This codelab is a follow-on to the Spring Cloud Stream Basics one. If you aren't yet familiar with the Spring Cloud Stream framework go ahead and jump over there to get a quick introduction to the framework. This codelab will go into more detail (Beyond the Basics π) of developing your Cloud Stream microservice with imperative functions. We will be using the Solace binder and Event Broker thoughout. The majority of features we will learn today apply at the framework level and do not depend on the underlying broker/binder of choice, however a few will be Solace specific and I will try to specify that when necessary.
Also note that while Spring Cloud Stream supports both imperative and reactive functions this codelab will be focused on the use of imperative ones.
π‘ You'll Learn:
π Valuable Resources:
π This page covers the setup needed to perform this codelab. π
β Spring Cloud Stream just requires Java and Maven to use π
java -version
)mvn -version
)brew install maven
β (1) Sign up for a free Solace Cloud account
Navigate to this link and fill out the required information. No credit card required! If you already have an account, skip right to #2.
β (2) Create a messaging service
After you create your Solace Cloud account and sign in to the Solace Cloud Console, you'll be routed to the Solace Cloud Landing page.
Click on βCluster Manager' and all the messaging services associated with your account will show up if you have any already created. To create a new service, click either button as depicted in the image below:
Fill out all the details for your messaging service, and then click "Create" at the bottom of the page. Note: make sure you choose the "Developer" option for the Service Type
Your service should be ready to use in a few minutes
β (3) Take note of the connection details
If your messaging service was created successfully, you'll be routed to the summary page of your new messaging service. From the service summary page, click on the "Connect" tab so we can take note of the connection details we'll need later.
After you click the "Connect" tab, sort the supported client library menu by Language
and click on the "Connect with Spring" box to expand it.
Click on the Get Started button next to the Spring Cloud Stream option.
Take note of the "Connect to Service" section and you'll see that the connection details are already configured in the spring.cloud.stream.binders
part of the config to connect a Spring Cloud Stream microservice to your PubSub+ Messaging Service. We'll be using this soon π
β (1) Create an Empty Spring Cloud Stream Microservice to use throughout this codelab
Navigate to start.spring.io and choose the "Solace PubSub+" and the "Cloud Stream" dependencies. If you used that link we already added them for you π.
You can leave everything else as the default or modify the Java version, package info, etc. as desired.
Click the "Generate" button which will download the project. You can then unzip it and import the project into your preferred Java IDE as a maven project. I'll be using Spring Tool Suite 4 for Eclipse in the screenshots. Your imported project should look like this:
β (2) Add your Connection Details from the Messaging Service
Change the application.properties
file to application.yml
and copy and paste the spring.cloud.stream.binders
part of the Spring Cloud Stream configuration previously found in the "Connect to Service" widget in your Message Service. It should look something like this, but with the values that were provided by your Message Service.
spring:
cloud:
stream:
binders:
solace-cloud:
type: solace
environment:
solace:
java:
clientUsername: default
clientPassword: default
host: 'tcp://localhost:55555'
msgVpn: default
π Setup complete! Let's get going! π
As discussed in the Spring Cloud Stream Basics codelab, the Cloud Stream framework supports multiple communication models. Since the Solace Binder supports both Publish-Subscribe and Consumer Groups we will focus on those. Note that this decision is made on an input binding, or where you configure consumption of events/messages.
Before we jump into choosing our consumption model, which is done via configuration, let's create a quick Consumer
app that we can use.
DemoApplication.java
file in your microservice.Consumer
function that receives a String
@Bean
public Consumer<String> myConsumer(){
return v -> {
System.out.println("Received: " + v);
};
}
Now that we have a function we'll look at the different options to configure the binding which tells the binder how to connect your function to the underlying message broker.
When using Spring Cloud Stream with the Solace Binder you can decide between 3 options:
There are many use cases where you need to process events in order as they are published to a defined topic. In order to do this with Spring Cloud Stream you would create your Function
or Consumer
function and configure the input binding for it to receive messages from a specific destination. There are two ways to do this with the Solace binder. The standard way, with no Solace specific configurations, is non-durable and we'll start there.
π Under the covers this option will use a Non-Durable Anonymous Queue on the broker to hold the messages for the consuming microservice. Let's check it out! This option will deliver events in order to your microservice while it remains up and running. To do this we just need to specify a destination
, but NO group
on your input binding.
So open your application.yml
file and add the following config to what already exists. This configuration is telling Spring Cloud Stream that you want your myConsumer
function to receive events that are published to the spring/cloud/stream
topic.
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: spring/cloud/stream
Go ahead and run your app from your IDE or use mvn clean spring-boot:run
. You should see the app start up and connect to the event broker. Note that under the covers the Solace Binder will bind your function to a Non-Durable Anonymous (or Temporary) queue on the broker.
This endpoint type in Solace is a temporary queue that will deliver messages to your app in order while your app remains online. It however is NOT a durable endpoint and will be removed after your application goes offline for more than 30 seconds.
You'll see the name of your Anonymous Queue shown in a log entry on the console from the SolaceQueueProvisioner class:
SolaceQueueProvisioner : Subscribing queue #P2P/QTMP/v:b0e95afab69a/scst/an/59e7cbe3-ec38-4da8-a8fa-d1e428f6eb56/plain/spring/cloud/stream to topic spring/cloud/stream
π Let's test this out using the "Try-Me" tab in the PubSub+ Manager.
localhost:8080
by default when using Software but otherwise get from your Adminadmin
and admin
default
spring/cloud/stream
topicThis option is great when we don't need to store messages/events when the app is offline, but what if we do!?
Sometimes it isn't enough to be able to deliver messages/events only while the app remains connected to the broker; there are many scenarios where you want messages stored while the app is offline while still maintaining order. In this case the Solace Binder allows you to follow the Publish-Subscribe pattern and consume from an Exclusive Durable Queue that will do just that.
In order to modify the input binding to create an Exclusive Durable Queue we need to add a group
to our binding and also set the Solace specific property queueAccessType
to use an exclusive queue. We do that with the configuration below.
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: spring/cloud/stream
group: exclusive
solace:
bindings:
myConsumer-in-0:
consumer:
queue-access-type: 1 #1 is Exclusive; 0 is Non-Exclusive (and default)
Now if you restart your app in your IDE or use ctrl-c
followed by mvn clean spring-boot:run
on the cli you'll see that a durable queue was created.
SolaceQueueProvisioner : Subscribing queue scst/wk/exclusive/plain/spring/cloud/stream to topic spring/cloud/stream
And if you look in PubSub+ Manager you'll find that the queue is "Exclusive" which means that the first microservice which connects to it will receive all the messages in the order they are received by the broker.
π Go ahead and test it out by sending more messages with the "Try-Me" tab. Now that the queue is durable you can even stop your app, send a bunch of messages and then start it back up to see that they are delivered in order.
Another popular consuming pattern is using what are called Consumer Groups. Consumer groups in Spring Cloud Stream allow you to have multiple consumers sharing the processing of events. To do this you just need to specify a group
on your input binding.
To configure this on our app that has the myConsumer
Consumer function your config would look like this.
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: spring/cloud/stream
group: nonexclusive
When using the Solace Binder and specifying a group
the binder will actually create a Durable Non-Exclusive Queue Endpoint by default. This durable queue will hold messages for your microservices if they get disconnected. Go ahead and run the app via your IDE or using mvn clean spring-boot:run
, you'll see the following log message that specifies the queue.
SolaceQueueProvisioner : Subscribing queue scst/wk/nonexclusive/plain/spring/cloud/stream to topic spring/cloud/stream
And in the PubSub+ Manager you'll see that it created a non-exclusive queue:
So we're using the Consumer Group pattern (which uses a Non-Exclusive Queue!) so we can have multiple consumers share the processing of events. What are the choices for scaling!?
Option 1 - Multiple Instances of your Microservice The first option is just to start up more instances of your microservice. This is common in a Kubernetes environment where your microservice runs in it's own container. Each instance of the microservice will connect to the same non-exclusive queue on the Solace broker and the broker will spread the messages across all consumers.
Option 2 - Concurrent Message Consumption in a Single Microservice The second option is to use the concurrency consumer property to enable concurrent message consumption for a particular consumer binding. Under the covers the Solace binder will create a separate flow for each concurrent consumer that will handle events in it's own thread. Note that this approach allows the concurrent consumers to share the same Solace Session.
Modify your microservice's configuration to set a concurrency of 5 like seen below:
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: spring/cloud/stream
group: nonexclusive
consumer:
concurrency: 5
When you restart your app in your IDE or use ctrl-c
followed by mvn clean spring-boot:run
on the cli you'll see that the Solace binder creates 5 separate "flow receivers". Note that they all share the same Solace session and will process events on separate threads.
JCSMPInboundChannelAdapter : Creating consumer 1 of 5 for inbound adapter 5722ebd9-7f2a-40ba-b635-235a86938638
FlowReceiverContainer : Binding flow receiver container 76e517b1-d1a8-4ab2-975f-e4eb0b12535a
JCSMPInboundChannelAdapter : Creating consumer 2 of 5 for inbound adapter 5722ebd9-7f2a-40ba-b635-235a86938638
FlowReceiverContainer : Binding flow receiver container 0aba1d4b-fe21-42ed-ae3b-db6fc5a62d61
JCSMPInboundChannelAdapter : Creating consumer 3 of 5 for inbound adapter 5722ebd9-7f2a-40ba-b635-235a86938638
FlowReceiverContainer : Binding flow receiver container 67fa395b-6f50-40b5-ab34-b7fff446c50e
JCSMPInboundChannelAdapter : Creating consumer 4 of 5 for inbound adapter 5722ebd9-7f2a-40ba-b635-235a86938638
FlowReceiverContainer : Binding flow receiver container 798380cc-6140-4194-9f14-4bbfe7821485
JCSMPInboundChannelAdapter : Creating consumer 5 of 5 for inbound adapter 5722ebd9-7f2a-40ba-b635-235a86938638
β If you modify your code to add a logger and change the print line to a log entry, as seen below:
package com.example.demo;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class DemoApplication {
private static final Logger logger = LoggerFactory.getLogger(DemoApplication.class);
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Consumer<String> myConsumer(){
return v -> {
logger.info("Received: " + v);
};
}
}
π And send a bunch of messages from the "Try-Me" tab you'll see that each message is handled in it's own thread allowing one microservice to process multiple messages at once!
2021-05-13 16:37:34.137 INFO 85614 --- [pool-4-thread-1] com.example.demo.DemoApplication : Received: Hello World
2021-05-13 16:37:34.237 INFO 85614 --- [pool-4-thread-2] com.example.demo.DemoApplication : Received: Hello World
2021-05-13 16:37:34.314 INFO 85614 --- [pool-4-thread-3] com.example.demo.DemoApplication : Received: Hello World
2021-05-13 16:37:34.333 INFO 85614 --- [pool-4-thread-4] com.example.demo.DemoApplication : Received: Hello World
2021-05-13 16:37:34.377 INFO 85614 --- [pool-4-thread-5] com.example.demo.DemoApplication : Received: Hello World
In most cases you'll want to do some performance testing to see what mix of scaling works best for you! Maybe you have 2 instances of your microservice each with a concurrency
of 5. Unfortunately there is no magic answer that works across the board.
By default when coding your Spring Cloud Stream microservice you are writing Spring Cloud Function beans that can be re-used for multiple purposes and can leverage the framework's Content Type Negotiation to pass your POJOs directly into the function while decoupling your business logic from the specific runtime target and triggering mechanism (web endpoint, stream processor, task). This is convenient, but sometimes when creating a function for a stream processor our business logic requires the use of metadata in the message headers that we need access to on the Consuming side or need to set on the Publishing side.
We'll start with the consuming side. In order to get access to the headers you'll need to set the input argument to a Message<
?>
type. Once you have the Spring Message object you can retrieve a map of the headers using the getHeaders()
method. Note that because the input argument is now a Message<
?>
you would now use the getPayload()
method to get the actual payload itself.
For example, if we modify the Consumer
from the previous section to take in a Message<
String>
we can now access the headers as seen below.
@Bean
public Consumer<Message<String>> myConsumer(){
return v -> {
logger.info("Received: " + v.getPayload());
logger.info("All Headers: " + v.getHeaders());
};
}
And if we want to access individual headers we can then read them from that map. Note that a list of common Solace Headers are found in the Solace Binder docs.
@Bean
public Consumer<Message<String>> myConsumer(){
return v -> {
logger.info("Received: " + v.getPayload());
logger.info("Destination: " + v.getHeaders().get("solace_destination"));
logger.info("TTL: " + v.getHeaders().get("solace_timeToLive"));
};
}
On the source/publishing side of things we sometimes also need to set headers that downstream listeners may need access to. In order to do this we will need the output argument of our Function to also be a ``Message<?
>object. Note that if you don't return a
Message<?
>object the framework will re-use the headers on the inbound message on the outbound one minus the headers defined or filtered by *SpringIntegrationProperties.messageHandlerNotPropagatedHeaders* or the Solace Binder
headerExclusions` producer property
For example, the code below sets a header named "Key" to the value "Value" on an outbound message.
@Bean
Supplier<Message<String>> mySupplier(){
return () -> {
return MessageBuilder.withPayload("Hello Headers").setHeader("Key", "Value").build();
};
}
In order to run it go ahead and modify your app config to look like the below:
spring:
cloud:
function:
definition: myConsumer;mySupplier
stream:
poller:
fixed-delay: 10000
bindings:
myConsumer-in-0:
destination: spring/cloud/stream
group: nonexclusive
consumer:
concurrency: 5
mySupplier-out-0:
destination: spring/cloud/stream
And modify your Consumer
function to print out the "Key" header:
@Bean
public Consumer<Message<String>> myConsumer() {
return v -> {
logger.info("Received myConsumer: " + v.getPayload());
logger.info("Destination: " + v.getHeaders().get("solace_destination"));
logger.info("TTL: " + v.getHeaders().get("solace_timeToLive"));
logger.info("My Custom Header: " + v.getHeaders().get("Key"));
};
}
Note the Solace Binder offers two producer properties that may come in handy for publishing apps that want to set headers:
spring.cloud.stream.solace.bindings.BINDING_NAME.producer.headerExclusions
property allows you to exclude headers from the published message.spring.cloud.stream.solace.bindings.BINDING_NAME.producer.nonserializableHeaderConvertToString
property allows you to include the toString
version of a non-serialiazable header. Note that if this is not set to true and a non-serializable header is set an exception would be thrown.Since we're using the Solace binder we really want to be able to make topic subscriptions with wildcards. The good news is that we're in luck! There are 2 different options for configuring your topic subscriptions on consuming functions. Both options are configured in the Spring application properties.
The first way to do it is in the destination
property itself. The Solace binder uses the destination
property to both name the queue that the app will bind to, but also as a topic subscription on the queue.
To test this functionality out go ahead and change the configuration of myConsumer-in-0
to have a destination using wildcards and restart your app in your IDE or using ctrl-c
followed by mvn clean spring-boot:run
on the cli.
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: 'spring/*/stream/>'
group: nonexclusive
consumer:
concurrency: 5
Now go ahead and use the "Try-Me" tab to send a few test messages that match the pattern. Maybe publish to spring/cloud/stream/5
and spring/boot/stream/anything
. You should see that the application receives the messages!
If you were to navigate to the queue in the PubSub+ Manager you'll see that the created queue substituted the wildcards with underscores in the queue name as they are invalid characters in a queue name, but applied the proper topic subscription to the queue.
The second place you can add topic subscriptions and also use wildcards when using the Solace binder is using the queueAdditionalSubscriptions
consumer property. This property is available under spring.cloud.stream.solace.bindings.BINDING_NAME.consumer.queueAdditionalSubscriptions
and allows for 1 to many extra topic subscriptions to be added.
For example, if we wanted to add pub/*/plus
and a/b/>
subscriptions to our app we could add those subscriptions to the queue by doing the following:
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: 'spring/*/stream/>'
group: nonexclusive
consumer:
concurrency: 5
solace:
bindings:
myConsumer-in-0:
consumer:
queueAdditionalSubscriptions:
- 'a/b/>'
- 'pub/*/plus'
After restarting our app we can see that the subscriptions on our queue have been updated to include our two additions:
π₯ We can now use Solace wildcards to filter for the exact events that we're interested in!
Now that we know how to subscribe using wildcards we're halfway to taking advantage of Solace's dynamic topics. The second part of that is of course to be able to publish to whatever topic we need to! We want to publish to a unique topic for each event, no worries I've got you covered!
The Spring Cloud Stream framework allows for 2 different ways to publish to dynamic topics using the imperative style.
StreamBridge
BinderHeaders.TARGET_DESTINATION
headerThe first option for dynamic publishing is using StreamBridge
. Note that this option is processed at the framework level and will work with any Cloud Stream binder that you choose to use. StreamBridge
will cache a channel within Spring for each destination that you publish to. This options is solid if you're going to be publishing to a small number of destinations as the channel will remain in cache and you can lookup the channel for monitoring/metrics if you desire. Note that the number of channels cached is configurable via spring.cloud.stream.dynamic-destination-cache-size
.
Let's update our myConsumer
function to publish to dynamic topics.
@Bean
public Consumer<Message<String>> myConsumer(StreamBridge sb) {
return v -> {
logger.info("Received myConsumer: " + v.getPayload());
logger.info("CorrelationID: " + v.getHeaders().get("solace_correlationId"));
// Use whatever business logic you'd like to figure out the topic!
String cid = (String) v.getHeaders().get("solace_correlationId");
if (cid == null) {
cid = Integer.toString(1);
}
String myTopic = "solace/cid/".concat(cid);
logger.info("Publishing to: " + myTopic);
sb.send(myTopic, v.getPayload());
};
}
π To test this out go ahead and open up the "Try-Me" tab. On the Subscriber side subscribe to "solace/cid/>". On the Publisher side, click "Show Advanced", set a "Correlation ID" of 1 and send a message to the a/b/c
topic. Change the "Correlation ID" value to 2 and send again. You should now see that the app is publishing to dynamic topics and your Subscriber is consuming them.
You should see something like the image below:
β Now you know how to use StreamBridge to publish to dynamic topics!
The second way to publish to a dynamic destination is to use the BinderHeaders.TARGET_DESTINATION
header. Note that this option will only work with binders that explicity support the feature, including Solace. When setting this header the framework is actually delegating the dynamic publishing to the Binder itself and therefore may result in better performance than the StreamBridge option depending on the binder's implementation. π If using the Solace binder this dynamic publishing option results in lower latencies as the Binder doesn't create/cache Spring Integration channels.
Different than when using StreamBridge, when using the BinderHeaders.TARGET_DESTINATION
option you would actually use a Function
or Supplier
and return a Message<
?>
with the header set to the destination you'd like the message to be published to. If the BinderHeaders.TARGET_DESTINATION
header is set it will override the default destination
that is configured on the output binding. This allows you to configure a default destination that is used a majority of the time and only override it when necessary if desired.
Let's create a myFunction
Function that will perform the same processing that we just implemented with StreamBridge
.
Your code will look something like this:
@Bean
public Function<Message<String>, Message<String>> myFunction() {
return v -> {
logger.info("Received myFunction: " + v.getPayload());
logger.info("CorrelationID: " + v.getHeaders().get("solace_correlationId"));
// Use whatever business logic you'd like to figure out the topic!
String cid = (String) v.getHeaders().get("solace_correlationId");
if (cid == null) {
cid = Integer.toString(1);
}
String myTopic = "solace/cid/".concat(cid);
logger.info("Publishing to: " + myTopic);
return MessageBuilder.withPayload(v.getPayload()).setHeader(BinderHeaders.TARGET_DESTINATION, myTopic).build();
};
}
Go ahead and update your Spring configuration to look like the following. This configuration sets the input and output bindings for your function myFunction
and also sets the default output topic to my/default/topic
which the app overrides.
spring:
cloud:
function:
definition: myFunction
stream:
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: nonexclusive
consumer:
concurrency: 5
myFunction-out-0:
destination: 'my/default/topic'
π Go ahead and test the function by repeating the "Try-Me" steps that we used above to test the StreamBridge
implementation. You should see the same result :)
Sometimes when following the Supplier
or the Function
pattern you may need to send more than one output message for each one that you process. As we saw earlier, you can use StreamBridge to send messages whenever you'd like, but there is also another option. That options is to return a Collection<
Message<
?>>
object in your Function. When doing this Spring Cloud Stream will send each member of the collection as it's own message.
π Let's check it out! Go ahead and comment out your previous code and create a myFunction
function that takes in a String and returns a Collection<
Message<
String>>
. It should look something like this:
@Bean
public Function<String, Collection<Message<String>>> myFunction() {
return v -> {
logger.info("Received: " + v);
ArrayList<Message<String>> msgList = new ArrayList<Message<String>>();
msgList.add(MessageBuilder.withPayload("Payload 1").build());
msgList.add(MessageBuilder.withPayload("Payload 2").build());
msgList.add(MessageBuilder.withPayload("Payload 3").build());
return msgList;
};
}
The code above will result in 3 messages being sent to the default output binding destination each time a message is received on the input binding.
If you still have the application properties configured from the previous section you can leave them be, if not go ahead and add these properties. Note that the default output binding destination is my/default/topic
.
spring:
cloud:
function:
definition: myFunction
stream:
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: nonexclusive
myFunction-out-0:
destination: 'my/default/topic'
π Test it out by starting your app via your IDE or using mvn clean spring-boot:run
inside of your project. Use the "Try-Me" Subscriber to subscribe to the my/default/topic/
topic and then use the Publisher to send a message to the a/b/c
topic. You should see your Subscriber receive 3 messages for each message that you send π.
π± That's cool! But what's even cooler? You can combine this batch publishing functionality with the dynamic publishing tricks we learned earlier π±
Check out this Function noting a few things: π₯³ You can use both StreamBridge & BinderHeaders.TARGET_DESTINATION in conjunction with Batch Publishing π When using StreamBridge you don't need to actually build the Message object, just like before π€ When using BinderHeaders.TARGET_DESTINATION you can have some messages go to the default binding destination and others go to dynamic ones!
@Bean
public Function<String, Collection<Message<String>>> myFunction(StreamBridge sb) {
return v -> {
logger.info("Received: " + v);
// Do some processing & use StreamBridge to send an Alert to a dynamic topic
sb.send("some/other/topic/1", v);
// Do some more processing and create a list of messages to send upon returning
ArrayList<Message<String>> msgList = new ArrayList<Message<String>>();
// Send to default topic
msgList.add(MessageBuilder.withPayload("Payload 1").build());
// Send to dynamic topics using BinderHeaders.TARGET_DESTINATION
msgList.add(MessageBuilder.withPayload("Payload 2").setHeader(BinderHeaders.TARGET_DESTINATION, "some/other/topic/2").build());
msgList.add(MessageBuilder.withPayload("Payload 3").setHeader(BinderHeaders.TARGET_DESTINATION, "some/other/topic/3").build());
return msgList;
};
}
π Add a topic subscription of some/other/topic/>
to your "Try-Me" Subscriber, resend a message to a/b/c
and checkout what your Subscriber receives.
You'll note that you received 1 message on the default binding destination of my/default/topic
and the other 3 messages went to some/other/topic/X
where X is a number as defined in the code.
Batch consumers are a feature that allows consuming messages in batches rather than individually. This feature is particularly useful when dealing with high-volume data processing scenarios where processing messages individually may introduce performance overhead.
Batching of messages is applied only when the spring.cloud.stream.bindings.
<binding-name
>.consumer.batch-mode
β―is set toβ―true. The number of messages in a batch is dependent on the availability of messages in the queue and the timeout to receive messages from the queue.
Other parameters that can control the batching are:
batch-max-size
The maximum number of messages per batch. Only applicable whenβ―batch-mode
β―isβ―true
, and the default value isβ―255
batch-timeout
The maximum wait time in milliseconds to receive a batch of messages. If this timeout is reached, then the messages that have already been received will be used to create the batch. A value ofβ―0
β―means wait forever, and the default value is 5000 Only applicable whenβ―batch-mode
β―isβ―true
.
It should be noted that a batch that the binder creates is a collection of individual messages and must not to be treated as a single consistent unit.
π Let's check it out!
@Bean
Consumer<Message<List<String>>> batchConsume() {
return batchMsg -> { // (1)
List<?> data = batchMsg.getPayload();
MessageHeaders headers = batchMsg.getHeaders();
List<?> dataHeaders = (List<?>) headers.get(SolaceBinderHeaders.BATCHED_HEADERS);
log.info("Received Batch Size: " + data.size());
for (int i=0; i< data.size(); i++) {
log.info("Batch Headers: " + dataHeaders.get(i));
log.info("Batch Payload: " + new String((byte[]) data.get(i), StandardCharsets.UTF_8));
}
};
}
The code above will receive batches of messages based on the batch settings set on the consumer on the solace binding in the application configuration.
We will amend the application properties from the previous section with addition of a consumer with batch settings configured.
Note that the previous exercise upon receiving the message on the processor, published multiple messages to dynamic topic solace/other/topic/>
. We will build the consumer subscribing to these messages.
spring:
cloud:
function:
definition: myFunction;batchConsume
stream:
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: nonexclusive
myFunction-out-0:
destination: 'my/default/topic'
batchConsume-in-0:
destination: 'some/other/topic/>'
group: batch
consumer:
batch-mode: true
useNativeDecoding: true
solace:
bindings:
batchConsume-in-0:
consumer:
batch-max-size: 5
batch-timeout: 5000
π Test it out by starting your app via your IDE or using mvn clean spring-boot:run
inside of your project. Use the "Try-Me" Publisher to send a message to the a/b/c
topic. You should see your Subscriber receive 3 messages for each message that you send π. No batching yet, because the number of messages is less than the set max batch size.
Now, go ahead and click on the publish button of "Try-Me" publisher 4 times. On the console, you should be able to see the messages were delivered in batches of 5+5+2.
2023-07-06T07:44:42.824+05:30 INFO 9858 --- [ool-12-thread-1] com.example.demo.DemoApplication :
Received: Hello world!
2023-07-06T07:44:42.845+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Received Batch Size: 5
2023-07-06T07:44:42.845+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/1, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001eaf, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, target-protocol=kafka, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=181ee60e-0809-8121-34e2-e8ad11609457, contentType=application/json, timestamp=1688609682842}
2023-07-06T07:44:42.845+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Hello world!
2023-07-06T07:44:42.845+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/2, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001eb2, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=2e640bb5-a047-a275-2e77-1057acd8359c, contentType=application/json, timestamp=1688609682843}
2023-07-06T07:44:42.845+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 2
2023-07-06T07:44:42.852+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/3, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001eb4, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=6f07d2ad-a433-2615-0a56-0fd9cd893332, contentType=application/json, timestamp=1688609682843}
2023-07-06T07:44:42.852+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 3
2023-07-06T07:44:42.853+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/1, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001eb8, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, target-protocol=kafka, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=6b57ac4a-c4c3-e476-bcb7-0f8b268880fa, contentType=application/json, timestamp=1688609682844}
2023-07-06T07:44:42.853+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Hello world!
2023-07-06T07:44:42.857+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/2, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001ebb, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=751a506f-2598-d369-9d33-0bbefdf0583e, contentType=application/json, timestamp=1688609682844}
2023-07-06T07:44:42.857+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 2
2023-07-06T07:44:43.173+05:30 INFO 9858 --- [ool-12-thread-1] com.example.demo.DemoApplication :
Received: Hello world!
2023-07-06T07:44:43.540+05:30 INFO 9858 --- [ool-12-thread-1] com.example.demo.DemoApplication :
Received: Hello world!
2023-07-06T07:44:43.548+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Received Batch Size: 5
2023-07-06T07:44:43.548+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/3, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001ebd, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=0b86f672-97e9-1167-fc5f-cc2152eed3c0, contentType=application/json, timestamp=1688609683546}
2023-07-06T07:44:43.548+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 3
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/1, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001ec1, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, target-protocol=kafka, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=5f48b49a-625f-c5f1-b157-252d5a0f721d, contentType=application/json, timestamp=1688609683547}
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Hello world!
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/2, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001ec4, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=82252888-6e99-4175-8cc9-bf81db0db086, contentType=application/json, timestamp=1688609683547}
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 2
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/3, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001ec6, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=85fdd815-e914-f375-0233-daad0929adac, contentType=application/json, timestamp=1688609683548}
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 3
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/1, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001eca, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, target-protocol=kafka, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=38b762fb-7a48-feb2-e97f-1619a8783ca0, contentType=application/json, timestamp=1688609683548}
2023-07-06T07:44:43.549+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Hello world!
2023-07-06T07:44:48.563+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Received Batch Size: 2
2023-07-06T07:44:48.563+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/2, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001ecd, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=d6179b9e-b3c0-957d-639e-3d293596245c, contentType=application/json, timestamp=1688609688563}
2023-07-06T07:44:48.564+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 2
2023-07-06T07:44:48.564+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Headers: {solace_scst_messageVersion=1, solace_expiration=0, solace_destination=some/other/topic/3, solace_replicationGroupMessageId=rmid1:24c78-197d007757c-00000000-00001ecf, solace_isReply=false, solace_timeToLive=0, solace_receiveTimestamp=0, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=4, solace_redelivered=false, id=ba3610fb-67f0-a0d3-1a6c-17a11251b2cb, contentType=application/json, timestamp=1688609688563}
2023-07-06T07:44:48.564+05:30 INFO 9858 --- [ool-13-thread-1] com.example.demo.DemoApplication :
Batch Payload: Payload 3
By default when using Spring Cloud Stream with imperative functions (not reactive!) it automatically acknowledges a message when the Function successfully exists. However sometimes you want more control. In this section we'll cover how you can use client/manual acknowledgements to handle this situation.
Using Client/Manual Acknowledgements can be simplified into a two step process:
Since we're dealing with acknowledgements we know we're essentially dealing with a message/event broker and we'll need to receive the Message>
object itself. Once we have the Message>
object we can access the AcknowledgementCallback
in the header and disable auto ack as follows:
// Disable Auto-Ack
AcknowledgmentCallback ackCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(v);
ackCallback.noAutoAck();
Now that we've disabled auto ack we are in charge of handling the Acknowledgement and can choose from 3 options using Spring's AckUtils
. When using Manual Acknowledgements make sure you ALWAYS acknowledge the message! We'll talk more about when you should use these options in the Consumer Error Handling section of this codelab.
When using the Solace binder and handing your events in multiple threads you'll also want to ensure that you catch the SolaceAcknowledgementException
which may get thrown in a REQUEUE scenarios.
// Acknowledge the Message!
try {
AckUtils.accept(ackCallback);
//AckUtils.requeue(ackCallback);
//AckUtils.reject(ackCallback);
} catch (SolaceAcknowledgmentException e) {
//TODO Log a warning? Message was re-queued on broker and will be re-delivered to a consumer
}
If using the Solace Binder you can learn how it handles the different AckUtils Status options in the Manual Message Acknowledgement binder docs. Refer to the AckUtils documentation and AcknowledgmentCallback documentation for more info on these objects at the Spring level.
π Let's go ahead and put it all together with a simple sample Function (Comment out previous code) that receives a Message<
String>
, disable auto-ack, executes some simple business logic and decides whether it wants to accept, reject or requeue a message.
1οΈβ£ First off let's go ahead and change our application configuration to create a fresh queue and set queueMaxMsgRedelivery
so we don't get stuck in an infinite loop of rejecting/receiving the same message over and over again. Note that the queue name will be different because we changed the group to clientAck
and the group is used as part of the queue naming convention.
spring:
cloud:
function:
definition: myFunction
stream:
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: clientAck
myFunction-out-0:
destination: 'my/default/topic'
solace:
bindings:
myFunction-in-0:
consumer:
queueMaxMsgRedelivery: 2
2οΈβ£ Next go ahead and modify your java code to add the following Function (comment out or delete other code so it doesn't interfere)
@Bean
public Function<Message<String>, String> myFunction() {
return v -> {
logger.info("Received: " + v);
// Disable Auto-Ack
AcknowledgmentCallback ackCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(v);
ackCallback.noAutoAck();
// TODO Execute Business Logic + Maybe even pass to another thread?
// Use CorrelationID for easy business logic...
String cid = (String) v.getHeaders().get("solace_correlationId");
if (cid == null) {
cid = "none";
}
// Acknowledge the Message!
try {
if (cid.equals("accept")) {
logger.info("Accepting the Message");
AckUtils.accept(ackCallback);
} else if (cid.equals("requeue")) {
logger.info("Requeuing the Message");
AckUtils.requeue(ackCallback);
Thread.sleep(60000);
} else {
logger.info("Rejecting the Message");
AckUtils.reject(ackCallback);
Thread.sleep(60000);
}
} catch (SolaceAcknowledgmentException e) {
logger.warn("Warning, exception occurred but message will be re-queued on broker and re-delivered", e);
return null; //Don't send an output message
}
return "My Payload";
};
}
π Let's test this out using the "Try-Me" tool as usual. In the Publisher set the "Correlation Id" (which is under "Show Advanced") to "accept" and publish a message to the a/b/c
topic like seen in the image below.
π You should see that your microservice accepts the message and if you navigate to your Queue, which should be named scst/wk/nonexclusive/plain/a/b/_, you'll see that there are no messages remaining on the queue.
π Now change the "Correlation ID" to "requeue" and send the message again. You'll see that the message gets requeued and if you navigate to your queue in PubSub+ Manager in a timely manner (within 2 minutes since we set retries to 2, are using a single consumer, and set a sleep of 60 seconds after requeuing) you'll see the message remains on the queue.
π‘ Now that you know how to use Manual Acknowledgements we'll talk about when to use them as part of Consumer Error Handling in the next section!
There are several mechanisms available to the developer when consuming messages with a Spring Cloud Stream microservice. Some of these mechanisms are internal to the framework and available for any binder, while others are specific to the binder that you're using. In this codelab I'll cover the framework options as well as error handling options made available by the Solace Binder.
Retries The Error Handling section in the Spring Cloud Stream reference guide goes into more detail around the available options, but in general for developers using imperative functions the framework provides retrying of exceptions using the Spring Retry library and developers creating reactive functions can take advantage of retryBackoff
capabilities in project reactor.
The framework provides reasonable defaults, but when using the the SpringRetry
library you can configure the RetryTemplate
to configure options such the number of retry events, the backoff interval and even which exceptions to retry or not retry. If you need custom retry logic you can also provide your own instance of the RetryTemplate
for use. It is important to note that these retries take place inside of the microservice itself and the message is not being sent back to the underlying message system.
Configuring the RetryTemplate
on an input binding would look something like this:
spring:
cloud:
function:
definition: myFunction
stream:
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: clientAck
consumer:
max-attempts: 2
back-off-initial-interval: 1
back-off-multiplier: 3
default-retryable: true
retryable-exceptions:
java.lang.IllegalStateException: true
Spring Cloud Stream pre-v3.2.5
If you are using Spring Cloud Stream version prior to 3.2.5, continue reading. Otherwise, continue to the next section Spring Cloud Stream v3.2.5 or higher. The error channel specification follows the convention of @ServiceActivator
, an annotation-based approach for specifying global and destination-specific error handlers.
After the Retries have been exhausted the Cloud Stream framework will next send an ErrorMessage
to a binding specific error channel, which is formatted as
. You can configure a @ServiceActivator
to listen on that Spring Integration channel to try to handle the Exception. If you do not register a listener then the framework will pass the ErrorMessage
along to the global errorChannel
Spring Integration Channel where a different @ServiceActivator
can listen. However, if listening on this global errorChannel do not try to handle the message itself as the binder will already have been notified that the message has failed and will be implementing it's own error handling. This global errorChannel is moreso useful for logging or publishing an alert elsewhere.
Here is an example @ServiceActivator
listening to the binding specific error channel. Note that inputChannel
name is derived from the yaml above that defines the destination as a/b/>
and the group as clientAck
. If your binding specific error handler exits successfully then the binder will acknowledge/accept the message back to the broker, if an exception is thrown then the binder error handling process will kick in. Note that if you are using Client/Manual acknowledgements you can also use them in the binding specific error handler.
@ServiceActivator(inputChannel = "a/b/>.clientAck.errors")
public void handleError(ErrorMessage message) {
logger.info("Binding Specific Error Handler executing business logic for: " + message.toString());
logger.info("Exception is here: " + message.getPayload());
}
And here is an example of a @ServiceActivator
listening on the global error channel. Note that the global error channel will only receive the ErrorMessage if there is no binding specific error channel.
@ServiceActivator(inputChannel = "errorChannel")
public void handleNotificationErrorChannel(ErrorMessage message) {
logger.info("Global errorChannel received msg. NO BUSINESS LOGIC HERE! Notify ONLY!" + message.toString());
}
Spring Cloud Stream v3.2.5 or higher
If your Maven BOM in the pom file does not include Spring Cloud Stream v3.2.5 (or higher) jar file, you can explicity add it to the dependencies section.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.2.5</version>
</dependency>
After the Retries have been exhausted the Cloud Stream framework will next send an ErrorMessage
to a binding specific error channel, as specified by
. You can configure a @Bean
function to handle the Exception. If you do not register a function then the framework will pass the ErrorMessage
along to the global errorChannel
, a different @Bean
function assigned to handle the error message. This can be specified as
. Do not try to handle the message itself as the binder will already have been notified that the message has failed and will be implementing it's own error handling. This global errorChannel is more so useful for logging or publishing an alert elsewhere.
Here is an example of registering binding-specific and global error handlers in the configuration.
spring:
cloud:
function:
definition: myFunction;myFunction2;handleGlobalError;handleBindingError;
stream:
default:
error-handler-definition: handleGlobalError
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: clientAck
error-handler-definition: handleBindingError
consumer:
max-attempts: 2
back-off-initial-interval: 1
back-off-multiplier: 3
default-retryable: true
retryable-exceptions:
java.lang.IllegalStateException: true
binder: solace-broker
myFunction2-in-0:
destination: 'a/c/>'
group: clientAck
consumer:
max-attempts: 2
back-off-initial-interval: 1
back-off-multiplier: 3
default-retryable: true
retryable-exceptions:
java.lang.IllegalStateException: true
binder: solace-broker
...
Here is an example of binding-specific error handler function. If your binding specific error handler exits successfully then the binder will acknowledge/accept the message back to the broker, if an exception is thrown then the binder error handling process will kick in. Note that if you are using Client/Manual acknowledgements you can also use them in the binding specific error handler.
@Bean
public Consumer<ErrorMessage> handleBindingError() {
return message -> {
logger.info("Binding Specific Error Handler executing business logic for: " + message.toString());
logger.info("Original Message: " + message.getOriginalMessage());
}
}
And here is an example of a global error channel, an error handling function that will be invoked to handle errors from all channels that do not have binding-specific error handlers.
@Bean
public Consumer<ErrorMessage> handleGlobalError() {
return message -> {
logger.info("Global errorChannel received msg. NO BUSINESS LOGIC HERE! Notify ONLY!" + message.toString());
logger.info("Original Message: " + message.getOriginalMessage());
}
}
Testing Error Handler functions
π You can easily test this out be changing your myFunction
to throw a RuntimeException like below:
@Bean
public Function<Message<String>, String> myFunction() {
return v -> {
logger.info("Received: " + v);
// Logic to Avoid infinite loop of message being re-delivered when testing error
// handling during codelab. DO NOT USE IN PRODUCTION CODE
if (true == (Boolean) v.getHeaders().get("solace_redelivered")) {
logger.warn("Exiting successfully to ACK msg and avoid infinite redelivieres");
return null;
}
throw new RuntimeException("Oh no!");
};
}
π Go ahead and use the "Try-Me" tool to publish a message to the "a/b/c" topic. You should see that the binding specific error handler received the message.
π You can easily test this out be changing your myFunction2
to throw a RuntimeException like below:
@Bean
public Function<Message<String>, String> myFunction2() {
return v -> {
logger.info("Received: " + v);
// Logic to Avoid infinite loop of message being re-delivered when testing error
// handling during codelab. DO NOT USE IN PRODUCTION CODE
if (true == (Boolean) v.getHeaders().get("solace_redelivered")) {
logger.warn("Exiting successfully to ACK msg and avoid infinite redelivieres");
return null;
}
throw new RuntimeException("Oh no!");
};
}
π Go ahead and use the "Try-Me" tool to publish a message to the "a/c/d" topic. You should see that the global error handler received the message.
Don't send a message! It is common to have a microservice that receives an event, processes it, and publishes an outbound event. But what if I don't want to send an output message!? The framework makes this easy, just return null
and no outbound message will be published.
@Bean
public Function<String, String> myFunction() {
return v -> {
logger.info("Received: " + v);
if (!sendMessageDownstream(v)) {
logger.warn("Not Sending an Outbound Message");
return null; //Don't send a message, but ACCEPT it to remove it from the queue
} else {
return processMessage(v);
}
};
}
So what happens after internal retries are exhausted!? The message is given back to the binder which for further error handling. I'll cover error handling by example in this codelab, but be sure to read the Failed Consumer Message Error Handling section of the Solace Binder docs for all the detail.
At a high level the Solace Binder offers 2 options:
autoBindErrorQueue
consumer property option to true and will result in the Binder creating a separate error queue to publish errors to(by default). The Error Queue will be a Durable Queue on the Solace Event Broker and can be configured by the options starting with "errorQueue" in the Solace Consumer Properties. This option allows your messages that result in an exception to be published elsewhere for further error handling.β
Here is an example of how you would configure the error queue to be used on our myFunction
example.
spring:
cloud:
function:
definition: myFunction
stream:
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: clientAck
max-attempts: 2
myFunction-out-0:
destination: 'my/default/topic'
solace:
bindings:
myFunction-in-0:
consumer:
autoBindErrorQueue: true
errorQueueMaxDeliveryAttempts: 3
errorQueueAccessType: 1
errorQueueRespectsMsgTtl: true
β
If we start our app with the configuration above we'll see that an error queue named scst/error/wk/clientAck/plain/a/b/_
is created. You can see it in PubSub+ Manager.
π The easiest way to test these options out are to have your function throw new RuntimeException("Oh no!!")
and use the "Try-Me" tab to send test messages π
public Function<Message<String>, String> myFunction() {
return v -> {
logger.info("Received: " + v);
// Logic to Avoid infinite loop of message being re-delivered when testing error
// handling during codelab. DO NOT USE IN PRODUCTION CODE
if (true == (Boolean) v.getHeaders().get("solace_redelivered")) {
logger.warn("Exiting successfully to ACK msg and avoid infinite redelivieres");
return null;
}
throw new RuntimeException("Oh no!");
};
}
Okay so we have all of these options, how do we choose what to do when handling errors? It of course all goes back to your requirements.
RetryTemplate
(Would Retrying really help?) and binding specific error channel appropriately.autoBindErrorQueue
)RetryTemplate
(Would Retrying really help?) and binding specific error channel appropriately.REQUEUE
messages that end in an error scenario that may be successful if retried, even if by another instance in the Consumer Group. For example, maybe an infrastructure issue where your microservice couldn't get a response from a downstream service.autoBindErrorQueue
option and use the Client/Manual Ack to REJECT
the message and let the binder handle it for you. However, if you prefer to send to several destinations for error processing then use StreamBridge
as covered in the Dynamic Publishing section to publish where you'd like. After publishing be sure to use the Client/Manual Ack to ACCEPT
the message.β Now that we know about exception handling options on the Consumer side we'll cover Publisher error handling in the next section!
When creating event-driven microservices you are using asynchronous communications by default. This can sometimes make it tricky to handle publishing errors. Luckily there are a few options available to you when using Spring Cloud Stream with the Solace Binder, if using other binders please check as error handling options may differ.
The two options are:
Producer Error Channels allow you to remain asynchronous and have a callback triggered when a send/publishing failure occurs. This can be enabled by setting the errorChannelEnabled
producer property to true. Note that this functionality is disabled by default.
β
Let's continue using our myFunction
function and configure the outbound side to enable the publisher error channel.
spring:
cloud:
function:
definition: myFunction
stream:
bindings:
myFunction-in-0:
destination: 'a/b/>'
group: clientAck
myFunction-out-0:
destination: 'my/default/topic'
producer:
error-channel-enabled: true
β Next we need to add a listener in the code
@ServiceActivator(inputChannel="my/default/topic.errors")
public void handlePublishError(ErrorMessage message) {
logger.warn("Message Publish Failed for: " + message);
logger.info("Original Message: " + message.getOriginalMessage());
}
π The easiest way to test this out when using Solace is to prevent our client-username
from being able to publish to our output destination.
π Let's use the "Try-Me" tool to send a message to the a/b/c
topic and trigger our function.
π You should see that our listener logged the message failed to publish and you can add custom error handling logic.
If you need to wait to ensure your message was absolutely retrieved by the broker before continuing processing or you just want to keep the code simple and avoid extra callbacks then Publisher Confirmations in conjunction with StreamBridge may be the way to go for publishing messages from your microservice. This option allows you to use a Future
to wait for publish confirmations and may differ per binder. With the Solace binder, for each message you create a CorrelationData
instance and set it as your SolaceBinderHeaders.CONFIRM_CORRELATION
header value. You can then use CorrelationData.getFuture().get()
to wait for the publish acknowledgement from the broker. If the publish failed then an exception would be thrown. Read more in the Publisher Confirmations section of the Solace Binder docs.
To test this out go ahead and delete or comment out your current function and app config and replace it with the Function and binding config below. This code will wire up the myConsumer
Consumer that will receive a message and use StreamBridge
to send outgoing messages. By using StreamBridge we can send while still in our function and use the Future
to wait for the publish to succeed or fail. Note that the code below will result in a failure if a message is published to the a/b/c
topic with a payload of "fail" assuming that we still have the ACL Profile set up to deny publishing to the my/default/topic
as configured earlier in this section.
Java Code
@Bean
public Consumer<String> myConsumer(StreamBridge sb) {
return v -> {
logger.info("Received myConsumer: " + v);
CorrelationData correlationData = new CorrelationData();
Message<String> message = MessageBuilder.withPayload("My Payload")
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, correlationData).build();
if (v.equals("fail")) {
sb.send("my/default/topic", message);
} else {
sb.send("pub/sub/plus", message);
}
try {
correlationData.getFuture().get(30, TimeUnit.SECONDS);
logger.info("Publish Successful");
// Do success logic
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error("Publish Failed");
// Do failure logic
}
};
}
Spring Config - for application.yml
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: 'a/b/>'
π Go ahead and use the "Try-Me" tool to send a message to the a/b/c
topic with a payload of "Hello World". You'll see the app processes the message and Publishing succceeds. If you then change the payload of the message to "fail" you'll see that the Publish to the my/default/topic
fails and "Publish Failed" is logged. This allows the developer to execute error handling prior to exiting the funtion.
π¨ Just a Reminder - Go ahead and remove that ACL Profile Exception before you forget and get weird exceptions later
This section will (might π) be expanded in the future, for now these are the things to keep in mind:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
Hopefully you learned quite a bit about the use of Spring Cloud Stream to create event-driven microservices, especially when it comes to using it with the Solace binder and Solace PubSub+ Event Brokers.
For next steps I would recommend:
β Checking out the AsyncAPI Code Generator Template for Spring Cloud Stream. This template allows you to start with a design and generate a Spring Cloud Stream microservice that is pre-configured with the input and output bindings necessary for your microservice to be wired up to the underlying messaging system. Don't want to read? Here is a YouTube video covering AsyncAPI code gen with Spring Cloud Stream!
β Learn about Solace Event Broker's Multi-protocol capabilties that allow your Spring Cloud Stream apps to exchange events/messages with other Spring and non-Spring apps. YouTube Video!
β Learning about the Solace Event Portal which allows you to design your EDA and export AsyncAPI documents to help kickstart code generation.
Thanks for participating in this codelab! Let us know what you thought in the Solace Community Forum! If you found any issues along the way we'd appreciate it if you'd raise them by clicking the Report a mistake button at the bottom left of this codelab.