Kafka 在代码中producer发送消息consumer接收不到

导读:本篇文章讲解 Kafka 在代码中producer发送消息consumer接收不到,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

在学习kafka的时候,虚拟机里搭建了kafka集群,Linux里边通过kafka-console-producer.sh 和 kafka-console-consumer.sh 发送消息接收消息都没问题。但是写生产者代码的时候发送消息,没报错,查看消费者就是接收不到消息。这是什么鬼???

到网上查了各种原因:

1、config/server.properties 该配置文件里边将

advertised.listeners=PLAINTEXT://hostsname:9092

改成

advertised.listeners=PLAINTEXT://IP地址:9092

我改完没生效。。。 

2、在本地电脑 C:\Windows\System32\drives\etc 的hosts文件里边加上集群各个ip和端口号

如:193.168.100.10  server01

我改完又没生效。。。 

既然别人改完有效,我改完没生效,那应该就是代码写的有问题了。因为第一次写,有些默认参数在代码中其实并不需要写,但是为了练习一下就写了几个(画蛇添足了)。代码如下:

我之所以发送不成功是因为: properties.put(“buffer.memory”,33554432);该配置我给把后边的值写成1了,正常情况下buffer.memory值小于batch.size启动会报错,但是我设成1启动并没有报错,就是consumer接收不到消息。将这个值设置成大于batch.size的值就可以正常发送消息了。

/**
 * 普通生产者
 * @author Administrator
 *
 */
public class MyProducer {
	public static void main(String[] args) {
		/**
		 * 生产者发送消息,总体可以分为四步:
		 * 1、创建kafka生产者的配置信息。
		 * 2、创建生产者对象,需要引入kafka配置信息。
		 * 3、发送消息。
		 * 4、关闭资源。
		 */
		
		//1、创建kafka生产者的配置信息。
		Properties properties = new Properties();
		/**
		 * 1.1指定连接kafka集群地址,注意:在Linux里连接kafka集群是--broker-list,在代码里是bootstrap.server。
		 * 生产环境当中, 这里要配置多个,保证有服务器挂掉还能继续发送消息。
		 */
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.67.128:9092,192.168.67.129:9092,192.168.67.130:9092");
		//properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka01:9092,kafka02:9092,kafka03:9092");
		
		/*以下五条代码里可以不用写,kafka都有默认设置*/
		/**
		 * properties.put("acks","all");
		 * 1.1、设置ack应答级别有3种类型。代表生产者和leader之间返回值的设置。ack(acknowledgement:确认收到)。
		 * 
		 * (1)acks=0,生产者(producer)不会等待broker任何返回值(ack)。优点:提供了一个最低延迟。缺点:broker故障有可能丢失数据。
		 * 		例如:producer发送消息和broker,消息有没有发送成功,并不知道,因为没有返回任何发送成功的提示。
		 * (2)acks=1,生产者(producer)等待broker的返回值(ack)。只要partition的leader接收消息成功后就返回ack,不管follower。
		 * 		缺点:如果在follower同步之前leader故障,那么将会丢失数据。
		 * (3)acks=-1(all),生产者(producer)等待broker的返回值(ack)。partition的leader和ISR里边的所有follower消息全部同步成功后才返回ack。
		 *      缺点:如果在follower同步成功之后,broker返回ack之前,leader发生了故障,那么会造成数据重复。
		 */
		properties.put("acks","all");
		//1.2、重试次数,比如消息发送失败,重新发送的次数。
		properties.put("retries",3);
		//1.3、批次大小,一次达到多大的消息开始发送,16384代表16k。
		properties.put("batch.size",16384);
		//1.4、控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
		properties.put("linger.ms",10);
		//1.5、RecordAccumulator缓冲区大小,设置的是最大能存多大内存的消息。如存放batch的消息,所以必须大于batch.size。默认33554432代表32兆。
		properties.put("buffer.memory",33554432);
		
		//1.6key和value的序列化类。
		 // 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 指定value.serializer属性。必填,无默认值。和key.serializer类似。此被用来对消息体即消息value部分做序列化。
		// 将消息value部分转换成字节数组。
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		//2、创建生产者对象
		KafkaProducer<String,String> producer = new KafkaProducer<String,String>(properties);
		
		//3、发送消息,这里设置发6条消息。
		for (int i = 0; i <6; i++) {
			producer.send(new ProducerRecord<String, String>("topic01","这是发送的消息--"+i));
			System.out.println("发送成功--"+i);
		}
		//4、关闭资源(关闭生产者和集群的连接),该方法会做资源回收,必须关闭。如果不关闭,消费者接收不到消息。
		producer.close();
	}
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/15935.html

(0)
小半的头像小半

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!