Program.cs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Linq;
  7. using System.Runtime.Remoting.Channels;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Channels;
  11. using System.Threading.Tasks;
  12. namespace AsynchronousTest
  13. {
  14. internal class Program
  15. {
  16. static void Main(string[] args)
  17. {
  18. Console.WriteLine("=== 生产者-消费者模式演示 ===\n");
  19. // 演示1:基本用法
  20. BasicDemo();
  21. Console.WriteLine("\n" + new string('=', 50) + "\n");
  22. // 演示2:性能对比
  23. PerformanceComparisonDemo();
  24. Console.WriteLine("\n按任意键退出...");
  25. Console.ReadKey();
  26. }
  27. static void BasicDemo()
  28. {
  29. Console.WriteLine("1. 基本演示:");
  30. var processor = new DataProcessor(4); // 4个消费者
  31. processor.StartConsumers();
  32. // 模拟生产者添加数据
  33. Console.WriteLine("开始添加数据到队列...");
  34. var watch = Stopwatch.StartNew();
  35. for (int i = 1; i <= 20; i++)
  36. {
  37. processor.AddItem(i);
  38. Console.WriteLine($"生产者添加项目: {i}");
  39. }
  40. processor.CompleteAdding();
  41. processor.WaitForCompletion();
  42. watch.Stop();
  43. Console.WriteLine($"\n总共处理 {20} 个项目,耗时: {watch.ElapsedMilliseconds}ms");
  44. }
  45. static void PerformanceComparisonDemo()
  46. {
  47. Console.WriteLine("2. 性能对比演示:");
  48. const int itemCount = 1000;
  49. // 串行处理
  50. Console.WriteLine("\n--- 串行处理 ---");
  51. var serialWatch = Stopwatch.StartNew();
  52. for (int i = 1; i <= itemCount; i++)
  53. {
  54. SimulateProcessing(i);
  55. }
  56. serialWatch.Stop();
  57. Console.WriteLine($"串行处理 {itemCount} 个项目耗时: {serialWatch.ElapsedMilliseconds}ms");
  58. // 并行处理(生产者-消费者模式)
  59. Console.WriteLine("\n--- 并行处理(生产者-消费者) ---");
  60. var processor = new DataProcessor(Environment.ProcessorCount);
  61. processor.StartConsumers();
  62. var parallelWatch = Stopwatch.StartNew();
  63. // 添加大量数据
  64. var items = new int[itemCount];
  65. for (int i = 0; i < itemCount; i++)
  66. {
  67. items[i] = i + 1;
  68. }
  69. processor.AddItems(items);
  70. processor.CompleteAdding();
  71. processor.WaitForCompletion();
  72. parallelWatch.Stop();
  73. Console.WriteLine($"并行处理 {itemCount} 个项目耗时: {parallelWatch.ElapsedMilliseconds}ms");
  74. var speedup = (double)serialWatch.ElapsedMilliseconds / parallelWatch.ElapsedMilliseconds;
  75. Console.WriteLine($"性能提升: {speedup:F2}x");
  76. }
  77. // 模拟单个项目处理
  78. static void SimulateProcessing(int item)
  79. {
  80. Thread.SpinWait(1000); // 模拟0.1ms处理时间
  81. }
  82. }
  83. public class DataProcessor
  84. {
  85. private readonly BlockingCollection<int> _queue;
  86. private readonly CancellationTokenSource _cancellationTokenSource;
  87. private readonly int _consumerCount;
  88. public DataProcessor(int consumerCount = 4)
  89. {
  90. _queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
  91. _cancellationTokenSource = new CancellationTokenSource();
  92. _consumerCount = consumerCount;
  93. }
  94. // 启动消费者
  95. public void StartConsumers()
  96. {
  97. for (int i = 0; i < _consumerCount; i++)
  98. {
  99. int consumerId = i + 1;
  100. Task.Run(() => ConsumerWorker(consumerId, _cancellationTokenSource.Token));
  101. }
  102. }
  103. // 消费者工作方法
  104. private void ConsumerWorker(int consumerId, CancellationToken cancellationToken)
  105. {
  106. Console.WriteLine($"消费者 {consumerId} 已启动");
  107. try
  108. {
  109. foreach (var item in _queue.GetConsumingEnumerable(cancellationToken))
  110. {
  111. ProcessItem(item, consumerId);
  112. }
  113. }
  114. catch (OperationCanceledException)
  115. {
  116. Console.WriteLine($"消费者 {consumerId} 已取消");
  117. }
  118. }
  119. // 处理单个数据项
  120. private void ProcessItem(int item, int consumerId)
  121. {
  122. // 模拟0.1ms的处理时间
  123. var startTime = DateTime.UtcNow;
  124. Thread.SpinWait(1000); // 模拟CPU工作(约0.1ms,具体取决于CPU速度)
  125. var endTime = DateTime.UtcNow;
  126. Console.WriteLine($"消费者 {consumerId} 处理了项目: {item}, 耗时: {(endTime - startTime).TotalMilliseconds:F2}ms");
  127. }
  128. // 添加数据到队列(生产者方法)
  129. public void AddItem(int item)
  130. {
  131. if (!_cancellationTokenSource.Token.IsCancellationRequested)
  132. {
  133. _queue.Add(item);
  134. }
  135. }
  136. // 批量添加数据
  137. public void AddItems(int[] items)
  138. {
  139. foreach (var item in items)
  140. {
  141. AddItem(item);
  142. }
  143. }
  144. // 完成添加,通知消费者可以结束
  145. public void CompleteAdding()
  146. {
  147. _queue.CompleteAdding();
  148. }
  149. // 停止所有消费者
  150. public void Stop()
  151. {
  152. _cancellationTokenSource.Cancel();
  153. CompleteAdding();
  154. }
  155. // 等待所有消费者完成
  156. public void WaitForCompletion()
  157. {
  158. // 等待队列被标记为完成添加
  159. while (!_queue.IsCompleted)
  160. {
  161. Thread.Sleep(10);
  162. }
  163. }
  164. }
  165. }