This Codelab contains the technical hands-on section of the Solace Masterclass: Implementing Event-Driven-Architectures The participants of this masterclass will be implementing this Codelab in the Strigo virtual machine provided as a part of the masterclass session. The Strigo virtual machine contains all the software packages required for implementing the hands-on Codelab.
The agenda of the hands-on is as below :
As a part of the event storming session earlier you would have discussed and brainstormed on few use cases. In this brainstorming you have identified systems, events and processes involved in the flow(s) and also designed the topic taxonomy following Solace best practices and recommendations.
In the hands-on section of this masterclass, you can choose one of the below industry domains :
and follow it for implementing. Due to time limit considerations, we will be implementing only a selected subset of the whole design.
In case you do not have an active Solace cloud account, you can register for one via the link: Solace trial account registration You can log in to the newly created Solace cloud account using the link : Solace Cloud Account Once you have registered and logged in to the Solace cloud account, you can get started with provisioning a Solace developer grade broker which will be used in the next sections.
Give your Solace PubSub+ Cloud service instance a name of your choice and then let it start!.
Under the covers, a Solace event broker will be deployed and configured in the cloud you indicated, ports configured, load-balancer setup, monitoring enabled, etc. It takes about 5 minutes, and then you'll be ready!
It is safe to navigate away from the "Solace is starting" page while the broker is being deployed in the cloud. Feel free to explore the rest of Solace Mission Control, including the Event Portal!
Once the Solace broker is up and running, click on the broker name and enter it.
The Event Portal is a cloud-based tool that simplifies the design of your event-driven architecture (EDA). With the Event Portal, you can:
In summary, the Event Portal streamlines event management, making it an essential part of your EDA toolkit.
/home/ubuntu/GitHub/solace-masterclass-code/ep-app-domains
Acme_Bank_App_Domain.json
Acme_Retail_App_Domain
Based on use case of your choice, please follow the appropriate chapter below :
For the retail domain use case, we will be considering a fictitious company called Acme Retail which has been consistently recognized as the leader in Ecommerce website. Unfortunately, due to legacy and outdated architecture choices and implementation, Acme Retail is encountering major challenges in expanding their operations as below :
This has impacted their customer's experience, and they are at risk of losing their customers. As a solution, they have defined a POC to event enable the Order-to-Cash (OTC) flow.
The dependency between the applications and events are described as below :
This POC leverages the following architectural principles and practices :
As a part of the workshop, you will have access to prebuilt applications and artifacts which you will be using in this segment of the workshop.
The Order-Service acts as the entry point to the whole flow and emulates the user experience of creating a basket and converting it into an Order. As described earlier in the over-all flow definition, it creates the Order-Created events and subscribes to other relevant events which it uses for showing realtime status updates on the order level.
mvn clean spring-boot:run
As Acme-Retail has been facing major challenges with fraudulent orders and realtime stock management, the stakeholders want to ensure that the system is not over committing to orders for which there is no stock. Additionally, they also want to do a fraud check on the customer and order to ensure that only legitimate orders are processed and shipped.
This requirement has been implemented in the Inventory-FraudCheck-Service which subscribes to each incoming Order-Created event and performs the inventory reservation and fraud check. Once this is completed, it triggers an Order-Confirmed event for further processing.
mvn clean install
java -jar target/inventory-fraudcheck-service-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
Did you get an error as below complaining about the absence of a queue object :
2024-03-22T12:42:19.725+01:00 INFO 69425 --- [1_ReactorThread] c.s.jcsmp.impl.flow.BindRequestTask : Client-1: Got BIND ('#P2P/QUE/all-orders-placed') Error Response (503) - Unknown Queue
com.solace.messaging.PubSubPlusClientException$MissingResourceException: 503: Unknown Queue
at com.solace.messaging.receiver.PersistentMessageReceiverImpl.createSolaceConsumer(PersistentMessageReceiverImpl.java:1359)
at com.solace.messaging.receiver.PersistentMessageReceiverImpl.onStart(PersistentMessageReceiverImpl.java:1169)
at com.solace.messaging.receiver.PersistentMessageReceiverImpl.startAsync(PersistentMessageReceiverImpl.java:436)
If you remember about Solace Queue which we discussed earlier, we differentiated them into two types :
Since in the case of Acme Retail, it is critical that no messages are lost during application downtime and also no subscribing application should be able to delete the queue object; an architectural choice was made to use Durable - Exclusive queue configurations.
To fix the error encountered above, you need to manually create the Queue object as below :
all-orders-placed
, click create and keeping the default queue settings click Apply as below : acmeretail/onlineservices/order/created/*/*/*
as shown below and click CreateNow that the missing queue has been created with the proper subscription, restart/rerun the Inventory-FraudCheck-Service application using the same command : java -jar target/inventory-fraudcheck-service-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
As you closely observe the logging being displayed on the console, you can see no events coming in from the queue. But how could that be when you clearly created multiple orders from the Order-Service ?
If you think closely, you would realize that when you created those orders, the queue all-orders-placed had not been created yet. So all the Order-created events even though they were marked as persistent and acknowledged by the broker were lost as the broker could not find an active consumer for them.
Now that you have created the queue, newly published orders will lie in the queue till a consumer processes and acknowledge the event.
Let's test out the flow again till now by building and deploying the Order-Service and observing the logs on the * Inventory-FraudCheck-Service* to see incoming order created events.
For each incoming Order-Created event, an Order-Confirmed event will be published as the stock reservation and fraud check passes.
This Order-Confirmed needs to be subscribed by the Order Service. Follow the below steps to enable this flow :
Queue name | Subscription |
all-order-updates |
|
final PersistentMessageReceiver orderUpdatesEventReceiver = messagingService.createPersistentMessageReceiverBuilder().build(Queue.durableExclusiveQueue(configProperties.getOrderUpdatesQueueName()));
orderUpdatesEventReceiver.setReceiveFailureListener(failedReceiveEvent -> System.out.println("### FAILED RECEIVE EVENT " + failedReceiveEvent));
orderUpdatesEventReceiver.start();
orderUpdatesEventReceiver.receiveAsync(buildOrdersUpdatesEventHandler(orderUpdatesEventReceiver));
private MessageReceiver.MessageHandler buildOrdersUpdatesEventHandler(final PersistentMessageReceiver orderUpdatesEventReceiver) {
return (inboundMessage -> {
try {
final String inboundTopic = inboundMessage.getDestinationName();
log.info("Processing message on incoming topic :{} with payload:{}", inboundTopic, inboundMessage.getPayloadAsString());
boolean eventProcessed = processOrderUpdate(inboundTopic, inboundMessage.getPayloadAsString());
if (eventProcessed) {
orderUpdatesEventReceiver.ack(inboundMessage);
}
} catch (RuntimeException runtimeException) {
log.error("Runtime exception encountered while processing incoming event payload :{} on topic:{}. Error is :", inboundMessage.getPayloadAsString(), inboundMessage.getDestinationName(), runtimeException);
}
});
}
private boolean processOrderUpdate(final String eventTopic, final String eventJson) {
try {
if (eventTopic.contains("order")) {
final Order order = objectMapper.readValue(eventJson, Order.class);
final String incomingOrderId = order.getId();
Order orderObjectFromCache = OrderCache.getInstance().getOrderMap().get(incomingOrderId);
orderObjectFromCache.setState(Order.OrderState.VALIDATED);
OrderCache.getInstance().getOrderMap().put(incomingOrderId, orderObjectFromCache);
} else if (eventTopic.contains("payment")) {
final Payment payment = objectMapper.readValue(eventJson, Payment.class);
final String incomingOrderId = payment.getOrderId();
Order orderObjectFromCache = OrderCache.getInstance().getOrderMap().get(incomingOrderId);
orderObjectFromCache.setState(Order.OrderState.PAYMENT_PROCESSED);
OrderCache.getInstance().getOrderMap().put(incomingOrderId, orderObjectFromCache);
} else if (eventTopic.contains("shipment")) {
final Shipping shipment = objectMapper.readValue(eventJson, Shipping.class);
final String incomingOrderId = shipment.getOrderId();
Order orderObjectFromCache = OrderCache.getInstance().getOrderMap().get(incomingOrderId);
orderObjectFromCache.setState(Order.OrderState.SHIPPED);
OrderCache.getInstance().getOrderMap().put(incomingOrderId, orderObjectFromCache);
}
return true;
} catch (JsonProcessingException jsonProcessingException) {
log.error("Error encountered while processing event:{}, exception:", eventJson, jsonProcessingException);
return false;
}
}
Let us continue with the next applications in the OTC flow :
Payment-Service-Provider (PSP) or Payment Gateway integrations are inherently complex due to error handling and transaction management, asynchronous communication, scalability, etc. Due to legacy architectural choices, Acme shop has faced considerable challenges in having a stable payment integration leading to revenue loss and customer dissatisfaction in many cases. Hence, PSP integration has been a major component of this POC for validating that Solace and EDA are a viable solution to this problem statement.
The Payment-Service subscribes to the Order-Confirmed event being published by the Inventory-Fraudcheck-Service and processes the payment integration for that order.
To denote that the payment processing has begun, it publishes a Payment Created event initially and once the payment authorization and confirmation comes in, an additional Payment Updated event is published.
The Payment Created event is in-turn subscribed by the Order Service for user status updates
Queue name | Subscription |
all-orders-confirmed |
|
acmeretail/onlineservices/payment/created/v1/*/*
mvn clean install
java -jar target/payment-service-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
and observe the console to see what events are getting processed and subsequently producedTo do a complete end-to-end test of the current flow, you can quickly publish a few more orders from the Order-Service and see the following events being produced and consumed across the three applications :
You can also see that the status of the order in the Order Service changes from Validated to PAYMENT_PROCESSED
Similar to the PSP and payment gateway, integrations with third-party logistics (3PL) forms an integral part of the OTC flow and is responsible for warehousing, inventory, shipping, tracking, reverse logistics etc. Considering the criticality of the use case, Acme Retail is opting to include 3PL integration as a part of the POC for the following reasons :
The Shipping Service is the final leg of this POC flow. It subscribes to the Payment Created event, integrates with the 3PL services and publishes the Shipment Created and Shipment Updated events.
The Order Service subscribes to the Shipment Created for user status updates
Queue name | Subscription |
all-payments-confirmed |
|
acmeretail/shipping/shipment/created/v1/*/*
mvn clean install
java -jar target/shipping-service-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
and observe the console to see what events are getting processed and subsequently produced.To do a complete end-to-end test of the current flow, you can quickly publish a few more orders from the Order-Service and see the following events being produced and consumed across the 4 applications :
You can also see that the status of the order in the Order Service changes from PAYMENT_PROCESSED to SHIPPED
As a result of this POC, Acme Retail has achieved a number of benefits including :
For the banking domain, we will be working with one of the major banks in the world Acme Bank which has been a leader in online banking and customer journey. They have a large customer base which leads to a humungous load of transactions.
As the importance of preventing fraudulent transactions and money laundering grows, Acme Bank is compelled to adhere to stringent legal and security standards. This necessitates thorough transaction monitoring across its systems to ensure compliance with regulatory mandates
On the roadmap to comply with these requirements, Acme Bank is currently facing the following challenges :
As a solution, Acme Bank has defined a POC to event enable account handling, transaction management and fraud detection systems. The application and event dependencies are defined as below :
The Account Management application acts as the entry point for the whole flow and emulates the user experience of applying for a bank account and the background processing associated with that account. As described earlier in the overall flow, it starts with triggering an Account Applied followed by a corresponding Account Opened event. It also subscribes to other relevant Account related events namely the Fraud Detected event and triggers a Fraud Confirmed and corresponding Account Suspended events.
mvn clean spring-boot:run
The Core Banking application is responsible for the execution of all the transactions being performed on the bank accounts like transfers, deposits and withdrawals. The Core Banking Application randomly generates the above transactions on all the accounts which are active.
mvn clean install
java -jar target/core-banking-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
Did you get an error as below complaining about the absence of a queue object :
2024-03-22T12:42:19.725+01:00 INFO 69425 --- [1_ReactorThread] c.s.jcsmp.impl.flow.BindRequestTask : Client-1: Got BIND ('#P2P/QUE/accounts-opened') Error Response (503) - Unknown Queue
com.solace.messaging.PubSubPlusClientException$MissingResourceException: 503: Unknown Queue
at com.solace.messaging.receiver.PersistentMessageReceiverImpl.createSolaceConsumer(PersistentMessageReceiverImpl.java:1359)
at com.solace.messaging.receiver.PersistentMessageReceiverImpl.onStart(PersistentMessageReceiverImpl.java:1169)
at com.solace.messaging.receiver.PersistentMessageReceiverImpl.startAsync(PersistentMessageReceiverImpl.java:436)
If you remember about Solace Queue which we discussed earlier, we differentiated them into 2 types :
Given the criticality for Acme Bank to maintain message integrity during application downtime, alongside the imperative that subscribing applications cannot delete queue objects, an architectural decision was made to implement Durable-Exclusive queue configurations.
To fix the error encountered above, you need to manually create the Queue object as below :
accounts-opened
, click create and keeping the default queue settings click Apply as below : acmebank/solace/account/opened/v1/*
as shown below and click CreateNow that the missing queue has been created with the proper subscription, restart/rerun the Core Banking application using the same command : java -jar target/core-banking-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
As you closely observe the logging being displayed on the console, you can see no events coming in from the queue. But how could that be when you clearly created multiple orders from the Account Management Application ?
If you think closely, you would realize that when you created those orders the queue accounts-opened had not been created yet. So all the Account Opened events even though they were marked as persistent and acknowledged by the broker were lost as the broker could not find an active consumer for them.
Now that you have created the queue, newly published orders will lie in the queue till a consumer processes and acknowledge the event.
So trigger a few more Account creation requests from the Account Management Application and see the Account Opened events being triggered. Once this happens, the Core-Banking application should receive those events and start publishing transaction events for the active accounts.
As the need to prevent fraudulent transactions and money laundering intensifies, Acme Bank is obligated to adhere to stringent legal and security standards. This entails conducting comprehensive transaction checks across its system to ensure compliance with regulatory mandates.
This realtime monitoring of transactions is being handled by the Fraud Detection service. It subscribes to all the transaction events being published by the Core Banking application and flags suspicious/potentially fraudulent transactions by publishing a Fraud Detected event.
Queue name | Subscription |
all-transactions |
|
mvn clean install
java -jar target/fraud-detection-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
Event-Driven Architecture enables modular, iterative development, and fosters innovation by providing a scalable, resilient, and flexible framework for building modern software systems. By embracing EDA principles and practices, organizations can drive agility, responsiveness, and innovation in their software development processes.
Till now, we have :
The next steps are performing a validation of those suspicious transactions and taking action on the linked accounts. Let's start with implementing this feature flow across multiple applications in a modular fashion :
The Account Management application subscribes to the Fraud Detected event and validates if the transaction is actually fraudulent. If found fraudulent, a corresponding Fraud Confirmed event is triggered and the linked account is suspended by publishing an Account Suspended event.
Queue name | Subscription |
fraud-detected-events |
|
final PersistentMessageReceiver fraudDetectedEventReceiver = messagingService.createPersistentMessageReceiverBuilder().build(Queue.durableExclusiveQueue(configProperties.getSolaceFraudDetectedEventQueue()));
fraudDetectedEventReceiver.setReceiveFailureListener(failedReceiveEvent -> log.error("### FAILED RECEIVE EVENT " + failedReceiveEvent));
fraudDetectedEventReceiver.start();
fraudDetectedEventReceiver.receiveAsync(buildFraudDetectedEventHandler(fraudDetectedEventReceiver));
private MessageReceiver.MessageHandler buildFraudDetectedEventHandler(PersistentMessageReceiver fraudDetectedEventReceiver) {
return (inboundMessage -> {
try {
final String inboundTopic = inboundMessage.getDestinationName();
log.info("Processing message on incoming topic :{} with payload:{}", inboundTopic, inboundMessage.getPayloadAsString());
boolean eventProcessed = fraudService.processFraudDetectedEvent(inboundMessage.getPayloadAsString());
if (eventProcessed) {
fraudDetectedEventReceiver.ack(inboundMessage);
}
} catch (RuntimeException runtimeException) {
log.error("Runtime exception encountered while processing incoming event payload :{} on topic:{}. Error is :", inboundMessage.getPayloadAsString(), inboundMessage.getDestinationName(), runtimeException);
}
});
}
public boolean processFraudDetectedEvent(final String incomingFraudDetectedEventJson) {
try {
final FraudDetected fraudDetectedEvent = objectMapper.readValue(incomingFraudDetectedEventJson, FraudDetected.class);
boolean isFraud = random.nextBoolean();
if (isFraud) {
log.info("Flagging potential fraud as confirmed :{}", fraudDetectedEvent);
createAndPublishFraudConfirmedEvent(fraudDetectedEvent);
accountService.processAccountSuspensionRequest(fraudDetectedEvent.getAccountNum());
}
return true;
} catch (JsonProcessingException jsonProcessingException) {
log.error("Error encountered while processing FraudDetected event:{}, exception:", incomingFraudDetectedEventJson, jsonProcessingException);
return false;
}
}
public void processAccountSuspensionRequest(final String accountNumber) {
log.info("Processing account suspension");
final AccountAction accountSuspendedAction = createAccountSuspendedEventPayload(accountNumber);
solaceEventPublisher.publishAccountSuspendedEvent(accountSuspendedAction);
Account account = AccountsList.getInstance().getAccountsList().get(accountNumber);
account.setCurrentStatus(Account.Status.SUSPENDED);
account.setComment("Account suspended due to potential suspicious/fraudulent transaction(s)");
AccountsList.getInstance().getAccountsList().put(accountNumber, account);
}
The Account Suspended event published in the previous step is subscribed to by the Core Banking application which stops all transactions on that account number immediately.
Queue name | Subscription |
accounts-suspended |
|
final PersistentMessageReceiver accountSuspendedEventReceiver = messagingService.createPersistentMessageReceiverBuilder().build(Queue.durableExclusiveQueue(configProperties.getAccountsSuspendedQueueName()));
accountSuspendedEventReceiver.setReceiveFailureListener(failedReceiveEvent -> System.out.println("### FAILED RECEIVE EVENT " + failedReceiveEvent));
accountSuspendedEventReceiver.start();
accountSuspendedEventReceiver.receiveAsync(buildAccountsSuspendedEventHandler(accountSuspendedEventReceiver));
private MessageReceiver.MessageHandler buildAccountsSuspendedEventHandler(PersistentMessageReceiver accountOpenedEventReceiver) {
return (inboundMessage -> {
try {
final String inboundTopic = inboundMessage.getDestinationName();
log.info("Processing message on incoming topic :{} with payload:{}", inboundTopic, inboundMessage.getPayloadAsString());
boolean eventProcessed = accountsEventProcessor.processAccountSuspendedEvent(inboundMessage.getPayloadAsString());
if (eventProcessed) {
accountOpenedEventReceiver.ack(inboundMessage);
}
} catch (RuntimeException runtimeException) {
log.error("Runtime exception encountered while processing incoming event payload :{} on topic:{}. Error is :",
inboundMessage.getPayloadAsString(), inboundMessage.getDestinationName(), runtimeException);
}
});
}
public boolean processAccountSuspendedEvent(final String accountSuspendedActionEventPayload) {
try {
AccountAction accountSuspendedEvent = objectMapper.readValue(accountSuspendedActionEventPayload, AccountAction.class);
Account suspendedAccount = Account.builder().accountNumber(accountSuspendedEvent.getAccountNum()).currentStatus(Account.Status.SUSPENDED).build();
AccountsList.getInstance().getAccountsList().put(suspendedAccount.getAccountNumber(), suspendedAccount);
log.info("After processing the updated map is :{}", AccountsList.getInstance().getAccountsList());
return true;
} catch (JsonProcessingException jsonProcessingException) {
log.error("Error encountered while processing AccountOpened event:{}, exception:", accountSuspendedActionEventPayload, jsonProcessingException);
return false;
}
}
mvn clean spring-boot:run
mvn clean install
java -jar target/core-banking-0.0.1-SNAPSHOT.jar -h HOST_URL -v VPN-NAME -u USER_NAME -p PASSWORD
As a result of this POC, Acme Bank has achieved a number of benefits including :
Below are some of the key takeaways from this masterclass :
Thanks for participating in this masterclass! Let us know what you thought in the Solace Community Forum! If you found any issues along the way, we'd appreciate it if you'd raise them by clicking the Report a mistake button at the bottom left of this Codelab.