等待所有线程在春季集成中完成

我有一个自我可执行的jar程序,它严重依赖于Spring Integration。我遇到的问题是,在其他Spring bean完全完成之前,程序正在终止

以下是我正在使用的代码的精简版本,如果需要,我可以提供更多代码/配置。入口点是一个 main() 方法,它引导 Spring 并开始导入过程:

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

DataImporter 包含一个简单的循环,该循环将消息触发到 Spring 集成网关。这为流提供了一种主动的“推送”方法,而不是常见的数据轮询方法。这就是我的问题所在:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

为了完整起见,流 XML 如下所示:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

流有效地启动和处理项目,但是一旦 startImport() 循环完成,主线程就会立即终止并拆除所有 Spring 集成线程。这会导致争用条件,当程序终止时,最后 (n) 个项目未完全处理。

我有一个想法,即维护我正在处理的项目的参考计数,但这被证明是相当复杂的,因为流经常将消息拆分/路由到多个服务激活器 - 这意味着很难确定每个项目是否“完成”。

我认为我需要的是某种方法来检查没有Spring bean仍在执行,或者标记发送到网关的所有项目在终止之前都已完全处理。

我的问题是,我该如何去做这些,或者有没有更好的方法来解决我没有想到的问题?


答案 1

您在此处未使用请求-响应模式。

出站通道适配器是一个触发并忘记的操作,如果你想等待响应,你应该使用一个将等待响应的出站网关,并将响应连接到原始网关,然后在java中发送AndReceive不仅仅是发布。


答案 2

如果您可以确定是否仍然需要它(processingFinished()或在后端阶段执行的类似操作),则可以在中央机构注册所有 s,该中央机构跟踪未完成的数量并有效地确定终止条件。ItemItemItem

如果这种方法是可行的,您甚至可以考虑将项目打包成 -objects 或使用 中的类似概念。FutureTaskjava.util.concurrent

编辑:第二个想法:

你有没有想过让渠道更智能?发送方在不再发送任何数据时关闭通道。在此方案中,工作线程不必是 deamon 线程,但可以根据已关闭的空输入通道确定其终止条件。