Kafka流:处理多个流的程序体系结构

gt0wga4j  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(144)

我正在运行多个单独的Kafka流。为此,我创建了一个保存这些流的流管理器。下面是sm类的本质。

public class StreamManager {

public Map<String, BaseStream> streamMap = new HashMap<String, BaseStream>();
public Map<String, ReadOnlyKeyValueStore<Long, BaseModel>> storeMap = new HashMap<String, ReadOnlyKeyValueStore<Long, BaseModel>>();    

public StreamManager(String bootstrapServer){
    initialize(bootstrapServer);
}

/**
 * initialize streams.  Right now hard coding creation of streams here.
 * @param bootstrapServer
 */
public void initialize(String bootstrapServer){

    Properties s1Props = this.GetStreamingProperties(bootstrapServer, "STREAM_1");
    Properties s2Props = this.GetStreamingProperties(bootstrapServer, "STREAM_1");

    BaseStream s1Stream = new CompositeInfoStream(new KStreamBuilder(), compositeInfoProps);
    BaseStream s2Stream = new ImcInfoStream(new KStreamBuilder(), imcInfoProps);

    streamMap.put(s1Stream.storeName, s1Stream);
    streamMap.put(s2Stream.storeName, s2Stream);

/**
 * Start all streams
 */
public void startStreams(){
    for(BaseStream stream: streamMap.values()){
        stream.start();
    }
}

public static void main(String[] args) throws Exception {

    StreamManager mgr = new StreamManager(StreamingConfig.instance().MESSAGE_SERVER);
    StreamManager.startStreamServices(mgr);

    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            try {

            } finally {
                mgr.closeStreams();

            }
        }
    });

    int i = 0;
    while(true) {
        if (i++ == 0)
            mgr.logAllStreamStates();

        Thread.sleep(60000);
        if (i == 60) i = 0;
    }       
}
}

我初始化并启动流,然后让进程在循环中运行。现在我想要的是对单个流有更多的控制。如果需要的话,启动并杀死它们(出于一些奇怪的原因,我的流经常处于再平衡模式,不会回来。目前,如果其中一个流进入重新平衡,我必须杀死整个sm(所有流)并重新启动。我想做的只是重新启动单个流。
我想了解一下我的架构应该是怎样的。kafka流是否提供了一种机制来管理流集群?我可以使用多处理来完成这个任务吗?如果可以的话,请您给我一些参考资料,让我记住我们使用windows进行开发,使用linux进行部署

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题