rabbitMQ学习-交换机(Exchanges)

如果你不相信努力和时光,那么成果就会是第一个选择辜负你的。不要去否定你自己的过去,也不要用你的过去牵扯你现在的努力和对未来的展望。不是因为拥有希望你才去努力,而是去努力了,你才有可能看到希望的光芒。rabbitMQ学习-交换机(Exchanges),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

交换机(exchanges)

当使用到交换机的时候,我们用的就不是普通的模式了,而是发布订阅模式了。

生产者生成的消息不会直接发送到队列,而是直接将消息先发送到交换机,并且只能发送到交换机,之钱的我们可以直接发送到队列(事实上我们走的是默认交换机),然后队列交给消费者,现在不行了,改成先发送给交换机,交换机在发给队列。

交换机的工作方式很简单,他接收来自生产者的消息,另一方面将他们推送到队列。

交换机的类型

直接(direct),主题(topic)标题(heads)扇出(fanout)

无名exchanges

事实上就是默认类型,我们通过空字符(“”)串进行标识。

channel.basicPublic("","hello",null,message.getBytes());

实际上第一个参数就是交换机的名字,空字符串表示默认或者无名称交换机,消息能路由发送到队列中,其实是由routing(bindingkey)绑定key指定的,如果它存在的话。

临时队列

实际上是自定义的,一旦断开链接,这个队列就会被删除。

绑定

实际上就是交换机和队列的桥梁,他告诉我们交换机和那个队列进行绑定关系


Fanout

fanout是一种交换机的类型,这种类型非常简单,正如名称中猜想的那样,他是将接收的所有消息广播到他知道的所有队列中,系统中默认有exchanges交换机类型。
在这里插入图片描述

/*
发消息 交换机
 */
public class EmitLog {
    //交换机的名称
    public static  final  String EXCHANGE_NAME="logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner sc = new Scanner(System.in);

        while (sc.hasNext()){
            String message = sc.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息:" + message);

        }
    }
}
/*
消息接收,客户端
 */
public class ReceiveLog {
    //交换机的名称
    public static final  String EXCHANG_NAME="logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        //声明一个交换机
      channel.exchangeDeclare(EXCHANG_NAME,"fanout");
        //声明一个队列,来一个临时队列
        /*
        队列的名称是随机的。
        当消费者断开与队列的连接的时候,队列就可以自动删除
         */
      String queueName =  channel.queueDeclare().getQueue();
        /*
        绑定交换机和队列
         */
      channel.queueBind(queueName,EXCHANG_NAME,"");
        System.out.println("等待接受消息,把消息显示在屏幕上");
        //接受消息
        DeliverCallback deliverCallback = (consumer,message)->{
            System.out.println("Receive控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
        };
        //消费者取消消息时回调接口
        channel.basicConsume(queueName,true,deliverCallback,consumer->{

        });
    }
}
Direct

在这里插入图片描述
从图上我们可以看到,X绑定了两个队列,绑定类型是direct,队列Q1绑定建为orange,队列2绑定键有两个,一个为black,另一个为green.

在这种情况下,发布这发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1绑定键为blackgreen的消息会被发布到队列Q2,其他消息类型就被丢弃。
多重绑定
在这里插入图片描述
当然如果exchange的绑定类型是direct,但是他绑定的多个队列的key如果多相同在这种情况下虽然绑定类型是direct,但是他表现的就和fanout有点想类似了,就和广播差不多了。
在这里插入图片描述
这边就可以指定发给谁了,我不发给谁他就收不到消息,只有我允许他接收他才能得到接收。

public class DirectLogs {

    //交换机的名称
    public static  final  String EXCHANGE_NAME="direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        Scanner sc = new Scanner(System.in);

        while (sc.hasNext()){
            String message = sc.next();
            channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息:" + message);
        }
    }
}
public class ReceiveLogDirect01 {
    public  static final  String EXCHANGE_NAME ="direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个交换机
        channel.queueDeclare("console",false,false,false,null);
        /*
        绑定交换机和队列
         */
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");
        //接受消息
        DeliverCallback deliverCallback = (consumer, message)->{
            System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
        };

        //消费者取消消息时回调接口
        channel.basicConsume("console",true,deliverCallback,consumer->{});
        
    }
}
public class ReceiveLogDirect02 {
    public  static final  String EXCHANGE_NAME ="direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明一个交换机
        channel.queueDeclare("disk",false,false,false,null);
        /*
        绑定交换机和队列
         */
        channel.queueBind("disk",EXCHANGE_NAME,"error");
        //接受消息
        DeliverCallback deliverCallback = (consumer, message)->{
            System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
        };

        //消费者取消消息时回调接口
        channel.basicConsume("disk",true,deliverCallback,consumer->{});
    }
}
Topic

对比上面两种交换机更加完美,当存在我们要接受日志类型有info.base和info.advantage,某个队列只想info.base的消息,那这个·时候上边两种交换机就做不到了。此时我们采用的就是topic交换机类型了。

发送类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,他必须是一个单纯列表,以点号隔开,这个单次可以是任意单词。”nysc.xxx”等。限制要求单词列表不能超过255个字节。

在这里插入图片描述

需要注意的是

‘ * ’ 代表的是一个单词

‘#’ 代表的是可以替换零个或多个单词。

下图绑定关系如下

Q1–> 绑定的是

中间代orange带3个单词的字符串(* .orange. *)

Q2–> 绑定的是

最后一个单词是rabbit的3个单词(* . *rabbit)

第一个单词是lazy的多个单词(lazy.#)

在这里插入图片描述
下边我们将上图之间的数据接受情况列举出来

quick.orange.rabbit 被队列Q1Q2接受到

lazy.orange.elephant 被队列Q1Q2接受到

quick.orange.fox 被队列Q1接受到

lazy.brown.fox 被队列Q2接受到

lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接受一次

quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃

quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃

lazy.orange.male.rabbit 是四个单词但匹配Q2

上述加粗的是符合TOPIC类型交换机的单词指令

是最强大的,也是使用最广的

  • 当队列绑定关系是#,那么这个队列将接收所有的数据,有点像Fanout了。
  • 如果队列绑定的键中没有#和*出现,那么该队列绑定类类型就是direct了。

消费者1

/*
声明主题交换机
 */
public class ReceiveLogsTopic01 {

    //老样子定义一个交换机名
    public static  final String EXCHANGE_NAME= "topic_logs";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
       //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);

        //绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接受消息");

        DeliverCallback deliverCallback = (consumer,message)->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接受队列:——> " +queueName +"绑定键:"+ message.getEnvelope().getRoutingKey());
        };
        //接收消息

        channel.basicConsume(queueName,true,deliverCallback,consumer->{});
    }

}

等待接受消息

被队列Q1Q2接受到

接受队列:——> Q1绑定键:lazy.orange.elephant

被队列Q1Q2接受到

接受队列:——> Q1绑定键:quick.orange.rabbit

被队列Q1接受到

接受队列:——> Q1绑定键:quick.orange.fox

消费者2

/*
声明主题交换机
 */
public class ReceiveLogsTopic02 {

    //老样子定义一个交换机名
    public static  final String EXCHANGE_NAME= "topic_logs";

    //接受消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
       //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName = "Q2";
        channel.queueDeclare(queueName,false,false,false,null);

        //绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        System.out.println("等待接受消息");

        DeliverCallback deliverCallback = (consumer,message)->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接受队列:——> " +queueName +"绑定键:"+ message.getEnvelope().getRoutingKey());
        };
        //接收消息

        channel.basicConsume(queueName,true,deliverCallback,consumer->{});
    }
}

等待接受消息
被队列Q1Q2接受到
接受队列:——> Q2绑定键:lazy.orange.elephant
被队列Q2接受到
接受队列:——> Q2绑定键:lazy.brown.fox
被队列Q1Q2接受到
接受队列:——> Q2绑定键:quick.orange.rabbit
虽然满足两个绑定但只被队列Q2接受一次
接受队列:——> Q2绑定键:lazy.pink.rabbit
是四个单词但匹配Q2
接受队列:——> Q2绑定键:lazy.orange.male.rabbit

生产者

/*
生产者
 */
public class EmitLogTopic {
    //老样子,还是定义一个交换机名字,注意这个交换
    // 机名字需要和消费者里边定义的交换机一样,避免出错

    private static final  String EXCHANGE_NAME="topic_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取信道链接
        Channel channel = GetConnection.getChannel();
        /*
        Q1--> 绑定的是
	    中间代orange带3个单词的字符串(* .orange. *)
        Q2--> 绑定的是
        最后一个单词是rabbit的3个单词(* . *rabbit)
        第一个单词是lazy的多个单词(lazy.#)
        quick.orange.rabbit         被队列Q1Q2接受到
        lazy.orange.elephant 		被队列Q1Q2接受到
        quick.orange.fox			被队列Q1接受到
        lazy.brown.fox			被队列Q2接受到
        lazy.pink.rabbit			虽然满足两个绑定但只被队列Q2接受一次
        quick.brown.fox			不匹配任何绑定不会被任何队列接收到会被丢弃
        quick.orange.male.rabbit		是四个单词不匹配任何绑定会被丢弃
        lazy.orange.male.rabbit 		是四个单词但匹配Q2


         */
        Map<String,String> map  = new HashMap<>();
        map.put("quick.orange.rabbit","被队列Q1Q2接受到");
        map.put("lazy.orange.elephant","被队列Q1Q2接受到");
        map.put("quick.orange.fox","被队列Q1接受到");
        map.put("lazy.brown.fox","被队列Q2接受到");
        map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接受一次");
        map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        map.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");

        for (Map.Entry<String,  String> mapEntry : map.entrySet()) {
           String routingKey = mapEntry.getKey();
           String message = mapEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息: " + message);
        }
    }
}

生产者发送消息: 是四个单词不匹配任何绑定会被丢弃
生产者发送消息: 不匹配任何绑定不会被任何队列接收到会被丢弃
生产者发送消息: 被队列Q1Q2接受到
生产者发送消息: 被队列Q2接受到
生产者发送消息: 被队列Q1Q2接受到
生产者发送消息: 被队列Q1接受到
生产者发送消息: 虽然满足两个绑定但只被队列Q2接受一次
生产者发送消息: 是四个单词但匹配Q2

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/197985.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!