Comments
Splitting a transaction at different levels (horizontal) for revert and contention avoidance.
Code
static ActivityHost[] processes; 2: 3: static void Main(string[] args) 4: { 5: var routingSlip = new RoutingSlip(new WorkItem[] 6: { 7: new WorkItem(new WorkItemArguments{{"vehicleType", "Compact"}}), 8: new WorkItem (new WorkItemArguments{{"roomType", "Suite"}}), 9: new WorkItem (new WorkItemArguments{{"destination", "DUS"}}) 10: }); 11: 12: 13: // imagine these being completely separate processes with queues between them 14: processes = new ActivityHost[] 15: { 16: new ActivityHost (Send), 17: new ActivityHost (Send), 18: new ActivityHost (Send) 19: }; 20: 21: // hand off to the first address 22: Send(routingSlip.ProgressUri, routingSlip); 23: } 24: 25: static void Send(Uri uri, RoutingSlip routingSlip) 26: { 27: // this is effectively the network dispatch 28: foreach (var process in processes) 29: { 30: if (process.AcceptMessage(uri, routingSlip)) 31: { 32: break; 33: } 34: } 35: }
The activities each implement a reservation step and an undo step. Here's the one for cars:
class ReserveCarActivity : Activity 2: { 3: static Random rnd = new Random(2); 4: 5: public override WorkLog DoWork(WorkItem workItem) 6: { 7: Console.WriteLine("Reserving car"); 8: var car = workItem.Arguments["vehicleType"]; 9: var reservationId = rnd.Next(100000); 10: Console.WriteLine("Reserved car {0}", reservationId); 11: return new WorkLog(this, new WorkResult { { "reservationId", reservationId } }); 12: } 13: 14: public override bool Compensate(WorkLog item, RoutingSlip routingSlip) 15: { 16: var reservationId = item.Result["reservationId"]; 17: Console.WriteLine("Cancelled car {0}", reservationId); 18: return true; 19: } 20: 21: public override Uri WorkItemQueueAddress 22: { // The sb transport allows Oracle products (bpm) to synchronously invoke an Oracle Service Bus Proxy service using RMI 23: get { return new Uri("sb://./carReservations"); } 24: } 25: 26: public override Uri CompensationQueueAddress 27: { 28: get { return new Uri("sb://./carCancellactions"); } 29: } 30: }
The chaining happens solely through the routing slip. The routing slip is "serializable" (it's not, pretend that it is) and it's the only piece of information that flows between the collaborating activities.
There is no central coordination. All work is local on the nodes and once a node is done, it either hands the routing slip forward (on success) or backward (on failure). For forward progress data, the routing slip has a queue and for backwards items it maintains a stack. The routing slip also handles resolving and invoking whatever the "next" thing to call is on the way forward and backward.
class RoutingSlip 2: { 3: readonly StackcompletedWorkLogs = new Stack (); 4: readonly Queue nextWorkItem = new Queue (); 5: 6: public RoutingSlip() 7: { 8: } 9: 10: public RoutingSlip(IEnumerable workItems) 11: { 12: foreach (var workItem in workItems) 13: { 14: this.nextWorkItem.Enqueue(workItem); 15: } 16: } 17: 18: public bool IsCompleted 19: { 20: get { return this.nextWorkItem.Count == 0; } 21: } 22: 23: public bool IsInProgress 24: { 25: get { return this.completedWorkLogs.Count > 0; } 26: } 27: 28: public bool ProcessNext() 29: { 30: if (this.IsCompleted) 31: { 32: throw new InvalidOperationException(); 33: } 34: 35: var currentItem = this.nextWorkItem.Dequeue(); 36: var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType); 37: try 38: { 39: var result = activity.DoWork(currentItem); 40: if (result != null) 41: { 42: this.completedWorkLogs.Push(result); 43: return true; 44: } 45: } 46: catch (Exception e) 47: { 48: Console.WriteLine("Exception {0}", e.Message); 49: } 50: return false; 51: } 52: 53: public Uri ProgressUri 54: { 55: get 56: { 57: if (IsCompleted) 58: { 59: return null; 60: } 61: else 62: { 63: return 64: ((Activity)Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)). 65: WorkItemQueueAddress; 66: } 67: } 68: } 69: 70: public Uri CompensationUri 71: { 72: get 73: { 74: if (!IsInProgress) 75: { 76: return null; 77: } 78: else 79: { 80: return 81: ((Activity)Activator.CreateInstance(this.completedWorkLogs.Peek().ActivityType)). 82: CompensationQueueAddress; 83: } 84: } 85: } 86: 87: public bool UndoLast() 88: { 89: if (!this.IsInProgress) 90: { 91: throw new InvalidOperationException(); 92: } 93: 94: var currentItem = this.completedWorkLogs.Pop(); 95: var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType); 96: try 97: { 98: return activity.Compensate(currentItem, this); 99: } 100: catch (Exception e) 101: { 102: Console.WriteLine("Exception {0}", e.Message); 103: throw; 104: } 105: 106: } 107: }
The local work and making the decisions is encapsulated in the ActivityHost, which calls ProcessNext() on the routing slip to resolve the next activity and call its DoWork() function on the way forward or will resolve the last executed activity on the way back and invoke its Compensate() function. Again, there's nothing centralized here; all that work hinges on the routing slip and the three activities and their execution is completely disjoint.
abstract class ActivityHost 2: { 3: Actionsend; 4: 5: public ActivityHost(Action send) 6: { 7: this.send = send; 8: } 9: 10: public void ProcessForwardMessage(RoutingSlip routingSlip) 11: { 12: if (!routingSlip.IsCompleted) 13: { 14: // if the current step is successful, proceed 15: // otherwise go to the Unwind path 16: if (routingSlip.ProcessNext()) 17: { 18: // recursion stands for passing context via message 19: // the routing slip can be fully serialized and passed 20: // between systems. 21: this.send(routingSlip.ProgressUri, routingSlip); 22: } 23: else 24: { 25: // pass message to unwind message route 26: this.send(routingSlip.CompensationUri, routingSlip); 27: } 28: } 29: } 30: 31: public void ProcessBackwardMessage(RoutingSlip routingSlip) 32: { 33: if (routingSlip.IsInProgress) 34: { 35: // UndoLast can put new work on the routing slip 36: // and return false to go back on the forward 37: // path 38: if (routingSlip.UndoLast()) 39: { 40: // recursion stands for passing context via message 41: // the routing slip can be fully serialized and passed 42: // between systems 43: this.send(routingSlip.CompensationUri, routingSlip); 44: } 45: else 46: { 47: this.send(routingSlip.ProgressUri, routingSlip); 48: } 49: } 50: } 51: 52: public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip); 53: }