Competing Consumers Pattern

 

Comments

 

RabbitMQ Message Broker Implementation

 

Code

 

The following code shows from the QueueManager class in CompetingConsumers solution of the examples available for download for this guidance shows how you can create a queue by using a QueueClient instance in the Start event handler in a web or worker role.

 

private string queueName = ...;
private string connectionString = ...;
...

public async Task Start()
{
  // Check if the queue already exists.
  var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
  if (!manager.QueueExists(this.queueName))
  {
    var queueDescription = new QueueDescription(this.queueName);

    // Set the maximum delivery count for messages in the queue. A message 
    // is automatically dead-lettered after this number of deliveries. The
    // default value for dead letter count is 10.
    queueDescription.MaxDeliveryCount = 3;

    await manager.CreateQueueAsync(queueDescription);
  }
  ...

  // Create the queue client. By default the PeekLock method is used.
  this.client = QueueClient.CreateFromConnectionString(
    this.connectionString, this.queueName);
}

The next code snippet shows how an application can create and send a batch of messages to the queue.

 

public async Task SendMessagesAsync()
{
  // Simulate sending a batch of messages to the queue.
  var messages = new List();

  for (int i = 0; i < 10; i++)
  {
    var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
    messages.Add(message);
  }
  await this.client.SendBatchAsync(messages);
}

The following code shows how a consumer service instance can receive messages from the queue by following an event-driven approach. TheprocessMessageTask parameter to the ReceiveMessages method is a delegate that references the code to run when a message is received. This code is run asynchronously.

 

private ManualResetEvent pauseProcessingEvent;
...

public void ReceiveMessages(Func processMessageTask)
{
  // Set up the options for the message pump.
  var options = new OnMessageOptions();

  // When AutoComplete is disabled it is necessary to manually
  // complete or abandon the messages and handle any errors.
  options.AutoComplete = false;
  options.MaxConcurrentCalls = 10;
  options.ExceptionReceived += this.OptionsOnExceptionReceived;

  // Use of the Service Bus OnMessage message pump. 
  // The OnMessage method must be called once, otherwise an exception will occur.
  this.client.OnMessageAsync(
    async (msg) =>
    {
      // Will block the current thread if Stop is called.
      this.pauseProcessingEvent.WaitOne();

      // Execute processing task here.
      await processMessageTask(msg);
    },
    options);
}
...

private void OptionsOnExceptionReceived(object sender, 
  ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
  ...
}