在学习kafka的时候,虚拟机里搭建了kafka集群,Linux里边通过kafka-console-producer.sh 和 kafka-console-consumer.sh 发送消息接收消息都没问题。但是写生产者代码的时候发送消息,没报错,查看消费者就是接收不到消息。这是什么鬼???
到网上查了各种原因:
1、config/server.properties 该配置文件里边将
advertised.listeners=PLAINTEXT://hostsname:9092
改成
advertised.listeners=PLAINTEXT://IP地址:9092
我改完没生效。。。
2、在本地电脑 C:\Windows\System32\drives\etc 的hosts文件里边加上集群各个ip和端口号
如:193.168.100.10 server01
我改完又没生效。。。
既然别人改完有效,我改完没生效,那应该就是代码写的有问题了。因为第一次写,有些默认参数在代码中其实并不需要写,但是为了练习一下就写了几个(画蛇添足了)。代码如下:
我之所以发送不成功是因为: properties.put(“buffer.memory”,33554432);该配置我给把后边的值写成1了,正常情况下buffer.memory值小于batch.size启动会报错,但是我设成1启动并没有报错,就是consumer接收不到消息。将这个值设置成大于batch.size的值就可以正常发送消息了。
/**
* 普通生产者
* @author Administrator
*
*/
public class MyProducer {
public static void main(String[] args) {
/**
* 生产者发送消息,总体可以分为四步:
* 1、创建kafka生产者的配置信息。
* 2、创建生产者对象,需要引入kafka配置信息。
* 3、发送消息。
* 4、关闭资源。
*/
//1、创建kafka生产者的配置信息。
Properties properties = new Properties();
/**
* 1.1指定连接kafka集群地址,注意:在Linux里连接kafka集群是--broker-list,在代码里是bootstrap.server。
* 生产环境当中, 这里要配置多个,保证有服务器挂掉还能继续发送消息。
*/
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.67.128:9092,192.168.67.129:9092,192.168.67.130:9092");
//properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka01:9092,kafka02:9092,kafka03:9092");
/*以下五条代码里可以不用写,kafka都有默认设置*/
/**
* properties.put("acks","all");
* 1.1、设置ack应答级别有3种类型。代表生产者和leader之间返回值的设置。ack(acknowledgement:确认收到)。
*
* (1)acks=0,生产者(producer)不会等待broker任何返回值(ack)。优点:提供了一个最低延迟。缺点:broker故障有可能丢失数据。
* 例如:producer发送消息和broker,消息有没有发送成功,并不知道,因为没有返回任何发送成功的提示。
* (2)acks=1,生产者(producer)等待broker的返回值(ack)。只要partition的leader接收消息成功后就返回ack,不管follower。
* 缺点:如果在follower同步之前leader故障,那么将会丢失数据。
* (3)acks=-1(all),生产者(producer)等待broker的返回值(ack)。partition的leader和ISR里边的所有follower消息全部同步成功后才返回ack。
* 缺点:如果在follower同步成功之后,broker返回ack之前,leader发生了故障,那么会造成数据重复。
*/
properties.put("acks","all");
//1.2、重试次数,比如消息发送失败,重新发送的次数。
properties.put("retries",3);
//1.3、批次大小,一次达到多大的消息开始发送,16384代表16k。
properties.put("batch.size",16384);
//1.4、控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
properties.put("linger.ms",10);
//1.5、RecordAccumulator缓冲区大小,设置的是最大能存多大内存的消息。如存放batch的消息,所以必须大于batch.size。默认33554432代表32兆。
properties.put("buffer.memory",33554432);
//1.6key和value的序列化类。
// 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定value.serializer属性。必填,无默认值。和key.serializer类似。此被用来对消息体即消息value部分做序列化。
// 将消息value部分转换成字节数组。
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(properties);
//3、发送消息,这里设置发6条消息。
for (int i = 0; i <6; i++) {
producer.send(new ProducerRecord<String, String>("topic01","这是发送的消息--"+i));
System.out.println("发送成功--"+i);
}
//4、关闭资源(关闭生产者和集群的连接),该方法会做资源回收,必须关闭。如果不关闭,消费者接收不到消息。
producer.close();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/15935.html