public class FooScheme implements Scheme {
public Values deserialize(final byte[] _line) {
try{
Values values = new Values();
JSONObject msg = (JSONObject) JSONValue.parseWithException(new String(_line));
values.add((String) msg.get("a"));
values.add((String) msg.get("b"))
values.add(msg)
}
catch(ParseException e) {
//handle the exception
return null;
}
}
public Fields getOutputFields() {
return new Fields("a", "b", "json");}
}
你把它和你的嘴一起用,就像这样:
SpoutConfig spoutConfig = new SpoutConfig( ... your config here ...);
spoutConfig.scheme = new SchemeAsMultiScheme(new FooScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
topology.setSpout("kafka-spout", 1).setNumTasks(1);
现在您可以使用按“a”或“b”或两者进行分组的字段了。
FooBolt bolt = new FooBolt();
topology.setBolt("foo-bolt", new FooBolt(), 1).setNumtasks(1)
.fieldsGrouping("kafka-spout", new Fields("a","b"));
1条答案
按热度按时间tjvv9vkg1#
你需要实施
backtype.storm.spout.scheme
界面,基本上是这样的:你把它和你的嘴一起用,就像这样:
现在您可以使用按“a”或“b”或两者进行分组的字段了。
享受