Comments
Competing Consumers Pattern RabbitMQ-server message broker
Code
The PriorityQueue solution in the code available with this guidance contains an implementation of this approach. This solution contains two worker roles projects named PriorityQueue.
High and PriorityQueue. Low. These two worker roles inherit from a class called PriorityWorkerRole which contains the functionality for connecting to a specified subscription in the OnStart method.
The PriorityQueue.High and PriorityQueue.Low worker roles connect to different subscriptions, defined by their configuration settings. An administrator can configure different numbers of each role to be run; typically there will be more instances of the PriorityQueue.High worker role than the PriorityQueue.
Low worker role. The Run method in the PriorityWorkerRole class arranges for the virtual ProcessMessage method (also defined in the PriorityWorkerRole class) to be executed for each message received on the queue.
The following code shows the Run and ProcessMessage methods. The QueueManager class, defined in the PriorityQueue.Shared project, provides helper methods for using Azure Service Bus queues.
public class PriorityWorkerRole : RoleEntryPoint { private QueueManager queueManager; ... public override void Run() { // Start listening for messages on the subscription. var subscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName"); this.queueManager.ReceiveMessages(subscriptionName, this.ProcessMessage); ...; } ... protected virtual async Task ProcessMessage(BrokeredMessage message) { // Simulating processing. await Task.Delay(TimeSpan.FromSeconds(2)); } }
The PriorityQueue.High and PriorityQueue.Low worker roles both override the default functionality of the ProcessMessage method. The code below shows the ProcessMessage method for the PriorityQueue.High worker role.
protected override async Task ProcessMessage(BrokeredMessage message) { // Simulate message processing for High priority messages. await base.ProcessMessage(message); Trace.TraceInformation("High priority message processed by " + RoleEnvironment.CurrentRoleInstance.Id + " MessageId: " + message.MessageId); }
When an application posts messages to the topic associated with the subscriptions used by the PriorityQueue.High and PriorityQueue.Low worker roles, it specifies the priority by using the Priority custom property, as shown in the following code example.
This code (which is implemented in theWorkerRole class in the PriorityQueue.Sender project), uses the SendBatchAsync helper method of the QueueManager class to post messages to a topic in batches.
// Send a low priority batch. var lowMessages = new List(); for (int i = 0; i < 10; i++) { var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() }; message.Properties["Priority"] = Priority.Low; lowMessages.Add(message); } this.queueManager.SendBatchAsync(lowMessages).Wait(); ... // Send a high priority batch. var highMessages = new List (); for (int i = 0; i < 10; i++) { var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() }; message.Properties["Priority"] = Priority.High; highMessages.Add(message); } this.queueManager.SendBatchAsync(highMessages).Wait();