spring cloud stream入门应用

发布时间:2020-03-05

概述

Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动的微服务应用。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点,实现了自动化配置的功能帮忙我们可以快速的上手使用

详细

spring cloud stream主要包含以下核心概念和内容:

  • Spring Cloud Stream的应用模型

  • 绑定抽象

  • 持久化发布/订阅支持

  • 消费组

  • 消息分区

  • 可插拔绑定api 应用模型

一、运行效果

image.png


二、实现过程

①、在父pom.xml中添加如下主要依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
</properties>
 
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.6.RELEASE</version>
</parent>
 
<dependencies>
 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
 
</dependencies>
 
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR5</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

②、构建消费者工程实例spring-cloud-stream-receiver

1、在pom.xml中添加如下依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

其等价使用spring-cloud-stream-binder-rabbit依赖。

2、新建一个启动类,作为一个普通的springboot项目:

@SpringBootApplication
public class ReceiverApp {
    public static void main(String[] args) {
        SpringApplication.run(ReceiverApp.class, args);
    }
}

3、SpringCloudStream已经预定义了Sink、Source、Processor,具体如下

{@link org.springframework.cloud.stream.messaging.Processor},

{@link org.springframework.cloud.stream.messaging.Source},

{@link org.springframework.cloud.stream.messaging.Sink}

采用Sink作为默认的消息订阅通道,定义如下:

@EnableBinding(value = {Sink.class})
public class SinkReceiver {
    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        LOGGER.info("Received from default channel : {}", payload.toString());
    }
}

Note:

将@EnableBinding注解至spring应用的一个配置类中,即可将spring应用变成Spring Cloud Stream应用。@EnableBinding注解本身就包含@Configuration注解,并且会触发Spring Cloud Stream基本配置;

将Sink.class作为@EnableBinding注解的参数,其指定了需要绑定的目标接口;

@StreamListener注解中描述具体监听的通道名称;

4、在application.properties文件中添加如下rabbitmq配置:

#configure rabbitmq

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest


5、通过单元测试,实现消息的生产者:

@RunWith(SpringRunner.class)
@EnableBinding(value = {ReceiverAppTest.SinkSender.class})
public class ReceiverAppTest {
    @Autowired
    private SinkSender sinkSender;
 
    @Test
    public void sinkSenderTester() {
        sinkSender.output().send(MessageBuilder.withPayload("hello this is my produce a message to " + Sink.INPUT + " channel").build());
    }
 
    public interface SinkSender {
        @Output(Sink.INPUT)
        MessageChannel output();
    }
}


  • 如上生产者定义主要参考org.springframework.cloud.stream.messaging.Source定义:

public interface Source {
   String OUTPUT = "output";
   @Output(Source.OUTPUT)
   MessageChannel output();
}


  • 为了与接收消息的通道一致,故修改@Output注解参数为Sink.INPUT;


6、启动服务,可以看到如下log,声明了一个queue,并添加了订阅者:

image.png

7、查看rabbitmq控制台的Queus页签中存在上述队列:

8、并执行单元测试可以看到如下log信息:

2020-03-05 16:27:29.874  INFO 313972 --- [jegRC3FaSe5Ww-1] com.zxh.cloud.stream.sink.SinkReceiver  .receive[17] : Received from default channel : hello this is my produce a message to input channel


以上即实现了一个最简单的demo示例。



三、项目结构图

image.png

四、补充

应用通过Spring Cloud Stream插入的input和output通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。


绑定抽象

Binder绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑。这一点在实现消息总线时,从RabbitMQ切换到Kafka的过程中,已经能够让我们体验到这一好处。

目前版本的Spring Cloud Stream为主流的消息中间件产品RabbitMQ和Kafka提供了默认的Binder实现。


持久化发布/订阅支持

应用间通信遵照发布-订阅模型,消息通过共享主题进行广播。下图所示,显示了交互的Spring Cloud Stream 应用的典型布局。



本实例支付的费用只是购买源码的费用,如有疑问欢迎在文末留言交流,如需作者在线代码指导、定制等,在作者开启付费服务后,可以点击“购买服务”进行实时联系,请知悉,谢谢
手机上随时阅读、收藏该文章 ?请扫下方二维码