This document demonstrates how to integrate Solace Java Message Service (JMS) with the Spark Streaming custom receiver for consumption of JMS messages. The goal of this document is to outline best practices for this integration to enable efficient use of both the Spark Streaming and Solace JMS.
The target audience of this document is developers using the Hadoopv2 with knowledge of both the Spark and JMS in general. As such this document focuses on the technical steps required to achieve the integration. For detailed background on either Solace JMS or Spark refer to the referenced documents below.
This document is divided into the following sections to cover the Solace JMS integration with Spark Streaming:
These links contain information related to this guide:
This tutorial requires access to Solace PubSub+ event broker and requires that you know several connectivity properties about your event broker. Specifically you need to know the following:
Resource | Value | Description |
Host | String | This is the address clients use when connecting to the event broker to send and receive messages. (Format: |
Message VPN | String | The event broker Message VPN that this client should connect to. |
Client Username | String | The client username. (See Notes below) |
Client Password | String | The client password. (See Notes below) |
There are several ways you can get access to Solace messaging and find these required properties.
The general Spark Streaming support for custom receivers is documented in the Spark Streaming Custom Receivers Documentation. The configuration outlined in this document makes use of a custom receiver to achieve the desired integration with Solace.
This integration guide demonstrates how to configure a Spark Streaming application to receive JMS messages using a custom receiver. The following steps are required to accomplish this:
This integration guide will demonstrate creation of Solace JMS custom receiver and configuring the receiver to receive messages. This section outlines the resources that are required/created and used in the subsequent sections.
The following event broker resources are required for the integration sample in this document.
Resource | Value | Description |
Event Broker Host | Refer to step Get Solace Messaging for values | |
Message VPN | ||
Client Username | ||
Client Password | ||
Solace Queue | Q/requests | Solace destination of messages produced and consumed |
JNDI Connection Factory | JNDI/CF/spring | The JNDI Connection factory for controlling Solace JMS connection properties |
JNDI Queue Name | JNDI/Q/requests | The JNDI name of the queue used in the samples |
The following Spark resources are required for code integration:
Resource | Description | |
org.apache.spark.storage.StorageLevel | ||
org.apache.spark.streaming.receiver.Receiver | Class implementing Receiver |
The Solace event broker needs to be configured with the following configuration objects at a minimum to enable JMS to send and receive messages within the Spark application.
The recommended approach for configuring a event broker is using Solace PubSub+ Manager, Solace's browser-based administration console packaged with the Solace PubSub+ event broker. This document uses CLI as the reference to remain concise - look for related settings if using Solace PubSub+ Manager.
For more details related to event broker CLI see Solace-CLI. Wherever possible, default values will be used to minimize the required configuration. The CLI commands listed also assume that the CLI user has a Global Access Level set to Admin. For details on CLI access levels please see User Authentication and Authorization.
If you are using Solace Cloud you can skip this step because a message-VPN is already assigned. For the name, refer to the "Message VPN" in the connection details page.
This section outlines how to create a message-VPN called "Solace_Spring_VPN" on the event broker with authentication disabled and 2GB of message spool quota for Guaranteed Messaging. This message-VPN name is required in the configuration when connecting to the messaging event broker. In practice, appropriate values for authentication, message spool and other message-VPN properties should be chosen depending on the end application's use case.
> home
> enable
# configure
(config)# create message-vpn Solace_Spring_VPN
(config-msg-vpn)# authentication
(config-msg-vpn-auth)# user-class client
(config-msg-vpn-auth-user-class)# basic auth-type none
(config-msg-vpn-auth-user-class)# exit
(config-msg-vpn-auth)# exit
(config-msg-vpn)# no shutdown
(config-msg-vpn)# exit
(config)#
(config)# message-spool message-vpn Solace_Spring_VPN
(config-message-spool)# max-spool-usage 2000
(config-message-spool)# exit
(config)#
This section outlines how to update the default client-profile and how to create a client username for connecting to the Solace event broker. For the client-profile, it is important to enable guaranteed messaging for JMS messaging and transacted sessions if using transactions.
The chosen client username of "spark_user" will be required by the Spark application when connecting to the Solace event broker.
(config)# client-profile default message-vpn Solace_Spark_VPN
(config-client-profile)# message-spool allow-guaranteed-message-receive
(config-client-profile)# message-spool allow-guaranteed-message-send
(config-client-profile)# message-spool allow-transacted-sessions
(config-client-profile)# exit
(config)#
(config)# create client-username spark_user message-vpn Solace_Spark_VPN
(config-client-username)# acl-profile default
(config-client-username)# client-profile default
(config-client-username)# no shutdown
(config-client-username)# exit
(config)#
This integration guide shows receiving messages within the Spark application from a single JMS Queue. For illustration purposes, this queue is chosen to be an exclusive PubSub+ queue with a message spool quota of 2GB matching quota associated with the message VPN. The queue name chosen is "Q/requests".
(config)# message-spool message-vpn Solace_Spark_VPN
(config-message-spool)# create queue Q/receive
(config-message-spool-queue)# access-type exclusive
(config-message-spool-queue)# max-spool-usage 2000
(config-message-spool-queue)# permission all delete
(config-message-spool-queue)# no shutdown
(config-message-spool-queue)# exit
(config-message-spool)# exit
(config)#
To enable the JMS clients to connect and look up the Queue destination required by Spark, there are two JNDI objects required on the Solace event broker:
direct-transport
is disabled for JMS persistent messagingThey are configured as follows:
(config)# jndi message-vpn Solace_Spark_VPN
(config-jndi)# create connection-factory JNDI/Sol/CF
(config-jndi-connection-factory)# property-list messaging-properties
(config-jndi-connection-factory-pl)# property default-delivery-mode persistent
(config-jndi-connection-factory-pl)# exit
(config-jndi-connection-factory)# property-list transport-properties
(config-jndi-connection-factory-pl)# property direct-transport false
(config-jndi-connection-factory-pl)# exit
(config-jndi-connection-factory)# exit
(config-jndi)#
(config-jndi)# create queue JNDI/Q/receive
(config-jndi-queue)# property physical-name Q/receive
(config-jndi-queue)# exit
(config-jndi)#
(config-jndi)# no shutdown
(config-jndi)# exit
(config)#
From Spark Receiver Class Documentation there is details on how to build a custom receiver and a template. We will use this template and build a PubSubPlusJMSReceiver
, which will stream events from the PubSub+ event broker through a JMS connection, using JNDI lookup of the queue to receive from.
The PubSubPlusJMSReceiver
extends the org.apache.spark.streaming.receiver.Receiver
and implements the javax.jms.MessageListener
. This will result in the following methods created:
PubSubPlusJMSReceiver
constructor – Synchronously called once as the Receiver is initially created.org.apache.spark.streaming.receiver.Receiver.onStart()
– Asynchronously called once as the Receiver is started.org.apache.spark.streaming.receiver.Receiver.onStop()
– Asynchronously called once as the Receiver is stoppedjavax.jms.MessageListener.onMessage()
– Asynchronously called on every message received from Solacepublic class PubSubPlusJMSReceiver extends Receiver<String> implements MessageListener {
private static final long serialVersionUID = 1L;
private static final String SOLJMS_INITIAL_CONTEXT_FACTORY =
"com.solacesystems.jndi.SolJNDIInitialContextFactory";
public JMSReceiver( ) {
super(StorageLevel.MEMORY_ONLY_SER_2());
}
@Override
public void onStart() {
// TODO Auto-generated from spark.streaming.receiver
}
@Override
public void onStop() {
// TODO Auto-generated from spark.streaming.receiver
}
@Override
public void onMessage(Message arg0) {
// TODO Auto-generated from javax.jms.MessageListener
}
}
In the constructor we need to collect information to information needed to connect to Solace and build the JMS environment.
public PubSubPlusJMSReceiver(String brokerURL,
String vpn,
String username,
String password,
String jndiQueueName,
String jndiConnectionFactory,
StorageLevel storageLevel)
{
super(storageLevel);
_brokerURL = brokerURL;
_vpn = vpn;
_username = username;
_password = password;
_queueName = jndiQueueName;
_connectionFactory = jndiConnectionFactory;
}
Next, in the onStart()
method we need to look up the JMS connection factory and queue then connect to receive messages:
@Override
public void onStart()
{
log.info("Starting up...");
try
{
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(InitialContext.INITIAL_CONTEXT_FACTORY,
SOLJMS_INITIAL_CONTEXT_FACTORY);
env.put(InitialContext.PROVIDER_URL, _brokerURL);
env.put(Context.SECURITY_PRINCIPAL, _username);
env.put(Context.SECURITY_CREDENTIALS, _password);
env.put(SupportedProperty.SOLACE_JMS_VPN, _vpn);
javax.naming.Context context = new javax.naming.InitialContext(env);
ConnectionFactory factory = (ConnectionFactory) context.lookup(_connectionFactory);
Destination queue = (Destination) context.lookup(_queueName);
_connection = factory.createConnection();
_connection.setExceptionListener(new JMSReceiverExceptionListener());
Session session = _connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer;
consumer = session.createConsumer(queue);
consumer.setMessageListener(this);
_connection.start();
log.info("Completed startup.");
} catch (Exception ex) {
// Caught exception, try a restart
log.error("Callback onStart caught exception, restarting ", ex);
restart("Callback onStart caught exception, restarting ", ex);
}
}
Finally, when receiving messages from the PubSub+ broker they need to be stored into Spark:
@Override
public void onMessage(Message message) {
log.info("Callback onMessage received" + message);
store(message.toString());
try {
message.acknowledge();
} catch (JMSException ex) {
log.error("Callback onMessage failed to ack message", ex);
}
}
This section will demo the use of the JMS receiver by creating a Java Spark Streaming example program, which counts the words in the input stream from the JMS receiver. For details of how the example works refer to the Java version of the Spark Streaming Quick Example documentation, which demonstrates a similar use case.
Following code snippet shows how PubSubPlusJMSReceiver
is invoked:
public static void main(String[] args) throws Exception {
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// Create a input stream with the custom receiver on provided PubSub+ broker connection config
JavaReceiverInputDStream<String> lines = ssc.receiverStream(
new PubSubPlusJMSReceiver(args[0], args[1], args[2], args[3], args[4], args[5],
StorageLevel.MEMORY_ONLY_SER_2()));
...
}
The demo code shall be first built, then submitted to Spark. Within a cluster of Spark machines you can build and submit jobs from any machine. For simplicity, a local Spark installation is assumed here.
git clone https://github.com/SolaceLabs/solace-integration-guides.git
cd solace-integration-guides/src/spark-streaming
export PROJECT_HOME=$(pwd)
mvn package
# this will generate jar libraries
# note the jar variant that includes required dependencies
ls target/*.jar
# Substitute "/path/to/spark" to local Spark install directory
export SPARK_HOME="/path/to/spark"
# Substitute from the configuration described in the Solace Resources section
export HOST=" <Solace Event Broker Host>"
export MESSAGE_VPN="<Message VPN>"
export USERNAME="<Client Username>"
export PASSWORD="<Client Password>"
export JNDI_QUEUE="<JNDI Queue Name>"
export JNDI_CF="<JNDI Connection Factory>"
# This example configures 4 local threads in the --master parameter
$SPARK_HOME/bin/spark-submit \
--class com.solace.sample.PubSubPlusJMSReceiverTest \
--master local[4] \
$PROJECT_HOME/target/pubsubplus-jms-to-spark-streaming-demo-0.0.1-jar-with-dependencies.jar \
$HOST \
$MESSAGE_VPN \
$USERNAME \
$PASSWORD \
$JNDI_QUEUE \
$JNDI_CF
In the provided example above persistent messaging was used on the event broker and the Spark Streaming client connected to a queue. This design pattern provides the highest level of reliability as each message is persisted on the Solace Event Broker and will not be lost in case of a client failure. This message pattern consumes the most resources on the Solace Event Broker and is not the most performant.
If the client does not want to receive messages that where missed while it was offline, does not want to receive older messages if it is unable to keep up to the published message flow, or wants the highest throughput with lowest latency; then direct messaging is the recommended pattern.
To achieve direct messaging, configure the connection-factory to enable this feature.
(config)# jndi message-vpn Solace_Spark_VPN
(config-jndi)# connection-factory JNDI/Sol/CF
(config-jndi-connection-factory)# property-list transport-properties
(config-jndi-connection-factory-pl)# property direct-transport true
(config)# jndi message-vpn Solace_Spark_VPN
(config-jndi)# create topic JNDI/T/receive
(config-jndi-queue)# property physical-name T/receive
(config-jndi-queue)# exit
The Developer Guide for Solace JMS API section "Establishing Connections" and "Managing Sessions" provides details on how to enable the Solace JMS connection to automatically reconnect to the standby event broker in the case of a HA failover of a event broker. By default Solace JMS connections will reconnect to the standby event broker in the case of an HA failover.
Note: When using HA redundant event brokers, a fail-over from one event broker to its mate will typically occur in less than 30 seconds, however, applications should attempt to reconnect for at least five minutes.
In Configuring JNDI Objects, the Solace CLI commands correctly configured the required JNDI properties to reasonable values. Note: the retry parameters re all defaults, these commands are repeated here for completeness.
(config)# jndi message-vpn solace_VPN
(config-jndi)# create connection-factory JNDI/Sol/CF
(config-jndi-connection-factory)# property-list transport-properties
(config-jndi-connection-factory-pl)# property "reconnect-retry-wait" "3000"
(config-jndi-connection-factory-pl)# property "reconnect-retries" "20"
(config-jndi-connection-factory-pl)# property "connect-retries-per-host" "5"
(config-jndi-connection-factory-pl)# property "connect-retries" "1"
(config-jndi-connection-factory-pl)# exit
(config-jndi-connection-factory)# exit
(config-jndi)# exit
(config)#
The key component for debugging integration issues with the Solace JMS API is to enable API logging. Spark was written using Jakarta Commons Logging API (JCL), Solace JMS API also makes use of the Jakarta Commons Logging API (JCL), configuring the Solace JMS API logging is very similar to configuring any other Spark application. The following example shows how to enable debug logging in the Solace JMS API using log4j. One note to consider is that since the Solace JMS API has a dependency on the Solace Java API (JCSMP) both of the following logging components should be enabled and tuned when debugging to get full information. By default info logs will be written to the consol. Following example sets both to debug level:
log4j.category.com.solacesystems.jms=DEBUG
log4j.category.com.solacesystems.jcsmp=DEBUG
Above lines can be added to the log4j.properties
configuration file located at the $SPARK_HOME/conf
directory. Note: if no log4j.properties
configuration file exists then the contents of the log4j.properties.template
file can be used as a starting point. With this you can get output in a format similar to the following which can help in understanding what is happening within the Solace JMS API.
22/01/31 09:11:22 INFO TcpClientChannel: Client-2: Connecting to host 'orig=tcps://mr-xyz.messaging.solace.cloud:55443, scheme=tcps://, host=mr-xyz.messaging.solace.cloud, port=55443' (host 1 of 1, smfclient 2, attempt 1 of 1, this_host_attempt: 1 of 1)
22/01/31 09:11:22 INFO SNIUtil: Server Name Indication (SNI) automatically applied by using provided hostname
22/01/31 09:11:22 INFO SSLSmfClient: SSLEngine Supported Protocols: [SSLV3, TLSV1, TLSV1.1, TLSV1.2]
22/01/31 09:11:22 INFO SSLSmfClient: Application Specified Protocols: [SSLv3, TLSv1, TLSv1.1, TLSv1.2]
22/01/31 09:11:22 INFO TcpClientChannel: Client-2: Connected to host 'orig=tcps://mr-xyz.messaging.solace.cloud:55443, scheme=tcps://, host=mr-xyz.messaging.solace.cloud, port=55443' (smfclient 2)
JMS Client authentication is handled by the PubSub+ event broker. The broker supports a variety of authentications schemes as described in the Solace documentation. In this section we will show how to configure the PubSub+ Event Broker to pass the authentication username/password through to an LDAP,(Active-Directory) server to incorporate with enterprise level authentication mechanisms. TLS client certificate, OAuth and Kerberos are also possible.
(config)# create authentication ldap-profile ActiveDirectoryIntegration
(config/authentication/ldap-profile)# admin dn DomainAdmin password xxxxxx
(config/authentication/ldap-profile)# search base-dn dc=lab,dc=solace,dc=com
(config/authentication/ldap-profile)# ldap-server ldap://192.168.1.56 index 1
(config/authentication/ldap-profile)# search filter "(sAMAccountName = $CLIENT_USERNAME)"
(config/authentication/ldap-profile)# no shut
(config/authentication/ldap-profile)# exit
Finally the LDAP profile will need to be enabled for the message VPN. Note that there is no code change from the Application/API. As the authentication is pass-through from the event broker to the LDAP server.
(config)# message-vpn Solace_Spark_VPN
(config/message-vpn)# authentication user-class client
(...message-vpn/authentication/user-class)# basic
(...e-vpn/authentication/user-class/basic)# auth-type ldap ActiveDirectoryIntegration
(...e-vpn/authentication/user-class/basic)# exit
TLS may already be enabled on your Event Broker, for example if using PubSub+ Cloud. In this case you can skip this section.
To enable secure connections (TLS) to the Solace event broker, the following configuration must be updated on the Solace event broker.
The following sections outline how to configure these items.
Before starting, here is some background information on the server certificate required by the event broker. This is from the Solace documentation:
"To enable TLS/SSL-encryption, you must set the TLS/SSL server certificate file that the Solace PubSub+ event broker is to use. This server certificate is presented to clients during TLS/SSL handshakes. The server certificate must be an x509v3 certificate and include a private key. The server certificate and key use an RSA algorithm for private key generation, encryption and decryption, and they both must be encoded with a Privacy Enhanced Mail (PEM) format."
To configure the server certificate, first copy the server certificate to the Solace event broker. For the purposes of this example, assume the server certificate file is named "mycert.pem".
# copy sftp://[<username>@]<ip-addr>/<remote-pathname>/mycert.pem /certs
<username>@<ip-addr>'s password:
#
Then set the server certificate for the Solace event broker.
(config)# ssl server-certificate mycert.pem
(config)#
By default, the Solace event broker accepts secure messaging client connections on port 55443. If this port is acceptable then no further configuration is required and this section can be skipped. If a non-default port is desired, then follow the steps below. Note this configuration change will disrupt service to all clients of the Solace event broker and should therefore be performed during a maintenance window when this client disconnection is acceptable. This example assumes that the new port should be 55403.
(config)# service smf
(config-service-smf)# shutdown
All SMF and WEB clients will be disconnected.
Do you want to continue (y/n)? y
(config-service-smf)# listen-port 55403 ssl
(config-service-smf)# no shutdown
(config-service-smf)# exit
(config)#
By default within Solace message VPNs both the plain-text and SSL services are enabled. If the Message VPN defaults remain unchanged, then this section can be skipped. However, if within the current application VPN, this service has been disabled, then for secure communication to succeed it should be enabled. The steps below show how to enable SSL within the messaging (SMF) service to allow secure client connections from the Spark Streaming client.
(config)# message-vpn solace_VPN
(config-msg-vpn)# service smf
(config-msg-vpn-service-smf)# ssl
(config-msg-vpn-service-ssl)# no shutdown
(config-msg-vpn-service-ssl)# exit
(config-msg-vpn-service-smf)# exit
(config-msg-vpn-service)# exit
(config-msg-vpn)# exit
(config)#
In order to signal to the PubSub+ JMS API that the connection should be a secure connection, the protocol must be updated in "Solace Event Broker Host":
tcps://<host>:<port>
Additionally, if using a self-signed server certificate, the Solace JMS API must be able to validate the server certificate of the Solace Event Broker in order to establish a secure connection. To do this, the following trust store parameters need to be provided in the jndi.properties file:
First the Solace JMS API must be given a location of a trust store file so that it can verify the credentials of the Solace Event Broker server certificate during connection establishment. This parameter takes a URL or Path to the trust store file.
env.put(SupportedProperty.Solace_JMS_SSL_TrustStore, ___Path_or_URL___)
It is also required to provide a trust store password. This password allows the Solace JMS API to validate the integrity of the contents of the trust store. This is done through the following parameter.
env.put(SupportedProperty.Solace_JMS_SSL_TrustStorePassword, ___Password___)
There are multiple formats for the trust store file. By default Solace JMS assumes a format of Java Key Store (JKS). So if the trust store file follows the JKS format then this parameter may be omitted. Solace JMS supports two formats for the trust store: "jks" for Java Key Store or "pkcs12". Setting the trust store format is done through the following parameter.
env.put(SupportedProperty.Solace_JMS_SSL_TrustStoreFormat, jks)
And finally, the authentication scheme must be selected. Solace JMS supports the following authentication schemes for secure connections:
This integration example will use basic authentication. So the required parameter is as follows:
env.put(SupportedProperty.Solace_JMS_Authentication_Scheme,AUTHENTICATION_SCHEME_BASIC)
The [Solace-FG] section "Data Center Replication" contains a sub-section on "Application Implementation" which details items that need to be considered when working with Solace"s Data Center Replication feature. This integration guide will show how the following items required to have a Spark Streaming client successfully connect to a backup data center using the Solace Data Center Replication feature.
As described in [Solace-Docs], the host list provides the address of the backup data center. This is configured within the Spark Streaming client through the ConnectionURL configuration property value (of a respective JCA entity) as follows:
tcp://__IP_active_site:PORT__,tcp://__IP_standby_site:PORT__
The active site and standby site addresses are provided as a comma-separated list of "Connection URIs". When connecting, the Solace JMS connection will first try the active site and if it is unable to successfully connect to the active site, then it will try the standby site. This is discussed in much more detail in the referenced Solace documentation
In order to enable applications to successfully reconnect to the standby site in the event of a data center failure, it is required that the Solace JMS connection be configured to attempt connection reconnection for a sufficiently long time to enable the manual switch-over to occur. This time is application specific depending on individual disaster recovery procedures and can range from minutes to hours depending on the application. In general it is best to tune the reconnection by changing the "reconnect retries" parameter within the Solace JNDI to a value large enough to cover the maximum time to detect and execute a disaster recovery switch over. If this time is unknown, it is also possible to use a value of "-1" to force the Solace JMS API to reconnect indefinitely.
The reconnect retries is tuned in the Solace event broker CLI as follows:
config)# jndi message-vpn solace_VPN
(config-jndi)# connection-factory JNDI/Sol/CF
(config-jndi-connection-factory)# property-list transport-properties
(config-jndi-connection-factory-pl)# property "reconnect-retries" "-1"
(config-jndi-connection-factory-pl)# exit
(config-jndi-connection-factory)# exit
(config-jndi)# exit
(config)#