Often there is a requirement where architects and developer use messaging services like Azure Service Bus or Event Hub to move realtime data between apps running locally. However organizations need this realtime information to flow across various regions, cloud and/or data centers. To accomplish that one needs an Event Mesh that provides hybrid cloud connectivity.
Solace is an industry leader and its PubSub+ event brokers makes such an Event Mesh a reality.
This codelab specifically talks about how you can easily stream data from Azure messaging service "ServiceBus" to Solace using Azure function.
You will find the Azure function code here.
To learn how to stream data from Solace PubSub+ broker to Azure messaging services, click here.
In this example we will create two queues in Solace PubSub+, one of it will receive messages from Azure function over HTTP and another over C#. Log on to Solace Console
solace> enable
solace# configure
solace# (configure)# message-spool message-vpn <VPN Name>
solace(configure/message-spool/message-vpn)# create queue azure-rest-queue
solace(configure/message-vpn/my-azure-queue )# permission all consume
solace(configure/message-vpn/my-azure-queue )# subscription topic azure/2/solace-rest
solace(configure/message-vpn/my-azure-queue )# no shutdown
solace(configure/message-vpn/my-azure-queue )# exit
solace(configure/message-spool/message-vpn)# create queue azure-c#-queue
solace(configure/message-vpn/my-azure-queue )# permission all consume
solace(configure/message-vpn/my-azure-queue )# subscription topic azure/2/solace
solace(configure/message-vpn/my-azure-queue )# no shutdown
solace(configure/message-vpn/my-azure-queue )# end
Azure allows you to use multiple programming languages and APIs. However, for this codelab, I will walk you through usage of C# using Solace C#API and REST .
//Update the Service Bus end point connection string below "SBConnection": "Endpoint=....windows.net/;SharedAccessKeyName=ListenOnly;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxx",
//Update the Solace Host (SMF) URL string below "solace-host": "mr1xi40mbgzuj7.messaging.solace.cloud",
//Update the Solace Username string below "solace-username": "solace-cloud-client",
//Update the Solace Password string below "solace-password": "abcgdjsjj",
//Update the Solace VPN Name string below "solace-vpnname": "sumeet"
//Update the Solace Topic string below "solace-topic": "azure/2/solace"
using System;
using SolaceSystems.Solclient.Messaging;
using System.Threading;
using System.Text;
namespace SB2SolaceCSharp
{
public class SolacePublisher
{
private IContext context = null;
private ISession session = null;
private string sUserName = "default";
private string sPassword = "default";
private string sVPNName = "default";
private string sHost = "default";
private string sTopic = "default";
//public Object lockThis = new Object();
public SolacePublisher(string Host, string UserName, string Password, string VPNName, string Topic)
{
this.sHost = Host;
this.sUserName = UserName;
this.sPassword = Password;
this.sVPNName = VPNName;
this.sTopic = Topic;
connect2Solace();
}
~SolacePublisher()
{
Console.WriteLine("In destructor - Will try to dispose session and context");
if (session != null)
{
session.Disconnect();
session.Dispose();
session = null;
Console.WriteLine("In destructor - disposed session");
}
if (context != null)
{
context.Dispose();
context = null;
Console.WriteLine("In destructor - disposed context");
}
}
public void sendMessage2Solace(String msg)
{
IMessage message = ContextFactory.Instance.CreateMessage();
message.Destination = ContextFactory.Instance.CreateTopic(sTopic);
message.DeliveryMode = MessageDeliveryMode.Direct;
message.BinaryAttachment = Encoding.ASCII.GetBytes(msg);
Console.WriteLine("About to send message '{0}' to topic '{1}'", msg, sTopic);
session.Send(message);
message.Dispose();
Console.WriteLine("Message sent. Exiting.");
}
public void connect2Solace()
{
Console.WriteLine("In connect2Solace");
ContextFactoryProperties cfp = new ContextFactoryProperties();
// Set log level.
cfp.SolClientLogLevel = SolLogLevel.Warning;
// Log errors to console.
cfp.LogToConsoleError();
// Must init the API before using any of its artifacts.
ContextFactory.Instance.Init(cfp);
ContextProperties contextProps = new ContextProperties();
SessionProperties sessionProps = new SessionProperties();
sessionProps.Host = sHost;
sessionProps.UserName = sUserName;
sessionProps.Password = sPassword;
sessionProps.SSLValidateCertificate = false;
sessionProps.VPNName = sVPNName;
//Connection retry logic
sessionProps.ConnectRetries = 3; //-1 means try to connect forever.
sessionProps.ConnectTimeoutInMsecs = 5000; //10 seconds
sessionProps.ReconnectRetries = 3; //-1 means try to reconnect forever.
sessionProps.ReconnectRetriesWaitInMsecs = 5000; //wait for 5 seconds before retry
// Compression is set as a number from 0-9, where 0 means "disable
// compression", and 9 means max compression. The default is no
// compression.
// Selecting a non-zero compression level auto-selects the
// compressed SMF port on the appliance, as long as no SMF port is
// explicitly specified.
//sessionProps.CompressionLevel = 9;
#region Create the Context
context = ContextFactory.Instance.CreateContext(contextProps, null);
#endregion
#region Create and connect the Session
session = context.CreateSession(sessionProps, null, null);
session.Connect();
#endregion
}
}
}
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using SolaceSystems.Solclient.Messaging;
using System.Threading;
using System.Text;
namespace SB2SolaceCSharp
{
public static class Function1
{
private static SolacePublisher solaceConnection = new SolacePublisher(
Environment.GetEnvironmentVariable("solace-host"),
Environment.GetEnvironmentVariable("solace-username"),
Environment.GetEnvironmentVariable("solace-password"),
Environment.GetEnvironmentVariable("solace-vpnname"),
Environment.GetEnvironmentVariable("solace-topic"));
[FunctionName("Function1")]
public static void Run([ServiceBusTrigger("azure2solacecsharp", Connection = "SBConnection")]string myQueueItem, ILogger log)
{
log.LogInformation($">>>>>>>>>>>>>>>>>>>>>>>C# ServiceBus queue trigger function processed message: {myQueueItem}");
solaceConnection.sendMessage2Solace(myQueueItem);
}
}
}
//Update all the below property values to point to your environment
"SBConnection": "Endpoint=sb://sumeet.servicebus.windows.net/;SharedAccessKeyName=sendListen;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxx;",
"solace-host": "sumeet-solace.mymaas.net",
"solace-tls": false,
"solace-plain-text-port": 80,
"solace-tls-port": 443,
"solace-username": "default",
"solace-password": "dc7u1ne2ps16r5p1ss2frq4b4n",
"solace-topic": "azure/2/solace-rest"
}
}
using System;
using System.Net.Http;
using System.Reflection;
using System.Text;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
namespace SB2SolaceRest
{
public static class funcSB2SolaceREST
{
private static readonly HttpClient _Client = new HttpClient();
private static string solaceRESTUrl = null;
private static Boolean init = initializeHttpClient();
[FunctionName("funcSB2SolaceREST")]
public static async void Run([ServiceBusTrigger("azure2solacerest", Connection = "SBConnection")] string myQueueItem, ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
if (initializeHttpClient())
{
HttpRequestMessage httpRequestMessage = new HttpRequestMessage
{
Method = HttpMethod.Post,
RequestUri = new Uri(solaceRESTUrl)
};
HttpContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/text");
httpRequestMessage.Content = httpContent;
var response = await _Client.SendAsync(httpRequestMessage);
string responseString = await response.Content.ReadAsStringAsync();
log.LogInformation($"Response from Solace :{responseString}, Response Code :{response.StatusCode}");
}
else
{
log.LogInformation("Unable to initialize HTTP Client");
throw new Exception("Unable to initialize HTTP Client");
}
}
public static bool initializeHttpClient()
{
if (!init)
{
string username = Environment.GetEnvironmentVariable("solace-username");
string password = Environment.GetEnvironmentVariable("solace-password");
string auth = "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
_Client.DefaultRequestHeaders.Add("Authorization", auth);
string port = null;
string protocol = "http";
if (Boolean.Parse(Environment.GetEnvironmentVariable("solace-tls")))
{
port = Environment.GetEnvironmentVariable("solace-tls-port");
protocol = "https";
//TODO: If 2 -way TLS, we may need to add Cert related stuff
}
else
port = Environment.GetEnvironmentVariable("solace-plain-text-port");
solaceRESTUrl = protocol + "://" + Environment.GetEnvironmentVariable("solace-host") + ":" + port +
"/" + Environment.GetEnvironmentVariable("solace-topic");
init = true;
}
return init;
}
}
}
✅ You have learned how to create an Azure function that helps you stream data from ServiceBus to Solace PubSub+ Event broker.
✅ You can use similar approach to integrate with Azure, Blob Storage, Event Hub, IoT Hub, Cosmos Db etc.
✅ Event though this codelab uses C#, you can use other programming languages supported by Azure functions.
✅ The code provided here is for demonstration purposes only. It is not production ready and hence you must refer Solace PubSub+ C# .Net API refernce documentation here.
✅ For more information on Azure functions connectors check out the PubSub+ Connector Hub page
Thanks for participating in this codelab! Let us know what you thought in the Solace Community Forum! If you found any issues along the way we'd appreciate it if you'd raise them by clicking the Report a mistake button at the bottom left of this codelab.