| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- using System;
- using System.Collections;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Runtime.Remoting.Channels;
- using System.Text;
- using System.Threading;
- using System.Threading.Channels;
- using System.Threading.Tasks;
- namespace AsynchronousTest
- {
- internal class Program
- {
- static void Main(string[] args)
- {
- Console.WriteLine("=== 生产者-消费者模式演示 ===\n");
- // 演示1:基本用法
- BasicDemo();
- Console.WriteLine("\n" + new string('=', 50) + "\n");
- // 演示2:性能对比
- PerformanceComparisonDemo();
- Console.WriteLine("\n按任意键退出...");
- Console.ReadKey();
- }
- static void BasicDemo()
- {
- Console.WriteLine("1. 基本演示:");
- var processor = new DataProcessor(4); // 4个消费者
- processor.StartConsumers();
- // 模拟生产者添加数据
- Console.WriteLine("开始添加数据到队列...");
- var watch = Stopwatch.StartNew();
- for (int i = 1; i <= 20; i++)
- {
- processor.AddItem(i);
- Console.WriteLine($"生产者添加项目: {i}");
- }
- processor.CompleteAdding();
- processor.WaitForCompletion();
- watch.Stop();
- Console.WriteLine($"\n总共处理 {20} 个项目,耗时: {watch.ElapsedMilliseconds}ms");
- }
- static void PerformanceComparisonDemo()
- {
- Console.WriteLine("2. 性能对比演示:");
- const int itemCount = 1000;
- // 串行处理
- Console.WriteLine("\n--- 串行处理 ---");
- var serialWatch = Stopwatch.StartNew();
- for (int i = 1; i <= itemCount; i++)
- {
- SimulateProcessing(i);
- }
- serialWatch.Stop();
- Console.WriteLine($"串行处理 {itemCount} 个项目耗时: {serialWatch.ElapsedMilliseconds}ms");
- // 并行处理(生产者-消费者模式)
- Console.WriteLine("\n--- 并行处理(生产者-消费者) ---");
- var processor = new DataProcessor(Environment.ProcessorCount);
- processor.StartConsumers();
- var parallelWatch = Stopwatch.StartNew();
- // 添加大量数据
- var items = new int[itemCount];
- for (int i = 0; i < itemCount; i++)
- {
- items[i] = i + 1;
- }
- processor.AddItems(items);
- processor.CompleteAdding();
- processor.WaitForCompletion();
- parallelWatch.Stop();
- Console.WriteLine($"并行处理 {itemCount} 个项目耗时: {parallelWatch.ElapsedMilliseconds}ms");
- var speedup = (double)serialWatch.ElapsedMilliseconds / parallelWatch.ElapsedMilliseconds;
- Console.WriteLine($"性能提升: {speedup:F2}x");
- }
- // 模拟单个项目处理
- static void SimulateProcessing(int item)
- {
- Thread.SpinWait(1000); // 模拟0.1ms处理时间
- }
- }
- public class DataProcessor
- {
- private readonly BlockingCollection<int> _queue;
- private readonly CancellationTokenSource _cancellationTokenSource;
- private readonly int _consumerCount;
- public DataProcessor(int consumerCount = 4)
- {
- _queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
- _cancellationTokenSource = new CancellationTokenSource();
- _consumerCount = consumerCount;
- }
- // 启动消费者
- public void StartConsumers()
- {
- for (int i = 0; i < _consumerCount; i++)
- {
- int consumerId = i + 1;
- Task.Run(() => ConsumerWorker(consumerId, _cancellationTokenSource.Token));
- }
- }
- // 消费者工作方法
- private void ConsumerWorker(int consumerId, CancellationToken cancellationToken)
- {
- Console.WriteLine($"消费者 {consumerId} 已启动");
- try
- {
- foreach (var item in _queue.GetConsumingEnumerable(cancellationToken))
- {
- ProcessItem(item, consumerId);
- }
- }
- catch (OperationCanceledException)
- {
- Console.WriteLine($"消费者 {consumerId} 已取消");
- }
- }
- // 处理单个数据项
- private void ProcessItem(int item, int consumerId)
- {
- // 模拟0.1ms的处理时间
- var startTime = DateTime.UtcNow;
- Thread.SpinWait(1000); // 模拟CPU工作(约0.1ms,具体取决于CPU速度)
- var endTime = DateTime.UtcNow;
- Console.WriteLine($"消费者 {consumerId} 处理了项目: {item}, 耗时: {(endTime - startTime).TotalMilliseconds:F2}ms");
- }
- // 添加数据到队列(生产者方法)
- public void AddItem(int item)
- {
- if (!_cancellationTokenSource.Token.IsCancellationRequested)
- {
- _queue.Add(item);
- }
- }
- // 批量添加数据
- public void AddItems(int[] items)
- {
- foreach (var item in items)
- {
- AddItem(item);
- }
- }
- // 完成添加,通知消费者可以结束
- public void CompleteAdding()
- {
- _queue.CompleteAdding();
- }
- // 停止所有消费者
- public void Stop()
- {
- _cancellationTokenSource.Cancel();
- CompleteAdding();
- }
- // 等待所有消费者完成
- public void WaitForCompletion()
- {
- // 等待队列被标记为完成添加
- while (!_queue.IsCompleted)
- {
- Thread.Sleep(10);
- }
- }
- }
- }
|