Program.cs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. using CommunityToolkit.HighPerformance;
  2. using CommunityToolkit.HighPerformance.Buffers;
  3. using System;
  4. using System.Buffers;
  5. using System.Collections.Concurrent;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace HighPerformance
  9. {
  10. internal class Program
  11. {
  12. static void Main(string[] args)
  13. {
  14. var producerConsumerService = new ProducerConsumerWithBlockingCollection();
  15. Console.WriteLine("开始运行生产者-消费者演示...");
  16. // 启动生产者任务
  17. var producerTask = Task.Run(() => producerConsumerService.StartProducing());
  18. // 启动多个消费者任务
  19. var consumerTasks = new Task[5];
  20. for (int i = 0; i < 5; i++)
  21. {
  22. int consumerId = i + 1;
  23. consumerTasks[i] = Task.Run(() => producerConsumerService.StartConsuming(consumerId));
  24. }
  25. // 等待一段时间后停止生产
  26. Thread.Sleep(5000);
  27. producerConsumerService.StopProducing();
  28. // 等待所有消费者完成
  29. Task.WaitAll(consumerTasks);
  30. Console.WriteLine("演示结束");
  31. Console.ReadKey();
  32. }
  33. }
  34. /// <summary>
  35. /// 使用 BlockingCollection 的生产者-消费者服务
  36. /// </summary>
  37. public class ProducerConsumerWithBlockingCollection
  38. {
  39. private readonly BlockingCollection<int> _collection;
  40. private volatile bool _shouldStop = false;
  41. public ProducerConsumerWithBlockingCollection()
  42. {
  43. // 设置容量限制
  44. _collection = new BlockingCollection<int>(boundedCapacity: 100);
  45. }
  46. /// <summary>
  47. /// 开始生产数据
  48. /// </summary>
  49. public async Task StartProducing()
  50. {
  51. int itemCounter = 0;
  52. while (!_shouldStop)
  53. {
  54. var item = Interlocked.Increment(ref itemCounter);
  55. // 尝试添加到集合(如果已满则等待)
  56. if (_collection.TryAdd(item, 100)) // 100ms 超时
  57. {
  58. Console.WriteLine($"生产者: 生产了项目 #{item}");
  59. }
  60. else
  61. {
  62. Console.WriteLine($"生产者: 队列满,跳过项目 #{item}");
  63. }
  64. // 模拟生产延迟
  65. await Task.Delay(10);
  66. }
  67. // 标记已完成添加
  68. _collection.CompleteAdding();
  69. Console.WriteLine("生产者: 停止生产");
  70. }
  71. /// <summary>
  72. /// 开始消费数据
  73. /// </summary>
  74. /// <param name="consumerId">消费者ID</param>
  75. public async Task StartConsuming(int consumerId)
  76. {
  77. foreach (var item in _collection.GetConsumingEnumerable())
  78. {
  79. // 模拟处理时间
  80. await Task.Delay(100);
  81. Console.WriteLine($"消费者 {consumerId}: 处理了项目 #{item}");
  82. }
  83. Console.WriteLine($"消费者 {consumerId}: 完成工作,退出");
  84. }
  85. /// <summary>
  86. /// 停止生产
  87. /// </summary>
  88. public void StopProducing()
  89. {
  90. _shouldStop = true;
  91. }
  92. }
  93. }