In-Depth
Deploying Microservices Architecture with C#, Part 1
What's all the hubbub with microservices? The benefits are in application management and delivery. Paul Mooney shows how in this sample C# app.
More on This Topic:
Microservices are groupings of lightweight services, interconnected, although independent of each other, without direct coupling or dependency. As such, microservices allow flexibility in terms of infrastructure: Application traffic is routed to collections of services that may be distributed across CPU, disk, machine and network as opposed to a single monolithic platform designed to manage all traffic.
In its simplest form, a microservice consists of an event-listener and a message-dispatcher (see Figure 1). The event-listener polls a service-bus -- generally a durable message-queue -- and handles incoming messages. Messages consist of instructions bound in metadata and encoded in a data-interchange format such as JSON, or Protobuf.
Connecting to RabbitMQ
Everything starts with an abstraction. The AMQP library abstracts all AMQP components, and provides RabbitMQ implementations (see Figure 2).
First, we need to connect to RabbitMQ. Lisiting 1 shows Connect method in the RabbitMQAdapter class.
Listing 1: Connect() Method
public override void Connect() {
var connectionFactory = new ConnectionFactory {
HostName = hostName,
Port = port,
UserName = userName,
Password = password,
RequestedHeartbeat = heartbeat
};
if (!string.IsNullOrEmpty(virtualHost)) connectionFactory.VirtualHost = virtualHost;
_connection = connectionFactory.CreateConnection();
}
A connection is established on application start-up, and is ideally maintained for the duration of the application's lifetime.
Consuming Messages
A single running instance (generally an *.exe) can connect to RabbitMQ and consume messages in a single-threaded, blocking manner. However, this is not the most scalable solution. Processes that read messages from RabbitMQ must subscribe to a Queue, or Exchange. Once subscribed, RabbitMQ manages message delivery, in terms of even-distribution through round-robin, or biased distribution, depending on your Quality of Service (QOS) configuration. Please refer to this post for a more detailed explanation as to how this works.
For now, consider that our microservice executable can generate multiple processes, each running on a dedicated thread, to consume messages from RabbitMQ in a parallel manner. The AMQPAdapter class shown in Listing 2 contains a method designed to invoke such processes.
Listing 2: AMQPAdapter Class
public void ConsumeAsync(AMQPConsumer consumer) {
if (!IsConnected) Connect();
var thread = new Thread(o => consumer.Start(this));
thread.Start();
while (!thread.IsAlive)
Thread.Sleep(1);
}
Notice the input variable of type "AMQPConsumer." Let's take a look at that class in more detail. Essentially, the class contains Start and Stop methods, and an event-handler to handle message-delivery, as shown in Listing 3.
Listing 3: AMQPConsumer Class
public event EventHandler<MessageReceivedEventArgs> MessageReceived;
public virtual void Start(AMQPAdapter amqpAdapter) {
stopConsuming = false;
}
public void Stop() {
stopConsuming = true;
}
protected void OnMessageReceived(MessageReceivedEventArgs e) {
var handler = MessageReceived;
if (handler != null) handler(this, e);
}
Like most classes in this project, this is an AMQP abstraction. Listing 4 shows the RabbitMQ implementation.
Listing 4: RabbitMQ's Version of Listing 3
protected void Start(AMQPAdapter amqpAdapter, bool catchAllExceptions) {
base.Start(amqpAdapter);
try {
var connection = (IConnection) amqpAdapter.GetConnection();
using (var channel = connection.CreateModel()) {
if (createQueue) channel.QueueDeclare(queueName, true, false, false, queueArgs);
channel.BasicQos(0, prefetchCount, false);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, noAck, consumer);
while (!stopConsuming) {
try {
BasicDeliverEventArgs;
var messageIsAvailable = consumer.Queue.Dequeue(timeout, out basicDeliverEventArgs);
if (!messageIsAvailable) continue;
var payload = basicDeliverEventArgs.Body;
var message = Encoding.UTF8.GetString(payload);
OnMessageReceived(new MessageReceivedEventArgs {
Message = message,
EventArgs = basicDeliverEventArgs
});
if (implicitAck && !noAck) channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
}
catch (Exception exception) {
OnMessageReceived(new MessageReceivedEventArgs {
Exception = new AMQPConsumerProcessingException(exception)
});
if (!catchAllExceptions) Stop();
}
}
}
}
catch (Exception exception) {
OnMessageReceived(new MessageReceivedEventArgs {
Exception = new AMQPConsumerInitialisationException(exception)
});
}
Connect to a RabbitMQ instance as follows:
var adapter = RabbitMQAdapter.Instance;
adapter.Init("hostName", 1234, "userName", "password", 50);
adapter.Connect();
Notice the static declaration of the RabbitMQAdapter class. RabbitMQ connections in this library are thread-safe; a single connection will facilitate all requests to RabbitMQ.
RabbitMQ implements the concept of Channels, which are essentially subsets of a physical connection. Once a connection is established, Channels, which are logical segments of the underlying Connection, can be invoked in order to interface with RabbitMQ. A single RabbitMQ connection can support up to 65,535 Channels (although I would personally scale out client instances rather than establish such a high number of Channels). Let's look at publishing a message to RabbitMQ, as shown in Listing 5.
Listing 5: Publishing Message to RabbitMQ
public override void Publish(string message, string exchangeName, string routingKey,
IBasicProperties messageProperties = null) {
if (!IsConnected) Connect();
using (var channel = _connection.CreateModel()) {
var payload = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey,
messageProperties ?? RabbitMQProperties.CreateDefaultProperties(channel), payload);
}
}
Notice the _connection.CreateModel() method call, which establishes a Channel to interface with RabbitMQ. The Channel is encapsulated within a using block; once we've completed our operation, the Channel may be disposed. Channels are relatively cheap to create, in terms of resources, and may be created and dropped liberally.
Messages are sent in UTF-8, byte-format. Here is how to publish a message to RabbitMQ:
var message = "Hello, World!";
adapter.Publish(message, "queueName");
This method also contains overloaded exchangeName and routingKey parameters. These are used to control the flow of messages through RabbitMQ resources. This concept is well documented here.
Now let's attempt to read our message back from RabbitMQ:
string output;
BasicDeliverEventArgs eventArgs;
adapter.TryGetNextMessage("queueName", out output, out eventArgs, 50);
The TryGetNextMessage method reads the next message from the specified Queue, when available. The method will return false in the event that the Queue is empty, after the specified timeout period has elapsed.
Listing 6 contains the complete code.
Listing 6: TryGetNextMessage Method
private static void Main(string[] args) {
var adapter = RabbitMQAdapter.Instance;
adapter.Init("hostName", 1234, "userName", "password", 50);
adapter.Connect();
var message = "Hello, World!";
adapter.Publish(message, "queueName");
string output;
BasicDeliverEventArgs eventArgs;
adapter.TryGetNextMessage("queueName", out output, out eventArgs, 50);
}
Consistent Message Polling
Reading one message at a time may not be the most efficient means of consuming messages. I mentioned the AMQPConsumer class at the beginning of this post. This code outlines a means to continuously read messages from a RabbitMQ Queue:
var consumer = new RabbitMQConsumerCatchAll("queueName", 10);
adapter.ConsumeAsync(consumer);
Console.ReadLine();
adapter.StopConsumingAsync(consumer);
Note the RabbitMQConsumerCatchAll class instantiation. This class is an implementation of RabbitMQConsumer. All potential exceptions that occur will be handled by this consumer and persisted back to the client along the same Channel as valid messages. As an alternative, the RabbitMQConsumerCatchOne instance can be leveraged instead. Both classes achieve the same purpose, with the exception of their error-handling logic. The RabbitMQConsumerCatchOne class will disconnect from RabbitMQ should an exception occur.
Now that we're at this point, it's time to put the app into production, which we'll do in Part 2. We do that as well as explore a number of issues that you need to deal with to ensure that messages reach their intended targets.
About the Author
Paul Mooney is a technology consultant proficient in C#, JavaScript, Java, and Golang, and a well-known software-development mentor. He also created the Encrypted Token Pattern and ARMOR, its .NET implementation. Contact Paul through his blog, http://insidethecpu.com/.