RingBuffer.cs 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace MvvmScaffoldFrame48.DLL.SystemTools
  8. {
  9. public sealed class LockFreeRingBuffer<T>
  10. {
  11. private readonly T[] _buffer;
  12. private readonly int _mask;
  13. private volatile int _readIndex;
  14. private volatile int _writeIndex;
  15. private readonly int _capacity;
  16. public LockFreeRingBuffer(int capacity)
  17. {
  18. // 容量必须是2的幂次方
  19. if (!IsPowerOfTwo(capacity))
  20. capacity = RoundUpToPowerOfTwo(capacity);
  21. _capacity = capacity;
  22. _mask = _capacity - 1;
  23. _buffer = new T[_capacity];
  24. }
  25. private static bool IsPowerOfTwo(int x)
  26. {
  27. return x > 0 && (x & (x - 1)) == 0;
  28. }
  29. private static int RoundUpToPowerOfTwo(int x)
  30. {
  31. if (x <= 1) return 1;
  32. x--;
  33. x |= x >> 1;
  34. x |= x >> 2;
  35. x |= x >> 4;
  36. x |= x >> 8;
  37. x |= x >> 16;
  38. return x + 1;
  39. }
  40. public bool TryEnqueue(T item)
  41. {
  42. int currentWriteIndex, nextWriteIndex, currentReadIndex;
  43. do
  44. {
  45. currentWriteIndex = _writeIndex;
  46. currentReadIndex = _readIndex;
  47. nextWriteIndex = (currentWriteIndex + 1) & _mask;
  48. // 检查是否已满
  49. if (nextWriteIndex == currentReadIndex)
  50. return false;
  51. } while (Interlocked.CompareExchange(ref _writeIndex, nextWriteIndex, currentWriteIndex) != currentWriteIndex);
  52. // 设置值并更新发布标记
  53. _buffer[currentWriteIndex & _mask] = item;
  54. return true;
  55. }
  56. public bool TryDequeue(out T result)
  57. {
  58. result = default(T);
  59. int currentReadIndex, nextReadIndex, currentWriteIndex;
  60. do
  61. {
  62. currentReadIndex = _readIndex;
  63. currentWriteIndex = _writeIndex;
  64. // 检查是否为空
  65. if (currentReadIndex == currentWriteIndex)
  66. return false;
  67. nextReadIndex = (currentReadIndex + 1) & _mask;
  68. } while (Interlocked.CompareExchange(ref _readIndex, nextReadIndex, currentReadIndex) != currentReadIndex);
  69. result = _buffer[currentReadIndex & _mask];
  70. _buffer[currentReadIndex & _mask] = default(T); // 防止内存泄漏
  71. return true;
  72. }
  73. public int Count
  74. {
  75. get
  76. {
  77. int read = _readIndex;
  78. int write = _writeIndex;
  79. return (write - read) & _mask;
  80. }
  81. }
  82. }
  83. }