Compensating Design Pattern

 

Comments

 

Splitting a transaction at different levels (horizontal) for revert and contention avoidance.

 

Code

 

 

static ActivityHost[] processes;
   2:   
   3:  static void Main(string[] args)
   4:  {
   5:      var routingSlip = new RoutingSlip(new WorkItem[]
   6:          {
   7:              new WorkItem(new WorkItemArguments{{"vehicleType", "Compact"}}),
   8:              new WorkItem(new WorkItemArguments{{"roomType", "Suite"}}),
   9:              new WorkItem(new WorkItemArguments{{"destination", "DUS"}})
  10:          });
  11:   
  12:   
  13:      // imagine these being completely separate processes with queues between them
  14:      processes = new ActivityHost[]
  15:                          {
  16:                              new ActivityHost(Send),
  17:                              new ActivityHost(Send),
  18:                              new ActivityHost(Send)
  19:                          };
  20:   
  21:      // hand off to the first address
  22:      Send(routingSlip.ProgressUri, routingSlip);
  23:  }
  24:   
  25:  static void Send(Uri uri, RoutingSlip routingSlip)
  26:  {
  27:      // this is effectively the network dispatch
  28:      foreach (var process in processes)
  29:      {
  30:          if (process.AcceptMessage(uri, routingSlip))
  31:          {
  32:              break;
  33:          }
  34:      }
  35:  }

The activities each implement a reservation step and an undo step. Here's the one for cars:

 

class ReserveCarActivity : Activity
   2:  {
   3:      static Random rnd = new Random(2);
   4:   
   5:      public override WorkLog DoWork(WorkItem workItem)
   6:      {
   7:          Console.WriteLine("Reserving car");
   8:          var car = workItem.Arguments["vehicleType"];
   9:          var reservationId = rnd.Next(100000);
  10:          Console.WriteLine("Reserved car {0}", reservationId);
  11:          return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
  12:      }
  13:   
  14:      public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
  15:      {
  16:          var reservationId = item.Result["reservationId"];
  17:          Console.WriteLine("Cancelled car {0}", reservationId);
  18:          return true;
  19:      }
  20:   
  21:      public override Uri WorkItemQueueAddress
  22:      {
// The sb transport allows Oracle products (bpm) to synchronously invoke an Oracle Service Bus Proxy service using RMI
  23:          get { return new Uri("sb://./carReservations"); }
  24:      }
  25:   
  26:      public override Uri CompensationQueueAddress
  27:      {
  28:          get { return new Uri("sb://./carCancellactions"); }
  29:      }
  30:  }

The chaining happens solely through the routing slip. The routing slip is "serializable" (it's not, pretend that it is) and it's the only piece of information that flows between the collaborating activities.

 

There is no central coordination. All work is local on the nodes and once a node is done, it either hands the routing slip forward (on success) or backward (on failure). For forward progress data, the routing slip has a queue and for backwards items it maintains a stack. The routing slip also handles resolving and invoking whatever the "next" thing to call is on the way forward and backward.

 

class RoutingSlip
   2:  {
   3:      readonly Stack completedWorkLogs = new Stack();
   4:      readonly Queue nextWorkItem = new Queue();
   5:   
   6:      public RoutingSlip()
   7:      {
   8:      }
   9:   
  10:      public RoutingSlip(IEnumerable workItems)
  11:      {
  12:          foreach (var workItem in workItems)
  13:          {
  14:              this.nextWorkItem.Enqueue(workItem);
  15:          }
  16:      }
  17:   
  18:      public bool IsCompleted
  19:      {
  20:          get { return this.nextWorkItem.Count == 0; }
  21:      }
  22:   
  23:      public bool IsInProgress
  24:      {
  25:          get { return this.completedWorkLogs.Count > 0; }
  26:      }
  27:   
  28:      public bool ProcessNext()
  29:      {
  30:          if (this.IsCompleted)
  31:          {
  32:              throw new InvalidOperationException();
  33:          }
  34:   
  35:          var currentItem = this.nextWorkItem.Dequeue();
  36:          var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType);
  37:          try
  38:          {
  39:              var result = activity.DoWork(currentItem);
  40:              if (result != null)
  41:              {
  42:                  this.completedWorkLogs.Push(result);
  43:                  return true;
  44:              }
  45:          }
  46:          catch (Exception e)
  47:          {
  48:              Console.WriteLine("Exception {0}", e.Message);
  49:          }
  50:          return false;
  51:      }
  52:   
  53:      public Uri ProgressUri
  54:      {
  55:          get
  56:          {
  57:              if (IsCompleted)
  58:              {
  59:                  return null;
  60:              }
  61:              else
  62:              {
  63:                  return
  64:                      ((Activity)Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)).
  65:                          WorkItemQueueAddress;
  66:              }
  67:          }
  68:      }
  69:   
  70:      public Uri CompensationUri
  71:      {
  72:          get
  73:          {
  74:              if (!IsInProgress)
  75:              {
  76:                  return null;
  77:              }
  78:              else
  79:              {
  80:                  return
  81:                      ((Activity)Activator.CreateInstance(this.completedWorkLogs.Peek().ActivityType)).
  82:                          CompensationQueueAddress;
  83:              }
  84:          }
  85:      }
  86:   
  87:      public bool UndoLast()
  88:      {
  89:          if (!this.IsInProgress)
  90:          {
  91:              throw new InvalidOperationException();
  92:          }
  93:   
  94:          var currentItem = this.completedWorkLogs.Pop();
  95:          var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType);
  96:          try
  97:          {
  98:              return activity.Compensate(currentItem, this);
  99:          }
 100:          catch (Exception e)
 101:          {
 102:              Console.WriteLine("Exception {0}", e.Message);
 103:              throw;
 104:          }
 105:   
 106:      }
 107:  }

The local work  and making the decisions is encapsulated in the ActivityHost, which calls ProcessNext() on the routing slip to resolve the next activity and call its DoWork() function on the way forward or will resolve the last executed activity on the way back and invoke its Compensate() function. Again, there's nothing centralized here; all that work hinges on the routing slip and the three activities and their execution is completely disjoint.

 

abstract class ActivityHost
   2:  {
   3:      Action send;
   4:   
   5:      public ActivityHost(Action send)
   6:      {
   7:          this.send = send;
   8:      }
   9:   
  10:      public void ProcessForwardMessage(RoutingSlip routingSlip)
  11:      {
  12:          if (!routingSlip.IsCompleted)
  13:          {
  14:              // if the current step is successful, proceed
  15:              // otherwise go to the Unwind path
  16:              if (routingSlip.ProcessNext())
  17:              {
  18:                  // recursion stands for passing context via message
  19:                  // the routing slip can be fully serialized and passed
  20:                  // between systems. 
  21:                  this.send(routingSlip.ProgressUri, routingSlip);
  22:              }
  23:              else
  24:              {
  25:                  // pass message to unwind message route
  26:                  this.send(routingSlip.CompensationUri, routingSlip);
  27:              }
  28:          }
  29:      }
  30:   
  31:      public void ProcessBackwardMessage(RoutingSlip routingSlip)
  32:      {
  33:          if (routingSlip.IsInProgress)
  34:          {
  35:              // UndoLast can put new work on the routing slip
  36:              // and return false to go back on the forward 
  37:              // path
  38:              if (routingSlip.UndoLast())
  39:              {
  40:                  // recursion stands for passing context via message
  41:                  // the routing slip can be fully serialized and passed
  42:                  // between systems 
  43:                  this.send(routingSlip.CompensationUri, routingSlip);
  44:              }
  45:              else
  46:              {
  47:                  this.send(routingSlip.ProgressUri, routingSlip);
  48:              }
  49:          }
  50:      }
  51:   
  52:      public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip);
  53:  }