tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ~/app/
export ZK_HOME=/home/bizzbee/app/zookeeper-3.4.5-cdh5.7.0
export PATH=$ZK_HOME/bin:$PATH
#source 一下
dataDir=/home/bizzbee/app/zk_tmp
```
* 启动zk
`./zkServer.sh start`
## 2.单节点单broker
* 解压kafka
```
tar -zxvf kafka_2.11-2.1.1.tgz -C ~/app/
```
* 修改配置`config/server.properties
listeners = PLAINTEXT://spark:9092
log.dirs=/home/bizzbee/app/kafka-logs
#log地址同样是自己创建目录,不能在/tmp下面
zookeeper.connect=spark:2181
* 启动kafka,启动之前配置kafka环境到配置文件,并source生效。
kafka-server-start.sh $KAFKA_HOME/config/server.properties
* 创建一个topic(之前启动的zookeeper不要关掉)
kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 1 --partitions 1 --topic bizzbee-topic
* 查看所有topic
[bizzbee@spark config]$ kafka-topics.sh --list --zookeeper spark:2181
bizzbee-topic
* 发送消息(生产消息)
[bizzbee@spark config]$ kafka-console-producer.sh --broker-list spark:9092 --topic bizzbee-topic
* 接受消息(消费)
kafka-console-consumer.sh --bootstrap-server spark:9092 --topic bizzbee-topic --from-beginning
![](http://img.saoniuhuo.com/images/202110/61361633587074919.jpg)
对于**消费者**,kafka中有两个设置的地方:对于老的消费者,由**\--zookeeper参数**设置;对于新的消费者,由**\--bootstrap-server参数**设置
如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中
查看的方法是使用./zookeeper-client,然后 ls /consumers/\[group\_id\]/offsets/\[topic\]/\[broker\_id-part\_id\],这个是查看某个group\_id的某个topic的offset
如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中。
> 所以注意,新的消费者应该是kafka所在的9092端口。老的消费者应该从zk的2181端口拿消息。
* 在生产端输入消息
![](http://img.saoniuhuo.com/images/202110/21111633587075199.jpg)
* from-beginning的意思是,消费者启动时之前已经消费的消息也会接收到。就是从最开始接收。
* 查看所有topic详细信息。
kafka-topics.sh --describe --zookeer spark:2181
## 3.单节点多broker部署
* 复制三个之前的配置文件。
[bizzbee@spark config]$ cp server.properties server1.properties
[bizzbee@spark config]$ cp server.properties server2.properties
[bizzbee@spark config]$ cp server.properties server3.properties
* 三个都进行修改(一下是需要修改的)
broker.id=1
#端口9093,94,95
listeners = PLAINTEXT://spark:9093
log.dirs=/home/bizzbee/app/tmp/kafka-logs1
* 这次以后台daemon的方式启动kafka(不占用终端)
kafka-server-start.sh -daemon $KAFKA_HOME/config/server1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server3.properties &
* 创建三个副本的topic
kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 3 --partitions 1 --topic bizzbee-replicated-topic
* 查看刚创建的topic
[bizzbee@spark config]$ kafka-topics.sh --describe --zookeeper spark:2181 --topic bizzbee-replicated-topic
Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: bizzbee-replicated-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
* 创建生产者:
kafka-console-producer.sh --broker-list spark:9093,spark:9094,spark:9095 --topic bizzbee-replicated-topic
* 创建消费者:
kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic
> 这里从9093,94,95 都可以取到消息。
内容来源于网络,如有侵权,请联系作者删除!