SpringCloud Stream消息驱动

是什么?

Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过input通道或者output通道来与Spring Cloud Stream中binder交互,通过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互, 消息的中间件有(RabbitMQ, Kafka),简化了消息中间件的操作步骤

binder中的inputs和outputs相当于我们平常所了解的输入输出,在这里input表示接收通知,outputs表示发送通知

为什么要使用stream?

  • 解耦(发布者和订阅者可以互相解藕)

  • 日志处理

  • 可以很好地切换消息中间件(RabbitMQ, Kafka)

  • 流量削峰

  • 简化了对消息中间件的操作

    ……

了解Stream中重要的概念 binder 绑定器

1. 发布, 订阅模式

应用间通信遵照发布-订阅模型, 消息通过共享主题进行广播.
下图显示了交互的Spring Cloud Stream 应用的典型布局未处理的传感数据发布到raw-sensor-data的Topic进行广播, Averages 和IngestHDFS同时订阅了此消息, 收到消息后触发自身的处理逻辑.
Topic可能对应不同的概念, 在RabbitMQ表示的是Exchange, Kafka中对应Topic.

2.消息分组

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.
.group属性即可。

假如两个消费端都是同一个分组那么,当发布者发布一条消息的时候,消费端就会以轮询的方式来接收通知,反正都是不同的分组那就每个订阅者都会接收到通知

代码示例

先看下示例项目的目录结构

发布者8801

image-20200404234222849

消费者8802

消费者8803

具体代码

8801

pom

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: localhost
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitMQ的服务信息
        defaultRabbit: # 表示定义的名称,用于binding的整合
          type: rabbit # 消息中间件类型
          environment: # 设置rabbitMQ的相关环境配置
            spring:
              rabbitmq:
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
    lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
    instance-id: send-8801.com # 主机名
    prefer-ip-address: true # 显示ip

业务接口

public interface IMessageProvider {
    /**
     * 消息发送
     * @return
     */
    String send();
}

发布业务实现

/**
 * EnableBinding 定义消息的推送管道
 */
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());//模拟发送流水号
        System.out.println("serial = " + serial);
        return serial;
    }
}

controller

@RestController
public class SendMessageController {

    @Resource
    private IMessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
         return messageProvider.send();
    }
}

发布者完成

消费者8801

pom

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

yml

server:
  port: 8802


spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: localhost
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitMQ的服务信息
        defaultRabbit: # 表示定义的名称,用于binding的整合
          type: rabbit # 消息中间件类型
          environment: # 设置rabbitMQ的相关环境配置
            spring:
              rabbitmq:
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: spectrumrpcA

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
    lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
    instance-id: receive-8802.com #主机名
    prefer-ip-address: true # 显示ip

controller:实时监控,要是订阅的有发布者发布立马接收并打印出来

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    //获取当前服务的端口号
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1号,----->接收到的消息:"+ message.getPayload() +"\t port:" + serverPort);
    }
}

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

只有不断的努力才会有更大的惊喜等着你去发现!!