构建kafka-Java开发环境-生产者

x33g5p2x  于2021-03-14 发布在 其他  
字(5.6k)|赞(0)|评价(0)|浏览(70)
  • 使用idea 创建新的maven工程,使用scala-simple原型。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.bizzbee.spark</groupId>
  <artifactId>spark-train</artifactId>
  <version>1.0</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
  </properties>


  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <version>2.9</version>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>
  • 之后创建一个简单的Kafka生产者类。
package com.bizzbee.spark.kafka;
//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer"

/*
* 简单的生产者示例
*
* */

public class SimpleKafkaProducer{

    public static void main(String[] args) throws InterruptedException {
        // Check arguments length value
//        if(args.length == 0){
//            System.out.println("Enter topic name");
//            return;
//        }

        //Assign topicName to string variable
        String topicName = "bizzbee-replicated-topic";

        // create instance for properties to access producer configs
        Properties props = new Properties();

        //Assign localhost id
        //请一定将服务器ip配到hosts中去!!!!!!!!有毒。。。
        props.put("bootstrap.servers", "spark:9095,spark:9094,spark:9093");
        //
        //Set acknowledgements for producer requests.
        props.put("acks", "all");

        //If the request fails, the producer can automatically retry,
        props.put("retries", 3);

        //Specify buffer size in config
        //props.put("batch.size", 16384);

        //Reduce the no of requests less than 0
        props.put("linger.ms", 1);

        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        //props.put("buffer.memory", 33554432);

        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer
                <String, String>(props);

        for(int i = 0; i < 100; i++){
            producer.send(new ProducerRecord<String, String>(topicName,
                    Integer.toString(i), Integer.toString(i)));
            Thread.sleep(1000);
            System.out.println("Message sent successfully");

        }
        producer.flush();
    }



}
  • 请一定注意将服务器ip配置到hosts当中去!!!!!!!!
    *然后检查是否开启zookeeper以及Kafka。
  • 检查topic对应的bloker。
  • `kafka-topics.sh --describe --zookeeper spark:2181
Topic:bizzbee	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: bizzbee	Partition: 0	Leader: 3	Replicas: 3	Isr: 3
Topic:bizzbee-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: bizzbee-replicated-topic	Partition: 0	Leader: 3	Replicas: 1,3,2	Isr: 3,2,1
Topic:bizzbee-topic	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: bizzbee-topic	Partition: 0	Leader: 3	Replicas: 3	Isr: 3
Topic:bizzbee_topic	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: bizzbee_topic	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
Topic:jjj	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: jjj	Partition: 0	Leader: 2	Replicas: 2	Isr: 2

```
* 可以看到当前的topic :bizzbee-replicated-topic有三个blocker,并且都是启动状态。所以当前生产者向这三个bloker进行生产。
* 然后在服务器终端起一个消费者。bloker写三个之中的一个就可以了。
`kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic`
![](http://img.saoniuhuo.com/images/202110/88181633587088712.jpg)

相关文章