RabbitMQ实战教程
视频:https://www.bilibili.com/video/BV1dE411K7MG?from=search&seid=15593601763323732951&spm_id_from=333.337.0.0
1.MQ引言 1.1 什么是MQ MQ
(Message Queue):翻译为消息队列
,通过典型的生产者
和消费者
模型,生产者不断向消息队列中生产消息,消费者不断从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的消费和生产,没有业务逻辑的侵入,轻松的实现系统间的解藕。别名为:消息中间件
,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2 MQ有哪些 当今市面上有很多的消息中间件,如老牌的ActiveMQ
,RabbitMQ
,炙手可热的Kafka
、阿里巴巴自主研发的RocketMQ
等。
1.3 不同MQ的特点 1 2 3 4 5 6 7 8 # 1.ActiveMQ ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。他是一个完全支持JMS规范的消息中间件。丰富的API,多种集群架构模式让ActiveMQ称为老牌的消息中间件,在中小企业颇受欢迎。 # 2.Kafka Kafka是LinkedIn公司开源的分布式发布-订阅消息中间件,目前属于Apache顶级项目。Kafka的主要特点是基于pull模式来处理消息消费。追求高吞吐量,一开始的目的是用于日志的收集和传输。0.8版本开始支持复制,不支持事物,对消息的丢失、重复、错误没有严格的要求。适合产生大量数据的互联网服务的数据收集业务。 # 3.RocketMQ RocketMQ是阿里开源的消息中间件。他是纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ的思路起源于Kafka,但并不是Kafka的一个复制,他对消息的可靠传输及事物做了优化,目前的阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流处理、binglog分发等场景。 # 4.RabbitMQ RabbitMQ是使用Erlang语言开发的开源消息中间件系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布订阅)、可靠性、安全。AMQP协议更多用在企业内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(延时低)、可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
2.RabbitMQ的引言 2.1 RabbitMQ简介 RabbitMQ是基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,也是最受欢迎的消息中间件之一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # AMQP协议(后续单独讲) AMQP(Advanced Message Queuing Protocol,高级消息队列协议),在2003年被提出,在最用于解决金融领域不同平台之间消息传递的交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wrie-level protocol(链接协议)。这时其和JMS的本质差异,AMQP不从api层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。 Server +---------------------------+ | Virtual Host | | +--------------------+ | +------------+ | | +-----------+ | | | Publisher | -------->| Exchange | | | | Application| | | +-----+-----+ | | +------------+ | | | | | | | +-----------+ | | +-------------+ | | + Message + | | | Consummer | | | + Queue + --------->| Application | | | +-----------+ | | +-------------+ | +--------------------+ | +---------------------------+
2.2 RabbitMQ的安装 本次安装的环境如下:
系统:centos8 64位
erlang:24.1.7
rabbitmq:3.9.11
2.2.1 erlang下载
因为rabbitMQ是基于erlang开发的,所以先要下载erlang的包:https://github.com/rabbitmq/erlang-rpm
erlang版本和rabbitmq版本对照:https://www.rabbitmq.com/which-erlang.html
本次开发使用的最新的rabbitmq版本为3.9.11
,最小支持的erlang版本为23.2,所以本次erlang使用了24.1.7
版本
2.2.2 RabbitMQ下载
rabbitmq下载地址:https://www.rabbitmq.com/install-rpm.html#downloads
2.2.3 安装启动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 # 1.将rabbitmq相关包上传到linxu服务器中 使用scp命令上传到服务器两个包: scp ./erlang-24.1.7-1.el8.x86_64.rpm root@10.3.4.5:/root/ scp ./rabbitmq-server-3.9.11-1.el8.noarch.rpm root@10.3.4.5:/root/ # 2.依次安装erlang、rabbitmq 使用rpm命令进行安装,缺少依赖会进行提示: rpm -ivh erlang-24.1.7-1.el8.x86_64.rpm rpm -ivn rabbitmq-server-3.9.11-1.el8.noarch.rpm # 3.修改rabbitmq的配置 使用rpm包的方式安装时,并没有将配置文件也放到指定目录下,所以需要自行创建一个配置文件:/etc/rabbitmq/rabbitmq.conf,具体的配置内容可以参考:https://www.rabbitmq.com/configure.html#config-file-formats。 这里我们需要修改一处:loopback_users=none,表示能够让guest用户进行远程访问。默认情况下,guest用户只能在localhost域名下访问。我们使用的是云服务器,需要使用ip进行访问,所以需要修改这个配置。 # 4.开启管理控制台插件 其实就是开启rabbitmq的一个插件:rabbitmq_management,可以让我们使用web界面管理rabbitmq。执行命令:rabbitmq-plugins enable rabbitmq_management。该命令还另外开启了2个插件:rabbitmq_management_agent、rabbitmq_web_dispatch。 # 5.启动/停止/重启rabbitmq服务 rabbitmq安装的时候,会将其设置为系统服务,使用系统服务命令即可: systemctl start/stop/restart rabbitmq-server.service # 6.查看服务状态 systemctl stauts rabbitmq-server.service 结果如下为正常运行中: ● rabbitmq-server.service - RabbitMQ broker Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled) Active: active (running) since Sun 2021-12-12 23:56:51 CST; 1 day 9h ago Process: 22710 ExecStop=/usr/sbin/rabbitmqctl shutdown (code=exited, status=0/SUCCESS) Main PID: 22758 (beam.smp) Tasks: 23 (limit: 23722) Memory: 94.8M CGroup: /system.slice/rabbitmq-server.service ├─22758 /usr/lib64/erlang/erts-12.1.5/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 12800> ├─22773 erl_child_setup 32768 ├─22827 inet_gethost 4 └─22828 inet_gethost 4 # 7.访问管理界面 默认http后台界面的端口为15672。 http://10.3.4.5:15672/ # 8.登陆 账号/密码:guest/guest
3.RabbitMQ配置 3.1 RabbitMQ管理命令行 1 2 3 4 5 6 7 # 1.服务管理 systemctl start/stop/restart rabbitmq-server.service # 2.管理命令行 可以用来在不使用web管理端的情况下管理rabbitmq。 rabbitmqctl help // 查看所有的命令 # 3.插件管理 rabbitmq-plugins enable/list/disable
3.2 web管理介绍
Connection :连接,无论是消费者还是生产者,都要与rabbitmq建立连接才能进行消息的生产与消费。
Channels :通道,建立连接后,消息的投递和获取都是通过通道来进行的。
Exchages :交换机,用来实现消息的路有。
Queues :队列,消息存放于该队列中,等待消费,消费之后移除。
3.2.1用户和虚拟主机管理 3.2.1.1 用户
1 2 3 4 5 6 # Tags说明: Admin(超级管理员):登录控制台,查看所有信息,并对用户、策略(policy)进行修改。 Monitoring(监控者):登陆控制台,查看rabbitmq的节点(进程数、内存、磁盘等使用情况)信息。 Policymaker(策略制定者):登陆控制台,对policy进行管理。 Management(普通管理员):登陆控制台查看信息。 其他:无法登陆控制台,就是普通的消费者或者生产者。
3.2.1.2 虚拟主机
1 2 # 虚拟主机: 为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Host)概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自都有自己的队列、交换机,互不影响。相当于MySQL中的数据库。
3.2.1.3 用户和虚拟主机绑定
4.RabbitMQ的Java客户端 4.1 AMQP协议回顾
4.2 RabbitMQ支持的消息类型
4.3 引入依赖 1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.14.0</version > </dependency >
4.4 各种模型客户端代码 4.4.1 Direct-直连模式
1 2 3 4 # 相关概念: P:provider,生产者,要发送消息的程序 C:consumer,消费者,消费消息的程序 Queue:队列,存储消息的地方
Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class TestProducer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("101.43.52.186" ); connectionFactory.setPort(5672 ); connectionFactory.setVirtualHost("/adu" ); connectionFactory.setUsername("adu" ); connectionFactory.setPassword("adu" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.basicPublish("" , QUEUE_NAME, null , "hello world" .getBytes(StandardCharsets.UTF_8)); channel.close(); connection.close(); } }
Consumer :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class TestConsumer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("101.43.52.186" ); connectionFactory.setPort(5672 ); connectionFactory.setVirtualHost("/adu" ); connectionFactory.setUsername("adu" ); connectionFactory.setPassword("adu" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.basicConsume(QUEUE_NAME, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("consumer:" + new String(body, StandardCharsets.UTF_8)); } }); channel.close(); connection.close(); } }
4.4.2 Work Queues-工作队列
Work Queue也被称为Task Queue
任务模型。当消息处理比较耗时时,可能消息生产的速度远远超过了消息消费的速度。长此以往,消息就会堆积,此时就可以使用Work Queue模型。让多个消费者绑定同一个队列,共同消费队列中的消息。队列中的消息一旦被消费,就会被删除,因此不会产生重复消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class RabbitMQUtil { public static Connection getConnection () { try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("101.43.52.186" ); connectionFactory.setPort(5672 ); connectionFactory.setVirtualHost("/adu" ); connectionFactory.setUsername("adu" ); connectionFactory.setPassword("adu" ); return connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null ; } public static Channel getChannel (Connection connection) { try { return connection.createChannel(); } catch (IOException e) { e.printStackTrace(); } return null ; } public static void close (Channel channel, Connection connection) { if (channel != null ) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null ) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class WorkQueueProducer { public static final String QUEUE_NAME = "work" ; public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); for (int i = 0 ; i < 10 ; i++) { channel.basicPublish("" , QUEUE_NAME, null , ("hello work queue " + i).getBytes(StandardCharsets.UTF_8)); } RabbitMQUtil.close(channel, connection); } }
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class WorkQueueConsumer1 { public static final String QUEUE_NAME = "work" ; public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.basicConsume(QUEUE_NAME, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8)); } }); } } 和consumer1相同代码,修改消费输出:System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));
结果:
1 2 3 4 5 6 7 8 9 10 11 12 # consumer1 output... consumer1: hello work queue 0 consumer1: hello work queue 2 consumer1: hello work queue 4 consumer1: hello work queue 6 consumer1: hello work queue 8 # consumer2 output... consumer2: hello work queue 1 consumer2: hello work queue 3 consumer2: hello work queue 5 consumer2: hello work queue 7 consumer2: hello work queue 9
总结:
默认情况下,RabbitMQ将按顺序依次将消息发给每个消费者,也就是说每个消费者都会收到相同数量的消息,这种平分消息的方式称为循环
。
4.4.2.1 消息自动确认机制 如果一个消费者在执行任务的过程中宕机,那么在以上代码中,一旦RabbitMQ将将消息传递给消费者,他就会立即标记为删除,那么该消费者中未处理的消息将丢失。我们希望在处理任务的过程中,一个消费者宕机了,能够将任务交给其他消费者处理。这时候我们就可以用到消息自动确认机制
。也就是说,我们可以在处理完任务之后,再通知RabbitMQ可以删除该任务。
Producer:
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class WorkQueueConsumer3 { public static final String QUEUE_NAME = "work" ; public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.basicQos(1 ); channel.basicConsume(QUEUE_NAME, false , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8)); channel.basicAck(envelope.getDeliveryTag(), false ); } }); } } public class WorkQueueConsumer4 { public static final String QUEUE_NAME = "work" ; public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.basicQos(1 ); channel.basicConsume(QUEUE_NAME, false , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { try { Thread.sleep(1500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8)); channel.basicAck(envelope.getDeliveryTag(), false ); } }); } }
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # consumer1 output... consumer1: hello work queue 1 consumer1: hello work queue 3 consumer1: hello work queue 5 consumer1: hello work queue 8 consumer1: hello work queue 10 consumer1: hello work queue 13 consumer1: hello work queue 15 consumer1: hello work queue 18 # consumer2 output... consumer1: hello work queue 0 consumer1: hello work queue 2 consumer1: hello work queue 4 consumer1: hello work queue 6 consumer1: hello work queue 7 consumer1: hello work queue 9 consumer1: hello work queue 11 consumer1: hello work queue 12 consumer1: hello work queue 14 consumer1: hello work queue 16 consumer1: hello work queue 17 consumer1: hello work queue 19
总结:
实现消息自动确认,需要注意两点:1)设置通道一次只能消费一条消息 2)关闭自动确认,手动确认消息
4.4.3 Publish/Subscribe-发布订阅模式
1 2 3 4 5 # 消息流程: 可以有多个消费者,每个消费者都有自己的queue(队列),每个队列都要绑定要exchange(交换机)。 生产者发送的消息只能发送给交换机,交换机来决定发送给哪个队列,生产者无法决定。 交换机把消息发送给绑定过的所有队列,队列的消费者都能拿到消息,实现一个消息被多个消费者消费。 # 对应交换机类型:fanout
Proudcer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class PublishSubscriptProducer { public static final String QUEUE_NAME = "fanout" ; public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare("logs" , "fanout" ); channel.basicPublish("logs" , "" , null , "hello fanout" .getBytes(StandardCharsets.UTF_8)); RabbitMQUtil.close(channel, connection); } }
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class PublishSubscribeConsumer1 { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare("logs" , "fanout" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs" , "" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8)); } }); } } 同consumer1,输出修改为:System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));
结果:
1 2 3 4 5 6 7 # consumer1 output... consumer1: hello fanout, type [ consumer1: hello fanout # consumer2 output... consumer2: hello fanout, type [ consumer2: hello fanout
4.4.4 Routing-路由模式 在发布订阅模式中,一条消息会被所有订阅的的队列消费。但是在某些场景下,我们希望不同的消息类型被不同的队列消费,这时就需要用到Routing模式的direct类型的Exchange。
1 2 3 # 在Direct模式下: 队列与交换机的绑定,不能是任意绑定了,而是需要指定一个Routing Keying,消息发送方在向Exchange发送消息时,也必须指定消息的Routing Key。 Exchange也不再把消息交给每一个与之绑定的队列,而是根据消息的Routing Key判断,只有队列的Routing Key和消息的Routing Key完全一致时,才会发送消息。
1 2 3 4 5 # 说明: P:Proudcer,生产者,向Exchange发送消息,同时会指定一个Routing Key。 X:Exchange,交换机,接收生产者发送的消息,然后把消息传递给与routing key完全匹配的队列。 C1:Consumer,消费者,其所在队列指定了需要routing key为error的消息 C2:Consumer,消费者,其所在队列指定了需要routing key为info、error、warning的消息
Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class RoutingProducer { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare("logs_direct" , "direct" ); String routingKey = "warning" ; channel.basicPublish("logs_direct" , routingKey, null , String.format("routing for type [%s]" , routingKey).getBytes(StandardCharsets.UTF_8)); RabbitMQUtil.close(channel, connection); } }
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class RoutingConsumer1 { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare("logs_direct" , "direct" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs_direct" , "info" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8)); } }); } } public class RoutingConsumer2 { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare("logs_direct" , "direct" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs_direct" , "info" ); channel.queueBind(queue, "logs_direct" , "error" ); channel.queueBind(queue, "logs_direct" , "warning" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8)); } }); } }
结果:
1 2 3 4 5 6 7 8 9 # 依次发送routing key为info、error、warning结果: ## consummer1 consumer1: routing for type [info] ## consumer2 consumer1: routing for type [info] consumer1: routing for type [error] consumer1: routing for type [warning]
4.4.5 Topics-主题(动态路由模式) Topics
模式同Routing
模式相比,都可以根据Routing Key把消息路由到不同的队列。只不过Topic
类型的Routing Key可以使用通配符。Routing Key一般是由一个或多个单词组成,多个单词之间以‘,’分割。Exchange的类型为topic
。
1 2 3 4 5 6 # 通配符: *(star)can substitute for exactly one word。精确匹配一个单词。 #(hash)can substitute for zero or more words。匹配零个、一个或多个单词。 ## 例如: audit.# :匹配 audit、audit.irs、audit.irs.corporate 等。 audit.* :只能匹配 audit.irs。
Producer:
1 2 3 4 5 6 7 8 9 10 11 public class TopicsProducer { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare("logs_topics" , "topic" ); String routingKey = "user.save.now" ; channel.basicPublish("logs_topics" , routingKey, null , String.format("hello topics, routing key : [%s]" , routingKey).getBytes(StandardCharsets.UTF_8)); RabbitMQUtil.close(channel, connection); } }
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class TopicsConsumer1 { public static final String EXCHANGE_NAME = "logs_topics" ; public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, EXCHANGE_NAME, "user.*" ); channel.basicConsume(queue, false , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8)); } }); } } public class TopicsConsumer2 { public static final String EXCHANGE_NAME = "logs_topics" ; public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = RabbitMQUtil.getChannel(connection); channel.exchangeDeclare(EXCHANGE_NAME, "topic" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, EXCHANGE_NAME, "user.#" ); channel.basicConsume(queue, false , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8)); } }); } }
结果:
1 2 3 4 5 6 7 8 9 # 依次发送routing key为user.save、user、user.save.now的结果: ## conumser1: consumer1: hello topics, routing key : [user.save] ## consumer2: consumer2: hello topics, routing key : [user.save] consumer2: hello topics, routing key : [user] consumer2: hello topics, routing key : [user.save.now]
5.SpringBoot重使用RabbitMQ 5.1 搭建环境 5.1.1 引入依赖 1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
5.1.2 配置信息 1 2 3 4 5 6 7 8 9 spring: application: name: adu-rabbitmq rabbitmq: host: 10.2 .3 .4 port: 5672 username: adu password: adu virtual-host: /adu
5.2 各种模型客户端代码 5.2.1 Hello World-直连模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component @RabbitListener(queuesToDeclare = @Queue(value = "hello")) public class HelloWorldConsumer { @RabbitHandler public void receive (String message) { System.out.println("consumer: " + message); } }
5.2.2 Work Queue - 工作队列模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Component public class WorkQueueConsumer { @RabbitListener(queuesToDeclare = @Queue(value = "work", durable = "true", exclusive = "false", autoDelete = "false")) public void receive1 (String message) { System.out.println("consumer1: " + message); } @RabbitListener(queuesToDeclare = @Queue(value = "work", durable = "true", exclusive = "false", autoDelete = "false")) public void receive2 (String message) { System.out.println("consumer2: " + message); } }
5.2.3 Publish/Subscribe - 发布订阅模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Component public class PublishSubscribeConsumer { @RabbitListener(bindings = @QueueBinding( value = @Queue, // 没有参数的,表示临时队列 exchange = @Exchange(value = "logs", type = "fanout") // 交换机 )) public void receive1 (String message) { System.out.println("consumer1: " + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue, // 没有参数的,表示临时队列 exchange = @Exchange(value = "logs", type = "fanout") // 交换机 )) public void receive2 (String message) { System.out.println("consumer2: " + message); } }
5.2.4 Routing - 路由模式(direct) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Component public class RoutingConsumer { @RabbitListener(bindings = @QueueBinding( value = @Queue, // 临时队列 exchange = @Exchange(value = "logs_direct", type = "direct"), key = "info" // routing key )) public void receive1 (String message) { System.out.println("consumer1: " + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue, // 临时队列 exchange = @Exchange(value = "logs_direct", type = "direct"), key = {"info", "error", "warning"} // routing key )) public void receive2 (String message) { System.out.println("consumer2: " + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue, // 临时队列 exchange = @Exchange(value = "logs_direct", type = "direct"), key = {"warning"} )) public void receive3 (String message) { System.out.println("consumer3: " + message); } }
5.2.5 Topics - 主题(动态路由)模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Component public class TopicsConsumer { @RabbitListener(bindings = @QueueBinding( value = @Queue, // 临时队列 exchange = @Exchange(value = "logs_topics", type = "topic"), key = {"user.*"} )) public void receive1 (String message) { System.out.println("consumer1: " + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue, // 临时队列 exchange = @Exchange(value = "logs_topics", type = "topic"), key = {"user.#"} )) public void receive2 (String message) { System.out.println("consumer2: " + message); } }
5.2.6 客户端测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @SpringBootTest(classes = RabbitMQApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @RunWith(SpringRunner.class) @EnableAutoConfiguration public class RabbitMQApplicationTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testHelloWorld () { rabbitTemplate.convertAndSend("hello" , "hello spring boot starter amqp..." ); } @Test public void testWorkQueue () { for (int i = 0 ; i < 20 ; i++) { rabbitTemplate.convertAndSend("work" , "hello work queue" + i); } } @Test public void testPublishSubscribe () { for (int i = 0 ; i < 10 ; i++) { rabbitTemplate.convertAndSend("logs" , "" , "hello publish subscribe " + i); } } @Test public void testRouting () { String routingKey = "warning" ; rabbitTemplate.convertAndSend("logs_direct" , routingKey, "hello routing, type=" + routingKey); } @Test public void testTopics () { String routingKey = "user.save" ; rabbitTemplate.convertAndSend("logs_topics" , routingKey, "hello topics, type=" + routingKey); } }
6.MQ的应用场景 6.1 异步处理 1 2 3 4 5 6 # 场景说明: 用户注册后,需要发送邮件和注册短信,传统的做法有2种:1)串行方式 2)并行方式 ## 串行方式 将注册信息写入数据库后,再发送邮件,再注册短信,以上三个任务全部返回结果后,才返回给客户端。这里有一个问题,邮件、短信并不是必须的,它只是一个通知,而这种做法需要让客户端等待没有必要等待的东西。 ## 并行方式 将注册信息写入数据库后,发送邮件的同时,发送短信。以上三个任务完成后,返回给客户端,并行的方式能提高处理的效率。
1 2 # 消息队列: 除了以上提到的2种传统方式之外,我们还可以选择 消息队列 的形式。因为发送邮件和发送短信不是必须的,所以我们可以使用消息队列的形式,进行异步处理,相对于并行方式,减少了发送邮件或者短信的时间,相当于只有 注册信息写入数据库 + 写入消息队列 这个操作的耗时。
6.2 应用解藕
1 2 3 4 # 场景: 双十一是购物节,用户下单后,订单系统需要通知库存系统,传统做法是订单系统调用库存系统提供的接口。 # 缺点: 这种做法有个缺点:当库存系统异常时,订单系统就无法使用。订单系统和库存系统就出现了耦合。解决方法引入消息队列。
1 2 3 # 引入消息队列: 订单系统:用户下单后,订单系统完成持久化后,将消息写入 消息队列 ,返回用户下单成功。 库存系统:订阅下单消息,获取下单消息,进行库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
6.3 流量削峰 1 2 3 4 5 6 7 8 # 场景: 秒杀活动,一般会因为流量过大,导致应用挂掉。为了解决这个问题,一般在应用之前加入消息队列。 # 作用: 1.可以控制活动人数,超过一定阈值的订单直接丢弃 2.可以缓解短时间的高流量压垮应用(程序按自己处理能力获取订单)。 # 流程: 1.用户的请求,服务器收到之后先加入到消息队列。假如超过消息队列的最大长度,则直接丢弃用户请求或者跳转到错误页面。 2.秒杀业务根据消息队列中的请求信息,再做后续的处理。
7.RabbitMQ集群 7.1 集群架构(副本集群)
默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是 消息队列,默认情况下,消息队列位于一个节点上,尽管他们可以从任意节点上看到和访问。–官网
也就是说,集群架构的方式只能在节点之间同步(复制)交换机等基本信息,无法同步队列信息。而队列信息是保存数据的地方,如果无法在节点之间进行复制,那么集群将失去一些意义。
7.1.1 架构图
1 2 # 核心解决问题: 当集群中某一时刻Master节点宕机,可以对Queue中信息进行备份。
7.1.2 集群搭建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 PS:使用docker安装 # 0.集群规划 node1: 172.18.0.2 5672 15672 master 主节点 node2: 172.18.0.3 5673 15673 repl1 副本节点 node3: 172.18.0.4 5674 15674 repl2 副本节点 # 1.克隆三台主机名和ip映射,并修改各个虚拟机的hostname 创建网络: docker network create rabbit-net 创建容器: docker run -d --privileged=true --hostname node1 --name node1 -p 15672:15672 -p 5672:5672 --network rabbit-net centos:8.2.2004 /sbin/init 进入容器: docker exec -it node/node1/node2 /bin/bash 复制erlang、rabbit的rpm包到容器: docker cp ./erlang.xxx.rpm node1:/ docker cp ./rabbitmq.xxx.rpm node1:/ 安装: rpm -ihv erlang.xxx.rpm rpm -ivh rabbitmq.xxx.rpm --nodeps --force 配置: 在/etc/hosts加入各个节点的ip和域名对应关系。 问题: 参考:https://blog.csdn.net/weixin_42181917/article/details/105579288 1.System has not been booted with systemd as init system (PID 1). Can't operate.Failed to create bus connection: Host is down 解决:创建容器使用 /sbin/init 2.Could not set property: Failed to set static hostname: Device or resource busy 解决:退出容器,重新进入在设置一次 hostnamectl set-hostname node/node1/node2 exit hostnamectl set-hostname node/node1/node2 # 2.三台机器安装rabbitmq,并同步cookie文件 docker cp 命令将宿主机中的erlang和rabbitmq的rpm包都上传到虚拟机中,启动rabbitmq。 docker cp 命令将node1节点中的 /var/lib/rabbitmq/.erlang.cookie文件复制到宿主机,再通过宿主机复制到node1和node2. 这里需要注意看是否.erlang.cookie文件的权限是否和node相同:chown、chgrp 命令可以修改权限。 # 3.查看cookie文件是否一致 进入三个节点的虚拟机中,执行: cat /var/lib/rabbitmq/.erlang.cookie,查看内容是否一致。 # 4.后台启动rabbit rabbitmq-server -detached # 5.在node2和node3执行加入集群命令 1.关闭 rabbitmqctl stop_app 2.加入集群 rabbitmqctl join_cluster rabbit@node 3.启动服务 rabbitmqctl start_app
搭建成功:
7.2 镜像集群
镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间自动进行同步。且如果其中一个节点不可用,并不会导致消息丢失或者服务不可用的情况,提升MQ集群整体的高可用性。–摘自官网
7.2.1 架构图
7.2.2 集群搭建 镜像集群的搭建,是在副本集群
的基础之上做的额外配置,也就是说必选先搭建好副本集群
才能搭建镜像集群。这个额外配置就是需要创建一个策略
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 0.策略说明 rabbitmqctl set_policy [--vhost <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition> --vhost:可选参数,针对指定virutal host下的queue进行设置 --priority:可选参数,policy的优先级,越大越优先 --appliy-to:可选参数,表示策略应用的对象,后面跟queues|exchanges|all。 name:策略名称,唯一标识。 pattern:匹配队列的正则表达式。 definition:镜像定义,包括三个部分:ha-mode、ha-params、ha-sync-mode ha-mode:镜像队列模式,可选值:all/exaclty/nodes all:表示在集群的所有节点上进行镜像 exactly:表示在指定个数的节点上进行镜像,节点个数通过ha-params指定 nodes:表示在指定节点上进行镜像,节点名称通过ha-params指定 ha-params:ha-mode需要用到的参数 ha-sync-mode:队列同步方式,可选值为:automatic/manual automatic:自动同步 manual:用户触发同步 # 1.查看当前策略 rabbitmqctl list_policies # 2.添加策略 rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}' 说明:策略正则表示为"^"表示匹配所有队列,"^hello"表示匹配hello开头的队列 # 3.删除策略 rabbitmqctl clear_policy ha-all
搭建成功: