kafka Storm集成

ocebsuys  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(266)

我用的是Kafka。请在下面找到测试程序。
我正在使用storm 0.8.1。风暴0.8.2中存在多模式类。我会用的。我只想知道早期版本仅仅通过示例化stringscheme()类是如何工作的?我在哪里可以下载Kafka喷口的早期版本?但我怀疑这将是一个正确的选择比工作风暴0.8.2(困惑)
当我在storm cluster上运行代码(如下所示)(即,当我推送拓扑时)时,我得到以下错误(当scheme部分被注解时会发生这种情况,否则我当然会得到编译器错误,因为类在0.8.1中不存在):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
        at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme

在下面给出的代码中,您可以找到spoutconfig.scheme=newStringScheme();部分评论道。我得到编译器错误,如果我不评论这一行,这是很自然的,因为那里没有构造函数。另外,当我示例化multischeme时,我得到一个错误,因为我在0.8.1中没有这个类。

public class TestTopology {
    public static class PrinterBolt extends BaseBasicBolt {
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple.toString());
        }
    }

    public static void main(String [] args) throws Exception {
        List<HostPort> hosts = new ArrayList<HostPort>();
        hosts.add(new HostPort("127.0.0.1",9092));
        LocalCluster cluster = new LocalCluster();
        TopologyBuilder builder = new TopologyBuilder();
        SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
        spoutConfig.zkServers=ImmutableList.of("localhost");
        spoutConfig.zkPort=2181;
        //spoutConfig.scheme=new StringScheme();
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        builder.setSpout("spout",new KafkaSpout(spoutConfig));
        builder.setBolt("printer", new PrinterBolt())
                .shuffleGrouping("spout");
        Config config = new Config();

        cluster.submitTopology("kafka-test", config, builder.createTopology());

        Thread.sleep(600000);
    }
ohtdti5x

ohtdti5x1#

我也有同样的问题。最终解决了这个问题,我将完整的运行示例放到github上。
欢迎您在这里查看>https://github.com/buildlackey/cep
(点击storm+kafka目录中的一个示例程序,它可以让您启动并运行)。

bgibtngc

bgibtngc2#

我们也有类似的问题。
我们的解决方案:
打开pom.xml
将范围从提供更改为 <scope>compile</scope> 如果您想了解更多关于依赖作用域的信息,请查看maven docu:maven docu-dependency scopes

相关问题