java—在storm拓扑的所有线程中同步一个变量

v7pvogib  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(294)

我有一个storm拓扑,可以将kafka队列中的一些数据写入cassandra数据库。这个程序是多线程的。为了方便插入cassandra db,我将此作为我的dbutils:

public DBUtils() {
    if(session == null) {
        session = CassandraUtil.getInstance().getSession();
        LOG.info("Started a new session for dbUtils-Monitoring.....");
    }
    synchronized(session) {
        testMapper = new MappingManager(session).mapper(TestVO.class);
    }
}

因此,我使用synchronized在所有运行的线程中创建了一个dbutils示例。但是当我查看日志时,似乎会话正在初始化多次。storm拓扑中的dbutils仅在prepare方法中初始化,并且已在prepare/execute/clean-up方法中使用。因此,如果要在所有线程中使用的变量在多个位置使用,则不确定如何使用synchronized block。我的问题是如何在所有线程中只初始化session/dbutils变量一次。

c7rzv4ha

c7rzv4ha1#

由于storm是一个分布式系统,您不能在所有并行运行的螺栓上使用一个共享变量。您只能在单个工作jvm中的执行器上共享一个变量。
为此,您需要创建一个 static 变量并使用共享/静态对象示例来同步其初始化。

相关问题