org.apache.storm.Config.put()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(145)

本文整理了Java中org.apache.storm.Config.put()方法的一些代码示例,展示了Config.put()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Config.put()方法的具体详情如下:
包路径:org.apache.storm.Config
类名称:Config
方法名:put

Config.put介绍

暂无

代码示例

代码示例来源:origin: apache/storm

/**
 * Set the max heap size allow per worker for this topology.
 *
 * @param size the maximum heap size for a worker.
 */
public void setTopologyWorkerMaxHeapSize(Number size) {
  if (size != null) {
    this.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, size);
  }
}

代码示例来源:origin: apache/storm

/**
 * Initialize a fake config.
 * @return conf
 */
private static Config initializedConfig() {
  Config conf = new Config();
  conf.putAll(Utils.readDefaultConfig());
  ArrayList<String> nimbusSeeds = new ArrayList<>();
  nimbusSeeds.add(NIMBUS_HOST);
  conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
  conf.put(Config.NIMBUS_THRIFT_PORT, NIMBUS_PORT);
  return conf;
}

代码示例来源:origin: apache/storm

/**
 * Sets the maximum number of states that will be searched in the constraint solver strategy.
 *
 * @param numStates maximum number of stats to search.
 */
public void setTopologyConstraintsMaxStateSearch(int numStates) {
  this.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, numStates);
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("args: runDurationSec topConfFile");
      return;
    }

    final Integer durationSec = Integer.parseInt(args[0]);
    Config topoConf = new Config();
    topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
    topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
    topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
    topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
    topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);

    topoConf.putAll(Utils.readCommandLineOpts());
    // Submit to Storm cluster
    Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
  }
}

代码示例来源:origin: apache/storm

/**
 * Set the priority for a topology.
 *
 * @param priority
 */
public void setTopologyPriority(int priority) {
  this.put(Config.TOPOLOGY_PRIORITY, priority);
}

代码示例来源:origin: apache/storm

public static Config createClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
                     Map<String, Map<String, Number>> pools) {
  Config config = new Config();
  config.putAll(Utils.readDefaultConfig());
  config.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, GenSupervisorsDnsToSwitchMapping.class.getName());
  config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
  config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
  config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, compPcore);
  config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, compOffHeap);
  config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, compOnHeap);
  if (pools != null) {
    config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS, pools);
  }
  return config;
}

代码示例来源:origin: apache/storm

public void setTopologyStrategy(String strategy) {
  this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, strategy);
}

代码示例来源:origin: apache/storm

public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
                          int launchTime, int priority, String user, double maxHeapSize) {
  Config conf = new Config();
  conf.putAll(config);
  conf.put(Config.TOPOLOGY_PRIORITY, priority);
  conf.put(Config.TOPOLOGY_NAME, name);
  conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
  conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
  TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
    0, genExecsAndComps(topology), launchTime, user);
  return topo;
}

代码示例来源:origin: apache/storm

/**
 * {@inheritDoc}
 */
@Override
public Map<String, Object> getComponentConfiguration() {
  Config conf = new Config();
  // add tick tuple each second to force acknowledgement of pending tuples.
  conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
  return conf;
}

代码示例来源:origin: apache/storm

public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
                     int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {
  Config conf = new Config();
  conf.putAll(config);
  conf.put(Config.TOPOLOGY_NAME, name);
  StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
  TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
      3, genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime, "user");
  return topo;
}

代码示例来源:origin: apache/storm

/**
 * {@inheritDoc}
 */
@Override
public Map<String, Object> getComponentConfiguration() {
  Config conf = new Config();
  conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
  return conf;
}

代码示例来源:origin: apache/storm

@BeforeAll
public static void initConf() {
  defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
  defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
}

代码示例来源:origin: apache/storm

public static Config createGrasClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
                       Map<String, Map<String, Number>> pools, Map<String, Double> genericResourceMap) {
  Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
  config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, genericResourceMap);
  config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
  return config;
}

代码示例来源:origin: apache/storm

public static void main(String[] args) throws Exception {
  Config conf = new Config();
  conf.setMaxSpoutPending(20);
  conf.put(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT, 100);
  // window-state table should already be created with cf:tuples column
  HBaseWindowsStoreFactory windowStoreFactory =
    new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
  String topoName = "wordCounterWithWindowing";
  if (args.length > 0) {
    topoName = args[0];
  }
  conf.setNumWorkers(3);
  StormSubmitter.submitTopologyWithProgressBar(topoName, conf, buildTopology(windowStoreFactory));
}

代码示例来源:origin: apache/storm

@Test
public void testHeterogeneousClusterwithGras() {
  Config grasClusterConfig = (Config) defaultTopologyConf.clone();
  grasClusterConfig.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
  testHeterogeneousCluster(grasClusterConfig, GenericResourceAwareStrategy.class.getSimpleName());
}

代码示例来源:origin: apache/storm

@Test
public void testMalformedYaml() throws Exception {
  File temp = File.createTempFile("FileLoader", ".yaml");
  temp.deleteOnExit();
  FileWriter fw = new FileWriter(temp);
  String outputData = "ThisIsNotValidYaml";
  fw.write(outputData, 0, outputData.length());
  fw.flush();
  fw.close();
  Config conf = new Config();
  conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + temp.getCanonicalPath());
  FileConfigLoader testLoader = new FileConfigLoader(conf);
  Map<String, Object> result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
  Assert.assertNull("Unexpectedly returned a map", result);
}

代码示例来源:origin: apache/storm

@Test
public void testMalformedYaml() throws Exception {
  // This is a test where we are configured to point right at a single artifact
  Config conf = new Config();
  conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI,
       ARTIFACTORY_HTTP_SCHEME_PREFIX + "bogushost.yahoo.com:9999/location/of/this/artifact");
  conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString());
  ArtifactoryConfigLoaderMock loaderMock = new ArtifactoryConfigLoaderMock(conf);
  loaderMock.setData("Anything", "/location/of/this/artifact", "{ \"downloadUri\": \"anything\"}");
  loaderMock.setData(null, null, "ThisIsNotValidYaml");
  Map<String, Object> ret = loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
  Assert.assertNull("Unexpectedly returned a map", ret);
}

代码示例来源:origin: apache/storm

@Test
public void testFileNotThere() {
  Config conf = new Config();
  conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX + "/file/not/exist/");
  FileConfigLoader testLoader = new FileConfigLoader(conf);
  Map<String, Object> result = testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
  Assert.assertNull("Unexpectedly returned a map", result);
}

代码示例来源:origin: apache/storm

@Test
@IntegrationTest
public void testCompleteTopologyNettySimulated() throws Exception {
  Config daemonConf = new Config();
  daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
  MkClusterParam param = new MkClusterParam();
  param.setSupervisors(4);
  param.setDaemonConf(daemonConf);
  Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}

代码示例来源:origin: apache/storm

@Test
@IntegrationTest
public void testCompleteTopologyNetty() throws Exception {
  Config daemonConf = new Config();
  daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
  MkClusterParam param = new MkClusterParam();
  param.setSupervisors(4);
  param.setDaemonConf(daemonConf);
  Testing.withLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}

相关文章