using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.Remoting.Channels; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace AsynchronousTest { internal class Program { static void Main(string[] args) { Console.WriteLine("=== 生产者-消费者模式演示 ===\n"); // 演示1:基本用法 BasicDemo(); Console.WriteLine("\n" + new string('=', 50) + "\n"); // 演示2:性能对比 PerformanceComparisonDemo(); Console.WriteLine("\n按任意键退出..."); Console.ReadKey(); } static void BasicDemo() { Console.WriteLine("1. 基本演示:"); var processor = new DataProcessor(4); // 4个消费者 processor.StartConsumers(); // 模拟生产者添加数据 Console.WriteLine("开始添加数据到队列..."); var watch = Stopwatch.StartNew(); for (int i = 1; i <= 20; i++) { processor.AddItem(i); Console.WriteLine($"生产者添加项目: {i}"); } processor.CompleteAdding(); processor.WaitForCompletion(); watch.Stop(); Console.WriteLine($"\n总共处理 {20} 个项目,耗时: {watch.ElapsedMilliseconds}ms"); } static void PerformanceComparisonDemo() { Console.WriteLine("2. 性能对比演示:"); const int itemCount = 1000; // 串行处理 Console.WriteLine("\n--- 串行处理 ---"); var serialWatch = Stopwatch.StartNew(); for (int i = 1; i <= itemCount; i++) { SimulateProcessing(i); } serialWatch.Stop(); Console.WriteLine($"串行处理 {itemCount} 个项目耗时: {serialWatch.ElapsedMilliseconds}ms"); // 并行处理(生产者-消费者模式) Console.WriteLine("\n--- 并行处理(生产者-消费者) ---"); var processor = new DataProcessor(Environment.ProcessorCount); processor.StartConsumers(); var parallelWatch = Stopwatch.StartNew(); // 添加大量数据 var items = new int[itemCount]; for (int i = 0; i < itemCount; i++) { items[i] = i + 1; } processor.AddItems(items); processor.CompleteAdding(); processor.WaitForCompletion(); parallelWatch.Stop(); Console.WriteLine($"并行处理 {itemCount} 个项目耗时: {parallelWatch.ElapsedMilliseconds}ms"); var speedup = (double)serialWatch.ElapsedMilliseconds / parallelWatch.ElapsedMilliseconds; Console.WriteLine($"性能提升: {speedup:F2}x"); } // 模拟单个项目处理 static void SimulateProcessing(int item) { Thread.SpinWait(1000); // 模拟0.1ms处理时间 } } public class DataProcessor { private readonly BlockingCollection _queue; private readonly CancellationTokenSource _cancellationTokenSource; private readonly int _consumerCount; public DataProcessor(int consumerCount = 4) { _queue = new BlockingCollection(new ConcurrentQueue()); _cancellationTokenSource = new CancellationTokenSource(); _consumerCount = consumerCount; } // 启动消费者 public void StartConsumers() { for (int i = 0; i < _consumerCount; i++) { int consumerId = i + 1; Task.Run(() => ConsumerWorker(consumerId, _cancellationTokenSource.Token)); } } // 消费者工作方法 private void ConsumerWorker(int consumerId, CancellationToken cancellationToken) { Console.WriteLine($"消费者 {consumerId} 已启动"); try { foreach (var item in _queue.GetConsumingEnumerable(cancellationToken)) { ProcessItem(item, consumerId); } } catch (OperationCanceledException) { Console.WriteLine($"消费者 {consumerId} 已取消"); } } // 处理单个数据项 private void ProcessItem(int item, int consumerId) { // 模拟0.1ms的处理时间 var startTime = DateTime.UtcNow; Thread.SpinWait(1000); // 模拟CPU工作(约0.1ms,具体取决于CPU速度) var endTime = DateTime.UtcNow; Console.WriteLine($"消费者 {consumerId} 处理了项目: {item}, 耗时: {(endTime - startTime).TotalMilliseconds:F2}ms"); } // 添加数据到队列(生产者方法) public void AddItem(int item) { if (!_cancellationTokenSource.Token.IsCancellationRequested) { _queue.Add(item); } } // 批量添加数据 public void AddItems(int[] items) { foreach (var item in items) { AddItem(item); } } // 完成添加,通知消费者可以结束 public void CompleteAdding() { _queue.CompleteAdding(); } // 停止所有消费者 public void Stop() { _cancellationTokenSource.Cancel(); CompleteAdding(); } // 等待所有消费者完成 public void WaitForCompletion() { // 等待队列被标记为完成添加 while (!_queue.IsCompleted) { Thread.Sleep(10); } } } }