backtype.storm.Config类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(14.3k)|赞(0)|评价(0)|浏览(125)

本文整理了Java中backtype.storm.Config类的一些代码示例,展示了Config类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config类的具体详情如下:
包路径:backtype.storm.Config
类名称:Config

Config介绍

[英]Topology configs are specified as a plain old map. This class provides a convenient way to create a topology config map by providing setter methods for all the configs that can be set. It also makes it easier to do things like add serializations.

This class also provides constants for all the configurations possible on a Storm cluster and Storm topology. Each constant is paired with a schema that defines the validity criterion of the corresponding field. Default values for these configs can be found in defaults.yaml.

Note that you may put other configurations in any of the configs. Storm will ignore anything it doesn't recognize, but your topologies are free to make use of them by reading them in the prepare method of Bolts or the open method of Spouts.
[中]拓扑配置被指定为普通的旧映射。此类通过为所有可以设置的配置提供setter方法,提供了创建拓扑配置映射的方便方法。它还使添加序列化等操作变得更容易。
此类还为Storm群集和Storm拓扑上所有可能的配置提供常量。每个常量与定义相应字段有效性标准的模式配对。这些配置的默认值可以在默认值中找到。亚马尔。
请注意,您可以在任何配置中放置其他配置。Storm将忽略任何它无法识别的内容,但您的拓扑可以通过在螺栓准备方法或喷口打开方法中读取它们来自由使用它们。

代码示例

代码示例来源:origin: alibaba/jstorm

public static void testDrpc() {
  TopologyBuilder builder = new TopologyBuilder();
  LocalDRPC drpc = new LocalDRPC();
  
  DRPCSpout spout = new DRPCSpout("exclamation", drpc);
  builder.setSpout("drpc", spout);
  builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
  builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
  
  LocalCluster cluster = new LocalCluster();
  Config conf = new Config();
  cluster.submitTopology("exclaim", conf, builder.createTopology());
  
  JStormUtils.sleepMs(30 * 1000);
  
  try {
    System.out.println(drpc.execute("exclamation", "aaa"));
    System.out.println(drpc.execute("exclamation", "bbb"));
  } catch (Exception e) {
    Assert.fail("Failed to test drpc");
  }
  
  drpc.shutdown();
  cluster.shutdown();
}

代码示例来源:origin: alibaba/jstorm

public static void test() {
  TopologyBuilder builder = new TopologyBuilder();
  
  builder.setSpout("spout", new RandomSentenceSpout(), 5);
  
  builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
  builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
  
  Config conf = new Config();
  conf.setDebug(true);
  
  String[] className = Thread.currentThread().getStackTrace()[1].getClassName().split("\\.");
  String topologyName = className[className.length - 1];
  
  try {
    JStormHelper.runTopology(builder.createTopology(), topologyName, conf, 60,
        new JStormHelper.CheckAckedFail(conf), isLocal);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    Assert.fail("Failed");
  }
}

代码示例来源:origin: alibaba/jstorm

int spout_Parallelism_hint = JStormUtils.parseInt(conf.get(TOPOLOGY_SPOUT_PARALLELISM_HINT), 1);
int split_Parallelism_hint = JStormUtils.parseInt(conf.get(TOPOLOGY_SPLIT_PARALLELISM_HINT), 1);
int count_Parallelism_hint = JStormUtils.parseInt(conf.get(TOPOLOGY_COUNT_PARALLELISM_HINT), 2);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new FastRandomSentenceSpout(), spout_Parallelism_hint);
builder.setBolt("split", new SplitSentence(), split_Parallelism_hint).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), count_Parallelism_hint).fieldsGrouping("split", new Fields("word"));
conf.put(ConfigExtension.TOPOLOGY_BACKPRESSURE_ENABLE, true);
conf.setNumWorkers(8);

代码示例来源:origin: alibaba/jstorm

@Override
public Map<String, Object> getComponentConfiguration() {
  Config conf = new Config();
  conf.setMaxTaskParallelism(1);
  return conf;
}

代码示例来源:origin: alibaba/jstorm

private static Config createTopologyConfiguration() {
  Config conf = new Config();
  conf.setDebug(true);
  return conf;
}

代码示例来源:origin: alibaba/jstorm

@Override
public Map<String, Object> getComponentConfiguration() {
  Config ret = new Config();
  ret.setMaxTaskParallelism(1);
  ret.registerSerialization(TransactionAttempt.class);
  return ret;
}

代码示例来源:origin: jasonTangxd/recommendSys

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("spout", new DemoSpout(), 1);
  //builder.setBolt("bolt", new RecommenderBolt(), 1).shuffleGrouping("spout");
  builder.setBolt("detailBolt", new RecommenderDetailBolt(), 1).shuffleGrouping("spout");
  builder.setBolt("statBolt", new RecommenderStatBolt(), 2).fieldsGrouping("detailBolt", new Fields("userId"));
  Config config = new Config();
  //这一行代码表示让spout每10秒发送一个特殊的tuple 此tuple的 SourceComponent就是 SYSTEM_COMPONENT_ID = "__system"
  //config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
  
  config.setDebug(true);
  //提交远程
  if (args != null && args.length > 0) {
    config.setNumWorkers(3); // use three worker processes
   StormSubmitter.submitTopology(args[0], config, builder.createTopology());
  }
  else {
    //提交本地模式
    LocalCluster cluster = new LocalCluster();
    config.setNumWorkers(3);
    cluster.submitTopology("recommenderTopology", config, builder.createTopology());
  }
}

代码示例来源:origin: udacity/ud381

public static void main(String[] args) throws Exception
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("word", new TestWordSpout(), 10);
 builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
 builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
 Config conf = new Config();
 conf.setDebug(true);
  conf.setNumWorkers(3);
  StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology("exclamation", conf, builder.createTopology());
  cluster.killTopology("exclamation");

代码示例来源:origin: alibaba/jstorm

public static void test() {
  TopologyBuilder builder = new TopologyBuilder();
  
  builder.setSpout("spout", new InOrderSpout(), 8);
  builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1"));
  
  Config conf = new Config();
  conf.setMaxSpoutPending(20);
  
  String[] className = Thread.currentThread().getStackTrace()[1].getClassName().split("\\.");
  String topologyName = className[className.length - 1];
  
  if (isLocal) {
    drpc = new LocalDRPC();
  }
  
  try {
    JStormHelper.runTopology(buildTopology(drpc), topologyName, conf, 60, new DrpcValidator(), isLocal);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    Assert.fail("Failed");
  }
}

代码示例来源:origin: dongeforever/middlewarerace

public static void  allInOneRaceV2(boolean local){
  Config conf = new Config();
  conf.put("is.stat.enable",false);
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("all_in_one_spout_2", new AllInOneSpoutV2(), 1);
  //写入tair
  BoltDeclarer flushBolt = builder.setBolt("flush_tair_bolt", new FlushTairBolt(),1);
  flushBolt.shuffleGrouping("all_in_one_spout_2");
  String topologyName = RaceConfig.JstormTopologyName;
  try {
    if(local){
      submitLocal(topologyName,conf,builder.createTopology());
    }else {
      StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
    }
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
}

代码示例来源:origin: alibaba/jstorm

int spout_Parallelism_hint = JStormUtils.parseInt(conf.get(TOPOLOGY_SPOUT_PARALLELISM_HINT), 1);
int bolt_Parallelism_hint = JStormUtils.parseInt(conf.get(TOPOLOGY_BOLT_PARALLELISM_HINT), 2);
builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME, new SequenceSpout(), spout_Parallelism_hint);
boolean isEnableSplit = JStormUtils.parseBoolean(conf.get("enable.split"), false);
      .localOrShuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
      .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, SequenceTopologyDef.TRADE_STREAM_ID);
boolean kryoEnable = JStormUtils.parseBoolean(conf.get("kryo.enable"), false);
if (kryoEnable) {
  System.out.println("Use Kryo ");
  boolean useJavaSer = JStormUtils.parseBoolean(conf.get("fall.back.on.java.serialization"), true);
  Config.setFallBackOnJavaSerialization(conf, useJavaSer);
  Config.registerSerialization(conf, TradeCustomer.class);
  Config.registerSerialization(conf, Pair.class);
int ackerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), 1);
Config.setNumAckers(conf, ackerNum);
int workerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS), 20);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);

代码示例来源:origin: openimaj/openimaj

public static void main(String[] args) {
    final Config conf = new Config();
    conf.setDebug(false);
    conf.setNumWorkers(2);
    conf.setMaxSpoutPending(1);
    conf.setFallBackOnJavaSerialization(false);
    conf.setSkipMissingKryoRegistrations(false);
    final LocalCluster cluster = new LocalCluster();
    final TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("randomSpout1", new RandomFieldSpout(2, 0, 0, 1)); // (nfields,seed,min,max)
    builder.setSpout("randomSpout2", new RandomFieldSpout(2, 10, 0, 1)); // (nfields,seed,min,max)
    JoinBolt.connectNewBolt(builder);
    final StormTopology topology = builder.createTopology();
    cluster.submitTopology("playTopology", conf, topology);
    Utils.sleep(10000);
    cluster.killTopology("playTopology");
    cluster.shutdown();

  }
}

代码示例来源:origin: alibaba/jstorm

public static void test() {
  
  String[] className = Thread.currentThread().getStackTrace()[1].getClassName().split("\\.");
  String topologyName = className[className.length - 1];
  
  try {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("integer", new RandomIntegerSpout(), 1);
    builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
        .shuffleGrouping("integer");
    builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
        .shuffleGrouping("slidingsum");
    builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
    
    conf.setDebug(true);
    
    JStormHelper.runTopology(builder.createTopology(), topologyName, conf, 60,
        new JStormHelper.CheckAckedFail(conf), isLocal);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.fillInStackTrace();
    Assert.fail("Failed to submit topology");
  }
}

代码示例来源:origin: alibaba/jstorm

public static void test() {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(SPOUT_NAME, new RandomIntegerSpout(), 2);
  builder.setBolt(BOLT1_NAME, new CheckBolt(false), 3).localOrShuffleGrouping(SPOUT_NAME);
  builder.setBolt(BOLT2_NAME, new CheckBolt(true), 3).localOrShuffleGrouping(SPOUT_NAME);
  Config conf = new Config();
  conf.setDebug(true);
    conf.setNumWorkers(workerNumber);

代码示例来源:origin: alibaba/jstorm

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    Map config = new Config();
    config.put(ConfigExtension.TOPOLOGY_MASTER_USER_DEFINED_STREAM_CLASS, "com.alipay.dw.jstorm.example.tm.TMUdfHandler");
    config.put(Config.TOPOLOGY_WORKERS, 2);

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("TMUdfSpout", new TMUdfSpout(), 2);
    builder.setBolt("TMUdfBolt", new TMUdfBolt(), 4);
    StormTopology topology = builder.createTopology();

    StormSubmitter.submitTopology("TMUdfTopology", config, topology);
  }
}

代码示例来源:origin: wurstmeister/storm-kafka-0.8-plus-test

public static void main(String[] args) throws Exception {

    String kafkaZk = args[0];
    KafkaSpoutTestTopology kafkaSpoutTestTopology = new KafkaSpoutTestTopology(kafkaZk);
    Config config = new Config();
    config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);

    StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology();
    if (args != null && args.length > 1) {
      String name = args[1];
      String dockerIp = args[2];
      config.setNumWorkers(2);
      config.setMaxTaskParallelism(5);
      config.put(Config.NIMBUS_HOST, dockerIp);
      config.put(Config.NIMBUS_THRIFT_PORT, 6627);
      config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
      config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
      StormSubmitter.submitTopology(name, config, stormTopology);
    } else {
      config.setNumWorkers(2);
      config.setMaxTaskParallelism(2);
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("kafka", config, stormTopology);
    }
  }
}

代码示例来源:origin: alibaba/jstorm

public static void test() {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout("word", new TestWordSpout(), 10);
  builder.setBolt("exclaim1", new ExclamationLoggingBolt(), 3).noneGrouping("word");
  builder.setBolt("exclaim2", new ExclamationLoggingBolt(), 2).shuffleGrouping("exclaim1");
  conf.put(Config.ISOLATION_SCHEDULER_MACHINES, hosts);

代码示例来源:origin: alibaba/jstorm

public static void test() throws Exception {
  TransactionTopologyBuilder builder = new TransactionTopologyBuilder();
  if (isLocal) {
    conf.put("tuple.num.per.batch", 5);
    conf.put("transaction.scheduler.spout", false);
    conf.put("transaction.exactly.cache.type", "default");
  }
  int spoutParallelism = JStormUtils.parseInt(conf.get(SPOUT_PARALLELISM_HINT), 1);
  int splitParallelism = JStormUtils.parseInt(conf.get(SPLIT_PARALLELISM_HINT), 2);
  int countParallelism = JStormUtils.parseInt(conf.get(COUNT_PARALLELISM_HINT), 2);
  boolean isScheduleSpout = JStormUtils.parseBoolean(conf.get("transaction.scheduler.spout"), true);
  if (isScheduleSpout)
    // Generate batch by configured time. "transaction.schedule.batch.delay.ms: 1000 # 1sec"
    builder.setSpout("spout", new ScheduleTxSpout(), spoutParallelism);
  else
    // Generate batch by user when calling emitBarrier
    builder.setSpout("spout", new BasicTxSpout(), spoutParallelism, false);
  builder.setBolt("split", new TxSplitSentence(), splitParallelism).localOrShuffleGrouping("spout");
  builder.setBolt("count", new TxWordCount(), countParallelism).fieldsGrouping("split", new Fields("word"));
  String[] className = Thread.currentThread().getStackTrace()[1].getClassName().split("\\.");
  String topologyName = className[className.length - 1];
  StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
}

代码示例来源:origin: pereferrera/trident-hackaton

public static void main(String[] args) throws Exception {
    Config conf = new Config();

    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("hackaton", conf, buildTopology(drpc));
  }
}

代码示例来源:origin: alibaba/jstorm

hosts = JStormHelper.getSupervisorHosts();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 1);
builder.setBolt(BOLT_NAME, new ExclamationLoggingBolt(), hosts.size()).localFirstGrouping("word")
    .addConfigurations(componentMap);
    conf.setNumWorkers(hosts.size() + 3);
  } else {
    conf.setNumWorkers(hosts.size());

相关文章