| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- using CommunityToolkit.HighPerformance;
- using CommunityToolkit.HighPerformance.Buffers;
- using System;
- using System.Buffers;
- using System.Collections.Concurrent;
- using System.Threading;
- using System.Threading.Tasks;
- namespace HighPerformance
- {
- internal class Program
- {
- static void Main(string[] args)
- {
- var producerConsumerService = new ProducerConsumerWithBlockingCollection();
- Console.WriteLine("开始运行生产者-消费者演示...");
- // 启动生产者任务
- var producerTask = Task.Run(() => producerConsumerService.StartProducing());
- // 启动多个消费者任务
- var consumerTasks = new Task[5];
- for (int i = 0; i < 5; i++)
- {
- int consumerId = i + 1;
- consumerTasks[i] = Task.Run(() => producerConsumerService.StartConsuming(consumerId));
- }
- // 等待一段时间后停止生产
- Thread.Sleep(5000);
- producerConsumerService.StopProducing();
- // 等待所有消费者完成
- Task.WaitAll(consumerTasks);
- Console.WriteLine("演示结束");
- Console.ReadKey();
- }
- }
- /// <summary>
- /// 使用 BlockingCollection 的生产者-消费者服务
- /// </summary>
- public class ProducerConsumerWithBlockingCollection
- {
- private readonly BlockingCollection<int> _collection;
- private volatile bool _shouldStop = false;
- public ProducerConsumerWithBlockingCollection()
- {
- // 设置容量限制
- _collection = new BlockingCollection<int>(boundedCapacity: 100);
- }
- /// <summary>
- /// 开始生产数据
- /// </summary>
- public async Task StartProducing()
- {
- int itemCounter = 0;
- while (!_shouldStop)
- {
- var item = Interlocked.Increment(ref itemCounter);
- // 尝试添加到集合(如果已满则等待)
- if (_collection.TryAdd(item, 100)) // 100ms 超时
- {
- Console.WriteLine($"生产者: 生产了项目 #{item}");
- }
- else
- {
- Console.WriteLine($"生产者: 队列满,跳过项目 #{item}");
- }
- // 模拟生产延迟
- await Task.Delay(10);
- }
- // 标记已完成添加
- _collection.CompleteAdding();
- Console.WriteLine("生产者: 停止生产");
- }
- /// <summary>
- /// 开始消费数据
- /// </summary>
- /// <param name="consumerId">消费者ID</param>
- public async Task StartConsuming(int consumerId)
- {
- foreach (var item in _collection.GetConsumingEnumerable())
- {
- // 模拟处理时间
- await Task.Delay(100);
- Console.WriteLine($"消费者 {consumerId}: 处理了项目 #{item}");
- }
- Console.WriteLine($"消费者 {consumerId}: 完成工作,退出");
- }
- /// <summary>
- /// 停止生产
- /// </summary>
- public void StopProducing()
- {
- _shouldStop = true;
- }
- }
- }
|