Compute Resource Consolidation Pattern

 

Comments

 

Every computation unit contains a/several worker process which can cancel on timeout or an event

 

Code

 

public class WorkerRole: RoleEntryPoint
{
  // The cancellation token source used to cooperatively cancel running tasks.
  private readonly CancellationTokenSource cts = new CancellationTokenSource ();

  // List of tasks running on the role instance.
  private readonly List tasks = new List();

  // List of worker tasks to run on this role.
  private readonly List> workerTasks  
                        = new List>
    {
      MyWorkerTask1,
      MyWorkerTask2
    };
  
  ...
}

The MyWorkerTask1 and the MyWorkerTask2 methods are provided to illustrate how to perform different tasks within the same worker role. The following code shows MyWorkerTask1. This is a simple task that sleeps for 30 seconds and then outputs a trace message. It repeats this process indefinitely until the task is cancelled. The code in MyWorkerTask2 is very similar.

 

private static async Task MyWorkerTask1(CancellationToken ct)
{
  // Fixed interval to wake up and check for work and/or do work.
  var interval = TimeSpan.FromSeconds(30);

  try
  {
    while (!ct.IsCancellationRequested)
    {
      // Wake up and do some background processing if not canceled.
      // TASK PROCESSING CODE HERE
      Trace.TraceInformation("Doing Worker Task 1 Work");

      // Go back to sleep for a period of time unless asked to cancel.
      // Task.Delay will throw an OperationCanceledException when canceled.
      await Task.Delay(interval, ct);
    }
  }
  catch (OperationCanceledException)
  {
    // Expect this exception to be thrown in normal circumstances or check
    // the cancellation token. If the role instances are shutting down, a
    // cancellation request will be signaled.
    Trace.TraceInformation("Stopping service, cancellation requested");

    // Re-throw the exception.
    throw;
  }
}

After the worker role has initialized the resources it uses, the Run method starts the two tasks concurrently, as shown here.

 

// RoleEntry Run() is called after OnStart().  
// Returning from Run() will cause a role instance to recycle.
public override void Run()
{
  // Start worker tasks and add them to the task list.
  foreach (var worker in workerTasks)
    tasks.Add(worker(cts.Token));

  Trace.TraceInformation("Worker host tasks started");
  // The assumption is that all tasks should remain running and not return, 
  // similar to role entry Run() behavior.
  try
  {
    Task.WaitAny(tasks.ToArray());
  }
  catch (AggregateException ex)
  {
    Trace.TraceError(ex.Message);

    // If any of the inner exceptions in the aggregate exception 
    // are not cancellation exceptions then re-throw the exception.
    ex.Handle(innerEx => (innerEx is OperationCanceledException));
  }

  // If there was not a cancellation request, stop all tasks and return from Run()
  // An alternative to cancelling and returning when a task exits would be to 
  // restart the task.
  if (!cts.IsCancellationRequested)
  {
    Trace.TraceInformation("Task returned without cancellation request");
    Stop(TimeSpan.FromMinutes(5));
  }
}

In this example, the Run method waits for tasks to be completed. If a task is canceled, the Run method assumes that the role is being shut down and waits for the remaining tasks to be canceled before finishing (it waits for a maximum of five minutes before terminating). If a task fails due to an expected exception, the Run method cancels the task.

 

The Stop method shown in the following code is called when the fabric controller shuts down the role instance (it is invoked from the OnStop method). The code stops each task gracefully by cancelling it. If any task takes more than five minutes to complete, the cancellation processing in the Stopmethod ceases waiting and the role is terminated.

 

// Stop running tasks and wait for tasks to complete before returning 
// unless the timeout expires.
private void Stop(TimeSpan timeout)
{
  Trace.TraceInformation("Stop called. Canceling tasks.");
  // Cancel running tasks.
  cts.Cancel();

  Trace.TraceInformation("Waiting for canceled tasks to finish and return");

  // Wait for all the tasks to complete before returning. Note that the 
  // emulator currently allows 30 seconds and Azure allows five
  // minutes for processing to complete.
  try
  {
    Task.WaitAll(tasks.ToArray(), timeout);
  }
  catch (AggregateException ex)
  {
    Trace.TraceError(ex.Message);

    // If any of the inner exceptions in the aggregate exception 
    // are not cancellation exceptions then re-throw the exception.
    ex.Handle(innerEx => (innerEx is OperationCanceledException));
  }
}