org.apache.flume.Context.put()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(12.3k)|赞(0)|评价(0)|浏览(130)

本文整理了Java中org.apache.flume.Context.put()方法的一些代码示例,展示了Context.put()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Context.put()方法的具体详情如下:
包路径:org.apache.flume.Context
类名称:Context
方法名:put

Context.put介绍

[英]Associates the specified value with the specified key in this context. If the context previously contained a mapping for the key, the old value is replaced by the specified value.
[中]在此上下文中将指定的值与指定的键相关联。如果上下文先前包含键的映射,则旧值将替换为指定值。

代码示例

代码示例来源:origin: apache/flume

@Override
public void configure(Context context) {
 if (context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES) == null) {
  context.put(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES, 
    "org.apache.solr.client.solrj.SolrServerException");      
 }
 super.configure(context);
}

代码示例来源:origin: kaaproject/kaa

@Override
public BucketWriter load(HdfsSinkKey key) throws Exception {
 HDFSWriter hdfsWriter = new HDFSDataStream();
 String path = key.getPath() + Path.SEPARATOR + filePrefix;
 context.put("serializer", AvroKaaEventSerializer.Builder.class.getName());
 LOG.info("Creating new writer for key: " + key);
 return new BucketWriter(rollInterval, rollSize, rollCount,
   batchSize, defaultBlockSize, context, path, hdfsWriter,
   timedRollerPool, proxyTicket, sinkCounter);
}

代码示例来源:origin: kaaproject/kaa

private Context prepareContext() throws IOException {
 Context context = new Context();
 // Channel parameters
 context.put("capacity", "100000000");
 context.put("transactionCapacity", "10000000");
 context.put("keep-alive", "1");
 context.put("port", "31333");
 context.put("bind", "localhost");
 context.put(ConfigurationConstants.CONFIG_ROOT_HDFS_PATH, fileSystem.makeQualified(new Path("/logs")).toString());
 context.put(ConfigurationConstants.CONFIG_HDFS_TXN_EVENT_MAX, "100000");
 context.put(ConfigurationConstants.CONFIG_HDFS_THREAD_POOL_SIZE, "20");
 context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_TIMER_POOL_SIZE, "1");
 context.put(ConfigurationConstants.CONFIG_HDFS_MAX_OPEN_FILES, "5000");
 context.put(ConfigurationConstants.CONFIG_HDFS_CALL_TIMEOUT, "10000");
 context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_INTERVAL, "86400000"); // milliseconds
 context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_SIZE, "0"); // bytes (0 means don't roll by size)
 context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_COUNT, "5500000"); // records count
 context.put(ConfigurationConstants.CONFIG_HDFS_BATCH_SIZE, "" + flushRecordsCount); // flush records count
 context.put(ConfigurationConstants.CONFIG_HDFS_DEFAULT_BLOCK_SIZE, "" + blockSize); // default dfs block size in bytes
 context.put(ConfigurationConstants.CONFIG_HDFS_FILE_PREFIX, "data");
 context.put(ConfigurationConstants.CONFIG_STATISTICS_INTERVAL, "10");
 context.put("serializer.compressionCodec", "null");
 context.put("serializer.avro.schema.source", "local");
 context.put("serializer.avro.schema.local.root", logSchemasRootDir.getAbsolutePath());
 return context;
}

代码示例来源:origin: apache/flume

private void filterValue(Context c, String contextKey) {
 for (ConfigFilter configFilter : configFiltersInstances) {
  try {
   Pattern pattern = configFilterPatternCache.get(configFilter.getName());
   String currentValue = c.getString(contextKey);
   Matcher matcher = pattern.matcher(currentValue);
   String filteredValue = currentValue;
   while (matcher.find()) {
    String key = matcher.group("key");
    LOGGER.debug("Replacing {} from config filter {}", key, configFilter.getName());
    String filtered = configFilter.filter(key);
    if (filtered == null) {
     continue;
    }
    String fullMatch = matcher.group();
    filteredValue = filteredValue.replace(fullMatch, filtered);
   }
   c.put(contextKey, filteredValue);
  } catch (Exception e) {
   e.printStackTrace();
   LOGGER.error("Error while matching and filtering configFilter: {} and key: {}",
     new Object[]{configFilter.getName(), contextKey, e});
  }
 }
}

代码示例来源:origin: apache/flume

for (String key : keys) {
 if (key.startsWith(CONF_MONITOR_PREFIX)) {
  context.put(key.substring(CONF_MONITOR_PREFIX.length()),
    systemProps.getProperty(key));

代码示例来源:origin: apache/flume

private boolean addComponentConfig(
  String key, String value, String configPrefix, Map<String, Context> contextMap
) {
 ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix);
 if (parsed != null) {
  String name = parsed.getComponentName().trim();
  LOGGER.info("Processing:{}", name);
  Context context = contextMap.get(name);
  if (context == null) {
   LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey());
   context = new Context();
   contextMap.put(name, context);
  }
  context.put(parsed.getConfigKey(), value);
  return true;
 }
 return false;
}

代码示例来源:origin: apache/flume

private void handleDeprecatedParameter(Context context, String newParam, String oldParam) {
 if (!context.containsKey(newParam) && context.containsKey(oldParam)) {
  context.put(newParam, context.getString(oldParam));
 }
}

代码示例来源:origin: apache/flume

private Context prepareDefaultContext() {
 // Prepares a default context with Kafka Server Properties
 Context context = new Context();
 context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
 context.put(BATCH_SIZE, "1");
 return context;
}

代码示例来源:origin: apache/flume

private void translateOldProps(Context ctx) {
  ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
  logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
   throw new ConfigurationException("Bootstrap Servers must be specified");
  } else {
   ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
   logger.warn("{} is deprecated. Please use the parameter {}",
         BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
  String oldGroupId = ctx.getString(GROUP_ID_FLUME);
  if (oldGroupId != null  && !oldGroupId.isEmpty()) {
   ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
   logger.warn("{} is deprecated. Please use the parameter {}",
         GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
    auto = "latest";
   ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
   logger.warn("{} is deprecated. Please use the parameter {}",
         READ_SMALLEST_OFFSET,

代码示例来源:origin: apache/flume

private void translateOldProps(Context ctx) {
  ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
  logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
   throw new ConfigurationException("Bootstrap Servers must be specified");
  } else {
   ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
   logger.warn("{} is deprecated. Please use the parameter {}",
         BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
  String oldBatchSize = ctx.getString(OLD_BATCH_SIZE);
  if ( oldBatchSize != null  && !oldBatchSize.isEmpty())  {
   ctx.put(BATCH_SIZE, oldBatchSize);
   logger.warn("{} is deprecated. Please use the parameter {}", OLD_BATCH_SIZE, BATCH_SIZE);
      KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
  if (!(requiredKey == null) && !(requiredKey.isEmpty())) {
   ctx.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, requiredKey);
   logger.warn("{} is deprecated. Please use the parameter {}", REQUIRED_ACKS_FLUME_KEY,
       KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG);

代码示例来源:origin: apache/flume

@Test
public void testKafkaProperties() {
 KafkaSink kafkaSink = new KafkaSink();
 Context context = new Context();
 context.put(KAFKA_PREFIX + TOPIC_CONFIG, "");
 context.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
       "override.default.serializer");
 context.put("kafka.producer.fake.property", "kafka.property.value");
 context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
 context.put("brokerList", "real-broker-list");
 Configurables.configure(kafkaSink, context);
 Properties kafkaProps = kafkaSink.getKafkaProps();
 //check that we have defaults set
 assertEquals(kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
        DEFAULT_KEY_SERIALIZER);
 //check that kafka properties override the default and get correct name
 assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
        "override.default.serializer");
 //check that any kafka-producer property gets in
 assertEquals(kafkaProps.getProperty("fake.property"),
        "kafka.property.value");
 //check that documented property overrides defaults
 assertEquals(kafkaProps.getProperty("bootstrap.servers"),
        "localhost:9092,localhost:9092");
}

代码示例来源:origin: apache/flume

@Test
public void testOldProperties() {
 KafkaSink kafkaSink = new KafkaSink();
 Context context = new Context();
 context.put("topic", "test-topic");
 context.put(OLD_BATCH_SIZE, "300");
 context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092");
 context.put(REQUIRED_ACKS_FLUME_KEY, "all");
 Configurables.configure(kafkaSink, context);
 Properties kafkaProps = kafkaSink.getKafkaProps();
 assertEquals(kafkaSink.getTopic(), "test-topic");
 assertEquals(kafkaSink.getBatchSize(), 300);
 assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
        "localhost:9092,localhost:9092");
 assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
}

代码示例来源:origin: apache/flume

@Test
public void testStaticTopic() {
 Context context = prepareDefaultContext();
 // add the static topic
 context.put(TOPIC_CONFIG, TestConstants.STATIC_TOPIC);
 String msg = "static-topic-test";
 try {
  Sink.Status status = prepareAndSend(context, msg);
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, TestConstants.STATIC_TOPIC);
}

代码示例来源:origin: apache/flume

/**
 * Tests that sub-properties (kafka.producer.*) apply correctly across multiple invocations
 * of configure() (fix for FLUME-2857).
 */
@Test
public void testDefaultSettingsOnReConfigure() {
 String sampleProducerProp = "compression.type";
 String sampleProducerVal = "snappy";
 Context context = prepareDefaultContext();
 context.put(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX + sampleProducerProp, sampleProducerVal);
 KafkaSink kafkaSink = new KafkaSink();
 Configurables.configure(kafkaSink, context);
 Assert.assertEquals(sampleProducerVal,
   kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
 context = prepareDefaultContext();
 Configurables.configure(kafkaSink, context);
 Assert.assertNull(kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
}

代码示例来源:origin: apache/flume

if (bufferMaxLineLength != null && deserializerType != null &&
  deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) {
 deserializerContext.put(LineDeserializer.MAXLINE_KEY,
   bufferMaxLineLength.toString());

代码示例来源:origin: apache/flume

private void doPartitionErrors(PartitionOption option, Sink kafkaSink) throws Exception {
 Context context = prepareDefaultContext();
 context.put(KafkaSinkConstants.PARTITION_HEADER_NAME, "partition-header");

代码示例来源:origin: apache/flume

Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, "false");
context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, "foo");

代码示例来源:origin: apache/flume

Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(AVRO_EVENT, "true");
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();

代码示例来源:origin: apache/flume

@Test
public void testReplaceSubStringOfTopicWithHeaders() {
 String topic = TestConstants.HEADER_1_VALUE + "-topic";
 Sink kafkaSink = new KafkaSink();
 Context context = prepareDefaultContext();
 context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
 Configurables.configure(kafkaSink, context);
 Channel memoryChannel = new MemoryChannel();
 Configurables.configure(memoryChannel, context);
 kafkaSink.setChannel(memoryChannel);
 kafkaSink.start();
 String msg = "test-replace-substring-of-topic-with-headers";
 Map<String, String> headers = new HashMap<>();
 headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
 Transaction tx = memoryChannel.getTransaction();
 tx.begin();
 Event event = EventBuilder.withBody(msg.getBytes(), headers);
 memoryChannel.put(event);
 tx.commit();
 tx.close();
 try {
  Sink.Status status = kafkaSink.process();
  if (status == Sink.Status.BACKOFF) {
   fail("Error Occurred");
  }
 } catch (EventDeliveryException ex) {
  // ignore
 }
 checkMessageArrived(msg, topic);
}

代码示例来源:origin: apache/flume

Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, customTopicHeader);
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();

相关文章