如何将中断器与多种消息类型结合使用

我的系统有两种不同类型的消息 - 类型 A 和 B。每条消息都有不同的结构 - 类型 A 包含一个 int 成员,类型 B 包含一个双精度成员。我的系统需要将这两种类型的消息传递到多个业务逻辑线程。减少延迟非常重要,因此我正在研究使用 Disruptor 以机械交感的方式将消息从主线程传递到业务逻辑线程。

我的问题是,干扰器只接受环形缓冲区中的一种类型的对象。这是有道理的,因为干扰器预先分配了环形缓冲区中的对象。但是,这也使得通过 Disruptor 将两种不同类型的消息传递到我的业务逻辑线程变得很困难。据我所知,我有四个选择:

  1. 将干扰器配置为使用包含固定大小字节数组的对象(如应如何使用 Disruptor(Disruptor 模式)构建真实消息系统?在这种情况下,主线程必须在将消息发布到中断器之前将消息编码为字节数组,并且每个业务逻辑线程必须在收到时将字节数组解码回对象。此设置的缺点是业务逻辑线程没有真正共享来自干扰器的内存 - 相反,它们从干扰器提供的字节数组创建新对象(从而创建垃圾)。此设置的好处是,所有业务逻辑线程都可以从同一中断器读取多个不同类型的消息。

  2. 将干扰器配置为使用单一类型的对象,但创建多个干扰器,每个对象类型一个。在上面的例子中,将有两个单独的干扰器 - 一个用于A类型的对象,另一个用于B类型的对象。此设置的好处是,主线程不必将对象编码为字节数组,并且业务较少的逻辑线程可以共享与中断器中使用的对象相同的对象(不创建垃圾)。此设置的缺点是,每个业务逻辑线程都必须以某种方式订阅来自多个中断器的消息。

  3. 将干扰器配置为使用包含消息 A 和 B 的所有字段的单一类型的“超级”对象。这非常反对OO风格,但允许在选项#1和#2之间进行折衷。

  4. 将干扰器配置为使用对象引用。但是,在这种情况下,我失去了对象预分配和内存排序的性能优势。

对于这种情况,您有什么建议?我觉得选项#2是最干净的解决方案,但我不知道消费者是否可以或如何在技术上订阅来自多个破坏者的消息。如果有人能提供如何实现选项#2的示例,将不胜感激!


答案 1

将干扰器配置为使用包含固定大小字节数组的对象(如应如何使用 Disruptor(Disruptor 模式)构建真实消息系统?在这种情况下,主线程必须在将消息发布到中断器之前将消息编码为字节数组,并且每个业务逻辑线程必须在收到时将字节数组解码回对象。此设置的缺点是业务逻辑线程没有真正共享来自干扰器的内存 - 相反,它们从干扰器提供的字节数组创建新对象(从而创建垃圾)。此设置的好处是,所有业务逻辑线程都可以从同一中断器读取多个不同类型的消息。

这将是我的首选方法,但我稍微改变了我们的用例,几乎每个我们使用Disruptor的地方,它要么从某种I / O设备接收,要么发送到某种I / O设备,所以我们的基本货币是字节数组。您可以通过使用蝇量级方法进行编组来绕过对象创建。为了看到这方面的一个例子,我在Devoxx(https://github.com/mikeb01/ticketing)上展示了Javolution的Struct和Union类。如果您可以在从事件处理程序的 onEvent 调用返回之前完全处理该对象,则此方法效果很好。如果事件需要超出该点,那么您需要对数据进行某种复制,例如将其反序列化为对象。

将干扰器配置为使用单一类型的对象,但创建多个干扰器,每个对象类型一个。在上面的例子中,将有两个单独的干扰器 - 一个用于A类型的对象,另一个用于B类型的对象。此设置的好处是,主线程不必将对象编码为字节数组,并且业务较少的逻辑线程可以共享与中断器中使用的对象相同的对象(不创建垃圾)。此设置的缺点是,每个业务逻辑线程都必须以某种方式订阅来自多个中断器的消息。

如果不尝试此方法,则可能需要一个可以从多个环形缓冲区轮询的自定义事件处理器。

将干扰器配置为使用包含消息 A 和 B 的所有字段的单一类型的“超级”对象。这非常反对OO风格,但允许在选项#1和#2之间进行折衷。将干扰器配置为使用对象引用。但是,在这种情况下,我失去了对象预分配和内存排序的性能优势。

我们已经在一些情况下这样做了,在某些情况下,缺乏预分配是可以容忍的。它工作正常。如果要传递对象,则需要确保在使用者端完成它们后将其清空。我们发现,对“super”对象使用双重调度模式可以保持实现相当干净。这样做的一个缺点是,由于GC在标记阶段有更多的活动对象要遍历,因此您将获得稍微更长的GC停滞时间,因为GC会比直接的对象数组稍长。

对于这种情况,您有什么建议?我觉得选项#2是最干净的解决方案,但我不知道消费者是否可以或如何在技术上订阅来自多个破坏者的消息。如果有人能提供如何实现选项#2的示例,将不胜感激!

如果您希望在数据使用方面具有完全的灵活性,另一种选择是不使用环形缓冲区,而是直接与Sequencer交谈,并按照您认为最合适的方式定义对象布局。


答案 2

Ben Baumgold,我相信你现在已经找到了解决方案。您的#4(或#3)可以通过创建事件持有者来轻松实现。可以将其视为对象的枚举。为了加快查找速度,应使用枚举类型丰富事件。请注意,我在持有者中存储了对原始事件的引用。创建复制构造函数或 clone() 并在插入到环形缓冲区时复制事件可能更合适。

通过示例说明:

这是在事件中使用的枚举

public enum MyEventEnum {
EVENT_TIMER,
EVENT_MARKETDATA;
}

这是持有人。在任何时候,ringbuffer 中的这个实例只包含一个由 array[ type.ordinal() ] 索引的事件。为什么数组应该从代码中显而易见。

public class RingBufferEventHolder {    
 private MyEventEnum;   
 private EventBase array[];

 public RingBufferEventHolder() {
    array=new EventBase[MyEventEnum.values().length]; 
 }

 // TODO: null the rest
 public void setEvent(EventBase event) {
    type=event.getType();
    switch( event.getType() ) {
        case EVENT_TIMER:
            array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
            break;
        case EVENT_MARKETDATA:
            array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
            break;
        default:
            throw new RuntimeException("Unknown event type " + event );
    }
}

发布事件

   EventBase newEvent=new EventMarketData(....);
   // prepare
   long nextSequence = ringBuffer.next(); 
   RingBufferEventHolder holder = ringBuffer.get(nextSequence);
   holder.setEvent(newEvent);
   // make the event available to EventProcessors 
   ringBuffer.publish(nextSequence);

推荐