简介
我目前正在使用Spring v2中的EnableBinding
,StreamListener
:https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/org/springframework/cloud/stream/annotation/package-summary.html
我的项目代码使用了这些注解:
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.enterprise.production.model.exceptions.ProductionStoreException;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.stream.service.ProductionStoreService;
import lombok.extern.slf4j.Slf4j;
@EnableBinding(Sink.class)
@Slf4j
@Service
public class ProductionMessageConsumer {
@Autowired
private ProductionStoreService productionService;
@Autowired
private Clock clock;
@StreamListener(target = Sink.INPUT)
public void handleEnergyProductionMessage(@Payload EnergyProductionMessage energyProductionMessage) throws ProductionStoreException {
Instant start = clock.instant();
log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
productionService.saveProductions(energyProductionMessage);
log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(start, clock.instant()).toMillis());
Instant startNormalization = clock.instant();
log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
productionService.saveProductions30m(energyProductionMessage);
log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(startNormalization, clock.instant()).toMillis());
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
log.error("Fail to read message with error '{}'", message.getPayload());
}
}
字符串
问题
我需要迁移到Spring v4,但这些注解在Spring v4中不可用:https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/annotation/package-summary.html
提问
有人知道如何将这些注解从Spring v2迁移到Spring v4吗?
2条答案
按热度按时间ldxq2e6h1#
您正在使用的注解已弃用,并在Spring Cloud Stream的3.2.x的相应部分中提到
基于注解的编程模型。基本上,@EnableBInding,@ Streaming和所有相关的注解现在都被弃用,以支持函数式编程模型。有关更多详细信息,请参阅Spring Cloud Function支持。
当你点击上面的链接时,你会看到
从Spring Cloud Stream v2.1开始,定义流处理程序和源的另一种选择是使用Spring Cloud Function的内置支持,它们可以表示为
java.util.function.[Supplier/Function/Consumer]
类型的bean。为了迁移到v4,您需要将
@StreamListener
的实际配置声明为在特定类中定义的实际Spring bean,并使用@org.springframework.context.annotationConfiguration
注解字符串
对于错误处理部分,通过defaut,它将创建一个包含消息有效负载的日志,但您可以按照本节的说明手动处理它
zzzyeukh2#
这些注解是折旧的。您可以使用Consumer定义它
字符串
也请检查这个答案@EnableBinding @从3.1开始已弃用,支持函数式编程模型