spring cloud stream入门应用
概述
详细
spring cloud stream主要包含以下核心概念和内容:
Spring Cloud Stream的应用模型
绑定抽象
持久化发布/订阅支持
消费组
消息分区
可插拔绑定api 应用模型
一、运行效果
二、实现过程
①、在父pom.xml中添加如下主要依赖
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.6.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
②、构建消费者工程实例spring-cloud-stream-receiver
1、在pom.xml中添加如下依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
其等价使用spring-cloud-stream-binder-rabbit依赖。
2、新建一个启动类,作为一个普通的springboot项目:
@SpringBootApplication public class ReceiverApp { public static void main(String[] args) { SpringApplication.run(ReceiverApp.class, args); } }
3、SpringCloudStream已经预定义了Sink、Source、Processor,具体如下
{@link org.springframework.cloud.stream.messaging.Processor},
{@link org.springframework.cloud.stream.messaging.Source},
{@link org.springframework.cloud.stream.messaging.Sink}
采用Sink作为默认的消息订阅通道,定义如下:
@EnableBinding(value = {Sink.class}) public class SinkReceiver { @StreamListener(Sink.INPUT) public void receive(Object payload) { LOGGER.info("Received from default channel : {}", payload.toString()); } }
Note:
将@EnableBinding注解至spring应用的一个配置类中,即可将spring应用变成Spring Cloud Stream应用。@EnableBinding注解本身就包含@Configuration注解,并且会触发Spring Cloud Stream基本配置;
将Sink.class作为@EnableBinding注解的参数,其指定了需要绑定的目标接口;
@StreamListener注解中描述具体监听的通道名称;
4、在application.properties文件中添加如下rabbitmq配置:
#configure rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
5、通过单元测试,实现消息的生产者:
@RunWith(SpringRunner.class) @EnableBinding(value = {ReceiverAppTest.SinkSender.class}) public class ReceiverAppTest { @Autowired private SinkSender sinkSender; @Test public void sinkSenderTester() { sinkSender.output().send(MessageBuilder.withPayload("hello this is my produce a message to " + Sink.INPUT + " channel").build()); } public interface SinkSender { @Output(Sink.INPUT) MessageChannel output(); } }
如上生产者定义主要参考org.springframework.cloud.stream.messaging.Source定义:
public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); }
为了与接收消息的通道一致,故修改@Output注解参数为Sink.INPUT;
6、启动服务,可以看到如下log,声明了一个queue,并添加了订阅者:
7、查看rabbitmq控制台的Queus页签中存在上述队列:
8、并执行单元测试可以看到如下log信息:
2020-03-05 16:27:29.874 INFO 313972 --- [jegRC3FaSe5Ww-1] com.zxh.cloud.stream.sink.SinkReceiver .receive[17] : Received from default channel : hello this is my produce a message to input channel
以上即实现了一个最简单的demo示例。
三、项目结构图
四、补充
应用通过Spring Cloud Stream插入的input和output通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。
绑定抽象
Binder绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑。这一点在实现消息总线时,从RabbitMQ切换到Kafka的过程中,已经能够让我们体验到这一好处。
目前版本的Spring Cloud Stream为主流的消息中间件产品RabbitMQ和Kafka提供了默认的Binder实现。
持久化发布/订阅支持
应用间通信遵照发布-订阅模型,消息通过共享主题进行广播。下图所示,显示了交互的Spring Cloud Stream 应用的典型布局。