将Spring v2迁移到Spring v4 for org.springframework.cloud.stream.annotation

ltqd579y  于 5个月前  发布在  Spring
关注(0)|答案(2)|浏览(40)

简介

我目前正在使用Spring v2中的EnableBindingStreamListenerhttps://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/org/springframework/cloud/stream/annotation/package-summary.html

我的项目代码使用了这些注解:

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import com.enterprise.production.model.exceptions.ProductionStoreException;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.stream.service.ProductionStoreService;

import lombok.extern.slf4j.Slf4j;

@EnableBinding(Sink.class)
@Slf4j
@Service
public class ProductionMessageConsumer {

    @Autowired
    private ProductionStoreService productionService;
    @Autowired
    private Clock clock;

    @StreamListener(target = Sink.INPUT)
    public void handleEnergyProductionMessage(@Payload EnergyProductionMessage energyProductionMessage) throws ProductionStoreException {
        Instant start = clock.instant();

        log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                    energyProductionMessage.getDeviceId());
        log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());

        productionService.saveProductions(energyProductionMessage);
        log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                 energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                         .between(start, clock.instant()).toMillis());
        Instant startNormalization = clock.instant();
        log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                 energyProductionMessage.getDeviceId());
        productionService.saveProductions30m(energyProductionMessage);
        log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                 energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                         .between(startNormalization, clock.instant()).toMillis());
    }

    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        log.error("Fail to read message with error '{}'", message.getPayload());
    }

}

字符串

问题

我需要迁移到Spring v4,但这些注解在Spring v4中不可用:https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/annotation/package-summary.html

提问

有人知道如何将这些注解从Spring v2迁移到Spring v4吗?

ldxq2e6h

ldxq2e6h1#

您正在使用的注解已弃用,并在Spring Cloud Stream的3.2.x的相应部分中提到
基于注解的编程模型。基本上,@EnableBInding,@ Streaming和所有相关的注解现在都被弃用,以支持函数式编程模型。有关更多详细信息,请参阅Spring Cloud Function支持。
当你点击上面的链接时,你会看到
从Spring Cloud Stream v2.1开始,定义流处理程序和源的另一种选择是使用Spring Cloud Function的内置支持,它们可以表示为java.util.function.[Supplier/Function/Consumer]类型的bean。
为了迁移到v4,您需要将@StreamListener的实际配置声明为在特定类中定义的实际Spring bean,并使用@org.springframework.context.annotationConfiguration注解

@Configuration
public class ProductionMessageConsumerConfiguration {

    @Autowired
    private ProductionStoreService productionService;
    @Autowired
    private Clock clock;

    @Bean
    public Consumer<EnergyProductionMessage> consumeEnergyProductionMessage() {
        return energyProductionMessage -> {
            Instant start = clock.instant();

            log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                    energyProductionMessage.getDeviceId());
            log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());

            productionService.saveProductions(energyProductionMessage);
            log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                 energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                         .between(start, clock.instant()).toMillis());
            Instant startNormalization = clock.instant();
            log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                 energyProductionMessage.getDeviceId());
            productionService.saveProductions30m(energyProductionMessage);
            log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                 energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                         .between(startNormalization, clock.instant()).toMillis());
        }
    }

}

字符串
对于错误处理部分,通过defaut,它将创建一个包含消息有效负载的日志,但您可以按照本节的说明手动处理它

zzzyeukh

zzzyeukh2#

这些注解是折旧的。您可以使用Consumer定义它

@Service
public class ProductionMessageConsumer {

    @Autowired
    private ProductionStoreService productionService;

    @Bean
    public Consumer<EnergyProductionMessage> handleEnergyProductionMessage() {
        retrun energyProductionMessage -> {
            ...
            productionService.saveProductions(energyProductionMessage);
            ....
        }
    }

}

字符串
也请检查这个答案@EnableBinding @从3.1开始已弃用,支持函数式编程模型

相关问题