using CommunityToolkit.HighPerformance;
using CommunityToolkit.HighPerformance.Buffers;
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace HighPerformance
{
internal class Program
{
static void Main(string[] args)
{
var producerConsumerService = new ProducerConsumerWithBlockingCollection();
Console.WriteLine("开始运行生产者-消费者演示...");
// 启动生产者任务
var producerTask = Task.Run(() => producerConsumerService.StartProducing());
// 启动多个消费者任务
var consumerTasks = new Task[5];
for (int i = 0; i < 5; i++)
{
int consumerId = i + 1;
consumerTasks[i] = Task.Run(() => producerConsumerService.StartConsuming(consumerId));
}
// 等待一段时间后停止生产
Thread.Sleep(5000);
producerConsumerService.StopProducing();
// 等待所有消费者完成
Task.WaitAll(consumerTasks);
Console.WriteLine("演示结束");
Console.ReadKey();
}
}
///
/// 使用 BlockingCollection 的生产者-消费者服务
///
public class ProducerConsumerWithBlockingCollection
{
private readonly BlockingCollection _collection;
private volatile bool _shouldStop = false;
public ProducerConsumerWithBlockingCollection()
{
// 设置容量限制
_collection = new BlockingCollection(boundedCapacity: 100);
}
///
/// 开始生产数据
///
public async Task StartProducing()
{
int itemCounter = 0;
while (!_shouldStop)
{
var item = Interlocked.Increment(ref itemCounter);
// 尝试添加到集合(如果已满则等待)
if (_collection.TryAdd(item, 100)) // 100ms 超时
{
Console.WriteLine($"生产者: 生产了项目 #{item}");
}
else
{
Console.WriteLine($"生产者: 队列满,跳过项目 #{item}");
}
// 模拟生产延迟
await Task.Delay(10);
}
// 标记已完成添加
_collection.CompleteAdding();
Console.WriteLine("生产者: 停止生产");
}
///
/// 开始消费数据
///
/// 消费者ID
public async Task StartConsuming(int consumerId)
{
foreach (var item in _collection.GetConsumingEnumerable())
{
// 模拟处理时间
await Task.Delay(100);
Console.WriteLine($"消费者 {consumerId}: 处理了项目 #{item}");
}
Console.WriteLine($"消费者 {consumerId}: 完成工作,退出");
}
///
/// 停止生产
///
public void StopProducing()
{
_shouldStop = true;
}
}
}