This document demonstrates how to integrate the Solace Java Message Service (JMS) with Flink Streaming source functions for consumption of JMS messages. The goal of this document is to outline best practices for this integration to enable efficient use of both Flink Streaming and Solace JMS.
The target audience of this document is developers with knowledge of both Flink and JMS in general. As such this document focuses on the technical steps required to achieve the integration. For detailed background on either Solace JMS or Flink refer to the referenced documents below.
This document is divided into the following sections to cover the Solace JMS integration with Flink Streaming:
These links contain information related to this guide:
This tutorial requires access to Solace PubSub+ event broker and requires that you know several connectivity properties about your event broker. Specifically you need to know the following:
Resource | Value | Description |
Host | String | This is the address clients use when connecting to the event broker to send and receive messages. (Format: |
Message VPN | String | The event broker Message VPN that this client should connect to. |
Client Username | String | The client username. (See Notes below) |
Client Password | String | The client password. (See Notes below) |
There are several ways you can get access to Solace messaging and find these required properties.
This is a discussion of an approach for consuming messages from a Java Messaging Service (JMS) bus in Flink containers. The full code is freely available on Github as part of this project in src/flink-jms-connector.
The general Flink Streaming support for connectors is documented in the Flink Streaming Documentation. The configuration outlined in this document makes use of a custom SourceFunction to achieve the desired integration with Solace via JMS.
This integration guide demonstrates how to configure a Flink Streaming application to receive JMS messages using a custom receiver. Accomplishing this requires completion of the following steps.
This integration guide will demonstrate creation of Solace JMS custom receiver and configuring the receiver to receive messages. This section outlines the resources that are required/created and used in the subsequent sections.
The following Solace Event Broker resources are required.
Resource | Value | Description |
Solace Event Broker Host | Refer to section 2- Get Solace Messaging for values | |
Message VPN | ||
Client Username | ||
Client Password | ||
Solace Queue | Q/receiver | Solace destination of persistent messages consumed |
JNDI Connection Factory | JNDI/CF/flink | The JNDI Connection factory for controlling Solace JMS connection properties |
JNDI Queue Name | JNDI/Q/receiver | The JNDI name of the queue used in the samples |
The Solace event broker can be obtained one of 2 ways.
The Solace JMS jars are required. They can be obtained on Solace Developer Portal Downloads or from Maven Central.
The easiest way to integrate Solace and Flink is using the client libraries available via public Maven Repositories, for example:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.solacesystems</groupId>
<artifactId>sol-jms</artifactId>
<version>10.0.0</version>
</dependency>
</dependencies>
Or if you downloaded the libraries and are referencing them directly, the following resources are all required:
Resource | Value | Description |
Solace Common | sol-common-VERSION.jar | Solace common utilities library. |
Solace JCSMP | sol-jcsmp-VERSION.jar | Underlying Solace wireline support libraries. |
Solace JMS | sol-jms-VERSION.jar | Solace JMS 1.1 compliant libraries. |
Apache Commons language | commons-lang-2.6.jar | Common language libraries. |
Apache Commons logging | commons-logging-1.1.3.jar | Common logging libraries |
Apache Geronimo | geronimo-jms_1.1_spec-1.1.1.jar | Apache Geronimo is an open source server runtime that integrates the best open source projects to create Java/OSGi server runtimes that meet the needs of enterprise developers and system administrators. Our most popular distribution is a fully certified Java EE 6 application server runtime. |
The Solace Event Broker needs to be configured with the following configuration objects at a minimum to enable JMS to send and receive messages within the Flink application.
The recommended approach for configuring a event broker is using Solace PubSub+ Manager, Solace's browser-based administration console packaged with the Solace PubSub+ event broker. This document uses CLI as the reference to remain concise - look for related settings if using Solace PubSub+ Manager.
For more details related to event broker CLI see Solace-CLI. Wherever possible, default values will be used to minimize the required configuration. The CLI commands listed also assume that the CLI user has a Global Access Level set to Admin. For details on CLI access levels please see User Authentication and Authorization.
If you are using Solace Cloud you can skip this step because a message-VPN is already assigned. For the name, refer to the "Message VPN" in the connection details page.
This section outlines how to create a message-VPN called "Solace_Flink_VPN" on the event broker with authentication disabled and 2GB of message spool quota for Guaranteed Messaging. This message-VPN name is required in the configuration when connecting to the messaging event broker. In practice, appropriate values for authentication, message spool and other message-VPN properties should be chosen depending on the end application's use case.
> home
> enable
# configure
(config)# create message-vpn Solace_Flink_VPN
(config-msg-vpn)# authentication
(config-msg-vpn-auth)# user-class client
(config-msg-vpn-auth-user-class)# basic auth-type none
(config-msg-vpn-auth-user-class)# exit
(config-msg-vpn-auth)# exit
(config-msg-vpn)# no shutdown
(config-msg-vpn)# exit
(config)#
(config)# message-spool message-vpn Solace_Flink_VPN
(config-message-spool)# max-spool-usage 2000
(config-message-spool)# exit
(config)#
This section outlines how to update the default client-profile and how to create a client username for connecting to the Solace Event Broker. For the client-profile, it is important to enable guaranteed messaging for JMS messaging and transacted sessions if using transactions.
The chosen client username of "flink_user" will be required by the Flink application when connecting to the Solace Event Broker.
(config)# client-profile default message-vpn Solace_Flink_VPN
(config-client-profile)# message-spool allow-guaranteed-message-receive
(config-client-profile)# message-spool allow-guaranteed-message-send
(config-client-profile)# message-spool allow-transacted-sessions
(config-client-profile)# exit
(config)#
(config)# create client-username flink_user message-vpn Solace_Flink_VPN
(config-client-username)# acl-profile default
(config-client-username)# client-profile default
(config-client-username)# no shutdown
(config-client-username)# exit
(config)#
This integration guide shows receiving messages within the Flink application from a single JMS Queue. For illustration purposes, this queue is chosen to be an exclusive queue with a message spool quota of 2GB matching quota associated with the message VPN. The queue name chosen is "Q/requests".
(config)# message-spool message-vpn Solace_Flink_VPN
(config-message-spool)# create queue Q/receive
(config-message-spool-queue)# access-type exclusive
(config-message-spool-queue)# max-spool-usage 2000
(config-message-spool-queue)# permission all delete
(config-message-spool-queue)# no shutdown
(config-message-spool-queue)# exit
(config-message-spool)# exit
(config)#
To enable the JMS clients to connect and look up the Queue destination required by Flink, there are two JNDI objects required on the Solace Event Broker:
They are configured as follows:
(config)# jndi message-vpn Solace_Flink_VPN
(config-jndi)# create connection-factory JNDI/CF/flink
(config-jndi-connection-factory)# property-list messaging-properties
(config-jndi-connection-factory-pl)# property default-delivery-mode persistent
(config-jndi-connection-factory-pl)# exit
(config-jndi-connection-factory)# property-list transport-properties
(config-jndi-connection-factory-pl)# property direct-transport false
(config-jndi-connection-factory-pl)# property "reconnect-retry-wait" "3000"
(config-jndi-connection-factory-pl)# property "reconnect-retries" "20"
(config-jndi-connection-factory-pl)# property "connect-retries-per-host" "5"
(config-jndi-connection-factory-pl)# property "connect-retries" "1"
(config-jndi-connection-factory-pl)# exit
(config-jndi-connection-factory)# exit
(config-jndi)#
(config-jndi)# create queue JNDI/Q/receive
(config-jndi-queue)# property physical-name Q/receive
(config-jndi-queue)# exit
(config-jndi)#
(config-jndi)# no shutdown
(config-jndi)# exit
(config)#
From Flink Streaming Documentation there are details on how to build a custom SourceFunction and a template. In this section of the document will use this template and build a JMSQueueSource
.
The JMSQueueSource
extends org.apache.flink.streaming.api.functions.source.SourceFunction
. This will result in the following methods created:
JMSQueueSource
constructor – Synchronously called once as the QueueSource
is initially created.void run()
– Asynchronously called once as the SourceFunction is started, which loops accepting messages.void cancel()
– Asynchronously called once as the Receiver is stoppedThe custom SourceFunction
also implements ResultTypeQueryable
this helps other generic Flink internals operate with the generic OUT
type that each specific SourceFunction implementation produces.
TypeInformation getProducedType()
– invoked by Flink collections to retrieve the TypeInformation
of the custom SourceFunctions generic outputspublic class JMSQueueSource<OUT> implements SourceFunction<OUT>, ResultTypeQueryable<OUT> {
public JMSQueueSource ( ) {
}
@Override
public void run() {
// TODO Auto-generated
}
@Override
public void cancel() {
// TODO Auto-generated
}
@Override
public TypeInformation<OUT> getProducedType() {
// TODO Auto-generated
}
}
In the constructor we need to collect information to information needed to connect to Solace and build the JMS environment. To make this completely generic to any JMS implementation, this is passed into the constructor in a completely generic way, compliant with JMS 1.1 and JNDI standards. The vendor-agnostic way to do this in JMS is to use a JNDI naming InitialContext to lookup a JMS ConnectionFactory and all JMS resources used (e.g., queues or topics). The JMS ConnectionFactory is used to create a JMS Connection which is then used to consume JMSMessages from JMS Topics or Queues (types for both are provided in the sample connector library).
jmsEnvironment
– The JMS environment properties for the InitialContext; this is used to lookup the connection-factorycfName
– The connection-factory name to retrieve from the JMS InitialContext and use for all connection creationtopicName
– Topic string to subscribe to; platform-dependent, will support whatever wildcarding the underlying platform supportsdeserializer
– A JMS translator instance that knows how to translate specific JMS messages to specific target typesprivate Hashtable<String, String> _jmsEnv;
private JMSTranslator<OUT> _deserializer;
private String _cfName;
private String _topicName;
public JMSTopicSource(Hashtable<String,String> jmsEnvironment,
String cfName, String topicName,
JMSTranslator<OUT> deserializer) {
_jmsEnv = jmsEnvironment;
_cfName = cfName;
_topicName = topicName;
_deserializer = deserializer;
}
@Override
public void run() {
InitialContext initialContext = null;
try {
InitialContext jndiContext = new InitialContext(_jmsEnv);
ConnectionFactory factory = (ConnectionFactory) jndiContext.lookup(_cfName);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic(_topicName));
connection.start();
while(true) {
Message msg = consumer.receive();
sourceContext.collect(_deserializer.translate(msg));
msg.acknowledge();
}
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
In the example above, we are using a simple JMSTranslator
function for JMS TextMessages
that is part of the sample project in GitHub: src/flink-jms-connector. Typically, you would implement a custom translator to marshal the contents of the inbound JMS Messages to the type your Flink application expects. While Flink has internal Serializer implementations, we tactically chose to implement serialization separately as it allows us to account for JMS Message properties when translating them into Flink objects.
public class JMSTextTranslator extends JMSTranslator<String> {
@Override
public String translate(Message msg) throws JMSException {
TextMessage txtmsg = (TextMessage) msg;
return txtmsg.getText();
}
@Override
public Class<String> outputType() {
return String.class;
}
}
Note that the JMSTranslator exposes an outputType() method that returns the Class of output for this translator; this function is exposed by both the JMSTopicSource
and JMSQueueSource
implementations via the generic ResultTypeQueryable interface implemented for Flink's generic collections to use:
@Override
public TypeInformation<OUT> getProducedType() {
return TypeInformation.of(_deserializer.outputType());
}
The Solace JMS API Online Reference Documentation section "Establishing Connection and Creating Sessions" provides details on how to enable the Solace JMS connection to automatically reconnect to the standby event broker in the case of a HA failover of a Solace Event Broker. By default Solace JMS connections will reconnect to the standby event broker in the case of an HA failover.
In general the Solace documentation contains the following note regarding reconnection:
Note: When using HA redundant event brokers, a fail-over from one event broker to its mate will typically
occur in under 30 seconds, however, applications should attempt to reconnect for at least five minutes.
In the previous section "Setting up Solace JNDI References", the Solace CLI commands correctly configured the required JNDI properties to reasonable values. These commands are repeated here for completeness.
(config)# jndi message-vpn Solace_Flink_VPN
(config-jndi)# connection-factory JNDI/CF/flink
(config-jndi-connection-factory)# property-list transport-properties
(config-jndi-connection-factory-pl)# property "reconnect-retry-wait" "3000"
(config-jndi-connection-factory-pl)# property "reconnect-retries" "20"
(config-jndi-connection-factory-pl)# property "connect-retries-per-host" "5"
(config-jndi-connection-factory-pl)# property "connect-retries" "1"
(config-jndi-connection-factory-pl)# exit
(config-jndi-connection-factory)# exit
(config-jndi)# exit
(config)#
Finally ensure that the JNDI Destination you are using points to a Topic not a Queue:
(config)# jndi message-vpn Solace_Flink_VPN
(config-jndi)# create topic JNDI/T/recieve
(config-jndi-queue)# property physical-name Topic/Recieve
(config-jndi-queue)# exit
The key component for debugging integration issues with the Solace JMS API is the API logging that can be enabled. How to enable logging in the Solace API is described below.
Flink was written using Jakarta Commons Logging API (JCL), Solace JMS API also makes use of the Jakarta Commons Logging API (JCL), configuring the Solace JMS API logging is very similar to configuring any other Flink application. The following example shows how to enable debug logging in the Solace JMS API using log4j.
One note to consider is that since the Solace JMS API has a dependency on the Solace Java API (JCSMP) both of the following logging components should be enabled and tuned when debugging to get full information. For example to set both to debug level:
log4j.category.com.solacesystems.jms=DEBUG
log4j.category.com.solacesystems.jcsmp=DEBUG
By default info logs will be written to the consol. This section will focus on using log4j as the logging library and tuning Solace JMS API logs using the log4j properties. Therefore in order to enable Solace JMS API logging, a user must do two things:
Below is an example Log4j properties file that will enable debug logging within the Solace JMS API.
log4j.rootCategory=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
log4j.category.com.solacesystems.jms=DEBUG
log4j.category.com.solacesystems.jcsmp=DEBUG
With this you can get output in a format similar to the following which can help in understanding what is happening within the Solace JMS API.
14:35:01,171 DEBUG main client.ClientRequestResponse:75 - Starting request timer (SMP-EstablishP2pSub) (10000 ms)
14:35:01,171 DEBUG Context_2_ReactorThread client.ClientRequestResponse:83 - Stopping request timer (SMP-EstablishP2pSub)
14:35:01,173 INFO main jms.SolConnection:151 - Connection created.
14:35:01,173 INFO main connection.CachingConnectionFactory:298 - Established shared JMS Connection: com.solacesystems.jms.SolConnection@ca3f2d
14:35:01,180 INFO main jms.SolConnection:327 - Entering start()
14:35:01,180 INFO main jms.SolConnection:338 - Leaving start() : Connection started.
14:35:01,180 INFO jmsContainer-1 jms.SolConnection:252 - Entering createSession()
A working sample with maven pom build is provided in GitHub with this guide: