本文整理了Java中org.apache.flume.Context.put()
方法的一些代码示例,展示了Context.put()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Context.put()
方法的具体详情如下:
包路径:org.apache.flume.Context
类名称:Context
方法名:put
[英]Associates the specified value with the specified key in this context. If the context previously contained a mapping for the key, the old value is replaced by the specified value.
[中]在此上下文中将指定的值与指定的键相关联。如果上下文先前包含键的映射,则旧值将替换为指定值。
代码示例来源:origin: apache/flume
@Override
public void configure(Context context) {
if (context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES) == null) {
context.put(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES,
"org.apache.solr.client.solrj.SolrServerException");
}
super.configure(context);
}
代码示例来源:origin: kaaproject/kaa
@Override
public BucketWriter load(HdfsSinkKey key) throws Exception {
HDFSWriter hdfsWriter = new HDFSDataStream();
String path = key.getPath() + Path.SEPARATOR + filePrefix;
context.put("serializer", AvroKaaEventSerializer.Builder.class.getName());
LOG.info("Creating new writer for key: " + key);
return new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, defaultBlockSize, context, path, hdfsWriter,
timedRollerPool, proxyTicket, sinkCounter);
}
代码示例来源:origin: kaaproject/kaa
private Context prepareContext() throws IOException {
Context context = new Context();
// Channel parameters
context.put("capacity", "100000000");
context.put("transactionCapacity", "10000000");
context.put("keep-alive", "1");
context.put("port", "31333");
context.put("bind", "localhost");
context.put(ConfigurationConstants.CONFIG_ROOT_HDFS_PATH, fileSystem.makeQualified(new Path("/logs")).toString());
context.put(ConfigurationConstants.CONFIG_HDFS_TXN_EVENT_MAX, "100000");
context.put(ConfigurationConstants.CONFIG_HDFS_THREAD_POOL_SIZE, "20");
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_TIMER_POOL_SIZE, "1");
context.put(ConfigurationConstants.CONFIG_HDFS_MAX_OPEN_FILES, "5000");
context.put(ConfigurationConstants.CONFIG_HDFS_CALL_TIMEOUT, "10000");
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_INTERVAL, "86400000"); // milliseconds
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_SIZE, "0"); // bytes (0 means don't roll by size)
context.put(ConfigurationConstants.CONFIG_HDFS_ROLL_COUNT, "5500000"); // records count
context.put(ConfigurationConstants.CONFIG_HDFS_BATCH_SIZE, "" + flushRecordsCount); // flush records count
context.put(ConfigurationConstants.CONFIG_HDFS_DEFAULT_BLOCK_SIZE, "" + blockSize); // default dfs block size in bytes
context.put(ConfigurationConstants.CONFIG_HDFS_FILE_PREFIX, "data");
context.put(ConfigurationConstants.CONFIG_STATISTICS_INTERVAL, "10");
context.put("serializer.compressionCodec", "null");
context.put("serializer.avro.schema.source", "local");
context.put("serializer.avro.schema.local.root", logSchemasRootDir.getAbsolutePath());
return context;
}
代码示例来源:origin: apache/flume
private void filterValue(Context c, String contextKey) {
for (ConfigFilter configFilter : configFiltersInstances) {
try {
Pattern pattern = configFilterPatternCache.get(configFilter.getName());
String currentValue = c.getString(contextKey);
Matcher matcher = pattern.matcher(currentValue);
String filteredValue = currentValue;
while (matcher.find()) {
String key = matcher.group("key");
LOGGER.debug("Replacing {} from config filter {}", key, configFilter.getName());
String filtered = configFilter.filter(key);
if (filtered == null) {
continue;
}
String fullMatch = matcher.group();
filteredValue = filteredValue.replace(fullMatch, filtered);
}
c.put(contextKey, filteredValue);
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("Error while matching and filtering configFilter: {} and key: {}",
new Object[]{configFilter.getName(), contextKey, e});
}
}
}
代码示例来源:origin: apache/flume
for (String key : keys) {
if (key.startsWith(CONF_MONITOR_PREFIX)) {
context.put(key.substring(CONF_MONITOR_PREFIX.length()),
systemProps.getProperty(key));
代码示例来源:origin: apache/flume
private boolean addComponentConfig(
String key, String value, String configPrefix, Map<String, Context> contextMap
) {
ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix);
if (parsed != null) {
String name = parsed.getComponentName().trim();
LOGGER.info("Processing:{}", name);
Context context = contextMap.get(name);
if (context == null) {
LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey());
context = new Context();
contextMap.put(name, context);
}
context.put(parsed.getConfigKey(), value);
return true;
}
return false;
}
代码示例来源:origin: apache/flume
private void handleDeprecatedParameter(Context context, String newParam, String oldParam) {
if (!context.containsKey(newParam) && context.containsKey(oldParam)) {
context.put(newParam, context.getString(oldParam));
}
}
代码示例来源:origin: apache/flume
private Context prepareDefaultContext() {
// Prepares a default context with Kafka Server Properties
Context context = new Context();
context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
context.put(BATCH_SIZE, "1");
return context;
}
代码示例来源:origin: apache/flume
private void translateOldProps(Context ctx) {
ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
throw new ConfigurationException("Bootstrap Servers must be specified");
} else {
ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
logger.warn("{} is deprecated. Please use the parameter {}",
BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
String oldGroupId = ctx.getString(GROUP_ID_FLUME);
if (oldGroupId != null && !oldGroupId.isEmpty()) {
ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId);
logger.warn("{} is deprecated. Please use the parameter {}",
GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
auto = "latest";
ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto);
logger.warn("{} is deprecated. Please use the parameter {}",
READ_SMALLEST_OFFSET,
代码示例来源:origin: apache/flume
private void translateOldProps(Context ctx) {
ctx.put(TOPIC_CONFIG, ctx.getString("topic"));
logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG);
throw new ConfigurationException("Bootstrap Servers must be specified");
} else {
ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
logger.warn("{} is deprecated. Please use the parameter {}",
BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG);
String oldBatchSize = ctx.getString(OLD_BATCH_SIZE);
if ( oldBatchSize != null && !oldBatchSize.isEmpty()) {
ctx.put(BATCH_SIZE, oldBatchSize);
logger.warn("{} is deprecated. Please use the parameter {}", OLD_BATCH_SIZE, BATCH_SIZE);
KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
if (!(requiredKey == null) && !(requiredKey.isEmpty())) {
ctx.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, requiredKey);
logger.warn("{} is deprecated. Please use the parameter {}", REQUIRED_ACKS_FLUME_KEY,
KAFKA_PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG);
代码示例来源:origin: apache/flume
@Test
public void testKafkaProperties() {
KafkaSink kafkaSink = new KafkaSink();
Context context = new Context();
context.put(KAFKA_PREFIX + TOPIC_CONFIG, "");
context.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"override.default.serializer");
context.put("kafka.producer.fake.property", "kafka.property.value");
context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
context.put("brokerList", "real-broker-list");
Configurables.configure(kafkaSink, context);
Properties kafkaProps = kafkaSink.getKafkaProps();
//check that we have defaults set
assertEquals(kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
DEFAULT_KEY_SERIALIZER);
//check that kafka properties override the default and get correct name
assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
"override.default.serializer");
//check that any kafka-producer property gets in
assertEquals(kafkaProps.getProperty("fake.property"),
"kafka.property.value");
//check that documented property overrides defaults
assertEquals(kafkaProps.getProperty("bootstrap.servers"),
"localhost:9092,localhost:9092");
}
代码示例来源:origin: apache/flume
@Test
public void testOldProperties() {
KafkaSink kafkaSink = new KafkaSink();
Context context = new Context();
context.put("topic", "test-topic");
context.put(OLD_BATCH_SIZE, "300");
context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092");
context.put(REQUIRED_ACKS_FLUME_KEY, "all");
Configurables.configure(kafkaSink, context);
Properties kafkaProps = kafkaSink.getKafkaProps();
assertEquals(kafkaSink.getTopic(), "test-topic");
assertEquals(kafkaSink.getBatchSize(), 300);
assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
"localhost:9092,localhost:9092");
assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
}
代码示例来源:origin: apache/flume
@Test
public void testStaticTopic() {
Context context = prepareDefaultContext();
// add the static topic
context.put(TOPIC_CONFIG, TestConstants.STATIC_TOPIC);
String msg = "static-topic-test";
try {
Sink.Status status = prepareAndSend(context, msg);
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, TestConstants.STATIC_TOPIC);
}
代码示例来源:origin: apache/flume
/**
* Tests that sub-properties (kafka.producer.*) apply correctly across multiple invocations
* of configure() (fix for FLUME-2857).
*/
@Test
public void testDefaultSettingsOnReConfigure() {
String sampleProducerProp = "compression.type";
String sampleProducerVal = "snappy";
Context context = prepareDefaultContext();
context.put(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX + sampleProducerProp, sampleProducerVal);
KafkaSink kafkaSink = new KafkaSink();
Configurables.configure(kafkaSink, context);
Assert.assertEquals(sampleProducerVal,
kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
context = prepareDefaultContext();
Configurables.configure(kafkaSink, context);
Assert.assertNull(kafkaSink.getKafkaProps().getProperty(sampleProducerProp));
}
代码示例来源:origin: apache/flume
if (bufferMaxLineLength != null && deserializerType != null &&
deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) {
deserializerContext.put(LineDeserializer.MAXLINE_KEY,
bufferMaxLineLength.toString());
代码示例来源:origin: apache/flume
private void doPartitionErrors(PartitionOption option, Sink kafkaSink) throws Exception {
Context context = prepareDefaultContext();
context.put(KafkaSinkConstants.PARTITION_HEADER_NAME, "partition-header");
代码示例来源:origin: apache/flume
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, "false");
context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, "foo");
代码示例来源:origin: apache/flume
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(AVRO_EVENT, "true");
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
代码示例来源:origin: apache/flume
@Test
public void testReplaceSubStringOfTopicWithHeaders() {
String topic = TestConstants.HEADER_1_VALUE + "-topic";
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
Configurables.configure(memoryChannel, context);
kafkaSink.setChannel(memoryChannel);
kafkaSink.start();
String msg = "test-replace-substring-of-topic-with-headers";
Map<String, String> headers = new HashMap<>();
headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
Transaction tx = memoryChannel.getTransaction();
tx.begin();
Event event = EventBuilder.withBody(msg.getBytes(), headers);
memoryChannel.put(event);
tx.commit();
tx.close();
try {
Sink.Status status = kafkaSink.process();
if (status == Sink.Status.BACKOFF) {
fail("Error Occurred");
}
} catch (EventDeliveryException ex) {
// ignore
}
checkMessageArrived(msg, topic);
}
代码示例来源:origin: apache/flume
Sink kafkaSink = new KafkaSink();
Context context = prepareDefaultContext();
context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, customTopicHeader);
Configurables.configure(kafkaSink, context);
Channel memoryChannel = new MemoryChannel();
内容来源于网络,如有侵权,请联系作者删除!