Comments
Splitting a transaction at different levels (horizontal) for revert and contention avoidance.
Code
static ActivityHost[] processes;
2:
3: static void Main(string[] args)
4: {
5: var routingSlip = new RoutingSlip(new WorkItem[]
6: {
7: new WorkItem(new WorkItemArguments{{"vehicleType", "Compact"}}),
8: new WorkItem(new WorkItemArguments{{"roomType", "Suite"}}),
9: new WorkItem(new WorkItemArguments{{"destination", "DUS"}})
10: });
11:
12:
13: // imagine these being completely separate processes with queues between them
14: processes = new ActivityHost[]
15: {
16: new ActivityHost(Send),
17: new ActivityHost(Send),
18: new ActivityHost(Send)
19: };
20:
21: // hand off to the first address
22: Send(routingSlip.ProgressUri, routingSlip);
23: }
24:
25: static void Send(Uri uri, RoutingSlip routingSlip)
26: {
27: // this is effectively the network dispatch
28: foreach (var process in processes)
29: {
30: if (process.AcceptMessage(uri, routingSlip))
31: {
32: break;
33: }
34: }
35: }
The activities each implement a reservation step and an undo step. Here's the one for cars:
class ReserveCarActivity : Activity
2: {
3: static Random rnd = new Random(2);
4:
5: public override WorkLog DoWork(WorkItem workItem)
6: {
7: Console.WriteLine("Reserving car");
8: var car = workItem.Arguments["vehicleType"];
9: var reservationId = rnd.Next(100000);
10: Console.WriteLine("Reserved car {0}", reservationId);
11: return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
12: }
13:
14: public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
15: {
16: var reservationId = item.Result["reservationId"];
17: Console.WriteLine("Cancelled car {0}", reservationId);
18: return true;
19: }
20:
21: public override Uri WorkItemQueueAddress
22: {
// The sb transport allows Oracle products (bpm) to synchronously invoke an Oracle Service Bus Proxy service using RMI
23: get { return new Uri("sb://./carReservations"); }
24: }
25:
26: public override Uri CompensationQueueAddress
27: {
28: get { return new Uri("sb://./carCancellactions"); }
29: }
30: }
The chaining happens solely through the routing slip. The routing slip is "serializable" (it's not, pretend that it is) and it's the only piece of information that flows between the collaborating activities.
There is no central coordination. All work is local on the nodes and once a node is done, it either hands the routing slip forward (on success) or backward (on failure). For forward progress data, the routing slip has a queue and for backwards items it maintains a stack. The routing slip also handles resolving and invoking whatever the "next" thing to call is on the way forward and backward.
class RoutingSlip
2: {
3: readonly Stack completedWorkLogs = new Stack();
4: readonly Queue nextWorkItem = new Queue();
5:
6: public RoutingSlip()
7: {
8: }
9:
10: public RoutingSlip(IEnumerable workItems)
11: {
12: foreach (var workItem in workItems)
13: {
14: this.nextWorkItem.Enqueue(workItem);
15: }
16: }
17:
18: public bool IsCompleted
19: {
20: get { return this.nextWorkItem.Count == 0; }
21: }
22:
23: public bool IsInProgress
24: {
25: get { return this.completedWorkLogs.Count > 0; }
26: }
27:
28: public bool ProcessNext()
29: {
30: if (this.IsCompleted)
31: {
32: throw new InvalidOperationException();
33: }
34:
35: var currentItem = this.nextWorkItem.Dequeue();
36: var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType);
37: try
38: {
39: var result = activity.DoWork(currentItem);
40: if (result != null)
41: {
42: this.completedWorkLogs.Push(result);
43: return true;
44: }
45: }
46: catch (Exception e)
47: {
48: Console.WriteLine("Exception {0}", e.Message);
49: }
50: return false;
51: }
52:
53: public Uri ProgressUri
54: {
55: get
56: {
57: if (IsCompleted)
58: {
59: return null;
60: }
61: else
62: {
63: return
64: ((Activity)Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)).
65: WorkItemQueueAddress;
66: }
67: }
68: }
69:
70: public Uri CompensationUri
71: {
72: get
73: {
74: if (!IsInProgress)
75: {
76: return null;
77: }
78: else
79: {
80: return
81: ((Activity)Activator.CreateInstance(this.completedWorkLogs.Peek().ActivityType)).
82: CompensationQueueAddress;
83: }
84: }
85: }
86:
87: public bool UndoLast()
88: {
89: if (!this.IsInProgress)
90: {
91: throw new InvalidOperationException();
92: }
93:
94: var currentItem = this.completedWorkLogs.Pop();
95: var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType);
96: try
97: {
98: return activity.Compensate(currentItem, this);
99: }
100: catch (Exception e)
101: {
102: Console.WriteLine("Exception {0}", e.Message);
103: throw;
104: }
105:
106: }
107: }
The local work and making the decisions is encapsulated in the ActivityHost, which calls ProcessNext() on the routing slip to resolve the next activity and call its DoWork() function on the way forward or will resolve the last executed activity on the way back and invoke its Compensate() function. Again, there's nothing centralized here; all that work hinges on the routing slip and the three activities and their execution is completely disjoint.
abstract class ActivityHost
2: {
3: Action send;
4:
5: public ActivityHost(Action send)
6: {
7: this.send = send;
8: }
9:
10: public void ProcessForwardMessage(RoutingSlip routingSlip)
11: {
12: if (!routingSlip.IsCompleted)
13: {
14: // if the current step is successful, proceed
15: // otherwise go to the Unwind path
16: if (routingSlip.ProcessNext())
17: {
18: // recursion stands for passing context via message
19: // the routing slip can be fully serialized and passed
20: // between systems.
21: this.send(routingSlip.ProgressUri, routingSlip);
22: }
23: else
24: {
25: // pass message to unwind message route
26: this.send(routingSlip.CompensationUri, routingSlip);
27: }
28: }
29: }
30:
31: public void ProcessBackwardMessage(RoutingSlip routingSlip)
32: {
33: if (routingSlip.IsInProgress)
34: {
35: // UndoLast can put new work on the routing slip
36: // and return false to go back on the forward
37: // path
38: if (routingSlip.UndoLast())
39: {
40: // recursion stands for passing context via message
41: // the routing slip can be fully serialized and passed
42: // between systems
43: this.send(routingSlip.CompensationUri, routingSlip);
44: }
45: else
46: {
47: this.send(routingSlip.ProgressUri, routingSlip);
48: }
49: }
50: }
51:
52: public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip);
53: }