经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » 编程经验 » 查看文章
如何兼顾性能+实时性处理缓冲数据?
来源:cnblogs  作者:Artech  时间:2023/5/30 9:34:39  对本文有异议

我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦,本文提供一种简单的实现方式。

一、实例演示
二、待处理的批量数据:Batch<T>
三、感知数据处理的时机:BatchChangeToken
四、接收、缓冲、打包和处理数据:Batcher<T>

一、实例演示

我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher<string>对象来接收应用分发给它的数据,该对象最终会在适当的时机处理它们。 调用Batcher<string>构造函数的三个参数分别表示:

  • processor:批量处理数据的委托对象,它指向的Process方法会将当前时间和处理的数据量输出到控制台上;
  • batchSize:单次处理的数据量,当缓冲的数据累积到这个阈值时会触发数据的自动处理。我们将这个阈值设置为10
  • interval:两次处理处理的最长间隔,我们设置为5秒
  1. var batcher = new Batcher<string>(
  2. processor:Process,
  3. batchSize:10,
  4. interval: TimeSpan.FromSeconds(5));
  5. var random = new Random();
  6. while (true)
  7. {
  8. var count = random.Next(1, 4);
  9. for (var i = 0; i < count; i++)
  10. {
  11. batcher.Add(Guid.NewGuid().ToString());
  12. }
  13. await Task.Delay(1000);
  14. }
  15.  
  16. static void Process(Batch<string> batch)=> Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");

如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。

clip_image002

二、待处理的批量数据:Batch<T>

除了上面实例涉及的Batcher<T>,该解决方案还涉及两个额外的类型,如下这个Batch<T>类型表示最终发送的批量数据。为了避免缓冲数据带来的内存分配,我们使用了一个单独的ArrayPool<T>对象来创建池化的数组,这个功能体现在静态方法CreatePooledArray方法上。由于构建Batch<T>对象提供的数组来源于对象池,在处理完毕后必须回归对象池,所以我们让这个类型实现了IDisposable接口,并将这一操作实现在Dispose方法种。在调用ArrayPool<T>对象的Return方法时,我们特意将数组清空。由于提供的数组来源于对象池,所以并不能保证每个数据元素都承载了有效的数据,实现的迭代器和返回数量的Count属性对此作了相应的处理。

  1. public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class
  2. {
  3. private bool _isDisposed;
  4. private int? _count;
  5. private readonly T[] _data;
  6. private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create();
  7.  
  8. public int Count
  9. {
  10. get
  11. {
  12. if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
  13. if(_count.HasValue) return _count.Value;
  14. var count = 0;
  15. for (int index = 0; index < _data.Length; index++)
  16. {
  17. if (_data[index] is null)
  18. {
  19. break;
  20. }
  21. count++;
  22. }
  23. return (_count = count).Value;
  24. }
  25. }
  26. public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data));
  27. public void Dispose()
  28. {
  29. _pool.Return(_data, clearArray: true);
  30. _isDisposed = true;
  31. }
  32. public IEnumerator<T> GetEnumerator() => new Enumerator(this);
  33. IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
  34. public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize);
  35. private void EnsureNotDisposed()
  36. {
  37. if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
  38. }
  39.  
  40. private sealed class Enumerator : IEnumerator<T>
  41. {
  42. private readonly Batch<T> _batch;
  43. private readonly T[] _data;
  44. private int _index = -1;
  45. public Enumerator(Batch<T> batch)
  46. {
  47. _batch = batch;
  48. _data = batch._data;
  49. }
  50. public T Current
  51. {
  52. get { _batch.EnsureNotDisposed(); return _data[_index]; }
  53. }
  54. object IEnumerator.Current => Current;
  55. public void Dispose() { }
  56. public bool MoveNext()
  57. {
  58. _batch.EnsureNotDisposed();
  59. return ++_index < _data.Length && _data[_index] is not null;
  60. }
  61. public void Reset()
  62. {
  63. _batch.EnsureNotDisposed();
  64. _index = -1;
  65. }
  66. }
  67. }

三、感知数据处理的时机:BatchChangeToken

Batcher具有两个触发数据处理的设置:缓冲的数据量和两次数据处理之间的最长间隔。当累积的数据量或者当前时间与上一次处理的间隔达到阈值,缓冲的数据将自动被处理。.NET Core经常利用一个IChangeToken作为通知的令牌,为此我们定义了如下这个实现了该接口的BatchChangeToken类型。如下面的代码片段所示,上述两个触发条件体现在两个CancellationToken对象上,我们利用它们创建了对应的CancellationChangeToken对象,最后利用这两个CancellationChangeToken创建了一个CompositeChangeToken对象。这个CompositeChangeToken对象最终被用来实现了IChangeToken接口的三个成员。

  1. internal sealed class BatchChangeToken : IChangeToken
  2. {
  3. private readonly IChangeToken _innerToken;
  4. private readonly int _countThreshold;
  5. private readonly CancellationTokenSource _expirationTokenSource;
  6. private readonly CancellationTokenSource _countTokenSource;
  7. private int _counter;
  8.  
  9. public BatchChangeToken(int countThreshold, TimeSpan timeThreshold)
  10. {
  11. _countThreshold = countThreshold;
  12. _countTokenSource = new CancellationTokenSource();
  13. _expirationTokenSource = new CancellationTokenSource(timeThreshold);
  14. var countToken = new CancellationChangeToken(_countTokenSource.Token);
  15. var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token);
  16. _innerToken = new CompositeChangeToken(new IChangeToken[] { countToken, expirationToken });
  17. }
  18.  
  19. public bool HasChanged => _innerToken.HasChanged;
  20. public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks;
  21. public IDisposable RegisterChangeCallback(Action<object?> callback, object? state) => _innerToken.RegisterChangeCallback(s =>
  22. {
  23. callback(s);
  24. _countTokenSource.Dispose();
  25. _expirationTokenSource.Dispose();
  26. }, state);
  27. public void Increase()
  28. {
  29. Interlocked.Increment(ref _counter);
  30. if (_counter >= _countThreshold)
  31. {
  32. _countTokenSource.Cancel();
  33. }
  34. }
  35. }

上述两个CancellationToken来源于对应的CancellationTokenSource,对应的字段为_countTokenSource和_expirationTokenSource。_expirationTokenSource根据设置的数据处理时间间隔创建而成。为了确定缓冲的数据量,我们提供了一个计数器,并利用Increase方法进行计数。在超过设置的数据量时,该方法会调用_expirationTokenSource的Cancel方法。在实现的ActiveChangeCallbacks方法种,我们将针对这两个CancellationTokenSource的释放放在注册的回调中。

四、接收、缓冲、打包和处理数据:Batcher<T>

最终用于打包的Batcher类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action<Batch<T>>委托完成针对打包数据的处理。通过Add方法接收的数据存储在_data字段返回的数组上,它时通过Batch<T>的静态方法CreatePooledArray提供的。我们使用字段_index表示添加数据在_data数组中存储的位置,并使用InterLocked.Increase方法解决并发问题。

  1. public sealed class Batcher<T> : IDisposable where T : class
  2. {
  3. private readonly Action<Batch<T>> _processor;
  4. private T[] _data;
  5. private BatchChangeToken _changeToken = default!;
  6. private readonly int _batchSize;
  7. private int _index = -1;
  8. private readonly IDisposable _scheduler;
  9.  
  10. public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval)
  11. {
  12. _processor = processor ?? throw new ArgumentNullException(nameof(processor));
  13. _batchSize = batchSize;
  14. _data = Batch<T>.CreatePooledArray(batchSize);
  15. _scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(_batchSize, interval), OnChange);
  16.  
  17. void OnChange()
  18. {
  19. var data = Interlocked.Exchange(ref _data, Batch<T>.CreatePooledArray(batchSize));
  20. if (data[0] is not null)
  21. {
  22. Interlocked.Exchange(ref _index, -1);
  23. _ = Task.Run(() => _processor.Invoke(new Batch<T>(data)));
  24. }
  25. }
  26. }
  27.  
  28. public void Add(T item)
  29. {
  30. if (item is null) throw new ArgumentNullException(nameof(item));
  31. var index = Interlocked.Increment(ref _index);
  32. if (index >= _batchSize)
  33. {
  34. SpinWait.SpinUntil(() => _index < _batchSize - 1);
  35. Add(item);
  36. }
  37. _data[index] = item;
  38. _changeToken.Increase();
  39. }
  40.  
  41. public void Dispose() => _scheduler.Dispose();
  42. }

在构造函数中,我们调用了ChangeToken的静态方法OnChange将数据处理操作绑定到创建的BatchChangeToken对象上,并确保每次发送“数据处理”后将重新创建的BatchChangeToken对象赋值到_changeToken字段上,因为Add放到需要调用它的Increase增加计数。当接收到数据处理通知后,我们会调用Batch<T>的静态方法CreatePooledArray构建一个数组将字段 -_data引用的数组替换下来,并将其封装成Batch<T>对象进行处理(如果数据存在)。于此同时,表示添加数据存储索引的_index恢复成-1。Add方法在对_index做自增操作后,如果发现累积的数据量达到阈值,需要等待数据处理完毕。由于数据处理以异步的方式处理,这里的耗时时很低的,所以我们这里选择了自旋锁的方式等待它完成。

原文链接:https://www.cnblogs.com/artech/p/batcher.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号