如何在kafkaspout和bolt之间实现字段分组

c0vxltue  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(239)

我一直在给Kafka主题发送消息,这些消息在主题中是json格式的,我正在使用 KafkaSpout 用于从Kafka获取消息并使用随机分组将其发送给博尔特。现在我想实现字段之间的分组 KafkaSpout 和螺栓。谁能帮我一下吗。如何实现字段分组 KafkaSpout 和螺栓。

tjvv9vkg

tjvv9vkg1#

你需要实施 backtype.storm.spout.scheme 界面,基本上是这样的:

public class FooScheme implements Scheme {

  public Values deserialize(final byte[] _line) {

     try{
           Values values = new Values();
           JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line));
           values.add((String) msg.get("a"));
           values.add((String) msg.get("b"))
           values.add(msg)
        }
        catch(ParseException e) {
            //handle the exception
            return null;
        }

  }

  public Fields getOutputFields() {
     return new Fields("a", "b", "json");}
}

你把它和你的嘴一起用,就像这样:

SpoutConfig spoutConfig = new SpoutConfig( ... your config here ...);
spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
topology.setSpout("kafka-spout", 1).setNumTasks(1);

现在您可以使用按“a”或“b”或两者进行分组的字段了。

FooBolt bolt = new FooBolt();
topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1)
         .fieldsGrouping("kafka-spout", new Fields("a","b"));

享受

相关问题