如何创建对activemq主题的dgs graphql订阅

1tu0hz3e  于 2021-07-09  发布在  Java
关注(0)|答案(1)|浏览(389)

我有一个有点复杂的技术堆栈。我正在利用netflix dgs提供graphql服务。幕后是一堆从各种服务发送和接收数据的jms组件。除了graphql订阅之外,我的一切都在工作。
具体来说,我要做的是为来自activemq主题的消息创建一个graphql订阅。
所以我有一个subscriptiondatafetcher,如下所示:

@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {

    private final Publisher<SurveyResult> surveyResultsReactiveSource;

    @Autowired
    public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
        this.surveyResultsReactiveSource = surveyResultsReactiveSource;
    }

    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        return surveyResultsReactiveSource;
    }
}

在我的spring配置中,我使用以下spring集成流:

@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
    SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

    return Flux.from(
            IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .log()
                    .toReactivePublisher())
            .map((message) -> converter.fromMessage(message, SurveyResult.class));
}

我要说几句话:
我有一个单独的房间 @JmsListener 也就是说收到这些离题的信息
即使建立了web套接字连接,我也看不到多个使用者。
如果我将mongo-reactive-spring数据存储库连接到这个graphql订阅,那么客户机将接收数据。
当我将客户端连接到订阅时,会看到以下日志:

PublishSubscribeChannel      : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler          : Subscription started for 1

我怀疑在建立web套接字连接时消息侦听器容器没有被激活。我应该“激活”通道适配器吗?我错过了什么?
技术堆栈:

// spring boots - version 2.4.3
    implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "org.springframework.boot:spring-boot-starter-activemq"
    implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
    implementation "org.springframework.boot:spring-boot-starter-integration"
    implementation 'org.springframework.boot:spring-boot-starter-security'

    // spring integration
    implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'

    // dgs
    implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
    implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'

更新1:
不管它值多少钱,如果我更新订阅到以下内容,我会在客户端得到结果。

@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        // repository is a ReactiveMongoRepository
        return repository.findAll();
    }

更新2:
这是最终确定的bean,以防它根据公认的解决方案帮助某人。我需要一个主题的听众,而不是一个队列。

@Bean
    public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
        SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
                Jms.container(connectionFactory(), surveyDestination).pubSubDomain(true))
                .jmsMessageConverter(converter))
                .toReactivePublisher();
    }
up9lanfz

up9lanfz1#

你的问题是:

return Flux.from(
        IntegrationFlows.from(

而框架却看不到它的内在 IntegrationFlow 示例进行正确的分析和注册。
为了使它发挥作用,你需要考虑宣布这一点 IntegrationFlow 作为顶级豆子。
像这样:

@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {  
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .transform([transform to SurveyResult])
                    .toReactivePublisher();
}

现在框架知道这是合乎逻辑的 IntegrationFlow 必须解析容器,并且必须注册和启动所有bean。
你可能需要重新考虑你的计划 SurveyResultMessageConverter 通俗易懂的逻辑 transform() 如果你不能提供 Jms.messageDrivenChannelAdapter 用你的转换器。
然后在你的 SurveyResultsSubscriptionDataFetcher 你只需要:

return surveyResultsReactiveSource.map(Message::getPayload);

相关问题