我正在尝试使用Spring Integration实现发件箱模式。我配置了以下bean:
@Configuration
public class SpringIntegrationTestApplicationConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringIntegrationTestApplicationConfiguration.class);
public static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";
@MessagingGateway
public interface EmailGateway {
@Gateway(requestChannel = "mailbox")
void sendEmail(String mailBody,
@Header String target);
}
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
new MySqlChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean
ConcurrentMetadataStore concurrentMetadataStore(DataSource dataSource) {
JdbcMetadataStore jdbcMetadataStore = new JdbcMetadataStore(dataSource);
jdbcMetadataStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
return jdbcMetadataStore;
}
@Bean
MessageHandler sendEmailMessageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String target = (String) message.getHeaders().get("target");
LOGGER.info("not sending email with body: {} {}", message, target);
throw new RuntimeException("");
}
};
}
@Bean
QueueChannel mailboxChannel(JdbcChannelMessageStore jdbcChannelMessageStore) {
return MessageChannels.queue(jdbcChannelMessageStore, "mailbox").getObject();
}
@Bean
public IntegrationFlow buildFlow(ChannelMessageStore channelMessageStore,
MessageHandler sendEmailMessageHandler) {
return IntegrationFlow.from("mailbox")
.routeToRecipients(routes -> {
routes
.transactional()
.recipientFlow(flow -> flow
.channel(channels -> channels.queue(channelMessageStore, "outbox"))
.handle(sendEmailMessageHandler, e -> e.poller(poller -> poller.fixedDelay(1000).transactional()))
);
}).get();
}
}
字符串
我也有一个ApplicationRunner来测试这个东西:
@Component
public class Runner implements ApplicationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(Runner.class);
private final SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway;
public Runner(SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway) {
this.emailGateway = emailGateway;
}
@Override
public void run(ApplicationArguments args) throws Exception {
LOGGER.info("Sending 1");
emailGateway.sendEmail("This is my body", "target");
LOGGER.info("Sending 2");
emailGateway.sendEmail("This is my body2", "target2");
}
}
型
我用docker-compose
安装了MySQL:
version: "3.9"
services:
db:
image: mysql:8-oracle
environment:
MYSQL_ROOT_PASSWORD: 'root'
MYSQL_ALLOW_EMPTY_PASSWORD: 1
MYSQL_ROOT_HOST: "%"
MYSQL_DATABASE: 'sidb'
ports:
- "3306:3306"
healthcheck:
test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
timeout: 10s
interval: 5s
retries: 10
型
并通过application.properties
配置以使用该数据库:
spring.datasource.url=jdbc:mysql://localhost:3306/sidb
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
型
已通过Flyway配置了自动售货机表(参见https://github.com/wimdeblauwe/spring-integration-test/blob/main/src/main/resources/db/migration/V1__init.sql)。
完整的源代码可以在https://github.com/wimdeblauwe/spring-integration-test/tree/main上查看
目标是在启动时有一个发送2封电子邮件的模拟,但电子邮件发送的处理失败。我希望看到Spring Integration每1秒尝试发送一次电子邮件,并且数据库包含这些电子邮件的条目。所以如果我停止程序并重新启动它,重试会继续发生。
当我现在运行这个程序时,在90%的时间里,我会得到一个异常:
org.springframework.dao.CannotAcquireLockException: PreparedStatementCallback; SQL [INSERT into _spring_integration_CHANNEL_MESSAGE(
MESSAGE_ID,
GROUP_KEY,
REGION,
CREATED_DATE,
MESSAGE_PRIORITY,
MESSAGE_BYTES)
values (?, ?, ?, ?, ?, ?)
]; Lock wait timeout exceeded; try restarting transaction
at org.springframework.jdbc.support.SQLExceptionSubclassTranslator.doTranslate(SQLExceptionSubclassTranslator.java:78) ~[spring-jdbc-6.1.1.jar:6.1.1]
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:107) ~[spring-jdbc-6.1.1.jar:6.1.1]
型
我注意到的一件事是,如果我改变轮询器:
poller.fixedDelay(1000).transactional()
型
到
poller.fixedDelay(1000,1000).transactional()
型
然后这个问题似乎不会再发生了,因为“电子邮件”可能已经插入到数据库的时候,轮询开始。但这可能只是一个修复,而不是真正的修复,因为这个锁的问题可能会出现在任何时候在一个真正的应用程序,我猜?
1条答案
按热度按时间5sxhfpxr1#
你的项目的修复是这样的:
字符串
如果您无法以某种方式为DB设置
transaction-isolation = READ-COMMITTED
。