Pipes and Filters Pattern

 

 

Comments

 

Refactoring of a large scale service into workflows consisting of pipes and filters i.e. input process (same re-usable code held on one service) output rather than multiple instances of same code in different services.

 

Code

 

TheServiceBusPipeFilter class shown below provides an example. It demonstrates how you can implement a filter that receives input messages from a queue, processes these messages, and posts the results to another queue.

 

public class ServiceBusPipeFilter
{
  ...
  private readonly string inQueuePath;
  private readonly string outQueuePath;
  ...
  private QueueClient inQueue;
  private QueueClient outQueue;
  ...

  public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
  {
     ...
     this.inQueuePath = inQueuePath;
     this.outQueuePath = outQueuePath;
  }

  public void Start()
  {
    ...
    // Create the outbound filter queue if it does not exist.
    ...
    this.outQueue = QueueClient.CreateFromConnectionString(...);
    
    ...
    // Create the inbound and outbound queue clients.
    this.inQueue = QueueClient.CreateFromConnectionString(...);
  }

  public void OnPipeFilterMessageAsync(
    Func> asyncFilterTask, ...) 
  {
    ...

    this.inQueue.OnMessageAsync(
      async (msg) =>
    {
      ...
      // Process the filter and send the output to the 
      // next queue in the pipeline.
      var outMessage = await asyncFilterTask(msg);

      // Send the message from the filter processor 
      // to the next queue in the pipeline.
      if (outQueue != null)
      {
        await outQueue.SendAsync(outMessage);
      }

      // Note: There is a chance that the same message could be sent twice 
      // or that a message may be processed by an upstream or downstream 
      // filter at the same time.
      // This would happen in a situation where processing of a message was
      // completed, it was sent to the next pipe/queue, and then failed 
      // to complete when using the PeekLock method.
      // Idempotent message processing and concurrency should be considered 
      // in a real-world implementation.
    },
    options);
  }

  public async Task Close(TimeSpan timespan)
  {
    // Pause the processing threads.
    this.pauseProcessingEvent.Reset();

    // There is no clean approach for waiting for the threads to complete
    // the processing. This example simply stops any new processing, waits
    // for the existing thread to complete, then closes the message pump 
    // and finally returns.
    Thread.Sleep(timespan);

    this.inQueue.Close();
    ...
  }

  ...
}

The Start method in the ServiceBusPipeFilter class connects to a pair of input and output queues, and the Close method disconnects from the input queue.

 

The OnPipeFilterMessageAsync method performs the actual processing of messages; the asyncFilterTask parameter to this method specifies the processing to be performed.

 

The OnPipeFilterMessageAsync method waits for incoming messages on the input queue, runs the code specified by the asyncFilterTask parameter over each messages as it arrives, and posts the results to the output queue. The queues themselves are specified by the constructor.

 

The sample solution implements filters in a set of worker roles. Each worker role can be scaled independently, depending on the complexity of the business processing that it performs or the resources that it requires to perform this processing. Additionally, multiple instances of each worker role can be run in parallel to improve throughput.

 

The following code shows a Azure worker role named PipeFilterARoleEntry, which is defined in the PipeFilterA project in the sample solution.

 

public class PipeFilterARoleEntry : RoleEntryPoint
{
  ...
  private ServiceBusPipeFilter pipeFilterA;

  public override bool OnStart()
  {
    ...
    this.pipeFilterA = new ServiceBusPipeFilter(
      ...,
      Constants.QueueAPath,
      Constants.QueueBPath);

    this.pipeFilterA.Start();
    ...
  }

  public override void Run()
  {
    this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
    {
      // Clone the message and update it.
      // Properties set by the broker (Deliver count, enqueue time, ...) 
      // are not cloned and must be copied over if required.
      var newMsg = msg.Clone();
      
      await Task.Delay(500); // DOING WORK

      Trace.TraceInformation("Filter A processed message:{0} at {1}", 
        msg.MessageId, DateTime.UtcNow);

      newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");

      return newMsg;
    });

    ...
  }

  ...
}

This role contains a ServiceBusPipeFilter object. The OnStart method in the role connects to the queues for receiving input messages and posting output messages (the names of the queues are defined in the Constants class).

 

The Run method invokes the OnPipeFilterMessagesAsync method to perform some processing on each message that is received (in this example, the processing is simulated by waiting for a short period of time). When processing is complete, a new message is constructed containing the results (in this case, the input message is simply augmented with a custom property), and this message is posted to the output queue.

 

The sample code contains another worker role named PipeFilterBRoleEntry in the PipeFilterB project. This role is similar to PipeFilterARoleEntryexcept that it performs different processing in the Run method. In the example solution, these two roles are combined to construct a pipeline; the output queue for the PipeFilterARoleEntry role is the input queue for the PipeFilterBRoleEntry role.

 

The sample solution also provides two further roles named InitialSenderRoleEntry (in the InitialSender project) and FinalReceiverRoleEntry (in the FinalReceiver project). The InitialSenderRoleEntry role provides the initial message in the pipeline. The OnStart method connects to a single queue and the Run method posts a method to this queue.

 

This queue is the input queue used by the PipeFilterARoleEntry role, so posting a message to this queue causes the message to be received and processed by the PipeFilterARoleEntry role. The processed message then passes through thePipeFilterBRoleEntry role.

 

The input queue for the FinalReceiveRoleEntry role is the output queue for the PipeFilterBRoleEntry role. The Run method in theFinalReceiveRoleEntry role, shown below, receives the message and performs some final processing. Then it writes the values of the custom properties added by the filters in the pipeline to the trace output.

 

public class FinalReceiverRoleEntry : RoleEntryPoint
{
  ...
  // Final queue/pipe in the pipeline from which to process data.
  private ServiceBusPipeFilter queueFinal;

  public override bool OnStart()
  {
    ...
    // Set up the queue.
    this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
    this.queueFinal.Start();
    ...
  }

  public override void Run()
  {
    this.queueFinal.OnPipeFilterMessageAsync(
      async (msg) =>
      {
        await Task.Delay(500); // DOING WORK

        // The pipeline message was received.
        Trace.TraceInformation(
          "Pipeline Message Complete - FilterA:{0} FilterB:{1}",
          msg.Properties[Constants.FilterAMessageKey],
          msg.Properties[Constants.FilterBMessageKey]);

        return null;
      });
    ...
  }

  ...
}