1、生产者 消费者pom
<!--spring boot 版本依赖-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
2、生产者
application.yml:
spring:
kafka:
#生产者
producer:
#发送失败重试次数
retries: 0
#生产者批次投递大小 16384(16k)默认
batch-size: 16384
#生产缓冲区大小 默认值 (32M)
buffer-memory: 33554432
#配置序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#kafka代理地址
bootstrap-servers: 192.168.92.39:9092
KafkaProviderService
@Service
public class KafkaProviderService {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMsg() {
for (int i = 1; i <= 50; i++) {
kafkaTemplate.send("kafka-spring","spring-key", "hello spring boot");
}
}
}
测试类:
@RunWith(SpringRunner.class)
//主application方法
@SpringBootTest(classes=KafkaProviderApplication.class)
public class KafkaTest {
@Autowired
private KafkaProviderService kafkaProviderService;
@Test
public void mainTest(){
kafkaProviderService.sendMsg();
}
}
3、消费者
application.yml
spring:
kafka:
bootstrap-servers: 192.168.92.39:9092
consumer:
#组iD
group-id: kafka-spring
#反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#配置手动提交
enable-auto-commit: false
listener:
ack-mode: manual
@Service
@Slf4j
public class KafkaCustomer {
@KafkaListener(topics = {"kafka-spring"})
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
try {
log.info("自行确认方式收到消息的key: " + record.key());
log.info("自行确认方式收到消息的value: " + record.value().toString());
ack.acknowledge();
} finally {
log.info("消息确认!");
}
}
}
4、启动消费者生产者测试类
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/15178.html