本文整理了Java中org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties
类的一些代码示例,展示了KafkaProducerProperties
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaProducerProperties
类的具体详情如下:
包路径:org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties
类名称:KafkaProducerProperties
[英]Extended producer properties for Kafka binder.
[中]卡夫卡粘合剂的扩展生产者属性。
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
public int getBatchTimeout() {
return this.kafkaProducerProperties.getBatchTimeout();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
public int getBufferSize() {
return this.kafkaProducerProperties.getBufferSize();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
public @NotNull CompressionType getCompressionType() {
return this.kafkaProducerProperties.getCompressionType();
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
String.valueOf(producerProperties.getExtension().getBufferSize()));
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
producerProperties.getExtension().getCompressionType().toString());
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
ProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers['" + BinderHeaders.PARTITION_HEADER + "']"));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);
}
this.producerFactory = producerFactory;
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
public Map<String, String> getConfiguration() {
return this.kafkaProducerProperties.getConfiguration();
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
public String[] getHeaderPatterns() {
return this.kafkaProducerProperties.getHeaderPatterns();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11-core
@Override
public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
if (this.bindings.containsKey(channelName) && this.bindings.get(channelName).getProducer() != null) {
return this.bindings.get(channelName).getProducer();
}
else {
return new KafkaProducerProperties();
}
}
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
public boolean isSync() {
return this.kafkaProducerProperties.isSync();
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
public Expression getMessageKeyExpression() {
return this.kafkaProducerProperties.getMessageKeyExpression();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka
String.valueOf(producerProperties.getExtension().getBufferSize()));
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
producerProperties.getExtension().getCompressionType().toString());
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
ProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers['" + BinderHeaders.PARTITION_HEADER + "']"));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);
}
this.producerFactory = producerFactory;
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
public Map<String, String> getConfiguration() {
return this.kafkaProducerProperties.getConfiguration();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
public String[] getHeaderPatterns() {
return this.kafkaProducerProperties.getHeaderPatterns();
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
@Override
@SuppressWarnings("unchecked")
protected Binding<KStream<Object, Object>> doBindProducer(String name, KStream<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
Serde<?> keySerde = this.keyValueSerdeResolver.getOuboundKeySerde(properties.getExtension());
Serde<?> valueSerde = this.keyValueSerdeResolver.getOutboundValueSerde(properties, properties.getExtension());
to(properties.isUseNativeEncoding(), name, outboundBindTarget, (Serde<Object>) keySerde, (Serde<Object>) valueSerde);
return new DefaultBinding<>(name, null, outboundBindTarget, null);
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
public boolean isSync() {
return this.kafkaProducerProperties.isSync();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka-core
public Expression getMessageKeyExpression() {
return this.kafkaProducerProperties.getMessageKeyExpression();
}
代码示例来源:origin: spring-cloud/spring-cloud-stream-binder-kafka
String.valueOf(producerProperties.getExtension().getBufferSize()));
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
producerProperties.getExtension().getCompressionType().toString());
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka11
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
ProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);
}
this.producerFactory = producerFactory;
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-stream-binder-kafka
Map<String, String> configuration = this.transactionManager == null ? dlqProducerProperties.getConfiguration()
: this.configurationProperties.getTransaction().getProducer().getConfiguration();
if (record.key() != null && !record.key().getClass().isInstance(byte[].class)) {
内容来源于网络,如有侵权,请联系作者删除!