If your requirement is to build modern, real-time applications using event-driven architecture (EDA), this codelab will walk you through the steps of how to build Event API products, define asynchronous APIs, implement them using Spring Cloud Stream microservices, and globally distribute them across multi-cloud and on-premises environments using an Solace PubSub+ platform.

In this codelab, we will create API Products that helps others to consume Business capabilities of a SmartTown where APIs are a way of life. We will also

Prerequisites

πŸ›  This page covers the setup needed to perform this codelab. πŸ› 

PubSub+ Event Broker

βœ… You need to sign for a free Solace Cloud Account using this link.

Complete the following steps:

  1. Log into the PubSub+ Cloud Console To start using PubSub+ Cloud, log in to the PubSub+ Cloud Console. For more information on accessing the Cloud Console, check out the documentation
  2. Create an Event Broker Service Create an event broker service (or simply a service) using Cluster Manager tool and name the service as solace-eap. For more information on accessing the Cloud Console, check out the documentation
  3. Create a Client Username Connect to the broker service and open the Message VPN console to add client username under Access Control setting. Create a client username smarttown with password smarttown.

Ensure that you have captured the following details on connection information that will be used in this codelab.

AsyncAPI Generator Requirements

βœ… Install instructions available here

Spring Cloud Stream Requirements

βœ… Spring Cloud Stream just requires Java and Maven to use πŸš€

Clone Git Repository

A ready to use code of few of the microservices referred in this workshop are available in Git. We will use this codebase and build new code wherever necessary to complete this workshop.

This would create a directory by name smarttown and checkout the git repository contents.

Solace PubSub+ Event Portal

The Market's First API Portal...for Events

PubSub+ Event Portal lets you design your event-driven applications, events, and schemas as interconnected network diagrams your team can go over in design reviews.

With the PubSub+ Event Portal, you can:

Foundational Elements of PubSub+ Event Portal

Application Domain

An application domain represents a namespace where applications, events, and schemas can live. Application domains are useful to decompose the enterprise and create organizational boundaries within which teams can work. Within the application domain, you can create a suite of applications, events and schemas that are isolated from other application domains.

Schema

In simple terms, a schema represents the contract to describe the payload of an event. Producers and consumers of an event can trust that the event's payload matches the schema definition assigned to that event. Schemas define a type of payload through JSON, AVRO, XML, Binary, or Text. JSON, AVRO, and XML schemas have content that describes each property of the schema. The content is either in JSON or AVRO Schema format, or XSD/DTD format.

Event

The event represents a business moment or an action that can be communicated with zero or more interested applications. The event is where you define metadata that describes and categorizes the event. An event is produced on a specific topic that must be defined for that event. From a modelling perspective, events reference payload schemas, and events are referenced by applications, thus forming the glue that binds applications together.

Application

An application represents a piece of software that produces and consumes events. Applications connect to the event broker in an event-driven architecture and communicate with other applications via events. A single application represents a class of applications that are running the same code base; therefore, there is no need to create multiple applications for each instance in a cluster.

Icons used in Designer:

SmartTown uses information and communication technology to improve operational efficiency, share information with the public and provide a better quality of government service and citizen welfare. SmartTown uses internal microservice architecture with functionality exposed through APIs and externally exposed API products while hiding the implementation details.

Application Domains

Though the SmartTown EDA design shall encompass various aspects of automation and control - we will focus on the Analytics and Operations applications for this workshop.

Applications

A view that captures the relationship between Applications within the Domain based on their publish/subscribe action on the domain events.

Events

References:

We will create an event-driven design for SmartTown functions in the Event Portal using Designer tool. Let us start with defining Application Domain.

Define Application Domains

We will be creating two application domains:

  1. SmartTown - Analytics and
  2. SmartTown - Operations

βœ… Launch Solace PubSub+ Event Portal and open Designer tool

βœ… Create Application Domain - SmartTown - Analytics

βœ… Create Application Domain - SmartTown - Operations

βœ… Now, you should be able to see both the domains in the topology view

βœ… Create schema under Smart - Operations Application

βœ… Launch create schema page

Create Schema - TemperatureReading

The following section describes the event schema to be used for TemperatureReading Event. It captures the geo-location of the sensor (lat, long), city name, city name and cpu-load attributes.

{
    "$schema": "http://json-schema.org/draft-07/schema",
    "type": "object",
    "title": "The root schema",
    "description": "The root schema comprises the entire JSON document.",
    "default": {},
    "examples": [
      {
        "m_city": "San Francisco",
        "m_cpu_temp": 55.6875,
        "m_cpu_load": 3.0550130208333335,
        "m_latitude": 13.04302978515625,
        "m_longitude": 80.27391815185547
      }
    ],
    "required": [
      "m_city",
      "m_cpu_temp",
      "m_cpu_load",
      "m_latitude",
      "m_longitude"
    ],
    "properties": {
      "m_city": {
        "type": "string",
        "title": "The m_city schema",
        "description": "An explanation about the purpose of this instance.",
        "default": "",
        "examples": [
          "San Francisco"
        ]
      },
      "m_cpu_temp": {
        "type": "number",
        "title": "The m_cpu_temp schema",
        "description": "An explanation about the purpose of this instance.",
        "default": 0,
        "examples": [
          55.6875
        ]
      },
      "m_cpu_load": {
        "type": "number",
        "title": "The m_cpu_load schema",
        "description": "An explanation about the purpose of this instance.",
        "default": 0,
        "examples": [
          3.0550130208333335
        ]
      },
      "m_latitude": {
        "type": "number",
        "title": "The m_latitude schema",
        "description": "An explanation about the purpose of this instance.",
        "default": 0,
        "examples": [
          13.04302978515625
        ]
      },
      "m_longitude": {
        "type": "number",
        "title": "The m_longitude schema",
        "description": "An explanation about the purpose of this instance.",
        "default": 0,
        "examples": [
          80.27391815185547
        ]
      }
    },
    "additionalProperties": true
  }

In the Create Schema page:

Define Schema - OperationalAlert

The following section describes the event schema to be used for OperationalAlert Event. It captures the geo-location of the sensor (lat, long), temperature, city name, attributes.

{
  "$schema": "http://json-schema.org/draft-07/schema",
  "type": "object",
  "title": "The root schema",
  "description": "The root schema comprises the entire JSON document.",
  "default": {},
  "examples": [
    {
      "lat": -71.518929,
      "long": -71.518929,
      "city": "Hopkinton",
      "alertType": "HighTemperature",
      "severity": "Medium",
      "temperature": 99.3
    }
  ],
  "required": [
    "lat",
    "long",
    "city",
    "alertType",
    "severity",
    "temperature"
  ],
  "properties": {
    "lat": {
      "type": "number",
      "title": "The lat schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        -71.518929
      ]
    },
    "long": {
      "type": "number",
      "title": "The long schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        -71.518929
      ]
    },
    "city": {
      "type": "string",
      "title": "The city schema",
      "description": "An explanation about the purpose of this instance.",
      "default": "",
      "examples": [
        "Hopkinton"
      ]
    },
    "alertType": {
      "type": "string",
      "title": "The alertType schema",
      "description": "An explanation about the purpose of this instance.",
      "default": "",
      "examples": [
        "HighTemperature"
      ]
    },
    "severity": {
      "type": "string",
      "title": "The severity schema",
      "description": "An explanation about the purpose of this instance.",
      "default": "",
      "examples": [
        "Medium"
      ]
    },
    "temperature": {
      "type": "number",
      "title": "The temperature schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        99.3
      ]
    }
  },
  "additionalProperties": true
}

In the Create Schema page:

βœ… Create event under Smart - Operations Application Domain

βœ… Launch create event page

Create Event - TemperatureReading

The following section describes how to create an event and set its attributes - name, logical event mesh, topic address and schema.

In the Create Event page:

βœ… Launch Set Topic Name page

βœ… Set Topic page

βœ… Create level SmartTown of Literal type

βœ… Create level Operations at the path SmartTown of Literal type

βœ… Create level temperatureReading at the path SmartTown/Operations of Literal type

βœ… Create level created at the path SmartTown/Operations/temperatureReading of Literal type

βœ… Create level v1 at the path SmartTown/Operations/temperatureReading/created of Literal type

βœ… Create level city at the path SmartTown/Operations/temperatureReading/created/v1 of Variable type

βœ… Create level latitude at the path SmartTown/Operations/temperatureReading/created/v1/city of Variable type

βœ… Create level longitude at the path SmartTown/Operations/temperatureReading/created/v1/city/latitude of Variable type

Create Event - OperationalAlert

The following section describes how to create an event and set its attributes - name, logical event mesh, topic address and schema.

In the Create Event page:

βœ… Launch Set Topic Name page

βœ… Create level SmartTown of Literal type

βœ… Create level Operations at the path SmartTown of Literal type

βœ… Create level OperationalAlert at the path SmartTown/Operations of Literal type

βœ… Create level created at the path SmartTown/Operations/OperationalAlert of Literal type

βœ… Create level v1 at the path SmartTown/Operations/OperationalAlert/created of Literal type

βœ… Create level AlertPriority at the path SmartTown/Operations/OperationalAlert/created/v1 of Variable type

βœ… Create level AlertType at the path SmartTown/Operations/OperationalAlert/created/v1/AlertPriority of Variable type

βœ… Create application under SmartTown - Operations

βœ… Launch create application option

Create Application - Temperature Sensors

The following section describes how to create an application and set its attributes - name, application type and event association.

In the Create Application page:

βœ… Launch Manage events page

βœ… Set Publish/Subscribe permissions on Events

In the Manage Events page:

Create Application - Heating Cooling Controllers - EXTERNAL

The following section describes how to create an application and set its attributes - name, application type and event association.

In the Create Application page:

βœ… Launch Manage events page and set Publish/Subscribe permissions on Events

In the Manage Events page:

Applications List

An Event API Product can be composed of events from either Solace PubSub+ Event Broker or Kafka for external developers' consumption, enabling them to build event-driven applications that can subscribe to and/or publish the specified events.

The events bundled in an Event API Product can be added from one or more application domains, so you can mix and match events. Event API is all about identifying and bundling high-value events for external consumption similar to REST APIs.

The following diagram demonstrates the co-existence of REST & Events based APIs serving both internal and external applications.

When it comes to value and exposure, Event-driven applications can combine the classic system and business events to create a bundle as a specification without exposing any of the internal or implementation details. Solace PubSub+ uses AsyncAPI as the standard for API specification for exposing such Event API bundles. Consuming applications also benefits from AsyncAPI's code-generation facility to focus just on the business logic and not the mundane tasks.

References:

Next step in building an Event API product involves bundling of high-value, share-worthy events. The resulting bundle is released as a specification for public consumption. The generated specification is based on a commonly understood standard - like AsyncAPI.

AsyncAPI also provides tools to generate code (supports popular languages) helping consumers directly go from spec-to-code, leaving the only responsibility of updating the business logic.

βœ… Launch API Products

Create EventAPI Product

βœ… Configure the API Product settings

In the Create Event API Product page:

βœ… Configure Event settings

βœ… Event API Product list

βœ… HVAC Controller Details

A newly created Event API Product can be made externally available or shared through the following methods:

These URLs are available only when the API Product is marked as Released. To release the API Product, click on the Release button at the top right.

More info about the API release and website hosting:

With this, the Event API Product named HVAC Controller is ready and published for access on public URLs.

Exposed AsyncAPI URL

Interested parties can download the AsyncAPI specification from this public URL and build applications with minimal code using AsyncAPI codegen tools.

AsyncAPI has emerged as the industry standard for defining asynchronous, event-driven APIs; you can think of it as OpenAPI for the asynchronous world.

It is an open source initiative that provides both

AsyncAPI Document

An AsyncAPI document that defines the application that you want to develop. This document can be represented as JSON objects conforming to the JSON standards, or an YAML file. You can either manually create the document or use an event portal.

The AsyncAPI initiative provides a handy, interactive tool called the AsyncAPI playground to make the document creation easier.

AsyncAPI Playground

Alternatively you can generate this document from EDA tools such as Event Portal of Solace PubSub+ platform. Typically they are design-time tools allowing architects and developers to collaborate using a GUI to design the event-driven architecture.

Having a catalog of well-organized channels and events for reuse will also save you both time and headaches while collaborating, instead of having to comb through a bunch of files in various locations.

AsyncAPI Generator

AsyncAPI Generator is a tool that can generate a skeleton code from the AsyncAPI document, reducing the need to laboriously create boilerplate code saving time and effort.

The AsyncAPI Code Generator supports templates to generate code for a variety of different languages and protocols, but for this workshop we're going to use the Spring Cloud Stream template.

The Spring Cloud Stream framework provides an easy way to get started with event-driven microservices by providing binders that allow the developer to create their microservices without having to learn messaging APIs.

Example:

ag ~/AsyncApiDocument.yaml https://github.com/asyncapi/java-spring-cloud-stream-template.git

Coding Business Logic

The generated spring project is a regular Spring Boot Java project with generated classes under the javaPackage and an application.yml file for configuration. It contains:

References:

Review API Product URL

You can locate the website URL where the EventAPI Product is hosted from the Event API Product settings.

βœ… Open the website URL in the browser (click on the link)

You can click on the Download YAML button and download the document from this portal - just you need to move it to the cloudstream folder.

Run the following command on a terminal window

mv ~/Downloads/asyncapi.yaml ~/github/smarttown/cloudstream/

Cloud Stream Intro

Spring Cloud Stream is a framework for creating highly scalable, event-driven microservices connected by pluggable messaging services. Messaging services are pluggable via Binders that we'll cover in a bit. The framework is based on Spring Boot and Spring Integration.

Spring Cloud Stream has three different types of message exchange contracts as pictures below.

  1. Suppliers are sources of events
  2. Sinks are consumers of events
  3. Processors are both consumers and subscribers of events

Message Exchange Contracts Map to Java Functions

As of Spring Cloud Stream v3 the preferred programming model is to use Java Functions to implement your cloud stream apps.

We just mentioned the 3 types of Cloud Stream message exchange contract are "Sources", "Sinks" and "Processors". Those map to functions as follows:

Binders

Spring Cloud Stream Binders are really what make the framework useful. Binders provide an abstraction layer between your code and the messaging systems over which events are flowing. This allows you to write your code without having to worry about learning messaging APIs! When messages are sent or received from the messaging system they pass through the messaging system specific binder which knows how to communicate with that specific message broker.

As of the creation of this codelab the available Cloud Stream Binders are:

Communication Models

Instead of having to learn Messaging APIs, developers just have to understand the communication models that Spring Cloud Stream supports. There are 3 supported models, but support varies per binder.

  1. Publish-subscribe: subscribers are independent from each other & receive events in order
  2. Consumer groups: fan-out and load-balancing across multiple consumers
  3. Stateful partitioning support: in-order processing for consistency and performance

References:

Launch the Broker Service from PubSub+ Console and open the connection tab.

Click on Get Started button next to the Spring Cloud Stream library

Make a note of the Host URI from the connection detailsx

The directory github/cloudstream/ac-city-iot-simulator contains a prebuilt spring cloud stream project that can readily publish temperature reading data. You just have to update the application.yml configuration with host details.

Import the projects under github/cloudstream/ directory.

Launch Spring Tool Suite

Review ac-city-iot-simulator project

TemperatureReading.java

βœ… Review the TemperatureReading.java file. It carries temperature reading data - a simple POJO with attributes and corresponding getters/setters.

Application.java

βœ… Review the Application.java, specifically the Supplier function.

  @Bean
  Supplier<Message<TemperatureReading>> publishTemperatureData()  {
    // Collect CPU metrics 
    return () -> {
      SystemInfo si = new SystemInfo();
      HardwareAbstractionLayer hal = si.getHardware();
      CentralProcessor processor = hal.getProcessor();
        
      double[] loadAverage = processor.getSystemLoadAverage(1);
          
      BigDecimal cpuLoad = new BigDecimal((loadAverage[0] < 0 ? 0 : loadAverage[0]));
      BigDecimal cpuTemp = new BigDecimal(50.0 + cpuLoad.doubleValue() * 8.0);
        
      // Construct the topic name with alert type and priority as per the Topic hierarchy design
      // SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}
      String topic = "SmartTown/Operations/temperatureReading/created/v1/" + city + "/" + latitude + "/" + longitude;	        			
      TemperatureReading data = new TemperatureReading(cpuLoad, city, latitude, longitude, cpuTemp);

      logger.info(data.toString());	

      // Set the target destination as the constructed topic name
      return MessageBuilder.withPayload(data)
                .setHeader(BinderHeaders.TARGET_DESTINATION, topic)
                .build();		
    };
  }	

βœ… The Supplier function is a Cloud Stream message exchange contract that publishes an event to configured Event Broker.

βœ… Notice the topic name construction:

SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}

The values for variables city, latitude and longitude are read from the configuration file via @Value annotation.

    @Value("${application.latitude}")
    public BigDecimal latitude;
    @Value("${application.longitude}")
    public BigDecimal longitude;
    @Value("${application.city}")
    public String city;

The topic name is dynamically constructed by concatenating the root topic name and dynamic values picked up from the application configuration.

βœ… The return value from the Supplier function is a Spring Cloud Stream Message that is set with destination as the dynamic topic and published to the Broker.

βœ… The temperature simulation is closely tied to CPU load on the machine, which can be manipulated using stress system utility. The simulation logic is built to generate a temperature value as a multiple of CPU load.

application.yml

βœ… Review the application.yml

server.port: 8082
application:
  # e.g., New York City
  city: New York City      
  # e.g., 40.713050
  latitude: 40.713050       
  # e.g., -74.007230 
  longitude: -74.007230      
spring:
  cloud:
    function:
      definition: publishTemperatureData
    stream:
      bindings:
        publishTemperatureData-out-0:
          destination: SmartTown/Operations/temperatureReading/created/v1/*/*/*
      binders:
        solace-binder:
          type: solace
          environment:
            solace:
              java:
                # e.g., 'tcps://mrri685heajs1.messaging.solace.cloud:55443'
                host: tcps://mrri685heajs1.messaging.solace.cloud:55443  
                msgVpn: solace-eap
                clientUsername: smarttown
                clientPassword: smarttown
logging:
  level:
    root: info
    org:
      springframework: info

Affect the following changes on the application configuration file:

  1. Goto Latlang.net and enter your city name to get latitude and longitude values
  2. In the yml file
    • Update your city name
    • Update latitude
    • Update longitude
    • Update the host name (instructor to provide the host name)
  3. Save the file

Running IoT Data Simulation

βœ… Open a terminal, change directory to ac-city-iot-simulator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-iot-simulator
mvn clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
2021-09-30 18:04:59.401  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:00.409  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:01.416  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:02.423  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.7255859375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=103.8046875]
2021-09-30 18:05:03.428  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.26708984375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=100.13671875]
2021-09-30 18:05:04.432  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
TemperatureReading [mCpuLoad=6.26708984375, mCity=New York City, mLatitude=40.71305, mLongitude=-74.00723, mCpuTemp=100.13671875]
2021-09-30 18:05:05.435  INFO 5981 --- [   scheduling-1] c.e.s.a.datacollector.Application        :
................................

Before we proceed, we have to make minor changes to the AsyncAPI yml file. The yml file is located under ~/github/smarttown/cloudstream/ directory (the last step in Download AsyncAPI Document).

βœ… Make the following updates to the asyncapi.yaml file

Let's add a few of the template's configuration options to the download AsyncAPI document.

Add x-scs-function-name: processTemperatureReading under the subscribe operation and the publish operation under our two channels. By adding this you are telling the generator the name of the function you would like to handle events being exchanged and by adding the same function-name for both the subscribe and the publish operation you are saying you want them handled by the same function!

Append the line below 

      x-scs-function-name: processTemperatureReading

a) After 

channels:
  SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}:
    subscribe:

b) And after

  SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}:
    publish:

With this change, the channels section of the YAML file will look like this

channels:
  SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}:
    subscribe:
      x-scs-function-name: processTemperatureReading
      message:
        $ref: "#/components/messages/OperationalAlert"
    parameters:
      AlertType:
        schema:
          type: "string"
      AlertPriority:
        schema:
          type: "string"
  SmartTown/Operations/temperatureReading/created/v1/{city}/{latitude}/{longitude}:
    publish:
      x-scs-function-name: processTemperatureReading
      message:
        $ref: "#/components/messages/TemperatureReading"
    parameters:
      city:
        schema:
          type: "string"
      latitude:
        schema:
          type: "string"
      longitude:
        schema:
          type: "string"

In the last section of the yml file, in the info section remove the x-view: "provider".

info:
  x-generated-time: "2021-09-30 13:15 UTC"
  description: ""
  title: "HVAC Controller"
  x-view: "provider"
  version: "1"

With this change, info section of the YAML file will look like this

info:
  x-generated-time: "2021-09-30 13:15 UTC"
  description: ""
  title: "HVAC Controller"
  version: "1"

βœ… Save the asyncapi.yaml file

We will be building this microservice using AsyncAPI Generator tool from the AsyncAPI document hosted by the PubSub+ Event Portal.

Generate code using AsyncAPI Code Generator

βœ… Change directory to cloudstream and run the following command

cd ~/github/smarttown/cloudstream

Run the following command to invoke AsyncAPI code generator utility.

ag -o ac-city-alert-generator -p view=provider -p binder=solace -p dynamicType=header -p artifactId=ac-city-alert-generator  -p groupId=com.eap -p javaPackage=com.eap.scs.asyncapi.alertgenerator -p host=tcps://mrri685heajs7.messaging.solace.cloud:55443 -p username=smarttown -p password=smarttown -p msgVpn=solace-eap asyncapi.yaml @asyncapi/java-spring-cloud-stream-template --force-write

This command will take some time (minute or so) and complete with the following message.

Done! ✨
Check out your shiny new generated files at /home/ubuntu/github/smarttown/cloudstream/ac-city-alert-generator.

Open Spring Tool Suite

TemperatureReading.java

βœ… Review the TemperatureReading.java file. It carries temperature reading data - a simple POJO with attributes and corresponding getters/setters.

It is same as the TemperatureReading POJO present in the ac-city-iot-simulator microservice.

OperationalAlert.java

βœ… Review the OperationalAlert.java file. It carries operational alert data - a simple POJO with attributes and corresponding getters/setters.

Application.java

βœ… Review the Application.java, specifically the Supplier function.

Let us review the Function

@Bean
  public Function<TemperatureReading, Message<OperationalAlert>> processTemperatureReading() {
    return data -> {
      // Add business logic here.
      logger.info(data.toString());
      String alertType = "string";
      String alertPriority = "string";
      String topic = String.format("SmartTown/Operations/OperationalAlert/created/v1/%s/%s",
        alertType, alertPriority);
      OperationalAlert payload = new OperationalAlert();
      Message message = MessageBuilder
        .withPayload(payload)
        .setHeader(BinderHeaders.TARGET_DESTINATION, topic)
        .build();

      return message;
    };
  }

This Spring Cloud Stream Function contract is a processor that subscribes to TemperatureReading message, and publishes a OperationalAlert message.

The business logic of what transpires in this function is something we will be coding here. Our goal is to generate Alerts of HighTemperature type with three distinct priority levels:

With the updated logic, the Function contract will be

@Bean
public Function<TemperatureReading, Message<OperationalAlert>> processTemperatureReading() {
  return data -> {
    // NOTE: A return value of null indicates that no message will be published to the Broker 
    if (data.getMCpuTemp().doubleValue() <= 60)
      return null;
    
    // Since the goal is to generate temperature alerts, set the alertType to a 
    // default value of 'HighTemperature' 
    String alertType = "HighTemperature"; // HighCpuLoad
    
    // Based on the defined bounds for Low, Medium and High temperature,
    // check the incoming temperature reading and set the alert priority appropriately
    String alertPriority = "High";
    if (data.getMCpuTemp().doubleValue() > 60 && data.getMCpuTemp().doubleValue() <= 70)
      alertPriority = "Low";
    else if (data.getMCpuTemp().doubleValue() > 70 && data.getMCpuTemp().doubleValue() <= 80)
      alertPriority = "Medium";
    
    // Construct the topic name with alert type and priority as per the Topic hierarchy design
    //		SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}
    String topic = String.format("SmartTown/Operations/OperationalAlert/created/v1/%s/%s",
      alertType, alertPriority);
    
    // Construct an OperatinalAlert object 
    OperationalAlert payload = new OperationalAlert(alertPriority, alertType, data.getMCity(), data.getMCpuTemp(), 
                            data.getMLatitude(), data.getMLongitude());
    
    logger.info("Operational Alert: \n" + payload.toString());
    

    // Add OperationalAlert as type parameter to Message declaration (AsyncAPI codegen will fix this soon)
    Message<OperationalAlert> message = MessageBuilder
      .withPayload(payload)
      .setHeader(BinderHeaders.TARGET_DESTINATION, topic)
      .build();

    return message;
  };
}

There could be other internal services like monitor and external applications like analytics, dashboard could subscribe to this Alert event.

βœ… Notice the topic name construction:

SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}

The topic name is dynamically constructed by concatenating the root topic name and computed values for AlertPriority, and AlertType .

βœ… The return value from the Function function is a Spring Cloud Stream Message that is set with destination as the dynamic topic and published to the Broker.

βœ… The temperature simulation is closely tied to CPU load on the machine, which can be manipulated using stress system utility. The simulation logic is built to generate a temperature value as a multiple of CPU load.

application.yml

βœ… Review the application.yml

spring:
  cloud:
    function:
      definition: processTemperatureReading
    stream:
      bindings:
        processTemperatureReading-out-0:
          destination: 'SmartTown/Operations/OperationalAlert/created/v1/{AlertPriority}/{AlertType}'
        processTemperatureReading-in-0:
          destination: SmartTown/Operations/temperatureReading/created/v1/*/*/*
      binders:
        solace-binder:
          type: solace
          environment:
            solace:
              java:
                host: 'tcps://xxxx.messaging.solace.cloud:55443' # connect to your cloud account and get the Host URI from the Spring Boot Java API service
                msgVpn: sprint-eap
                clientUsername: smarttown
                clientPassword: smarttown
logging:
  level:
    root: info
    org:
      springframework: info

Running Alert Generator

βœ… Open a terminal, change directory to ac-city-alert-generator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-alert-generator
mvn clean spring-boot:run

This should run the alert generator microservice that subscribes to temperature reading event and publishes appropriate operational alert events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:20.852  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:21.862  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:22.859  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 80.25 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:23.860  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 81.66796875 lat: 40.71305 _long: -74.00723 ]
2021-10-04 06:03:24.867  INFO 45472 --- [pool-4-thread-1] c.e.s.a.alertgenerator.Application       : Operational Alert:
OperationalAlert [ severity: High alertType: HighTemperature city: New York City temperature: 81.66796875 lat: 40.71305 _long: -74.00723 ]
................................

Running IoT Data Simulation

βœ… Open a terminal, change directory to ac-city-iot-simulator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-iot-simulator
mvn clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

Running Alert Generator

βœ… Open a terminal, change directory to ac-city-alert-generator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-iot-generator
mvn clean spring-boot:run

This should run the alert generator microservice that subscribes to temperature reading event and publish appropriate operational alert events to the Event Broker.

This should run the simulator microservice and publish temperature reading events to the Event Broker.

Simulating CPU load to generate

βœ… Open a terminal, change directory to ac-city-alert-generator project and run the following maven command.

To run the stress command, type:

stress -c 6 -t 180  -v    

Hint: Simulate load on CPU with 6 worker threads and exit after 180 seconds.

This will simulate load on the VM and till timeout after 180 seconds. As the CPU load varies, it gets translated to temperature variation in the Alert Generator microservice and the readings gets tagged as Low, Medium or High severity appropriately.

Observe that the alerts generated are qualified with Low, High and Medium severity types appropriately as the CPU Load varies.

We will extend the workshop to generate an aggregated alert (over a period/duration of 30 seconds) with average temperature and alert nature. In turn these aggregated alerts will be subscribed by a web application using MQTT client in Javascript and plots geo-location markers on Google Maps.

To accomplish that, we would extend the EDA design:

Create Schema - AggregateAlert

βœ… Create schema AggregateAlert under Smart - Analytics Application

βœ… Launch create schema page

The following section describes the event schema to be used for AggregateAlert Event. It captures the geo-location of the sensor (lat, long), city name, city name, aggregate window duration, sample size and average temperature, timestamp along with the severity and alert type attributes.

{
  "$schema": "http://json-schema.org/draft-07/schema",
  "type": "object",
  "title": "The root schema",
  "description": "The root schema comprises the entire JSON document.",
  "default": {},
  "examples": [
    {
      "lat": -71.518929,
      "long": -71.518929,
      "city": "Hopkinton",
      "alertType": "HighTemperature",
      "severity": "Medium",
      "temperature": 99.3,
      "count": 30,
      "timeStamp": "2021-11-29'T'18:37:22.323333",
      "duration": 30
    }
  ],
  "required": [
    "lat",
    "long",
    "city",
    "alertType",
    "severity",
    "temperature",
    "count",
    "timeStamp",
    "duration"
  ],
  "properties": {
    "lat": {
      "type": "number",
      "title": "The lat schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        -71.518929
      ]
    },
    "long": {
      "type": "number",
      "title": "The long schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        -71.518929
      ]
    },
    "city": {
      "type": "string",
      "title": "The city schema",
      "description": "An explanation about the purpose of this instance.",
      "default": "",
      "examples": [
        "Hopkinton"
      ]
    },
    "alertType": {
      "type": "string",
      "title": "The alertType schema",
      "description": "An explanation about the purpose of this instance.",
      "default": "",
      "examples": [
        "HighTemperature"
      ]
    },
    "severity": {
      "type": "string",
      "title": "The severity schema",
      "description": "An explanation about the purpose of this instance.",
      "default": "",
      "examples": [
        "Medium"
      ]
    },
    "temperature": {
      "type": "number",
      "title": "The temperature schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        99.3
      ]
    },
    "count": {
      "type": "number",
      "title": "The count schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        30
      ]
    },
    "timeStamp": {
      "type": "string",
      "title": "The timeStamp schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        "2021-11-29'T'18:37:22.323333"
      ]
    },
    "duration": {
      "type": "number",
      "title": "The duration schema",
      "description": "An explanation about the purpose of this instance.",
      "default": 0,
      "examples": [
        30
      ]
    }
  },
  "additionalProperties": true
}

In the Create Schema page:

Create Event - AggregateAlert

βœ… Create event under Smart - Analytics Application Domain

βœ… Launch create event page

The following section describes how to create an event and set its attributes - name, logical event mesh, topic address and schema.

In the Create Event page:

βœ… Launch Set Topic Name page

βœ… Set Topic page

βœ… Create level AggregateAlert at the path SmartTown/Analytics of Literal type

βœ… Create level created at the path SmartTown/Analytics/AggregateAlert of Literal type

βœ… Create level v1 at the path SmartTown/Analytics/AggregateAlert/created of Literal type

βœ… Create level city at the path SmartTown/Analytics/AggregateAlert/created/v1 of Variable type

βœ… Create level AlertPriority at the path SmartTown/Analytics/AlertPriority/created/v1/city of Variable type

βœ… Create level AlertType at the path SmartTown/Analytics/AggregateAlert/created/v1/city/AlertPriority of Variable type

Create Application - Heating Cooling Controllers - DASHBOARD

The following section describes how to create an application and set its attributes - name, application type and event association.

In the Create Application page:

βœ… Launch Manage events page and set Publish/Subscribe permissions on Events

In the Manage Events page:

Applications List

Review ac-city-alert-aggregator project

OperationalAlert.java

βœ… Review the OperationalAlert.java file. It carries temperature reading data - a simple POJO with alert attributes and corresponding getters/setters.

AggregateAlert.java

βœ… Review the AggregateAlert.java file. It carries average temperature reading and geo details - a simple POJO with attributes and corresponding getters/setters.

Application.java

βœ… Review the Application.java file. The critical aspect of collecting OperationalAlert events (30 seconds time window) and computing city-wise average temperature, severity and publish AggregateAlert events is carried out here.

application.yml

βœ… Review the application.yml

server.port: 8084
spring:
  cloud:
    function:
      definition: aggregateTemperature
    stream:
      bindings:
        aggregateTemperature-out-0:
          destination: 'SmartTown/Operations/AggregateAlert/created/v1/*/*/*'
        aggregateTemperature-in-0:
          destination: 'SmartTown/Operations/OperationalAlert/created/v1/*/*'
      binders:
        solace-binder:
          type: solace
          environment:
            solace:
              java:
                # e.g., 'tcps://mrri685heajs1.messaging.solace.cloud:55443'
                host: tcps://mrri685heajs7.messaging.solace.cloud:55443  
                msgVpn: solace-eap
                clientUsername: smarttown
                clientPassword: smarttown
logging:
  level:
    root: info
    org:
      springframework: info

Running Alert Aggregator

βœ… Open a terminal, change directory to ac-city-alert-aggregator project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-alert-aggregator
mvn clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
Aggregate for city: San Francisco
AggregateAlert [count=10, severity=Medium, alertType=HighTemperature, city=San Francisco, timeStamp=2021-10-04T12:59:11.730+05:30, temperature=74.2976562500000028421709430404007434844970703125, lat=37.774929, _long=-122.419418]
Aggregate for city: New York City
AggregateAlert [count=10, severity=Medium, alertType=HighTemperature, city=New York City, timeStamp=2021-10-04T12:59:11.750+05:30, temperature=74.487890625000005684341886080801486968994140625, lat=40.71305, _long=-74.00723]
Aggregate for city: Sau Paulo
AggregateAlert [count=9, severity=Medium, alertType=HighTemperature, city=Sau Paulo, timeStamp=2021-10-04T12:59:11.778+05:30, temperature=74.466579861111114269078825600445270538330078125, lat=-23.550520, _long=-46.633308]
Aggregate for city: London
AggregateAlert [count=9, severity=Medium, alertType=HighTemperature, city=London, timeStamp=2021-10-04T12:59:11.803+05:30, temperature=74.466579861111114269078825600445270538330078125, lat=51.507351, _long=-0.127758]
Aggregate for city: Melbourne
AggregateAlert [count=10, severity=Medium, alertType=HighTemperature, city=Melbourne, timeStamp=2021-10-04T12:59:11.830+05:30, temperature=74.2976562500000028421709430404007434844970703125, lat=-37.813629, _long=144.963058]
................................

Review ac-city-alert-dashboard project

Alert Dashboard is a Spring Boot Application that serves static web application with HTML pages & JS scripts. The web application makes a MQTT client connection to the Broker and subscribes to AggregateAlert event, on receipt it simply plots them on the Google Map.

A tooltip facility details the alert nature with information on timestamp, temperature, sample size, city name and geo-coordinates.

βœ… Review the shared.js

The connection details to establish a MQTT client session is retrieved from the following page.

  // eclipse test server
  var deets = {
    host: 'mrri685heajs7.messaging.solace.cloud',
    port: 8443,
    ssl: true,
    username: 'smarttown',
    password: 'smarttown',
  }
  
  var topicName = 'SmartTown/Operations/AggregateAlert/created/v1/+/+/+';
  var windowSizeSecs = 10000;

  // this is for MQTT, it should return a connected or connecting valid Paho client
  function getClientConnection(uniqueID,onMessageArrived,onConnectionLost,onConnect) {
    var client = new Paho.MQTT.Client(deets['host'], Number(deets['port']), uniqueID); // AWS SGP Nano
    // set the callback handlers
    client.onConnectionLost = onConnectionLost;
    client.onMessageArrived = onMessageArrived;
    // define connection options
    var connectOptions = {};
    if (deets['ssl'] == true) {
      connectOptions["useSSL"] = true;
    } else {
      connectOptions["useSSL"] = false;
    }
    //connectOptions["reconnect"] = true;
    connectOptions["userName"] = deets['username'];
    connectOptions["password"] = deets['password'];  // AWS SGP Nano
    connectOptions["onSuccess"] = onConnect;
    // try to connect!
    client.connect(connectOptions);
    return client;
  }

your Broker using Spring Cloud Stream client library.

Running Alert Dashboard

βœ… Open a terminal, change directory to ac-city-alert-dashboard project and run the following maven command.

cd ~/github/smarttown/cloudstream/ac-city-alert-dashboard
mvn clean spring-boot:run

This should run the simulator microservice and publish temperature reading events to the Event Broker.

mvn clean 
mvn spring-boot:run
................................
2021-10-04 12:49:35.174  INFO 62510 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2021-10-04 12:49:35.176  INFO 62510 --- [  restartedMain] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-10-04 12:49:35.177  INFO 62510 --- [  restartedMain] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.48]
2021-10-04 12:49:35.194  INFO 62510 --- [  restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-10-04 12:49:35.194  INFO 62510 --- [  restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 363 ms
2021-10-04 12:49:35.248  INFO 62510 --- [  restartedMain] o.s.b.a.w.s.WelcomePageHandlerMapping    : Adding welcome page: class path resource [static/index.html]
2021-10-04 12:49:35.270  WARN 62510 --- [  restartedMain] ion$DefaultTemplateResolverConfiguration : Cannot find template location: classpath:/templates/ (please add some templates or check your Thymeleaf configuration)
2021-10-04 12:49:35.278  INFO 62510 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2021-10-04 12:49:35.284  INFO 62510 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-10-04 12:49:35.287  INFO 62510 --- [  restartedMain] c.e.scs.asyncapi.dashboard.Application   : Started Application in 0.503 seconds (JVM running for 1303.432)
2021-10-04 12:49:35.289  INFO 62510 --- [  restartedMain] .ConditionEvaluationDeltaLoggingListener : Condition evaluation unchanged
2021-10-04 12:50:09.348  INFO 62510 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-10-04 12:50:09.349  INFO 62510 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-10-04 12:50:09.349  INFO 62510 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
................................

Access Alert Dashboard

βœ… Open a browser and navigate to localhost:8080

solly_wave

solly_wave

Thanks for participating in this codelab! Let us know what you thought in the Solace Community Forum and if you found any issues along the way we'd appreciate it if you'd raise them by clicking the Report a mistake button at the bottom left of this codelab.