access_time67 mins remaining

Creating Event API Products using Spring Cloud Stream, Event Portal and AsyncAPI

access_time67 mins remaining

1. Overview

If your requirement is to build modern real-time applications using event-driven architecture (EDA), this codelab is for you!

This codelab will walk you through the steps to

βœ… build Event API products,
βœ… define asynchronous APIs,
βœ… implement them using Spring Cloud Stream microservices,
βœ… and globally distribute them across multi-cloud and on-premises environments using an Solace PubSub+ platform.

In this codelab, we will create API Products to consume business capabilities of a SmartTown. The SmartTown implements a microservices architecture for the Heating and Cooling system. More details on this will be covered throughout the codelab.

We will also

  • Learn how to define asynchronous APIs.
  • Use the AsyncAPI Generator template for Spring Cloud Stream.
  • Develop event-driven microservices using Spring Cloud Stream and Java.
  • Connect your microservices to PubSub+ Event Brokers and stream events across the globe

Prerequisites

  • Intermediate level of knowledge coding with Java
  • Computer with internet connectivity

2. Requirements

πŸ›  This page covers the setup needed to perform this codelab. πŸ› 

Create a new PubSub+ Event Broker

βœ… You need to sign for a free Solace Cloud Account using this link.

Complete the following steps:

  1. Log into the PubSub+ Cloud Console To start using PubSub+ Cloud, log in to the PubSub+ Cloud Console. For more information on accessing the Cloud Console, check out the documentation
  2. Create an Event Broker Service Create an event broker service (or simply a service) using Cluster Manager tool and name the service as solace-eap. For more information on accessing the Cloud Console, check out the documentation

Use an existing PubSub+ Event Broker

If you have a PubSub+ Cloud account and an existing Event Broker Service, you can use the existing Message VPN on the Broker Service for this codelab.

  1. Message VPN Name Make a note of the Message VPN name - you will be using this name wherever β€˜solace-eap' is referred as VPN name in this codelab.

AsyncAPI Generator Requirements

βœ… Install instructions available here

  • Node.js v12.16+ (Check version using node -v)
  • npm v6.13.7+ (Check version using npm -version)

Spring Cloud Stream Requirements

βœ… Spring Cloud Stream just requires Java and Maven to use πŸš€

  • Java 1.8+ (Check version using java -version)
  • Maven 3.3+ (Check version using mvn -version)
    • On mac you can brew install maven
    • Other install instructions here
  • Your favorite Java IDE πŸ’₯

3. Clone Git Repository

A ready to use code of few of the microservices referred in this workshop are available in Git. We will use this codebase and build new code wherever necessary to complete this workshop.

  • Launch Terminal application
  • Execute the following command
    git clone https://github.com/gvensan/smarttown.git

Below is the directory structure of the newly cloned smarttown folder

4. Introduction to PubSub+ Event Portal

Solace PubSub+ Event Portal

The Solace PubSub+ Event Portal is a single place where architects and developers can collaboratively catalog, share, create and manage all the events, schemas and applications internal and external to the enterprise.

One of the features of the event portal is the Catalog, allowing users assess which events have high value over others to consumers (both internal and external). This information is used to build Event API Product.

With the PubSub+ Event Portal, you can:

  • Define and model event-driven systems
  • Visualize existing relationships
  • Develop consistent event-driven applications
  • Discover and share events of interest
  • Govern the event-driven system
  • Integrate with 3rd-party systems for programmatic interactions
  • Manage and audit changes to events, schemas, and applications
  • Runtime event discovery
  • Understand statistics about events

Elements of PubSub+ Event Portal

Application Domain

An application domain represents a namespace where applications, events, and schemas can live.

  • Application domains are useful to decompose the enterprise and create organizational boundaries within which teams can work
  • Within the application domain, you can create a suite of applications, events and schemas that are isolated from other application domains

Application

An application represents a piece of software that produces and consumes events.

  • Applications connect to the event broker in an event-driven architecture and communicate with other applications via events.
  • A single application represents a class of applications that are running the same code base; therefore, there is no need to create multiple applications for each instance in a cluster.

Event

The event represents a business moment or an action that can be communicated with zero or more interested applications.

  • The event is where you define metadata that describes and categorizes the event. An event is produced on a specific topic that must be defined for that event
  • From a modelling perspective, events reference payload schemas, and events are referenced by applications, thus forming the glue that binds applications together

Schema

In simple terms, a schema represents the contract to describe the payload of an event.

  • Producers and consumers of an event can trust that the event's payload matches the schema definition assigned to that event
  • Schemas define a type of payload through JSON, AVRO, XML, Binary, or Text. JSON, AVRO, and XML schemas have content that describes each property of the schema
  • The content is either in JSON or AVRO Schema format, or XSD/DTD format

5. Introduction to Spring Cloud Stream

Cloud Stream Intro

Spring Cloud Stream is a framework for creating highly scalable, event-driven microservices connected by pluggable messaging services. Messaging services are pluggable via Binders that we'll cover in a bit. The framework is based on Spring Boot and Spring Integration.

Spring Cloud Stream has three different types of message exchange contracts:

  1. Suppliers are sources of events
  2. Sinks are consumers of events
  3. Processors are both consumers and subscribers of events

Spring Cloud Stream Components

Spring Cloud Stream introduces three main components that allow developers to utilize messaging in their code:

  • Binder - The component that implements communication with a specific message broker. For example, there is a RabbitMQ Binder, a Kafka Binder, and so on. Binders abstracts the messaging API
  • Binding - The interface for sending and receiving messages. This component links the abstract channels in your code with a topic or queue that's handled by the binder.
  • Message - The data structure used to communicate with the bindings between your code and your message broker. How this data is packaged and communicated over the message broker is determined by the binder.

Cloud Stream Intro

Message Exchange Contracts Map to Java Functions

As of Spring Cloud Stream v3 the preferred programming model is to use Java Functions to implement your cloud stream apps.

We just mentioned the 3 types of Cloud Stream message exchange contract are "Sources", "Sinks" and "Processors". Those map to functions as follows:

  • java.util.function.Supplier -> Source [Producer/Publisher of Events]
  • java.util.function.Consumer -> Sink [Subscriber/Consumer of Events]
  • java.util.function.Function -> Processor [ Consumes, Processes, and Produces Events ]

Binders

Spring Cloud Stream Binders are really what make the framework useful. Binders provide an abstraction layer between your code and the messaging systems over which events are flowing. This allows you to write your code without having to worry about learning messaging APIs! When messages are sent or received from the messaging system they pass through the messaging system specific binder which knows how to communicate with that specific message broker.

As of the creation of this codelab the available Cloud Stream Binders are:

  • RabbitMQ
  • Apache Kafka
  • Amazon Kinesis
  • Google PubSub
  • Solace PubSub+
  • Azure Event Hubs
  • Apache RocketMQ

Communication Models

Instead of having to learn Messaging APIs, developers just have to understand the communication models that Spring Cloud Stream supports. There are 3 supported models, but support varies per binder.

  1. Publish-subscribe: subscribers are independent from each other & receive events in order
  2. Consumer groups: fan-out and load-balancing across multiple consumers
  3. Stateful partitioning support: in-order processing for consistency and performance

References:

6. Spring Cloud Stream - Solace API Connection

Launch the Broker Service from PubSub+ Console and open the connection tab.

Click on Get Started button next to the Spring Cloud Stream library

Make a note of the following parameters:

  • Host URI
  • Username
  • Password

7. SmartTown - Event-Driven Design

SmartTown uses information and communication technology to improve operational efficiency, share information with the public and provide a better quality of government service and citizen welfare. SmartTown uses internal microservice architecture with functionality exposed through APIs and externally exposed API products while hiding the implementation details.

Application Domains

We will focus on the Analytics and Operations applications for this workshop.

Applications

Events

  • TemperatureReading - An event generated by a Temperature Sensor (IoT Device) with temperature reading and a timestamp
  • OperationalAlert - An event generated when the temperature is found to be out of bounds with an alert type and prioritization level.

References:

8. Import Application Domain

We will create an event-driven design for SmartTown functions in the Event Portal using the PubSub+ Designer. To begin with, we will import an application domain export file that contains required event, schema and application domain definitions - we just need to create application and tie the event with those applications.

βœ… Launch Solace PubSub+ Event Portal and open the Designer

  • Click on Import button next to Application Domain File or right-click on the canvas and choose Import Application Domain option.

βœ… On the file open dialog, choose the file asyncapi-designer-topology.json in cloudstream folder of the cloned repository folder.

This import action would create

  • Application domains
    • SmartTown - Operations
    • SmartTown - Analytics
  • Events
    • TemperatureReading with a topic address of SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}
    • OperationalAlert with a topic address of SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}
  • Schemas
    • TemperatureReading
    • OperationalAlert

βœ… Review the elements under each of the Application Domains to get an understanding of what is created by the import and their association.

9. Create Application

Let's create the Temperature Sensors and the Eternal Heating/Cooling Controllers Applications. To do so:

βœ… Navigate to the SmartTown - Operations Application Domain

βœ… Click on create application option

1️⃣ Temperature Sensors Application

The following section describes how to create an application and set its attributes - name, application type and event association.

In the Create Application page:

  • Enter Temperature Sensors for Name
  • Choose Standard as selected option for Application Type

βœ… Click on Manage events page

In the Manage Events page:

  • Identify the Events relevant to the application
  • Click on the Pub icon on the TemperatureReading Event indicating the event relationship with the application

2️⃣ External Heating/Cooling Controllers Application

In the Create Application page:

  • Enter Heating Cooling Controllers - EXTERNAL for Name
  • Choose Standard as selected option for Application Type

βœ… Launch Manage events page and set Publish/Subscribe permissions on Events

In the Manage Events page:

  • Identify the Events relevant to the application
  • Click on the Sub icon for the TemperatureReading Event
  • Click on the Pub icon for the Operational Alert Event
  • Click on Save button

Applications List

When you click on the Application List tab you will see the following

10. Introduction to AsyncAPI

AsyncAPI has emerged as the industry standard for defining asynchronous, event-driven APIs; you can think of it as OpenAPI for the asynchronous world.

It is an open source initiative that provides both

  • a specification to describe and document your asynchronous applications in a machine-readable format, and
  • tooling (such as code generators) to make life easier for developers tasked with implementing them.

AsyncAPI Document

An AsyncAPI document that defines the application that you want to develop. This document can be represented as JSON objects conforming to the JSON standards, or an YAML file. You can either manually create the document or use an event portal.

The AsyncAPI initiative provides a handy, interactive tool called the AsyncAPI playground to make the document creation easier.

AsyncAPI Playground

Alternatively you can generate this document from EDA tools such as Event Portal of Solace PubSub+ platform. Typically they are design-time tools allowing architects and developers to collaborate using a GUI to design the event-driven architecture.

Having a catalog of well-organized channels and events for reuse will also save you both time and headaches while collaborating, instead of having to comb through a bunch of files in various locations.

AsyncAPI Generator

AsyncAPI Generator is a tool that can generate a skeleton code from the AsyncAPI document, reducing the need to laboriously create boilerplate code saving time and effort.

The AsyncAPI Code Generator supports templates to generate code for a variety of different languages and protocols (nodejs, python, go), but for this workshop we're going to use the Spring Cloud Stream template.

The Spring Cloud Stream framework provides an easy way to get started with event-driven microservices by providing binders that allow the developer to create their microservices without having to learn messaging APIs.

Example:

ag ~/AsyncApiDocument.yaml https://github.com/asyncapi/java-spring-cloud-stream-template.git

Coding Business Logic

The generated spring project is a regular Spring Boot application and contains:

  • Generated classes under javaPackage, including POJOs defined from the schemas in the AsyncAPI document
  • application.yml containing Spring Cloud Stream configuration and binding details
  • Application.java containing implementation of one or more Spring Cloud Stream Functions interfaces asper the AsyncAPI specification
    • A Supplier function that implements delivery of messages to the channel
    • A Producer function that handles messages delivered on the channels
    • A Function implementation that processes a delivered message and returns a response message

AsyncAPI in Event Portal

PubSub+ Event Portal makes adopting and using AsyncAPI simple:

  • Faster: you don't have to edit asyncapi specs by hand
  • Ensure Consistency: changes to events and schemas up stream can easily be exported
  • Flexible: Export Event API Products for interested developer and/or model your application interface and export it

References:

11. Event API Product

An Event API Product can be composed of events from either Solace PubSub+ Event Broker or Kafka for external developers' consumption, enabling them to build event-driven applications that can subscribe to and/or publish the specified events.

The events bundled in an Event API Product can be added from one or more application domains, so you can mix and match events. Event API is all about identifying and bundling high-value events for external consumption similar to REST APIs.

When it comes to value and exposure, Event-driven applications can combine the classic system and business events to create a bundle as a specification without exposing any of the internal or implementation details. Solace PubSub+ uses AsyncAPI as the standard for API specification for exposing such Event API bundles. Consuming applications also benefits from AsyncAPI's code-generation facility to focus just on the business logic and not the mundane tasks.

References:

  • Getting started with PubSub+ Cloud Documentation
  • Event API Products Documentation
  • Understanding the Concept of an Event Portal - An API Portal for Events Blog

12. Create Event API Product

Next step in building an Event API product involves bundling of high-value, share-worthy events. The resulting bundle is released as a specification for public consumption. The generated specification is based on a commonly understood standard - like AsyncAPI.

AsyncAPI also provides tools to generate code (supports popular languages) helping consumers directly go from spec-to-code, leaving the only responsibility of updating the business logic.

βœ… Launch API Products

Create EventAPI Product

βœ… Configure the API Product settings

In the Create Event API Product page:

  • Enter HVAC Controller for Name
  • Choose LogicalEventMesh for Logical Event Mesh
  • Enter localhost:55555 for Server URL
  • Enter smf for Server Protocol

βœ… Navigate to the Events tab to configure Event settings

  • Click on the Add Events button

  • Select TemperatureReading and OperationalAlert events
  • Click on the Apply button

  • Mark TemperatureReading with Subscribe property
  • Mark OperationalAlert with Publish property
  • Click on the Save button

βœ… Event API Product list

βœ… Click on the HVAC Controller Event API product Details

A newly created Event API Product can be made externally available or shared through the following methods:

  • REST API External developers can call a public REST API to retrieve this Event API Product's AsyncAPI document.
  • Website External developers can use a website to learn about your available events. Anyone with this URL can access these events.

These URLs are available only when the API Product is marked as Released. To release the API Product, click on the Release button at the top right.

More info about the API release and website hosting:

With this, the Event API Product named HVAC Controller is ready and published for access on public URLs.

Exposed AsyncAPI URL

Interested parties can download the AsyncAPI specification from this public URL and build applications with minimal code using AsyncAPI codegen tools.

13. Download AsyncAPI Document

###Review API Product URL

You can locate the website URL where the EventAPI Product is hosted from the Event API Product settings.

βœ… Open the website URL in the browser (click on the link)

You can click on the Download YAML button and download the document from this portal. Let's move it to the cloudstream directory in the previously cloned github repo.

Navigate to the smarttown/cloudstream directory and run the following command on a terminal window

mv ~/Downloads/asyncapi.yaml .

14. Create IoT Data Simulator MicroService

The directory github/cloudstream/ac-city-iot-simulator contains a prebuilt spring cloud stream project that can readily publish temperature reading data. You just have to update the application.yml configuration with host details.

Import the projects under github/cloudstream/ directory.

###Launch Spring Tool Suite

  • Open Spring Cloud Suite tool by clicking on the icon in the desktop
  • Import cloud stream projects from the smarttown folder as β€˜Existing maven projects'

###Review the ac-city-iot-simulator project

####TemperatureReading.java

βœ… Review the TemperatureReading.java file. It carries temperature reading data - a simple POJO with attributes and corresponding getters/setters.

####Application.java

βœ… Review the Application.java, specifically the Supplier function.

  @Bean
  Supplier<Message<TemperatureReading>> publishTemperatureData()  {
    // Collect CPU metrics 
    return () -> {
      SystemInfo si = new SystemInfo();
      HardwareAbstractionLayer hal = si.getHardware();
      CentralProcessor processor = hal.getProcessor();
        
      double[] loadAverage = processor.getSystemLoadAverage(1);
          
      BigDecimal cpuLoad = new BigDecimal((loadAverage[0] < 0 ? 0 : loadAverage[0]));
      BigDecimal cpuTemp = new BigDecimal(50.0 + cpuLoad.doubleValue() * 8.0);
        
      // Construct the topic name with alert type and priority as per the Topic hierarchy design
      // SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}
      String topic = "SmartTown/Operations/temperatureReading/created/v1/" + city + "/" + latitude + "/" + longitude;	        			
      TemperatureReading data = new TemperatureReading(cpuLoad, city, latitude, longitude, cpuTemp);

      logger.info(data.toString());	

      // Set the target destination as the constructed topic name
      return MessageBuilder.withPayload(data)
                .setHeader(BinderHeaders.TARGET_DESTINATION, topic)
                .build();		
    };
  }	

βœ… The Supplier function is a Cloud Stream message exchange contract that publishes an event to configured Event Broker.

βœ… Notice the topic name construction:

SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}

The values for variables city, latitude and longitude are read from the configuration file via @Value annotation.

    @Value("${application.latitude}")
    public BigDecimal latitude;
    @Value("${application.longitude}")
    public BigDecimal longitude;
    @Value("${application.city}")
    public String city;

The topic name is dynamically constructed by concatenating the root topic name and dynamic values picked up from the application configuration.

βœ… The return value from the Supplier function is a Spring Cloud Stream Message that is set with destination as the dynamic topic and published to the Broker.

βœ… The temperature simulation is closely tied to CPU load on the machine, which can be manipulated using stress system utility. The simulation logic is built to generate a temperature value as a multiple of CPU load.

####application.yml

βœ… Review the application.yml

server.port: 8082
application:
  city: New York City      
  latitude: 40.713050       
  longitude: -74.007230      
spring:
  cloud:
    function:
      definition: publishTemperatureData
    stream:
      bindings:
        publishTemperatureData-out-0:
          destination: SmartTown/Operations/temperatureReading/created/v1/*/*/*
      binders:
        solace-binder:
          type: solace
          environment:
            solace:
              java:
                host: tcps://<your_host>.messaging.solace.cloud:55443  
                msgVpn: <your_msgVPN>
                clientUsername: solace-cloud-client
                clientPassword: <your_password>
logging:
  level:
    root: info
    org:
      springframework: info      

Affect the following changes on the application configuration file:

  1. Goto Latlang.net and enter your city name to get latitude and longitude values
  2. In the yml file
    • Update your city name
    • Update latitude
    • Update longitude
  3. Save the file

####Running IoT Data Simulation

βœ… Open a terminal, change directory to ac-city-iot-simulator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-iot-simulator
mvn clean spring-boot:run

Alternatively, you can also run ./mvnw clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
2021-09-30 18:04:59.401  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:00.409  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:01.416  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:02.423  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:03.428  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.26708984375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=100.13671875]
2021-09-30 18:05:04.432  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.26708984375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=100.13671875]
2021-09-30 18:05:05.435  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
................................

15. Create Alert Generator Microservice

Before we proceed, we have to make minor changes to the AsyncAPI yml file. The yml file is located under ~/github/smarttown/cloudstream/ directory (the last step in Download AsyncAPI Document).

βœ… Make the following updates to the asyncapi.yaml file

Let's add a few of the template's configuration options to the download AsyncAPI document.

Add x-scs-function-name: processTemperatureReading under the subscribe operation and the publish operation under the two channels. By adding this you are telling the generator the name of the function you would like to handle events being exchanged and by adding the same function-name for both the subscribe and the publish operation you are saying you want them handled by the same function!

a) After

channels:
  SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}:
    subscribe:

b) And after

  SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}:
    publish:

With this change, the channels section of the YAML file will look like this

channels:
  SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}:
    subscribe:
      x-scs-function-name: processTemperatureReading
      message:
        $ref: "#/components/messages/OperationalAlert"
    parameters:
      AlertType:
        schema:
          type: "string"
      AlertPriority:
        schema:
          type: "string"
  SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}:
    publish:
      x-scs-function-name: processTemperatureReading
      message:
        $ref: "#/components/messages/TemperatureReading"
    parameters:
      city:
        schema:
          type: "string"
      latitude:
        schema:
          type: "string"
      longitude:
        schema:
          type: "string"

Under the info section at the bottom of the yml file, remove the x-view: "provider".

info:
  x-generated-time: "2021-09-30 13:15 UTC"
  description: ""
  title: "HVAC Controller"
  x-view: "provider"
  version: "1"

With this change, info section of the YAML file will look like this

info:
  x-generated-time: "2021-09-30 13:15 UTC"
  description: ""
  title: "HVAC Controller"
  version: "1"

βœ… Save the asyncapi.yaml file

We will be building this microservice using AsyncAPI Generator tool from the AsyncAPI document hosted by the PubSub+ Event Portal.

###Generate code using AsyncAPI Code Generator

βœ… Change directory to cloudstream and run the following command

cd ~/github/smarttown/cloudstream

Run the following command to invoke AsyncAPI code generator utility.

ag -o ac-city-alert-generator -p view=provider -p binder=solace -p dynamicType=header -p artifactId=ac-city-alert-generator  -p groupId=com.eap -p javaPackage=com.eap.scs.asyncapi.alertgenerator -p host=tcps://your_host_name.messaging.solace.cloud:55443 -p username=username -p password=password -p msgVpn=msgVPN asyncapi.yaml @asyncapi/java-spring-cloud-stream-template --force-write

This command will take some time (minute or so) and complete with the following message.

Done! ✨
Check out your shiny new generated files at /home/ubuntu/github/smarttown/cloudstream/ac-city-alert-generator.

###Open Spring Tool Suite

  • Open Spring Cloud Suite tool by clicking on the icon in the desktop (if not open already)
  • Import cloud stream projects from the smarttown folder as β€˜Existing maven projects' and choose smarttown/cloudstream folder.

####TemperatureReading.java

βœ… Review the TemperatureReading.java file. It carries temperature reading data - a simple POJO with attributes and corresponding getters/setters.

It is same as the TemperatureReading POJO present in the ac-city-iot-simulator microservice.

####OperationalAlert.java

βœ… Review the OperationalAlert.java file. It carries operational alert data - a simple POJO with attributes and corresponding getters/setters.

####Application.java

βœ… Review the Application.java, specifically the Supplier function.

Let us review the Function

@Bean
  public Function<TemperatureReading, Message<OperationalAlert>> processTemperatureReading() {
    return data -> {
      // Add business logic here.
      logger.info(data.toString());
      String alertType = "string";
      String alertPriority = "string";
      String topic = String.format("SmartTown/Operations/OperationalAlert/created/v1/%s/%s",
        alertType, alertPriority);
      OperationalAlert payload = new OperationalAlert();
      Message message = MessageBuilder
        .withPayload(payload)
        .setHeader(BinderHeaders.TARGET_DESTINATION, topic)
        .build();

      return message;
    };
  }

This Spring Cloud Stream Function contract is a processor that subscribes to TemperatureReading message, and publishes a OperationalAlert message.

The business logic of what transpires in this function is something we will be coding here. Our goal is to generate Alerts of HighTemperature type with three distinct priority levels:

  • LOW: When temperature > 60 and <= 70
  • MEDIUM: When temperature > 70 and <= 80
  • HIGH: When temperature > 80

With the updated logic, the Function contract will be

@Bean
public Function<TemperatureReading, Message<OperationalAlert>> processTemperatureReading() {
  return data -> {
    // NOTE: A return value of null indicates that no message will be published to the Broker 
    if (data.getMCpuTemp().doubleValue() <= 60)
      return null;
    
    // Since the goal is to generate temperature alerts, set the alertType to a 
    // default value of 'HighTemperature' 
    String alertType = "HighTemperature"; // HighCpuLoad
    
    // Based on the defined bounds for Low, Medium and High temperature,
    // check the incoming temperature reading and set the alert priority appropriately
    String alertPriority = "High";
    if (data.getMCpuTemp().doubleValue() > 60 && data.getMCpuTemp().doubleValue() <= 70)
      alertPriority = "Low";
    else if (data.getMCpuTemp().doubleValue() > 70 && data.getMCpuTemp().doubleValue() <= 80)
      alertPriority = "Medium";
    
    // Construct the topic name with alert type and priority as per the Topic hierarchy design
    //		SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}
    String topic = String.format("SmartTown/Operations/OperationalAlert/created/v1/%s/%s",
      alertType, alertPriority);
    
    // Construct an OperatinalAlert object 
    OperationalAlert payload = new OperationalAlert(alertPriority, alertType, data.getMCity(), data.getMCpuTemp(), 
                            data.getMLatitude(), data.getMLongitude());
    
    logger.info("Operational Alert: \n" + payload.toString());
    

    // Add OperationalAlert as type parameter to Message declaration (AsyncAPI codegen will fix this soon)
    Message<OperationalAlert> message = MessageBuilder
      .withPayload(payload)
      .setHeader(BinderHeaders.TARGET_DESTINATION, topic)
      .build();

    return message;
  };
}

There could be other internal services like monitor and external applications like analytics, dashboard could subscribe to this Alert event.

βœ… Notice the topic name construction:

SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}

The topic name is dynamically constructed by concatenating the root topic name and computed values for AlertPriority, and AlertType .

βœ… The return value from the Function function is a Spring Cloud Stream Message that is set with destination as the dynamic topic and published to the Broker.

βœ… The temperature simulation is closely tied to CPU load on the machine, which can be manipulated using stress system utility. The simulation logic is built to generate a temperature value as a multiple of CPU load.

####application.yml

βœ… Review the application.yml

spring:
  cloud:
    function:
      definition: processTemperatureReading
    stream:
      bindings:
        processTemperatureReading-out-0:
          destination: 'SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}'
        processTemperatureReading-in-0:
          destination: SmartTown/Operations/temperatureReading/created/v1/*/*/*
      binders:
        solace-binder:
          type: solace
          environment:
            solace:
              java:
                host: 'tcps://mrm28q29kuoha.messaging.solace.cloud:55443'
                msgVpn: sprint-eap
                clientUsername: solace-cloud-client
                clientPassword: <your_password>
logging:
  level:
    root: info
    org:
      springframework: info

####Running Alert Generator

βœ… Open a terminal, change directory to ac-city-alert-generator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-alert-generator
mvn clean spring-boot:run

This should run the alert generator microservice that subscribes to temperature reading event and publishes appropriate operational alert events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:20.852  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:21.862  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:22.859  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:23.860  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 81.66796875 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:24.867  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 81.66796875 lat: 40.71305 _long: -74.00723 ]
................................

16. Simulate Temperature Variation

Running IoT Data Simulation

βœ… Open a terminal, change directory to ac-city-iot-simulator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-iot-simulator
mvn clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

####Running Alert Generator

βœ… Open a terminal, change directory to ac-city-alert-generator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-iot-generator
mvn clean spring-boot:run

This should run the alert generator microservice that subscribes to temperature reading event and publish appropriate operational alert events to the Event Broker.

This should run the simulator microservice and publish temperature reading events to the Event Broker.

####Simulating CPU load to generate

βœ… Open a terminal, change directory to ac-city-alert-generator project and run the following maven command.

To run the stress command, type:

stress -c 6 -t 180  -v    

Hint: Simulate load on CPU with 6 worker threads and exit after 180 seconds.

This will simulate load on the VM and till timeout after 180 seconds. As the CPU load varies, it gets translated to temperature variation in the Alert Generator microservice and the readings gets tagged as Low, Medium or High severity appropriately.

Observe that the alerts generated are qualified with Low, High and Medium severity types appropriately as the CPU Load varies.

17. Alert Aggregator (optional)

We will extend the workshop to generate an aggregated alert (over a period/duration of 30 seconds) with average temperature and alert nature. In turn these aggregated alerts will be subscribed by a web application using MQTT client in Javascript and plots geo-location markers on Google Maps.

To accomplish that, we would extend the EDA design:

  • Create a Schema AggregateAlert
  • Create a Event AggregateAlert
  • Create Application Heating Cooling Controllers - DASHBOARD

1️⃣ Schema: - AggregateAlert Schema

βœ… Create schema AggregateAlert under the Smart - Analytics Application Domain

βœ… Launch create schema page

In the Create Schema page:

  • Enter AggregateAlert for Name
  • Choose JSON for Content Type
  • Enable the checkbox for Shared
  • Under the Content section, click on the Import from file link. Choose the file AggregateAlert-schema.json in ~/github/smarttown/cloudstream folder
  • Click on Save button

2️⃣ Event - AggregateAlert Event

βœ… Create event under Smart - Analytics Application Domain

βœ… Launch create event page

The following section describes how to create an event and set its attributes - name, logical event mesh, topic address and schema.

In the Create Event page:

  • Enter AggregateAlert for Name
  • Enable the checkbox for Shared
  • Choose LogicalEventMesh for Logical Event Mesh
  • Choose Schema as selected option for Value
    • In the first dropdown select AggregateAlert
    • In the second dropdown select All Versions

βœ… Launch Set Topic Name page

βœ… Set Topic page

βœ… Create level AggregateAlert at the path SmartTown/Analytics of Literal type

βœ… Create level created at the path SmartTown/Analytics/AggregateAlert of Literal type

βœ… Create level v1 at the path SmartTown/Analytics/AggregateAlert/created of Literal type

βœ… Create level city at the path SmartTown/Analytics/AggregateAlert/created/v1 of Variable type

βœ… Create level AlertPriority at the path SmartTown/Analytics/AlertPriority/created/v1/city of Variable type

βœ… Create level AlertType at the path SmartTown/Analytics/AggregateAlert/created/v1/city/AlertPriority of Variable type

  • The final topic name structure

  • Click on Set and Return to Event button

  • Click on Save button

3️⃣ Application - Heating Cooling Controllers - DASHBOARD

In the Create Application page:

  • Enter Heating Cooling Controllers - DASHBOARD for Name
  • Choose Standard as selected option for Application Type

βœ… Launch Manage events page and set Publish/Subscribe permissions on Events

In the Manage Events page:

  • Identify the Events relevant to the application
  • Click on Sub for the OperationalAlert Event
  • Click on Pub for the AggregateAlert Event
  • Click on Save button

Applications List

###Review ac-city-alert-aggregator project

####OperationalAlert.java

βœ… Review the OperationalAlert.java file. It carries temperature reading data - a simple POJO with alert attributes and corresponding getters/setters.

####AggregateAlert.java

βœ… Review the AggregateAlert.java file. It carries average temperature reading and geo details - a simple POJO with attributes and corresponding getters/setters.

####Application.java

βœ… Review the Application.java file. The critical aspect of collecting OperationalAlert events (30 seconds time window) and computing city-wise average temperature, severity and publish AggregateAlert events is carried out here.

####application.yml

βœ… Review the application.yml

server.port: 8084
spring:
  cloud:
    function:
      definition: aggregateTemperature
    stream:
      bindings:
        aggregateTemperature-out-0:
          destination: 'SmartTown/Analytics/AggregateAlert/created/v1/*/*/*'
        aggregateTemperature-in-0:
          destination: 'SmartTown/Operations/OperationalAlert/created/v1/*/*'
      binders:
        solace-binder:
          type: solace
          environment:
            solace:
              java:
                host: tcps://mrm28q29kuoha.messaging.solace.cloud:55443  
                msgVpn: solace-eap
                clientUsername: solace-cloud-client
                clientPassword: <your_password>
logging:
  level:
    root: info
    org:
      springframework: error

####Running Alert Aggregator

βœ… Open a terminal, change directory to ac-city-alert-aggregator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-alert-aggregator
mvn clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
Aggregate for city: San Francisco
AggregateAlert [count=10, severity=Medium, alertType=HighTemperature, city=San Francisco, timeStamp=2021-10-04T12:59:11.730+05:30, temperature=74.2976562500000028421709430404007434844970703125, lat=37.774929, _long=-122.419418]
Aggregate for city: New York City
AggregateAlert [count=10, severity=Medium, alertType=HighTemperature, city=New York City, timeStamp=2021-10-04T12:59:11.750+05:30, temperature=74.487890625000005684341886080801486968994140625, lat=40.71305, _long=-74.00723]
Aggregate for city: Sau Paulo
AggregateAlert [count=9, severity=Medium, alertType=HighTemperature, city=Sau Paulo, timeStamp=2021-10-04T12:59:11.778+05:30, temperature=74.466579861111114269078825600445270538330078125, lat=-23.550520, _long=-46.633308]
Aggregate for city: London
AggregateAlert [count=9, severity=Medium, alertType=HighTemperature, city=London, timeStamp=2021-10-04T12:59:11.803+05:30, temperature=74.466579861111114269078825600445270538330078125, lat=51.507351, _long=-0.127758]
Aggregate for city: Melbourne
AggregateAlert [count=10, severity=Medium, alertType=HighTemperature, city=Melbourne, timeStamp=2021-10-04T12:59:11.830+05:30, temperature=74.2976562500000028421709430404007434844970703125, lat=-37.813629, _long=144.963058]
................................

18. Alert Dashboard (optional)

###Review ac-city-alert-dashboard project

Alert Dashboard is a Spring Boot Application that serves static web application with HTML pages & JS scripts. The web application makes a MQTT client connection to the Broker and subscribes to AggregateAlert event, on receipt it simply plots them on the Google Map.

A tooltip facility details the alert nature with information on timestamp, temperature, sample size, city name and geo-coordinates.

βœ… Review the shared.js

The connection details to establish a MQTT client session is retrieved from the following page.

  // eclipse test server
  var deets = {
    host: '<your_host>.messaging.solace.cloud',
    port: 8443,
    ssl: true,
    username: 'solace-cloud-client',
    password: '<your_password>',
  }
  
  var topicName = 'SmartTown/Analytics/AggregateAlert/created/v1/+/+/+';
  var windowSizeSecs = 10000;

  // this is for MQTT, it should return a connected or connecting valid Paho client
  function getClientConnection(uniqueID,onMessageArrived,onConnectionLost,onConnect) {
    var client = new Paho.MQTT.Client(deets['host'], Number(deets['port']), uniqueID); // AWS SGP Nano
    // set the callback handlers
    client.onConnectionLost = onConnectionLost;
    client.onMessageArrived = onMessageArrived;
    // define connection options
    var connectOptions = {};
    if (deets['ssl'] == true) {
      connectOptions["useSSL"] = true;
    } else {
      connectOptions["useSSL"] = false;
    }
    //connectOptions["reconnect"] = true;
    connectOptions["userName"] = deets['username'];
    connectOptions["password"] = deets['password'];  // AWS SGP Nano
    connectOptions["onSuccess"] = onConnect;
    // try to connect!
    client.connect(connectOptions);
    return client;
  }

MQTT client connection is established with the Broker.

####Running Alert Dashboard

βœ… Open a terminal, change directory to ac-city-alert-dashboard project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-alert-dashboard
mvn clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
2021-10-04 12:49:35.174  INFO 62510 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2021-10-04 12:49:35.176  INFO 62510 --- [  restartedMain] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-10-04 12:49:35.177  INFO 62510 --- [  restartedMain] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.48]
2021-10-04 12:49:35.194  INFO 62510 --- [  restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-10-04 12:49:35.194  INFO 62510 --- [  restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 363 ms
2021-10-04 12:49:35.248  INFO 62510 --- [  restartedMain] o.s.b.a.w.s.WelcomePageHandlerMapping    : Adding welcome page: class path resource [static/index.html]
2021-10-04 12:49:35.270  WARN 62510 --- [  restartedMain] ion$DefaultTemplateResolverConfiguration : Cannot find template location: classpath:/templates/ (please add some templates or check your Thymeleaf configuration)
2021-10-04 12:49:35.278  INFO 62510 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2021-10-04 12:49:35.284  INFO 62510 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-10-04 12:49:35.287  INFO 62510 --- [  restartedMain] c.e.scs.asyncapi.dashboard.Application   : Started Application in 0.503 seconds (JVM running for 1303.432)
2021-10-04 12:49:35.289  INFO 62510 --- [  restartedMain] .ConditionEvaluationDeltaLoggingListener : Condition evaluation unchanged
2021-10-04 12:50:09.348  INFO 62510 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-10-04 12:50:09.349  INFO 62510 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-10-04 12:50:09.349  INFO 62510 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
................................

####Access Alert Dashboard

βœ… Open a browser and navigate to localhost:8080

Simulating Temperature Readings from multiple SmartTowns

Lunch a terminal and run the following commands.

cd ~/github/smarttown/cloudstream/ac-city-iot-simulator

Run one or more (maybe all) of the following commands to simulate sensor data for different locations.

mvn spring-boot:run -Dspring-boot.run.arguments="--application.city=London --application.latitude=51.507351 --application.longitude=-0.127758 --server.port=8091" &
mvn spring-boot:run -Dspring-boot.run.arguments="--application.city='Sau Paulo' --application.latitude=-23.550520 --application.longitude=-46.633308 --server.port=8092" &
mvn spring-boot:run -Dspring-boot.run.arguments="--application.city=Melbourne --application.latitude=-37.813629 --application.longitude=144.963058 --server.port=8093" &
mvn spring-boot:run -Dspring-boot.run.arguments="--application.city='SanFrancisco' --application.latitude=37.774929 --application.longitude=-122.419418 --server.port=8094"
mvn spring-boot:run -Dspring-boot.run.arguments="--application.city=Bangalore --application.latitude=12.971599 --application.longitude=77.594566 --server.port=8095" &
mvn spring-boot:run -Dspring-boot.run.arguments="--application.city=Singapore --application.latitude=1.290453 --application.longitude=103.852038 --server.port=8096" &

19. Takeaways

  • βœ… The Solace PubSub+ Event Portal is an excellent tool to design and visualize your Event-Driven Architecture, discover what events exist, collaborate with your team and kickstart development via exporting of AsyncAPI documents.
  • βœ… AsyncAPI Generator templates allow developers to consistently create event-driven applications by generating code skeletons that are pre-wired with the events and channels defined in the AsyncAPI documents.
  • βœ… Spring Cloud Stream allows developers to implement highly scalable, event-driven microservices without having to learn how to use messaging APIs.

solly_wave

solly_wave

Thanks for participating in this codelab! Let us know what you thought in the Solace Community Forum and if you found any issues along the way we'd appreciate it if you'd raise them by clicking the Report a mistake button at the bottom left of this codelab.