RabbitMQ 消息队列 发表于 2021-04-19 更新于 2021-04-19
字数总计: 4.6k 阅读时长: 18分钟 阅读量: 上海
什么是 MQ 消息队列(Message Queue,简称 MQ),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已。其主要用途:不同进程 Process/线程 Thread 之间通信。
为什么会产生消息队列?有几个原因:
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
RabbitMQ
RabbitMQ 简介
开发语言:Erlang - 面向并发的编程语言
AMQP 协议
学习五种队列
RabbitMQ 的第一个程序 第一种模型(直连)
P:生产者:也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息的到来
queue:消息队列,图中红色部分,类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
建立一个 maven 项目
导入 RabbitMQ 的客户端依赖 1 2 3 4 5 6 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.7.2</version > </dependency >
编写生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Test public void testSendMessage () throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory (); connectionFactory.setHost("192.168.90.140" ); connectionFactory.setPort(5672 ); connectionFactory.setVirtualHost("/ems" ); connectionFactory.setUsername("ems" ); connectionFactory.setPassword("ems" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello" ,false ,false ,false ,null ); channel.basicPublish("" ,"hello" ,null ,"hello rabbitmq" .getBytes()); channel.close(); connection.close(); }
编写消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ConnectionFactory connectionFactory = new ConnectionFactory ();connectionFactory.setHost("192.168.90.140" ); connectionFactory.setPort(5672 ); connectionFactory.setVirtualHost("/ems" ); connectionFactory.setUsername("ems" ); connectionFactory.setPassword("ems" ); Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("hello" , false , false , false , null ); channel.basicConsume("hello" , true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("new String(body)==>" + new String (body)); } });
注意:需要在 rabbitmq 管理页面中添加用户和虚拟主机
编写连接工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class RabbitMQUtils { public static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory (); connectionFactory.setHost("192.168.159.140" ); connectionFactory.setPort(5672 ); connectionFactory.setVirtualHost("/ems" ); connectionFactory.setUsername("ems" ); connectionFactory.setPassword("ems" ); } public static Connection getConnection () throws IOException, TimeoutException { try { return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null ; } public static void closeConnectionAndChanel (Channel channel, Connection connection) { try { if (channel != null ) { channel.close(); } if (connection != null ) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } }
第二种模型(work queue) Work queue
,也被称为(Task queue
),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理,此时就可以使用 work 模型,让多个消费者绑定到一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
P:生产者:任务的发布者
C1:消费者:领取任务并且完成任务,假设完成速度较慢
C2:消费者 2:领取任务并完成任务,假设完成速度快
编写生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work" , true , false , false , null ); for (int i = 0 ; i < 20 ; i++) { channel.basicPublish("" , "work" , null , (i + "hello work queue" ).getBytes()); } RabbitMQUtils.closeConnectionAndChanel(channel, connection);
编写消费者-1 1 2 3 4 5 6 7 8 9 10 11 12 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work" ,true ,false ,false ,null ); channel.basicConsume("work" ,true ,new DefaultConsumer (channel){ @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte [] body) throws IOException { System.out.println("消费者--1:" +new String (body)); } });
编写消费者-2 1 2 3 4 5 6 7 8 9 10 11 12 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work" ,true ,false ,false ,null ); channel.basicConsume("work" ,true ,new DefaultConsumer (channel){ @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte [] body) throws IOException { System.out.println("消费者--2:" +new String (body)); } });
测试结果
==总结:默认情况下,RabbitMQ 将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式成为循环 ==
消息自动确认机制
完成一项任务可能只需要几秒钟。您可能想知道,如果其中一个消费者启动了一个很长的任务,并且只完成了部分任务而死亡,会发生什么情况。在我们当前的代码中,一旦 RabbitMQ 向消费者发送消息,它就会立即标记该消息为删除。在本例中,如果您杀死一个 worker,我们将丢失它正在处理的消息。我们还将丢失所有已发送到这个特定工作器但尚未处理的消息。
1 2 3 4 5 6 7 8 9 10 11 channel.basicQos(1 ); channel.basicConsume("work" ,false ,new DefaultConsumer (channel){ @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("消费者--1:" +new String (body)); channel.basicAck(envelope.getDeliveryTag(),false ); } });
设置通道一次只能消费一个消息
关闭消息的自动确认,开启手动确认消息
第三种模型(fanout) ==fanout
扇出 也称为广播==
在广播模式下,消息发送流程是这样的:
可以有多个消费者
每个消费者有自己的 queue (队列)
每个队列都要绑定到 Exchange (交换机)
生产者发送的消息,只能发送到交换机 ,交换机来决定要发给哪个队列,生产者无法决定
交换机把消息发送给绑定过的所有队列
队列的消费者都能拿到消息,实现一条消息被多个消费者消费
编写生产者 1 2 3 4 5 6 7 8 9 10 11 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs" , "fanout" ); channel.basicPublish("logs" , "" , null , "fanout type message" .getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel, connection);
编写消费者-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs" , "fanout" ); String queue = channel.queueDeclare().getQueue();channel.queueBind(queue, "logs" , "" ); channel.basicConsume(queue, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte [] body) throws IOException { System.out.println("消费者1==>" + new String (body)); } });
编写消费者-2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs" , "fanout" ); String queue = channel.queueDeclare().getQueue();channel.queueBind(queue, "logs" , "" ); channel.basicConsume(queue, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte [] body) throws IOException { System.out.println("消费者2==>" + new String (body)); } });
第四种模型(Routing) Routing 之订阅模型 -Direct(直连) ==在 Fanout 模式中,一条消息,会被所有订阅的队列消息。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的 Exchange。==
在 Direct 模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey
(路由 key)
消息的发送方在向 Exchange 发送消息是,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey
进行判断,只有队列的RoutingKey
与消息的 RoutingKey
完全一致,才会接收到消息
流程:
图解:
P:生产者,向 Exchange 发送消息,发送消息是,会指定一个 Routing Key
X:Exchange(交换机),接收生产者消息,然后把消息递交给与 Routing Key 完全匹配的队列
C1:消费者,其所在队列指定了需要 Routing Key 为 error 的消息
C2:消费者,其所在队列指定了需要 Routing Key 为 info、 error、warning 的消息
编写生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();String exchangeName = "logs_direct" ;channel.exchangeDeclare(exchangeName, "direct" ); String routingKey = "info" ;channel.basicPublish(exchangeName, routingKey, null , ("这是direct模型发布对的基于routing key[" +routingKey+"]==>发送的消息" ).getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel, connection);
编写消费者-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();String exchangeName = "logs_direct" ;String queue = channel.queueDeclare().getQueue();channel.queueBind(queue, exchangeName, "error" ); channel.basicConsume(queue, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte [] body) throws IOException { System.out.println("消费者1==>" + new String (body)); } });
编写消费者-2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();String exchangeName = "logs_direct" ;String queue = channel.queueDeclare().getQueue();channel.queueBind(queue, exchangeName, "info" ); channel.queueBind(queue, exchangeName, "error" ); channel.queueBind(queue, exchangeName, "warning" ); channel.basicConsume(queue, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte [] body) throws IOException { System.out.println("消费者2==>" + new String (body)); } });
Routing 之订阅模型 -Topic Topic
类型的 Exchange
与 Direct
相比,都可以根据 RoutingKey
把消息路由到不用的队列。只不过 Topic
类型的 Exchange
可以让队列在绑定 RoutingKey
的时候使用通配符!这种模型 RoutingKey
一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如: item.insert
编写生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("topics" , "topic" ); String routingKey = "user.save" ;channel.basicPublish("topics" , routingKey, null , ("这里是topic动态路由模型,routingKey:" + routingKey).getBytes()); RabbitMQUtils.closeConnectionAndChanel(channel, connection);
编写消费者-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();String queue = channel.queueDeclare().getQueue();channel.queueBind(queue, "topics" , "user.*" ); channel.basicConsume(queue, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte [] body) throws IOException { System.out.println("消费者1 ==>" + new String (body)); } });
编写消费者-2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();String queue = channel.queueDeclare().getQueue();channel.queueBind(queue, "topics" , "user.#" ); channel.basicConsume(queue, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("消费者2 ==>" + new String (body)); } });
结果:
SpringBoot 整合 RabbitMQ 搭建初始环境 引入依赖 1 2 3 4 5 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
配置配置文件 1 2 3 4 5 6 7 8 9 spring: application: name: rabbitmq-springboot rabbitmq: host: 192.168 .80 .140 port: 5672 username: ems password: ems virtual-host: /ems
==RabbitTemplate
用来简化操作 使用时候直接在项目中注入即可使用==
HelloWorld 模型 编写生产者 1 2 3 4 5 6 7 8 9 @Autowired private RabbitTemplate rabbitTemplate;@Test public void testHello () { rabbitTemplate.convertAndSend("hello" , "hello world" ); }
编写消费者 1 2 3 4 5 6 7 8 9 @Component @RabbitListener(queuesToDeclare = @Queue("hello")) public class HelloConsumer { @RabbitHandler public void read (String message) { System.out.println("message==" + message); } }
Work 模型 编写生产者 1 2 3 4 5 6 7 8 9 10 11 @Autowired private RabbitTemplate rabbitTemplate;@Test public void testWork () { for (int i = 0 ; i < 10 ; i++) { rabbitTemplate.convertAndSend("work" , "work模型" + i); } }
编写消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Component public class WorkConsumer { @RabbitListener(queuesToDeclare = @Queue("work")) public void read1 (String message) { System.out.println("message1=" + message); } @RabbitListener(queuesToDeclare = @Queue("work")) public void read2 (String message) { System.out.println("message2=" + message); } }
==说明:默认在 Spring AMQP 实现中 Work 这种方式就是公平调度,如果需要实现能者多劳需要额外配置 ==
Fanout 广播模型 编写生产者 1 2 3 4 5 6 7 8 9 @Autowired private RabbitTemplate rabbitTemplate;@Test public void testFanout () { rabbitTemplate.convertAndSend("logs" , "" , "Fanout的模型发送的消息" ); }
编写消费者-1 1 2 3 4 5 6 7 8 9 @RabbitListener(bindings = { @QueueBinding( value = @Queue,//绑定临时队列 exchange = @Exchange(value = "logs", type = "fanout") //绑定的交换机 ) }) public void read1 (String message) { System.out.println("message1=" +message); }
编写消费者-2 1 2 3 4 5 6 7 8 9 @RabbitListener(bindings = { @QueueBinding( value = @Queue,//绑定临时队列 exchange = @Exchange(value = "logs", type = "fanout") //绑定的交换机 ) }) public void read2 (String message) { System.out.println("message2=" +message); }
Routing 路由模型 编写生产者 1 2 3 4 5 6 7 8 9 @Autowired private RabbitTemplate rabbitTemplate;@Test public void testRoute () { rabbitTemplate.convertAndSend("directs" , "info" , "发送info的key的路由信息" ); }
编写消费者-1 1 2 3 4 5 6 7 8 9 10 @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定义交换机名称和类型 key = {"info", "error", "warn"} ) }) public void read1 (String message) { System.out.println("message1==>" + message); }
编写消费者-2 1 2 3 4 5 6 7 8 9 10 @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时队列 exchange = @Exchange(value = "directs", type = "direct"), //自定义交换机名称和类型 key = {"info"} ) }) public void read2 (String message) { System.out.println("message1==>" + message); }
Topic 动态路由模型 编写生产者 1 2 3 4 5 6 7 8 9 @Autowired private RabbitTemplate rabbitTemplate;@Test public void testTopic () { rabbitTemplate.convertAndSend("topics" , "user.save" , "user.save 路由消息" ); }
编写消费者-1 1 2 3 4 5 6 7 8 9 10 @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", value = "topics"), key = {"user.save", "user.*"} ) }) public void read1 (String message) { System.out.println("message1==>" + message); }
编写消费者-2 1 2 3 4 5 6 7 8 9 10 @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic", value = "topics"), key = {"user.save", "user.*"} ) }) public void read2 (String message) { System.out.println("message2==>" + message); }
MQ 的应用场景 异步处理 ==场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1. 串行的方式 2. 并行的方式==
串行方式: 讲注册信息写入数据库后,发送注册邮件, 再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待没有必要等待的东西。
并行方式: 将信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回客户端,并行的方式能提高处理的时间。
消息队列: 假设三个业务点分别使用 50ms,串行方式使用时间 150ms,并行使用时间 100ms。虽然并行已经提高了处理时间,但是,前面说过,邮件和短信不对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。引入消息队列后,把发送邮件,短信等不是必须的业务逻辑异步处理。
应用解耦 ==场景说明:双 11 是购物狂欢节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口==
这样做法有一个缺点:
当库存系统出现故障时,订单就会失效。订单系统和库存系统高耦合,引入消息队列
订单系统: 用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统: 订阅下单的消息,获取下单消息,进行库操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失
流量削锋 ==场景说明:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。==
作用:
可以控制活动人数,超过此一定阈值的订单直接丢弃
可以缓解短时间的高流量压垮应用