Do you use or want to use Kafka? Want to learn how to integrate Kafka with Solace PubSub+ event brokers?
Using the Kafka Connect API, the Solace-designed PubSub+ Kafka Connectors allow you to both on-ramp and off-ramp data between Solace and Kafka.
Solace allows a wide variety of standard protocols and APIs to connect directly to the broker, such as MQTT, AMQP 1.0, REST, WebSocket, and JMS. This, coupled with Solace's dynamic hierarchical topic structure, and the multi/hybrid cloud event mesh capability, allows Kafka architectures to extend much further than the standard Connector framework allows.
No, Solace has released the PubSub+ Connectors for Kafka as open source, and you can find them on GitHub in the following sections of this CodeLab.
The Solace PubSub+ Connectors will work with either standard Apache Kafka, or the proprietary Confluent variant.
Download and unzip Kafka onto your server or local machine. This tutorial will assume that it is installed in ~/kafka_2.13-2.7.0/
, the current version of Apache Kafka at the time of this writing.
Download and unzip Confluent Platform onto your server or local machine. This tutorial will assume that it is installed in ~/confluent-6.1.0/
, the current version of Confluent Platform at the time of this writing.
Follow the appropriate Quickstart guide (links in previous section) and verify you can publish and subscribe to Kafka topics. From the quickstart guides, the basic steps for this are (with examples for standard Kafka given):
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic quickstart-events --partitions 2 --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: quickstart-events Partition: 1 Leader: 0 Replicas: 0 Isr: 0
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>First message
>2nd one
>third
>4
>5
>6
>7
>8
>9
>10
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
2nd one
4
5
6
7
10
First message
third
8
9
- Notice the messages are somewhat out-of-order. This is because Kafka only has ordering at the partition level. In this case, within each of the two partitions message order is maintained; but not across the whole Kafka topic.
>11
>12
>13
>
Next, we are ready to fetch all the required components for the Solace PubSub+ Connectors for Kafka. Point your favourite browser to https://github.com/SolaceProducts and search for kafka
:
Click on the "source" one.
You can download either or both, building and installation is the same. For simplicity, we will only do the source connector. Download the zip, or clone the project:
The Connectors use Gradle as the Java build tool. There is no need to intall Gradle if you do not have it, everything is self-contained within the Connector distributions.
Simply run ./gradlew assemble
on Linux, Mac, or WSL, or .\gradlew.bat assemble
on Windows Command Prompt or PowerShell. It might take a little bit of time while the appropriate dependencies are downloaded:
alee@LAPTOP-OQFKDPM0:/mnt/c/Users/AaronLee/Downloads/pubsubplus-connector-kafka-source-master$ ./gradlew assemble
Downloading https://services.gradle.org/distributions/gradle-6.1.1-bin.zip
.........10%.........20%.........30%..........40%.........50%.........60%..........70%.........80%.........90%.........100%
Welcome to Gradle 6.1.1!
Here are the highlights of this release:
- Reusable dependency cache
- Configurable compilation order between Groovy/Kotlin/Java/Scala
- New sample projects in Gradle's documentation
For more details see https://docs.gradle.org/6.1.1/release-notes.html
Starting a Gradle Daemon (subsequent builds will be faster)
BUILD SUCCESSFUL in 39s
4 actionable tasks: 2 executed, 2 up-to-date
alee@LAPTOP-OQFKDPM0:/mnt/c/Users/AaronLee/Downloads/pubsubplus-connector-kafka-source-master$
PS C:\Users\AaronLee\Downloads\pubsubplus-connector-kafka-sink-master> .\gradlew.bat clean assemble
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :compileJava
BUILD SUCCESSFUL in 23s
5 actionable tasks: 5 executed
PS C:\Users\AaronLee\Downloads\pubsubplus-connector-kafka-sink-master>
Look inside the directory ./build/libs/
and there should be a single JAR file there. This is the PubSub+ Connector JAR and must be copied inside the Kafka distribution:
~/kafka_2.13-2.7.0/libs
kafka-connect-solace
inside ~/confluent-6.1.0/share/java/
and copy it thereRepeat the same procedure for the PubSub+ Sink Connector.
If you want the latest release you can download a pre-compiled version. On the right side of the screen, click the "Releases" and download the latest ZIP or TAR file. Open the archive, and look in the lib
directory. Copy the pubsubplus-connector-kafka-[source|sink]-x.x.x.jar
file into the Kafka installation location as in the step above.
Next, we need the Solace application APIs so the Connectors know how to connect to Solace. Point your favourite browser to https://solace.com/downloads, and be sure to tick the box "Solace APIs" under Categories. and scroll towards the bottom, looking for "Messaging APIs & Protocols":
Click on "Java / JCSMP", and select "Download":
Once the ZIP is downloaded, look inside the lib
directory and copy all JARs (should be about 7 files) into the appropriate Kafka location:
~/kafka_2.13-2.7.0/libs
kafka-connect-solace
inside ~/confluent-6.1.0/share/java/
and copy it thereIncluded with the Source and Sink Connectors inside the etc
directory are example configuration files. Both properties and JSON format are provided, depending on your runtime requirements:
config
directory of KafkaFor this CodeLab, the examples will use the standalone mode of the standard Apache Kafka.
Open the properties file and note all of the various configuration options exposed. There are both Solace Java (JCSMP) API properties and Kafka Connect properties available to be configured. For more information:
At the least, you will need to edit the properties/config files to add the basic Solace connection parameters, so the Source and/or Sink connectors know how to connect to the Solace PubSub+ broker. These properties are:
sol.host
sol.username
sol.password
sol.vpn_name
Other useful properties are explained below:
kafka.topic
: which Kafka topic to either write to, or read from. The default is "test" but can be set to anything.sol.topics
: one or more (comma separated) topics the Source connector can subscribe Directly tosol.queue
: to use Persistent/Guaranteed messaging, the Source connetor binds to a queue insteadsol.message_processor_class
: which Processor to use when receiving a Solace message before writing into Kafka: null
for the key, copies the message payload into the Kafka recordsol.kafka_message_key
: when writing records into Kafka, this allows you to specify what Solace message attribute to use as the Kafka key. The limited options can be expanded by customizing the source code.sol.topics
: one or more (comma separated) topics the Sink connector will publish to with Direct messagingsol.queue
: to use Persistent/Guaranteed messaging, the Sink connetor publish to a queuesol.record_processor_class
: which Processor to use when creating a Solace message to publish for each Kafka record:sol.kafka_message_key
: when writing records into Kafka, this allows you to specify what Solace message attribute to use as the Kafka key. The limited options can be expanded by customizing the source code. The sample Solace PubSub+ Connector properties files assume the Kafka topic to be test
.If you have access to a Solace PubSub+ broker (local Docker container, free Solace Cloud, etc.), then let's connect the Solace Source Connector to it and verify it works. Otherwise, skip to the next step to use an existing Solace broker.
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
bin/kafka-console-consumer.sh --topic test --property print.key=true --from-beginning --bootstrap-server localhost:9092
solace_source.properties
file which you copied into the config folder (or corresponding JSON if you want to run distributed) with the appropriate info. For example:# PubSub+ connection information
sol.host=tcp://localhost
sol.username=default
sol.password=default
sol.vpn_name=default
# Solace PubSub+ topics to subscribe to
sol.topics=samples/>
bin/connect-standalone.sh config/connect-standalone.properties config/solace_source.properties
INFO Connected to host 'orig=tcp://localhost, scheme=tcp://, host=localhost'
INFO Adding subscription for topic samples/>
samples/hello
), and publish. You should see the message arrive in the Kafka Consumer:null Hello world!
Next, we will use an existing Solace demo stream of messages: the Solace taxi demo. You can visualize the datastream here: Solace Taxi Map Demo.
Stop the Connector instance (Ctrl-C
), but leave the Kafka Consumer running. We'll repoint the Source Connector to the taxi demo stream.
solace_source.properties
file again with the following. Note that the Processor is now changing to the "keyed" processor, and we'll use the messages' Destination (aka Topic) as the key:# PubSub+ connection information
sol.host=tcp://taxi.messaging.solace.cloud
sol.username=public-taxi
sol.password=iliketaxis
sol.vpn_name=nyc-modern-taxi
# Solace PubSub+ topics to subscribe to
sol.topics=taxinyc/>
# Which Processor class to use
sol.message_processor_class=com.solace.connector.kafka.connect.source.msgprocessors.SolaceSampleKeyedMessageProcessor
# What attribute to use for the Kafka message key?
sol.kafka_message_key=DESTINATION
bin/connect-standalone.sh config/connect-standalone.properties config/solace_source.properties
taxinyc/ops/ride/updated/v1/enroute/00000190/54846519/040.74084/-073.94572
{
"ride_id": "7a63e37c-b4f2-4085-9b3e-0b24873cea00",
"information_source": "RideDispatcher",
"point_idx": 442,
"latitude": 40.74084,
"longitude": -73.94572,
"heading": 100,
"speed": 4,
"timestamp": "2021-03-18T09:55:30.017-04:00",
"meter_reading": 9.26,
"meter_increment": 0.0209495,
"ride_status": "enroute",
"passenger_count": 1,
"driver": {
"driver_id": 190,
"first_name": "Allison",
"last_name": "Schwager",
"rating": 2.75,
"car_class": "Sedan"
},
"passenger": {
"passenger_id": 54846519,
"first_name": "Joe",
"last_name": "Caswell",
"rating": 4.75
}
}
Now that we have made the Source connector go from Solace→Kafka, let's configure the basic Sink Connector to publish data from Kafka→Solace.
sink/>
. This will echo/display any Solace messages received that match that subscription.solace_sink.properties
file which you copied into the config folder (or corresponding JSON if you want to run distributed) with the appropriate info. Let's use the Kafka topic "quickstart-events".# Kafka topics to read from
topics=quickstart-events
# PubSub+ connection information
sol.host=tcp://localhost
sol.username=default
sol.password=default
sol.vpn_name=default
# Solace PubSub+ topics to PUBLISH to
sol.topics=sink/events
bin/connect-standalone.sh config/connect-standalone.properties config/solace_sink.properties
sink/events
. bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
In the previous section, the very basic SolSimpleRecordProcessor Sink Connector simply published on a single Solace topic that was defined in the properties file (e.g. sink/events
). But Solace supports a very dynamic and expressive topic hierarchy, where each level of the topic can represent a piece of metadata from the message. For example, from the taxi data feed:
TOPIC: taxinyc/ops/ride/updated/v1/enroute/00000190/54846519/040.74084/-073.94572
LEVLS: app/division/type/verb/version/status/driver_id/pass/id/latitude/longitude
PAYLOAD:
{
"ride_id": "7a63e37c-b4f2-4085-9b3e-0b24873cea00",
"information_source": "RideDispatcher",
"point_idx": 442,
"latitude": 40.74084,
"longitude": -73.94572,
"heading": 100,
"speed": 4,
"timestamp": "2021-03-18T09:55:30.017-04:00",
"meter_reading": 9.26,
"meter_increment": 0.0209495,
"ride_status": "enroute",
"passenger_count": 1,
"driver": {
"driver_id": 190,
"first_name": "Allison",
"last_name": "Schwager",
"rating": 2.75,
"car_class": "Sedan"
},
"passenger": {
"passenger_id": 54846519,
"first_name": "Joe",
"last_name": "Caswell",
"rating": 4.75
}
}
To be able to dynamically change which Solace topic the Sink Connector publishes to, check out the source code for the SolDynamicDestinationRecordProcessor. You'll find this under folder src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor
of the Solace PubSub+ Sink Connector code which you downloaded from GitHub.
Note the processRecord()
method, which takes a Kafka record as a parameter and returns a Solace Message. Inside the method, you can see how the (example) code examines the payload of the Kafka record for certain strings and values, and then builds the corresonponding or appropriate Solace topic. This particular example is looking at bus alerting data (comms and control messages) coming from Kafka, and builds dynamic Solace toipcs. Very cool!
public class SolDynamicDestinationRecordProcessor implements SolRecordProcessorIF {
private static final Logger log =
LoggerFactory.getLogger(SolDynamicDestinationRecordProcessor.class);
@Override
public BytesXMLMessage processRecord(String skey, SinkRecord record) {
BytesXMLMessage msg = JCSMPFactory.onlyInstance().createMessage(BytesXMLMessage.class);
// Add Record Topic,Partition,Offset to Solace Msg
String kafkaTopic = record.topic();
msg.setApplicationMessageType("ResendOfKafkaTopic: " + kafkaTopic);
...
String payload = "";
Topic topic;
String busId = payload.substring(0, 4);
String busMsg = payload.substring(5, payload.length());
if (busMsg.toLowerCase().contains("stop")) {
topic = JCSMPFactory.onlyInstance().createTopic("ctrl/bus/" + busId + "/stop");
log.debug("================ Dynamic Topic = " + topic.getName());
} else if (busMsg.toLowerCase().contains("start")) {
topic = JCSMPFactory.onlyInstance().createTopic("ctrl/bus/" + busId + "/start");
log.debug("================ Dynamic Topic = " + topic.getName());
} else {
topic = JCSMPFactory.onlyInstance().createTopic("comms/bus/" + busId);
log.debug("================ Dynamic Topic = " + topic.getName());
}
// Also include topic in dynamicDestination header
try {
userHeader.putDestination("dynamicDestination", topic);
You can modify this source code to make toipcs for your specific data, or duplicate it, rebuild the JAR using Gradle, and copy the Sink JAR back into the Kafka distribution. Or you can also deploy a new JAR with only the modified Processor classes, so you don't need to copy over the previous Sink JAR.
Not many apps are written to take data out of Kafka... typically, Kafka is seen as a massive ingest engine. However, if data that's generated in Kafka (e.g. with Streams or KSQL or something) needs to be sent to edge devices or microservices that aren't all connected direclty to Kafka, the Solace Sink is a great way to publish a message with a very specific topic that can find its way through the Solace Event Mesh to your application.
The following are all for the standard Apache Kafka variant.
Create a topic:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 2 --topic test2
Use 2 partitions.
Then use the console publisher to publish 10 messages into the Kafka topic:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test2
Then start a consumer and see the order of the records returned. This is why order is not guaranteed for a Kafka topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --property print.key=true
Note the --property print.key=true
. This comes in handy when using (e.g.) the Solace Topic (aka Destination) as the key.
At this point, try running the Source connector. Hopefully you have access to some streaming data. We (CTO team at Solace) are building a demo that will provide streaming taxi location info publicly.
Due to the custom capabilities in the Solace PubSub+ Connector framework, it is easy to extend the Processors to convert from one payload format (e.g. Google ProtoBufs in Solace) to another (e.g. AVRO for Kafka).
Much more detailed documentation on configuration and usage can be found on the Solace PubSub+ Connectors GitHub pages. Please refer there for more information.
✅ For more information on the Kafka Source, Sink and Consumer connectors check out the PubSub+ Connector Hub page
Thanks for participating in this codelab! Let us know what you thought in the Solace Community Forum! If you found any issues along the way we'd appreciate it if you'd raise them by clicking the Report a mistake button at the bottom left of this codelab.