SpringCloud 08 —— 消息总线Spring Cloud Bus 之 RabbitMQ

Spring cloud bus通过轻量消息代理连接各个分布的节点。这会用在广播状态的变化(例如配置变化)或者其他的消息指令。Spring bus的一个核心思想是通过分布式的启动器对spring boot应用进行扩展,也可以用来建立一个多个应用之间的通信频道。目前唯一实现的方式是用AMQP消息代理作为通道,同样特性的设置(有些取决于通道的设置)在更多通道的文档中。
SpringCloud Bus会向外提供一个http接口,如/ actuator / bus-refresh。我们将这个接口配置到git的webhook上,当git上的内容发生改变时,就会自动调用/ actuator / bus-refresh接口。Bus就会通知ConfigServer,ConfigServer会发布更新消息到消息总线的消息队列,其他服务订阅到该消息就会信息刷新,从而实现整个微服务进行自动刷新。

MQ 中间件的作用
MQ中间件就是我们常说的消息代理,主要用于消息的接收和转发消息,可以将消息生产者和消息消费者完全解耦,不必直接调用对方的API。一般常用在邮件服务、短信服务、日志服务等。

目前常用的MQ有:

  • Kafka
  • RabbitMQ
  • RocketMQ
  • ActiveMQ等

Spring Cloud Bus 目前仅支持两款MQ中间件: RabbitMQ和Kafka。后面讲主要介绍这两种中间件配合Spring Cloud Bus 的使用

RabbitMQ 介绍

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现,是一个轻量级的消息中间件,为很多语言提供了调用工具,例如:Java,.NET,PHP,Python,JavaScript,Ruby,Go等。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  • 可靠性(Reliability)
  • 灵活的路由(Flexible Routing)
  • 消息集群(Clustering)
  • 高可用(Highly Available Queues)
  • 多种协议(Multi-protocol)
  • 多语言客户端(Many Clients)
  • 管理界面(Management UI)
  • 跟踪机制(Tracing)
  • 插件机制(Plugin System)

RabbitMQ 实现原理

RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念

  • Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  • Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  • Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Connection
    网络连接,比如一个TCP连接。
  • Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  • Broker
    表示消息队列服务器实体。

RabbitMQ 工作模式

RabbitMQ 有6中工作模式,分别是:

simple模式(即最简单的收发模式):一个生产者,一个消费者

simple模式

work工作模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。

work工作模式

publish/subscribe发布订阅:一个生产者发送的消息会被多个消费者获取。

发布订阅

routing路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key

topic 主题模式

topic 主题模式:路由模式的一种,将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

主题模式

RPC模式: 当客户端启动时,创建一个匿名的回调队列。客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。请求被发送到rpc_queue队列中。RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了

RPC模式

发布者确认:消费者确认解决的问题是确认消息是否被消费者"成功消费",发布者确认是异步发出的,可以确认单个消息或一组消息,发出确认的确切时刻取决于消息的传递模式(持久性与瞬态)以及消息路由到的队列的属性

RabbitMQ 安装

这个应该没什么说的吧,以windows为例到官网下载,安装即可(注意:rabbitMQ基于Erlang编写所以要先安装Erlang,官网也有对应连接).

安装完成后,进入RabbitMQ的安装目录/sbin,在用命令安装RabbitMq-Plugins。

控制台运行

rabbitmq-plugins enable rabbitmq_management

如果执行失败,执行rabbitmq-service stop,在输入rqbbitmq-service remove,在输入rabbitmq-service install,再输入rabbit-service start,最后重新输入rabbitmq-plugins enable rabbitmq_management

安装完成后访问localhost:15672/#/

默认账号和密码都是:guest

登录进来后,可以看到Connections、Channels、Exchanges、Queues等功能

关闭服务:
在服务中找到对应的服务关闭即可。

Windows 下 快捷键win+R 弹出小窗口,输入services.msc

RabbitMQ的基本使用

这里只是一个基本使用的示例,如果想要详细学习RabbitMQ的可以自行到官网了解。RabbitMQ并不复杂且官方也有详细的示例代码和说明。浏览器打开RabbitMQ 客户端,可以自己先熟悉熟悉使用。

我们先在Rabbit上面创建一个账号cloud

创建完成后,在上面的用户列表中可以看到新创建的账号,点击进去设置权限。

下面通过一个SpringBoot 整合RabbitMQ 的例子熟悉一下RabbitMQ的功能

新建项目模块:cloud-mq-rabbitmq,引入依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

注意这里引入的是Spring-RabbitMQ,spring-boot-starter-amqp封装了RabbitMQ 的API

新建启动类

@SpringBootApplication
public class RabbitMqApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMqApplication.class, args);
    }
}

创建RabbitMQ配置类

使用MQ前,我们需要设置一个Topic ,还可以配置交换器、路由等信息,我们这里以简单使用为主。

@Configuration
public class RabbitMqConfig {

    /**
     * 注册一个名为hello的队列
     * @return
     */
    @Bean
    public Queue helloQueue(){
        return  new Queue("hello");
    }
}

SpringBoot应用配置信息

设置RabbitMQ 连接配置信息,这里设置的用户是我们刚刚创建的cloud用户

server:
  port: 8781
spring:
  application:
    name: cloud-mq-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: cloud
    password: cloud@321 #这里是创建账号时的密码

创建消息生产者

通过注入AmqpTemplate 来发送我们自定义产生的消息,AmqpTemplate 已经为我们定义了一套AMQP协议的基本操作。我们也将信息通过log日志打印出来

@Component
@Slf4j
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){
      String message="Hello World:"+ DateUtil.now();
      log.info("Sender:"+message);
      //第一个参数是topic,第二个参数是内容
      amqpTemplate.convertAndSend("hello",message);
    }
}

创建消费者

@Component
@RabbitListener(queues = "hello")
@Slf4j
public class Receiver {

    @RabbitHandler
    public void process(String msg){
        log.info("Receiver:{}",msg);
    }
}

启动项目

启动之后,控制台可以看到程序创建了一个和127.0.0.1:5672的连接,用户名是cloud

通过前面的RabbitMQ 的web 管理可以看到Connections和Channels连接信息

创建单元测试执行send():

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class SenderTest {
    @Autowired
    private Sender sender;
    @Test
    void send() {
        for (int i = 0; i <10 ; i++) {
            sender.send();
        }
    }
}

同时可以看到web管理界面queue已经存在

上面只是演示了最简单的收发模式,RabbitMQ还有跟多特性(订阅模式、路由模式、RPC模式等),可以到RabbitMQ官网详细了解.

Spring Cloud Config 改造(RabbitMQ)

在config配置章节中我们提过,如果一个ConfigServer 有多个客户端,ConfigServer 修改了数据,这个时候需要一个个刷新客户端的/actuator/refresh 对开发很不友好,下面我们通过引入Spring Cloud Bus 总线来解决问题,ConfigServer 变更后推送给总线,由总线来向各个客户端刷新配置。过程如下图:

新建项目cloud-config-amqp-client,引入依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
    </dependencies>

启动类

@EnableDiscoveryClient
@SpringBootApplication
public class ConfigAmqpClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConfigAmqpClientApplication.class, args);
    }
}

配置文件bootstrap.yml
这里我们选用基于git 且使用服务发现的方式(先把Config Server启动起来)
较fcloud-config-native-client我们吧添加了rabbitmq相关的配置信息
并且把bus-refresh接口暴漏出来

server:
  port: 8779
spring:
  application:
    name: register-eureka-client
  cloud:
    config:
      profile: dev
      label: master
      discovery: #基于服务发现的
        enabled: true
        service-id: config-server
  rabbitmq:
    host: localhost
    port: 5672
    username: cloud
    password: cloud@321
management:
  endpoints:
    web:
      exposure:
        include: refresh,health,info,bus-refresh

注意:Spring boot 2.0的改动较大,/ bus / refresh全部整合到执行器里面了,变成了/ actuator / bus-refresh,所以之前1.x的management.security.enabled全部失效,不适用于2.0 ,2.0的性能配置是这样的:

management:
  endpoints:
    web:
      exposure:
        include:*

启动项目并postman 测试
localhost:8779/api/version get请求
返回结果

eureka-2.0

修改version的值为3.0,提交git并postman测试
localhost:8779/actuator/bus-refresh post请求
返回结果

eureka-3.0

github设置自动触发

配置一个可以直接访问的地址,不要是localhost,可以再每次变更之后动态刷新配置

这样便使用Spring Cloud Bus 与 Spring Cloud Config的整合,将RabbitMQ作为消息代理,实现了配置的动态更新。

更新时间:2020-04-26 19:22:45

本文由 寻非 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
原文链接:https://www.zhouning.group/archives/springcloud08消息总线springcloudbus之rabbitmq
最后更新:2020-04-26 19:22:45

评论

Your browser is out of date!

Update your browser to view this website correctly. Update my browser now

×