Introduction to Spring Cloud

Spring Cloud? What is that?

Spring Cloud is an extension of Spring Boot provided through various libraries and aimed at addressing different cloud-native patterns. In this case, Spring Cloud Stream aims to simplify the chaining together of services via messaging.

To use any Spring Cloud library, we need to add the following chunk to the bottom of our build.gradle file:

    dependencyManagement { 
      imports { 
        mavenBom "org.springframework.cloud:spring-cloud- 
dependencies:${springCloudVersion}" } }

This preceding fragment of code is part of Spring's Dependency Management gradle plugin, pulling in Spring Cloud BOM (Bill of Materials). In this case, it has a variable, springCloudVersion, which we need to select.

Spring Cloud has release trains, which means that each library has a version but all the versions are coordinated. By picking one train, we get a fleet of tools to pick from (and we will throughout the rest of this book!).

The Spring Cloud release train tied to Spring Boot 2.0 is Finchley, so let's put that right next to our version of Boot at the top:

    buildscript { 
      ext { 
        springBootVersion = '2.0.0.M5' 
        springCloudVersion = 'Finchley.M3' 
      } 
      ... 
    }
If you're curious about the various release trains of Spring Cloud, check out its project page at http://projects.spring.io/spring-cloud/.

With Spring Cloud's BOM and Spring Cloud Stream added to our build, let's return to configuring messaging using Spring Cloud Stream's core interfaces, as follows:

    @Controller 
    @EnableBinding(Source.class) 
    public class CommentController { 
 
      private final CounterService counterService; 
      private FluxSink<Message<Comment>> commentSink; 
      private Flux<Message<Comment>> flux; 
 
      public CommentController(CounterService counterService) { 
        this.counterService = counterService; 
        this.flux = Flux.<Message<Comment>>create( 
          emitter -> this.commentSink = emitter, 
          FluxSink.OverflowStrategy.IGNORE) 
          .publish() 
          .autoConnect(); 
      } 
 
      @PostMapping("/comments") 
      public Mono<String> addComment(Mono<Comment> newComment) { 
        if (commentSink != null) { 
          return newComment 
           .map(comment -> commentSink.next(MessageBuilder 
           .withPayload(comment) 
           .build())) 
           .then(Mono.just("redirect:/")); 
        } else { 
            return Mono.just("redirect:/"); 
        } 
      } 
 
      @StreamEmitter 
      public void emit(@Output(Source.OUTPUT) FluxSender output) { 
        output.send(this.flux); 
      } 
 
    } 

This last code is very similar to the CommentController that we created earlier in this chapter, but with the following differences:

  • @EnableBinding(Source.class) flags this app as a source for new events. Spring Cloud Stream uses this annotation to signal the creation of channels, which, in RabbitMQ, translates to exchanges and queues.
  • The constructor proceeds to set up a FluxSink, the mechanism to emit new messages into a downstream Flux. This sink is configured to ignore downstream backpressure events. It starts publishing right away, autoconnecting to its upstream source upon subscription.
  • The objects being emitted are Message<Comment>, which is Spring's abstraction for a POJO wrapped as a transportable message. This includes the ability to add headers and other information.
  • Inside addComments, if the sink has been established, it maps newComment into a Message<Comment> using Spring Messaging APIs. Finally, it transmits the message into the sink.
  • When the message is successfully emitted to Flux, a redirect is issued.
  • To transmit Flux of Message<Comment> objects, a separate method, emit, is wired up with an @StreamEmitter annotation. This method is fed a FluxSender, which provides us with a Reactor-friendly means to transmit messages into a channel. It lets us hook up the Flux tied to our FluxSink.
  • The @Output(Source.OUTPUT) annotation marks up which channel it gets piped to (visiting Source.OUTPUT reveals the channel name as output).

That's a lot of stuff packed into this controller. To better understand it, there are some fundamental concepts to realize.

First of all, it's not common practice to create a Flux and then add to it. The paradigm is to wrap it around something else. To drive this point home, Flux itself is an abstract class. You can't instantiate it. Instead, you must use its various static helper methods to craft one. So, when we want to take a behavior that is tied to users clicking on a site and link it to a Flux that was created when the application started, we need something like FluxSink to bridge these two things together.

Spring Cloud Stream focuses on chaining together streams of messages with source/sink semantics. When it comes to Reactor, this means adapting a Flux of messages onto a channel, a concept curated for several years by Spring Integration. Given that the concrete nature of the channel is abstracted away, it doesn't matter what transport technology we use. Thanks to the power of Spring Boot, this is defined by dependencies on the classpath. Nevertheless, we'll continue using RabbitMQ because it's darn simple and powerful at the same time.

By the way, we'll see this concept of connecting a sink to Flux again when we visit Chapter 8, WebSockets with Spring Boot. It's a common Reactor pattern when connecting one-off objects to established flows.

To declare a Spring Cloud Stream consumer, we merely need to update our CommentService as follows:

    @Service 
    @EnableBinding(CustomProcessor.class) 
    public class CommentService { 

At the top of CommentService, we need to add @EnableBinding(CustomProcessor.class). If this was the only Spring Cloud Stream component, we could have used @EnableBinding(Processor.class), however, we can't share the same channel, output, with the CommentController. So we need to code a custom set of channels, CustomProcessor as shown below:

public interface CustomProcessor {

String INPUT = "input";
String OUTPUT = "emptyOutput";

@Input(CustomProcessor.INPUT)
SubscribableChannel input();

@Output(CustomProcessor.OUTPUT)
MessageChannel output();

}

This custom processor is quite similar to Spring Cloud Stream's Processor:

  • It's a declarative interface.
  • It has two channel names, INPUT and OUTPUT. The INPUT channel uses the same as Processor. To avoid colliding with the OUTPUT channel of Source, we create a different channel name, emptyOutput. (Why call it emptyOutput? We'll see in a moment!)
  • The is a SubscribableChannel for inputs and a MessageChannel for outputs.

This flags our application as both a Sink as well as a Source for events. Remember how we had to subscribe earlier when consuming with RabbitTemplate?

Thankfully, Spring Cloud Stream is Reactor-friendly. When dealing with Reactive Streams, our code shouldn't be the termination point for processing. So, receiving an incoming Flux of Comment objects must result in an outgoing Flux that the framework can invoke as we'll soon see.

Further down in CommentService, we need to update our save method as follows:

    @StreamListener 
    @Output(CustomProcessor.OUTPUT) 
    public Flux<Void> save(@Input(CustomProcessor.INPUT) 
     Flux<Comment> newComments) { 
       return repository 
        .saveAll(newComments) 
        .flatMap(comment -> { 
          meterRegistry
.counter("comments.consumed", "imageId", comment.getImageId())
.increment(); return Mono.empty(); }); }

Let's tear apart this preceding updated version of save:

  • The @RabbitListener annotation has been replaced with @StreamListener, indicating that it's transport-agnostic.
  • The argument newComments is tied to the input channel via the @Input() annotation.
  • Since we've marked it as Flux, we can immediately consume it with our MongoDB repository.
  • Since we have to hand a stream back to the framework, we have marked up the whole method with @Output.
  • From there, we can flatMap it to generate metrics and then transform it into a Flux of Mono<Void> s with Mono.empty(). This ensures that no more processing is done by the framework.

This method has the same concept as all Spring @*Listener annotations--​invoke the method with optional domain objects. But this time, it receives them from whatever underlying technology we have configured Spring Cloud Stream to use. The benefit is that this is slim and easy to manage and our code is no longer bound to RabbitMQ directly.

That being said, we need to express to Spring Cloud Stream that our source and sink need to communicate through the same RabbitMQ exchange. To do so, we need to provide settings in application.yml:

    spring: 
      cloud: 
        stream: 
          bindings: 
            input: 
              destination: learning-spring-boot-comments 
              group: learning-spring-boot 
            output: 
              destination: learning-spring-boot-comments 
              group: learning-spring-boot 

This last application configuration contains the following details:

  • spring.cloud.stream.bindings is configured for both the input and the output channel's destination to be learning-spring-boot. When using RabbitMQ bindings, this is the name of the exchange and Spring Cloud Stream uses topic exchanges by default.
  • We take advantage of Spring Cloud Streams' support for consumer groups by also setting the group property. This ensures that even if there are multiple stream listeners to a given channel, only one listener will consume any one message. This type of guarantee is required in cloud-native environments when we can expect to run multiple instances.
As stated early in this book, you can use either application.properties or application.yml. If you find yourself configuring many settings with the same prefix, use YAML to make it easier to read and avoid repetition.

By the way, remember having to define a Jackson2JsonMessageConverter bean earlier in this chapter to handle serialization? No longer needed. Spring Cloud Stream uses Esoteric Software's Kryo library for serialization/deserialization (https://github.com/EsotericSoftware/kryo). That means, we can chuck that bean definition. Talk about thinning out the code!

If we run the simulator again (spring.profiles.active=simulator) and check http://localhost:8080/application/metrics, we can see our custom metrics tabulating everything.

With this, we have managed to change the comments solution and yet retain the same set of metrics.

However, by switching to Spring Cloud Stream, we have gathered a whole new fleet of metrics, as seen in this screenshot:

This is a subset (too many to fill a book) covering the input and output channels.

Remember how we wrote a custom health check in the last chapter? It would be handy to have one for RabbitMQ and its bindings. Guess what? It's already done. Check it out:

In this last screenshot, we can see the following:

  • The RabbitMQ broker is up and operational
  • Our RabbitMQ binders are operational as well

With this in place, we have a nicely working comment system.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset