kafka生产者和消费者使用kafka代理和用户名和密码,而不是ssl证书

qv7cva1a  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(521)

如何实现camel-kafka生产者和消费者使用kafka代理和用户名和密码,而不是ssl证书。
我的Kafka经纪人配置是这样的

{
    "kafka_brokers_sasl":
    "kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093"
  ,
  "user": "**********",
  "password": "********"
}
mum43rcc

mum43rcc1#

为了连接到消息中心,您至少需要apachecamel2.20。
然后将以下查询参数添加到uri:
saslmechanism=普通
安全协议=sasl\u ssl
sslprotocol=tlsv1.2
sslenabledprotocols=tlsv1.2
sasljaasconfig=org.apache.kafka.common.security.plain.plainloginmodule所需用户名=“用户名”密码=“密码”;
例如,对于生产者:

final String brokers = "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093";

final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";

from("direct:in")
.to("kafka:test?" 
    + "brokers=" + brokers
    + "&saslMechanism=PLAIN"  
    + "&securityProtocol=SASL_SSL"
    + "&sslProtocol=TLSv1.2"
    + "&sslEnabledProtocols=TLSv1.2" 
    + "&sslEndpointAlgorithm=HTTPS"
    + "&saslJaasConfig=" + saslJaasConfig);

消费者:

final String brokers = "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093";

final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";

from("kafka:test?" 
    + "brokers=" + brokers
    + "&saslMechanism=PLAIN"  
    + "&securityProtocol=SASL_SSL"
    + "&sslProtocol=TLSv1.2"
    + "&sslEnabledProtocols=TLSv1.2" 
    + "&sslEndpointAlgorithm=HTTPS"
    + "&saslJaasConfig=" + saslJaasConfig
    + "&groupId=mygroup")
.to("stream:out");

相关问题