当尝试使用Spring Integration和MySQL实现发件箱模式时,“Lock wait timeout exceeded”

vc9ivgsu  于 5个月前  发布在  Mysql
关注(0)|答案(1)|浏览(35)

我正在尝试使用Spring Integration实现发件箱模式。我配置了以下bean:

@Configuration
public class SpringIntegrationTestApplicationConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpringIntegrationTestApplicationConfiguration.class);

    public static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";

    @MessagingGateway
    public interface EmailGateway {
        @Gateway(requestChannel = "mailbox")
        void sendEmail(String mailBody,
                       @Header String target);
    }

    @Bean
    public JdbcChannelMessageStore messageStore(DataSource dataSource) {
        JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
        jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
        jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
                new MySqlChannelMessageStoreQueryProvider());
        return jdbcChannelMessageStore;
    }

    @Bean
    ConcurrentMetadataStore concurrentMetadataStore(DataSource dataSource) {
        JdbcMetadataStore jdbcMetadataStore = new JdbcMetadataStore(dataSource);
        jdbcMetadataStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
        return jdbcMetadataStore;
    }

    @Bean
    MessageHandler sendEmailMessageHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String target = (String) message.getHeaders().get("target");
                LOGGER.info("not sending email with body: {} {}", message, target);
                throw new RuntimeException("");
            }
        };
    }

    @Bean
    QueueChannel mailboxChannel(JdbcChannelMessageStore jdbcChannelMessageStore) {
        return MessageChannels.queue(jdbcChannelMessageStore, "mailbox").getObject();
    }

    @Bean
    public IntegrationFlow buildFlow(ChannelMessageStore channelMessageStore,
                                     MessageHandler sendEmailMessageHandler) {
        return IntegrationFlow.from("mailbox")
                              .routeToRecipients(routes -> {
                                  routes
                                          .transactional()
                                          .recipientFlow(flow -> flow
                                                  .channel(channels -> channels.queue(channelMessageStore, "outbox"))
                                                  .handle(sendEmailMessageHandler, e -> e.poller(poller -> poller.fixedDelay(1000).transactional()))
                                          );
                              }).get();
    }

}

字符串
我也有一个ApplicationRunner来测试这个东西:

@Component
public class Runner implements ApplicationRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(Runner.class);

    private final SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway;

    public Runner(SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway) {
        this.emailGateway = emailGateway;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        LOGGER.info("Sending 1");
        emailGateway.sendEmail("This is my body", "target");
        LOGGER.info("Sending 2");
        emailGateway.sendEmail("This is my body2", "target2");
    }
}


我用docker-compose安装了MySQL:

version: "3.9"
services:
  db:
    image: mysql:8-oracle
    environment:
      MYSQL_ROOT_PASSWORD: 'root'
      MYSQL_ALLOW_EMPTY_PASSWORD: 1
      MYSQL_ROOT_HOST: "%"
      MYSQL_DATABASE: 'sidb'
    ports:
      - "3306:3306"
    healthcheck:
      test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
      timeout: 10s
      interval: 5s
      retries: 10


并通过application.properties配置以使用该数据库:

spring.datasource.url=jdbc:mysql://localhost:3306/sidb
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver


已通过Flyway配置了自动售货机表(参见https://github.com/wimdeblauwe/spring-integration-test/blob/main/src/main/resources/db/migration/V1__init.sql)。
完整的源代码可以在https://github.com/wimdeblauwe/spring-integration-test/tree/main上查看
目标是在启动时有一个发送2封电子邮件的模拟,但电子邮件发送的处理失败。我希望看到Spring Integration每1秒尝试发送一次电子邮件,并且数据库包含这些电子邮件的条目。所以如果我停止程序并重新启动它,重试会继续发生。
当我现在运行这个程序时,在90%的时间里,我会得到一个异常:

org.springframework.dao.CannotAcquireLockException: PreparedStatementCallback; SQL [INSERT into _spring_integration_CHANNEL_MESSAGE(
    MESSAGE_ID,
    GROUP_KEY,
    REGION,
    CREATED_DATE,
    MESSAGE_PRIORITY,
    MESSAGE_BYTES)
values (?, ?, ?, ?, ?, ?)
]; Lock wait timeout exceeded; try restarting transaction
    at org.springframework.jdbc.support.SQLExceptionSubclassTranslator.doTranslate(SQLExceptionSubclassTranslator.java:78) ~[spring-jdbc-6.1.1.jar:6.1.1]
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:107) ~[spring-jdbc-6.1.1.jar:6.1.1]


我注意到的一件事是,如果我改变轮询器:

poller.fixedDelay(1000).transactional()


poller.fixedDelay(1000,1000).transactional()


然后这个问题似乎不会再发生了,因为“电子邮件”可能已经插入到数据库的时候,轮询开始。但这可能只是一个修复,而不是真正的修复,因为这个锁的问题可能会出现在任何时候在一个真正的应用程序,我猜?

5sxhfpxr

5sxhfpxr1#

你的项目的修复是这样的:

.handle(sendEmailMessageHandler, e ->
        e.poller(poller ->
                    poller.fixedDelay(1000)
                        .transactional(
                                        new TransactionInterceptorBuilder()
                                                .isolation(Isolation.READ_COMMITTED)
                                                .build())))

字符串
如果您无法以某种方式为DB设置transaction-isolation = READ-COMMITTED

相关问题