camel:如何使用streamlist从sql组件流式处理

qij5mzcb  于 2021-07-12  发布在  Java
关注(0)|答案(1)|浏览(356)

我正在尝试使用camels sql组件从数据库中使用outputtype=streamlist进行流式处理。我从一个带有consumertemplate的java类中获取resultiterator:

public Flux<String> CreateFlux () {
ConsumerTemplate consumer = camelContext.createConsumerTemplate();

    ResultSetIterator resultSetIterator = consumer.receiveBody(
            "sql:SELECT DATA FROM TRANSAKSJON WHERE REQ_ID='" + recId + "'?outputType=StreamList", ResultSetIterator.class);
    ...
    while (result.hasNext()) {
        Map<String, String> map = (Map<String, String>) result.next();
        String data = map.get("DATA");

    }
}

尝试迭代ResultSeterator时出现以下错误:
org.h2.jdbc.jdbcsqlexception:对象已关闭[90007-197]
经检查,我看到连接处已关闭。连接={hikariproxyconnection@16287} "hikariproxyconnection@1048081993 Package com.zaxxer.hikari.pool.proxyconnection.closedconnection“
如何使用camel sql组件来流式处理?我必须从一个不在 Camel 路线内的豆子上使用它。我发现只有在camel路由中使用sql组件时,流才能工作。 
camel版本是:2.24.1
更新1:在看了源代码之后,它是有意的。ondone关闭连接。我正在尝试在defaultexchange上设置unitofwork,以便通过将exchange标记为“未完成”来保持连接打开。
update2:通过设置unitof work:

ProducerTemplate pTmp = camelContext.createProducerTemplate();

        DefaultExchange defaultExchange = new DefaultExchange(camelContext);
        UnitOfWork unitOfWork = new DefaultUnitOfWork(defaultExchange);
        defaultExchange.setUnitOfWork(unitOfWork);
        pTmp.send("direct:DbStream", defaultExchange);

route dbstream执行上述sql select

8nuwlpux

8nuwlpux1#

不要使用receivebody,而只使用receive来获取 Exchange 回来。然后,您可以从迭代器的消息体中获取迭代器,在使用之后,您可以完成交换(参见javadoc)

相关问题