SpringBoot 整合Kafka实现消息的发送和接收

在项目中我们常常遇到这样的场景:后台在进行业务处理的过程中,当遇到一些特殊的场景,会及时的把异常信息及数据在前台页面给予提示;当遇到这样的场景,我们经常考虑使用kafka进行实现。

安装Kafka

一、下载Kafka安装包

下载地址:https://kafka.apache.org/downloads.html

SpringBoot 整合Kafka实现消息的发送和接收

二、下载完成后,进行解压

解压目录:D:kafka

SpringBoot 整合Kafka实现消息的发送和接收

注:kafka依赖Java环境,首先需要安装Java 8或以上版本的Java,关于jdk的安装这里不再详细介绍。

三、启动zookeeper

cmd窗口进入kafka安装目录,执行

.binwindowszookeeper-server-start.bat  .configzookeeper.properties

启动成功后,发现zookeeper默认端口为2181

SpringBoot 整合Kafka实现消息的发送和接收

如果需要更改端口的话,可以编辑配置文件zookeeper.properties,找到clientPort 进行修改即可。

SpringBoot 整合Kafka实现消息的发送和接收
SpringBoot 整合Kafka实现消息的发送和接收

四、zookeeper启动完成后,接下来启动kafka

cmd窗口进入kafka安装目录,执行下面命令,等待kafka启动成功。

.binwindowskafka-server-start.bat .configserver.properties

注意:若修改了zookeeper默认的启动端口2181,我们需要找到config文件夹下的server.properties配置文件,并编辑修改zookeeper的连接端口并做同步修改:zookeeper.connect=localhost:2181

SpringBoot 整合Kafka实现消息的发送和接收

项目创建

创建父项目xxkfz-kafka-master,并创建两个子模块分别为:消息生产端模块Spring-kafka-producer、消息消费端模块spring-kafka-consumer

整体的项目结构如下:

SpringBoot 整合Kafka实现消息的发送和接收

项目创建完成后分别在两个子模块pom.xml添加所需依赖:

        <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.13</version>
</dependency>

消息生产端

一、新建消息传输实体Message.java

package com.xxkfz.simplememory.entity;

import lombok.*;

import java.io.Serializable;

@Data
@ToString
public class Message implements Serializable {

    private static final long serialVersionUID = -118L;
    /**
     * 发送人
     */

    private String sendUserName;

    /**
     * 发送时间
     */

    private String sendTime;

    /**
     * 发送内容
     */

    private String sendContent;
}

二、新建Kafka简单工具类KafkaUtils.java

package com.xxkfz.simplememory.util;


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * @author 公众号: SimpleMemory
 * @version 1.0.0
 * @ClassName KafkaUtils.java
 * @Description TODO
 * @createTime 2022年05月22日 19:53:00
 */

@Component
@Slf4j
public class KafkaUtils {

 @Autowired
 private KafkaTemplate<String, Object> kafkaTemplate;

 public void send(String topic, Object message) {
  String messageJson = "";
  if (null != message) {
   messageJson = JSON.toJSONString(message);
  }
  // 发送消息
  ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, messageJson);

  // 监听回调
  future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
   @Override
   public void onFailure(Throwable ex) {
    log.info("发送消息失败......");
   }

   @Override
   public void onSuccess(SendResult<String, Object> result) {
    log.error("成功发送信息:{}......", result);
   }
  });
 }
}

三、新建MessageController.java

import com.xxkfz.simplememory.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author 公众号: SimpleMemory
 * @version 1.0.0
 * @ClassName MessageController.java
 * @Description TODO
 * @createTime 2022年05月22日 19:53:00
 */

@RestController
public class MessageController {

    @Autowired
    private MessageService messageService;

    @RequestMapping(path = "/send/{content}")
    public void sendMessage(@PathVariable String content) {
        messageService.sendMsg(content);
    }

}

四、新建MessageServiceImpl.java

package com.xxkfz.simplememory.service.impl;

import cn.hutool.core.date.DateUtil;
import com.xxkfz.simplememory.entity.Message;
import com.xxkfz.simplememory.service.MessageService;
import com.xxkfz.simplememory.util.KafkaUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author 公众号: SimpleMemory
 * @version 1.0.0
 * @ClassName MessageServiceImpl.java
 * @Description TODO
 * @createTime 2022年05月22日 19:54:00
 */

@Service
public class MessageServiceImpl implements MessageService {

    @Resource
    private KafkaUtils kafkaUtils;

    @Value("${spring.kafka.topic}")
    private String topicName;

    @Override
    public void sendMsg(Object msg) {
        Message message = new Message();
        message.setSendUserName("xk_admin");
        message.setSendTime(DateUtil.now());
        message.setSendContent((String) msg);
        kafkaUtils.send(topicName, message);
    }
}

五、新建消息生产者启动类KafkaProducerMainApplication.java

package com.xxkfz.simplememory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaProducerMainApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerMainApplication.classargs);
    }

}

六、新建kafka生产者拦截器CustomProducerInterceptor.java

package com.xxkfz.simplememory.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.*;

/**
 * 自定义生产者拦截器
 */

@Slf4j
public class CustomProducerInterceptor implements ProducerInterceptor {

    /**
     * 在发送前做一些处理
     *
     * @param producerRecord
     * @return
     */

    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        log.info("正在发送消息: {}", producerRecord.value().toString());
        return producerRecord;
    }

    /**
     * 在消息应答前,或者消息发送失败时被调用
     *
     * @param recordMetadata
     * @param e
     */

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    /**
     * 关闭interceptor,主要用于执行一些资源清理工作
     */

    @Override
    public void close() {

    }

    /**
     * 获取配置信息和初始化数据时调用
     *
     * @param map
     */

    @Override
    public void configure(Map<String, ?> map) {

    }
}

七、新建kafka自定义分区器CustomPartitioner.java

package com.xxkfz.simplememory.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

/**
 * xxkfz
 * 20211023
 * 自定义分区器
 */

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer integer = cluster.partitionCountForTopic(topic);
        if (null == integer) {
            return Utils.toPositive(ThreadLocalRandom.current().nextInt()) % integer;
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % integer;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

八、新建生产端配置文件application.yml

server:
  port: 8083

spring:
  kafka:
    # 配置消息的主题
    topic: xxkfz
    producer:
      client-id: 1
      #  kafka server的地址,如果有多个,使用逗号分割
      bootstrap-servers: 127.0.0.1:9092
      #  生产者消息key和消息value的序列化处理类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #  生产者发送失败时,重试次数
      retries: 3
      properties:
        # 自定义消费者拦截器
        interceptor.classes: com.xxkfz.simplememory.interceptor.CustomProducerInterceptor
        #自定义分区器
        partitioner.classes: com.xxkfz.simplememory.partitioner.CustomPartitioner
        # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
        # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
        # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
        # 批量发送的消息数量
      batch-size: 1000
        # 32MB的批处理缓冲区
      buffer-memory: 33554432
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

消息消费端

一、新建消费端消息监听类MessageListener.java

package com.xxkfz.simplememory.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 消息接收监听器
 */

@Component
@Slf4j
public class MessageListener {

    @KafkaListener(id = "messageGroup", topics = "${spring.kafka.topic}")
    public void listenerMessage(ConsumerRecord<?, ?> record) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.error("消费者接收到消息:{} ", msg);
        }
    }
}

二、新建消费端消息拦截器CustomConsumerInterceptor.java

package com.xxkfz.simplememory.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.*;

/**
 * 消费者拦截器
 */

@Slf4j
public class CustomConsumerInterceptor implements ConsumerInterceptor {
    /**
     * 拉取消息时,被调用
     *
     * @param consumerRecords
     * @return
     */

    @Override
    public ConsumerRecords onConsume(ConsumerRecords consumerRecords) {

        log.error("消息{}已被拉取", consumerRecords.toString());
        return consumerRecords;
    }

    @Override
    public void close() {

    }

    /**
     * 提交请求响应成功时被调用
     *
     * @param map
     */

    @Override
    public void onCommit(Map map) {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

三、新建消费端启动类KafkaConsumerMainApplication.java

package com.xxkfz.simplememory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaConsumerMainApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerMainApplication.classargs);
    }

}

四、新建消费端配置文件application.yml

server:
  port: 8082
spring:
  kafka:
    topic: xxkfz
    consumer:
      client-id: 1
      bootstrap-servers: 127.0.0.1:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      properties:
        interceptor.classes: com.xxkfz.simplememory.interceptor.CustomConsumerInterceptor
        # 默认消费者组
      group-id: crm-user-service
        # 最早未被消费的offset
      auto-offset-reset: earliest
        # 批量一次最大拉取数据量
      max-poll-records: 4000
       # 是否自动提交
      enable-auto-commit: false
       # 自动提交时间间隔,单位ms
      auto-commit-interval: 1000
      batch:
        # 批消费并发量,小于或等于Topic的分区数
        concurrency: 3

验证并测试

  • 分别启动消息生产端、消费端启动类。
  • 启动成功后,在浏览器地址栏输入:http://127.0.0.1:8083/send/xxkfz,分别查看控制台日志情况
SpringBoot 整合Kafka实现消息的发送和接收
SpringBoot 整合Kafka实现消息的发送和接收

到此,生产者成功发送消息,消费者成功进行消息的接收消费。

源码获取

关于本项目完整代码获取方式:关注公众号并回复文章标题即可获取;同时为了便于大家快速学习,也包含kafka安装包哦~


SpringBoot 整合Kafka实现消息的发送和接收

SpringBoot 整合Kafka实现消息的发送和接收

SpringCloud Alibaba 集成Dubbo实现远程服务调用

并发编程系列(一):并发编程的挑战

Mybatis plus如何解决大数据量查询慢的问题


原文始发于微信公众号(SimpleMemory):SpringBoot 整合Kafka实现消息的发送和接收

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137789.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!