Camel Spring Coud Sleuth导致ActiveMQ/JMS消息头丢失

p1iqtdky  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(141)

版本:

  • Spring启动:2.3.12.发布
  • Spring Cloud:Hoxton.SR12
  • SpringCloud入门级侦探:3.0.3
  • Camel :3.4.6

我想将Sleuth添加到一个预先存在的项目中,该项目现在使用ActiveMQ,而以前它使用JMS。当我这样做时,来自ActiceMQ消息的值被阻止/删除(其中一个是“filename”,它是S2请求的键值)。其他JMS值似乎仍然通过。
我需要了解为什么非JMS值被阻止/删除(我找不到任何有关导致这种情况发生的信息),以防止错误。
我知道我可以使用spring.sleuth.messaging.jms.enabled=true禁用JMS的Sleuth,但接下来我希望能够跟踪ActiveMQ/JMS代码,因此这种解决方案不是特别有吸引力。
由于这是预存在的代码,如果可能的话,我也希望避免重写它。
有没有人让SPring Cloud Sleuth与ActiveMQ/JMS一起工作,并且可能指出哪里出了问题?

编辑:

根据Marcin的初始响应,我们发现以下版本可以编译和执行,但仍存在重大问题:

  • Spring启动:2.4.8
  • Spring Cloud:2020.0.3
  • 驼色:3.7.5

"没有侦探"
ActiveMQ消息详细信息:

日志消息(存在“文件名”、“CamelAwsS 3Etag”等):

2021-08-16 10:10:37.889  INFO [MyApp,,] 28775 --- [umer[taskQueue]] taskQueueConsumer:***HEADERS IN***: {CamelAwsS3ETag=39d029a87fa4c6aaee5f1de643d9f3f6, Content-Type=application/json, filename=_bl001/group0/_bl001-group0-1629104686042.zip, JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-44053-1629104652998-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629104686350, JMSType=null, JMSXGroupID=_bl001-group0, JMSXGroupSeq=0, JMSXUserID=null}

"带着侦探"
ActiveMQ消息详细信息:

日志消息(缺少“文件名”、“CamelAwsS 3Etag”等):

2021-08-16 10:24:33.821  INFO [MyApp,,] 31553 --- [umer[taskQueue]] taskQueueConsumer:***HEADERS IN***: {JMSCorrelationID=null, JMSCorrelationIDAsBytes=null, JMSDeliveryMode=2, JMSDestination=queue://taskQueue, JMSExpiration=0, JMSMessageID=ID:server-42561-1629105658959-1:3:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1629105675306, JMSType=null, JMSXGroupID=_bl000-group1, JMSXGroupSeq=0, JMSXUserID=null}

示例Java代码

@Component
public class MyAppCoreRouter extends RouteBuilder {

    /* Other code */

    @Override
    public void configure() {
        from("activemq:queue:taskQueue?concurrentConsumers=" + taskNumberOfConcurrentConsumers)
                .log("***HEADERS IN***: ${headers}")
                .routeId("taskQueueConsumer")
                .threads(taskNumberOfConcurrentConsumers)
                .pollEnrich().simple("aws-s3://myapp-task?amazonS3Client=#amazonS3Client&fileName=${header.filename}&operation=getObject")
                .choice()
                .when(header(S3Constants.KEY).endsWith(".zip"))
                .to("file://" + taskLocalUnCompressedEndpoint + "?fileName=${header.CamelAwsS3ETag}/${header.CamelAwsS3Key}")
                .process(exchange -> {
                    String camelAwsS3ETag = exchange.getIn().getHeader("CamelAwsS3ETag", String.class);
                    String camelAwsS3Key = exchange.getIn().getHeader("CamelAwsS3Key", String.class);
                    File uniqueDir = new File(taskLocalUnCompressedEndpoint, camelAwsS3ETag);
                    File taskZip = new File(uniqueDir, camelAwsS3Key);
                    new ZipFile(taskZip).extractAll(uniqueDir.getAbsolutePath());
                })
                .setHeader("resourceDirectory", simple(taskLocalUnCompressedEndpoint + "/${header.CamelAwsS3ETag}")).setHeader("schedule.time", simple("${date:now}"))
                .removeHeader("CamelAwsS3Headers")
                .log(LoggingLevel.DEBUG, "Processing batch job ${headers}")
                .to("spring-batch:importMyAppRecordJob")
                .endChoice()
                .otherwise()
                .log(LoggingLevel.WARN, "Unexpected file type, filtering file name ${header.CamelAwsS3Key} ")
                .end();

        /* Other code */
    }
}
s1ag04yj

s1ag04yj1#

您正在使用错误的 Boot 、Cloud和Sleuth版本。若要使用Sleuth 3.0.x,您需要使用Cloud 2020.0.x和Boot 2.4.x或2.5.x。

相关问题