一、RabbitMQ简介
RabbitMQ是非常优秀的消息队列解决方案,下图是RabbitMQ的工作模型:
Broker表示RabbitMQ服务,每个Broker里面至少有一个Virtual_host虚拟主机,每个虚拟主机中有自己的Exchange交换机、Queue队列以及Exchange交换机与Queue队列之间的绑定关系Binding。producer(生产者)和consumer(消费者)通过与Broker建立Connection来保持连接,然后在Connection的基础上建立若干Channel信道,用来发送与接收消息 。
二、RabbitMQ安装
1、RabbitMQ依赖erlang和socat,需要先安装这两个。
1、安装erlang
# rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm –force –nodeps
# erl -v
2安装socat
# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm –force –nodeps
3安装rabbitmq
# rpm -ivh rabbitmq-server-3.6.5-1.suse.noarch.rpm –force –nodeps
2、配置web访问和用户名密码
开启web插件
# rabbitmq-plugins enable rabbitmq_management
# 后台启动
# rabbitmq-server -detached
# 用户和权限
# rabbitmqctl add_user root 123456
# rabbitmqctl set_permissions -p “/” root “.*” “.*” “.*”
# 查询状态
# rabbitmqctl list_user_permissions root
# 设置管理员
# rabbitmqctl set_user_tags root administrator
# 配置 vhost
# rabbitmqctl add_vhost config
三、项目中实战
1、首先pom和yml中需要配置RabbitMQ
<!-- rabbitmq 配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# RabbitMQ 配置
rabbitmq:
host: xx.xx.xx.xx
port: 5672
username: root
password: 123456
virtual-host: config
listener:
simple:
prefetch: 1
2、项目中增加配置,设置交换机和队列
@Configuration
public class RabbitMqConfig {
@Bean
DirectExchange exchange() {
return new DirectExchange(CommonConstants.EXCHANGE);
}
@Bean
Queue sendMsgQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-max-priority", 40);
return new Queue(CommonConstants.UNTREATED_MSG_QUEUE, true, false, false, map);
}
@Bean
public Binding bindingSendMsg() {
return BindingBuilder.bind(sendMsgQueue()).to(exchange()).with(CommonConstants.UNTREATED_MSG_QUEUE);
}
@Bean
Queue processedMsgQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-max-priority", 40);
return new Queue(CommonConstants.PROCESSED_MSG_QUEUE, true, false, false, map);
}
@Bean
public Binding bindingProcessedMsg() {
return BindingBuilder.bind(processedMsgQueue()).to(exchange()).with(CommonConstants.PROCESSED_MSG_QUEUE);
}
}
3、RabbitMQ发送和接收消息
queue就是发送的队列名称,msg就是发送的消息。
@Component
@RequiredArgsConstructor
public class SendMsgHandle {
private static Logger logger = LoggerFactory.getLogger(SendMsgHandle.class);
private final AmqpTemplate amqpTemplate;
public void send(String queue,String msg) {
logger.info("向队列:{},发送消息:{}",queue,msg);
try {
amqpTemplate.convertAndSend(queue, msg);
} catch (AmqpException e) {
logger.error("向队列:{},发送消息:{},出现异常",queue,msg,e.getMessage(),e);
}
}
}
content就是收到的消息,@RabbitListener是监听队列的注解,concurrency 是一个一个接收。
@Component
public class ReceiveMsgHandle {
private static Logger logger = LoggerFactory.getLogger(ReceiveMsgHandle.class);
@RabbitListener(queues = {CommonConstants.PROCESSED_MSG_QUEUE}, concurrency = "1")
public void processedQueue(String content, Message message, Channel channel) {
try {
logger.info("接收到消息==> content:{},message :{},channel:{}",content, message, channel);
}catch (Exception e){
logger.info(e.getMessage());
}finally {
return;
}
}
}
四、集成测试
private final SendMsgHandle sendMsgHandle;
@ApiOperation(value = "RabbitMQ集成测试" , notes = "RabbitMQ集成测试")
@GetMapping("/rabbitMqTest")
public Result rabbitMqTest(){
sendMsgHandle.send(CommonConstants.PROCESSED_MSG_QUEUE,"RabbitMQ集成测试");
return Result.ok();
}
到此,集成成功,可以使用RabbitMQ了。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/101728.html