SpringCloud Stream消息驱动
是什么?
Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过input通道或者output通道来与Spring Cloud Stream中binder交互,通过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互, 消息的中间件有(RabbitMQ, Kafka),简化了消息中间件的操作步骤
binder中的inputs和outputs相当于我们平常所了解的输入输出,在这里input表示接收通知,outputs表示发送通知
为什么要使用stream?
-
解耦(发布者和订阅者可以互相解藕)
-
日志处理
-
可以很好地切换消息中间件(RabbitMQ, Kafka)
-
简化了对消息中间件的操作
……
了解Stream中重要的概念 binder 绑定器
1. 发布, 订阅模式
应用间通信遵照发布-订阅模型, 消息通过共享主题进行广播.
下图显示了交互的Spring Cloud Stream 应用的典型布局未处理的传感数据发布到raw-sensor-data的Topic进行广播, Averages 和IngestHDFS同时订阅了此消息, 收到消息后触发自身的处理逻辑.
Topic可能对应不同的概念, 在RabbitMQ表示的是Exchange, Kafka中对应Topic.
2.消息分组
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings..group属性即可。
假如两个消费端都是同一个分组那么,当发布者发布一条消息的时候,消费端就会以轮询的方式来接收通知,反正都是不同的分组那就每个订阅者都会接收到通知
代码示例
先看下示例项目的目录结构
发布者8801
消费者8802
消费者8803
具体代码
8801
pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
rabbitmq:
host: localhost
cloud:
stream:
binders: # 在此处配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于binding的整合
type: rabbit # 消息中间件类型
environment: # 设置rabbitMQ的相关环境配置
spring:
rabbitmq:
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
instance-id: send-8801.com # 主机名
prefer-ip-address: true # 显示ip
业务接口
public interface IMessageProvider {
/**
* 消息发送
* @return
*/
String send();
}
发布业务实现
/**
* EnableBinding 定义消息的推送管道
*/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
/**
* 消息发送管道
*/
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());//模拟发送流水号
System.out.println("serial = " + serial);
return serial;
}
}
controller
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
发布者完成
消费者8801
pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: localhost
cloud:
stream:
binders: # 在此处配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于binding的整合
type: rabbit # 消息中间件类型
environment: # 设置rabbitMQ的相关环境配置
spring:
rabbitmq:
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: spectrumrpcA
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
instance-id: receive-8802.com #主机名
prefer-ip-address: true # 显示ip
controller:实时监控,要是订阅的有发布者发布立马接收并打印出来
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
//获取当前服务的端口号
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号,----->接收到的消息:"+ message.getPayload() +"\t port:" + serverPort);
}
}
Q.E.D.
Comments | 0 条评论