在数据库中实现save并用java回滚发送到kafka的最佳方法是什么

ars1skjm  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(194)

我有下面的方法,它首先将条目保存在db中,然后将其发送到kafka topic。我希望这两个操作都发生(按顺序)或不发生。

@Transactional(value = TxType.REQUIRED)
    public Long saveAndSend(String topic, byte[] byteArray, MyEntity myEntity ) throws InterruptedException, ExecutionException, TimeoutException{

        Long id= myDao.persistEntity(myEntity);
        MyUtil.sendWithTimeout(eventSender, topic, byteArray);
        return id;
    }

由于发送到kafka主题涉及到一个阻塞调用,我不想挂起我的api,所以我实现了一个sendwithtimeout方法,如下所示:

public static Long sendWithTimeout(EventSenderKafka eventSender, String topic, byte[] byteArray) throws InterruptedException, ExecutionException, TimeoutException {

        bytes = byteArray;
        ev = eventSender;
        top = topic;
        ExecutorService executor = Executors.newCachedThreadPool();
        Callable<String> task = new Callable<String>() { 

           public String call( ) {
               ev.sendWithByte(top, bytes);
               return "OK";
           }
        };
        Future<String> future = executor.submit(task);
        Object result = future.get(2, TimeUnit.SECONDS); 

        return null;
    }

由于上面提到在api的业务流中使用异常,我想知道有没有更好的方法来实现这一点?我认为在业务流中使用异常是个坏主意,如果我错了,请纠正我。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题