Posts

Taskmatics Scheduler 1.1 Released

We’ve released version 1.1.0.0 today which contains many performance improvements as well as some new features that were requested by the community. This means that if you download the installer today you’ll be getting the new version, and we’ve updated the Taskmatics Scheduler package on the Nuget repository with a new version. The primary focus of the build was to improve system-wide performance. In cases where there are a significant number of tasks that run at high frequencies we’ve seen some slowness. With our changes we’ve come to see improvements of between 80% to 90% in terms of time. This isn’t limited to the administration UI screens, but also applies to the reports as well.

How to get it

To get the 1.1.0.0 installer, go here!

What’s Inside

The full list of changes in this release are as follows:

  • Added ability to set exclusion dates in the calendar trigger on which the scheduled task will not run.
  • Fixed an intermittent trigger loading error when adding a trigger to an existing scheduled task.
  • Added performance improvements to all reports.
  • Added performance improvements to dashboard loading.
  • Added performance improvements to task dispatching.
  • Added performance improvements to task history search.
  • Database schema changes to help improve overall system performance.
  • Fix for intermittent out of memory exception when querying resource usage.
  • Added filter to the Dashboard screen to allow users to find scheduled tasks that contain the entered term (similar to filter on other listing screens).
  • Added additional error handling and update messaging to the installer process.

Breaking Changes

While it’s always a goal to be fully backwards compatible, this release does contain one breaking change that may require a rebuild of one or more tasks. In the past, each Task instance was identified with a Globally Unique Identifier, or GUID. These identifiers are great at ensuring that you can have billions of tasks that don’t collide with each other but the downside to their guarantee of uniqueness is that they also are not as performant as using sequential integers. As part of the new release, we’ve re-mapped all task instances to use straight 64 bit integers, which gives us the same uniqueness guarantee, but also performs much better under search.

You should update all of your task libraries to reference the Taskmatics.Scheduler.Core 1.1.0.0 Nuget package. If any of your tasks reference TaskBase.Context.TaskInstanceId, you will need to rebuild and re-deploy those tasks as the TaskBase.Context.TaskInstanceId property is now represented as Int64 instead of Guid.

Upgrade Instructions

As of now, we don’t have an upgradeable installer, which means that to upgrade a 1.0.0.0 installation of Taskmatics Scheduler, you should first uninstall the previous version and then simply run the new 1.1.0.0 installer and point to the existing database where Taskmatics was originally set up to use. Note: Uninstalling Taskmatics Scheduler will NEVER remove the data in your database. We suggest following these steps to install the new version on your system:

  1. Stop all coordinator and agent Windows services (done from the Services snap-in).
  2. Backup your existing database. This is precautionary in case anything unexpected happens during the upgrade.
  3. Take note of the following information from your current installation:
    • Your serial number, which is in your email you got when you purchased Taskmatics Scheduler. You can also find it in the license.xml file in the ProgramData\Taskmatics\Scheduler directory.
    • Your current database server, database name and the runtime credentials. These can be found in the ProgramData\Taskmatics\Scheduler\Taskmatics.Scheduler.Coordination.ServiceHost.exe.config file.
    • The runtime users for both the coordinator and agent Windows services, which you can see from the services snap-in.
    • The root filesets path and the root working folders path, which are found in the .config files located in the ProgramData\Taskmatics\Scheduler directory.
  4. Uninstall the existing Taskmatics Scheduler components (done from the Programs and Features screen in Windows)
  5. Run the new Taskmatics.Scheduler installer. You will need to re-enter the information you collected in step 3 to make sure that all the original permissions and mappings are re-used.

Due to the nature of the performance improvements and the significant amount of database changes that are in this update, it is possible, depending on the size of your database, that installation of the new version could take upwards of 30 minutes (if you have millions of task instances in your DB). Making the upgrade path smoother for our users is on the top of our roadmap so we don’t expect this to be the norm going forward.

More to Come

This is just the beginning of the changes that we have in store. Thanks again for using Taskmatics Scheduler, and we hope you enjoy working with the new version even more. Let us know what features you’d like to see in subsequent versions of Taskmatics Scheduler as we’re always looking for ideas from the community.

Where was that download link again?

In case you missed it at the top, to get the 1.1.0.0 installer, go here!

How Asynchronous Operations Can Reduce Performance

It’s well known that asynchronous programming can improve application performance. Developers writing Windows applications are trying to keep the UI thread responsive as work is performed in the background while website and web API developers attempt to maximize thread availability for processing new requests. Like any technique, however, asynchronous programming can produce results that are contradictory to the expectation. Without some forethought and care into how work is offloaded onto threads, developers can unwittingly put areas of application performance at risk. In this post I want to talk about the different types of asynchronous work, how the operating system schedules this work and how to determine when creating new threads should be created in your application when initiating asynchronous operations.

Types of Asynchronous Operations

Most of the functionality we code into our applications fall into one of two main categorizations: Compute-Bound operations are those in which calculations are done that are time consuming to perform. Examples of this include sorting a large array or calculating the first 1000 digits of pi. In these cases, the CPU is responsible for completing the work but with most every modern machine supporting multiple cores it benefits the application to continue processing the main thread while these calculations are taking place on a second core. For these scenarios, it’s worthwhile to offload the work to a thread pool thread to improve the application’s performance. We can do this by using the Task class to schedule the work on the thread pool as in the example below:

private static Task<double> DoHeavyCalculationsAsync(int length)
{
   return Task.Run(() => {
	//do heavy calculations on another thread
   });
}

Most of the other operations we think of as time consuming, such as reading and writing to files, database operations and calls made over the network are classified as I/O-bound operations. At first blush, I/O operations seem like the perfect candidate for offloading to a new thread pool thread. They can be unpredictable in the time it takes for the operations to complete so therefore they will likely cause the application to lose responsiveness. By moving this logic to another thread, the main thread can continue to perform its work and all is right with the world, or so it seems. In reality, because of the way that the thread pool schedules work to be performed, the truth of the matter is that performance can actually be worsened rather than improved. Let’s look at a simple example:

private static Task<byte[]> ReadFileAsync(string path)
{
    return Task.Run(() =>
    {
        Byte[] bytes;
 
        using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
        {
            bytes = new byte[stream.Length];
            stream.Read(bytes, 0, (int)stream.Length);
        }
 
        return bytes;
     });
}

Thread Management under the Hood

While it’s true that this code will not block the calling thread, it’s not doing the application any favors in terms of overall performance. All .NET applications are allocated two sets of threads onto which it can queue work, and the number of these threads that are allocated to the process as of .NET 4 varies based on a number of factors, including the resources available on the machine running the application. The first set contains worker threads that can execute work in parallel with the main thread of the application. This is the only group of threads outside of the main application thread that the Task class in the .NET 4 framework can queue work onto. The second set of threads are known as I/O completion threads. These are threads that are requested by the CLR when the OS has completed asynchronous I/O operations like reading file contents, making driver calls or making calls over a network. When an I/O operation is complete, the completion port bound to that operation will notify the CLR, which will obtain a thread from this set on which to execute the callback of the asynchronous operation. The problem with offloading I/O operations to another thread using Task.Run is that it results in swapping one thread for another when the OS can perform asynchronous I/O without blocking a thread. In the example above if you had a loop that was reading the contents of all files within a fairly large folder tree, you could easily have 100 or more threads waiting for their respective files to finish being read. This will tax the application for sure because the thread pool will have to create and destroy a large number of worker threads to handle the volume of files being processed, and eventually this can cause resource contention on the machine since each thread takes a minimum of one megabyte of RAM. About the only thing that will be going to plan is that the UI will be responsive, at least until the system starts to go into resource starvation. So how can we make use of the I/O completion threads in our applications if the .NET thread pool is exclusively tied to queuing operations on worker threads?

Looking Inward

While it is possible for .NET developers to queue work onto these completion ports using some rather obscure methods in the ThreadPool class and native overlapped structures, it’s quite unlikely you’ll ever really need to. Luckily, the framework is already chock full with many methods that implement this I/O pattern for you. Any methods that implement the Asynchronous Programming Model (APM), characterized by the Begin and End method pairs, or the more recent Task-based Asynchronous Programming (TAP) paradigm. These methods have been programmed to use the native calls that schedule work to be run on the I/O completion threads. The result is that no thread is held up waiting for the I/O to complete, and the overall application performance will be better for it. Here is our example from above, rewritten to use built in TAP methods that use the I/O completion threads:

private static async Task<byte[]> ReadFileAsync(string path)
{
    var bytesRead = 0;
    byte[] fileBytes;

    using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
    {
        fileBytes = new byte[stream.Length];
        bytesRead = await stream.ReadAsync(fileBytes, 0, (int)stream.Length).ConfigureAwait(false);
    }
 
    return fileBytes;
}

In the above example, we aren’t creating a new thread to offload the work onto, but simply calling the ReadAsync method of the Stream class. When the await statement is hit, the current execution is returned to the caller and the work is queued on the OS, freeing the current thread to perform other work. When the file read is complete, a thread from the pool is re-assigned to handle the remaining work after the await.

Summary

By differentiating between CPU and I/O operations in your application and a little digging into the classes surrounding I/O operations, you can determine when it is truly beneficial to offload work to the thread pool or whether to let to the framework asynchronous methods do the heavy lifting instead.

Going with the Flow: Simplifying Producer/Consumer Processing with TPL Dataflow Structures

In programming, even simple looking requirements can foreshadow some of the most complex code developers can create. Often the complexity is driven by factors that we, as developers, need to consider that the business defining the requirements do not. These factors normally include performance metrics like throughput, latency and demand on system resources. This article will explore how quickly the complexity of a simple task can grow, and how the TPL Dataflow library helps to tackle the complexity with an API that builds on the Task-based Asynchronous Pattern (TAP) that was introduced in .NET 4.5. To demonstrate the power of the Dataflow library, we’ll show how a completely custom asynchronous parallelized workflow that would likely have been over 1,000 lines of code can be condensed to less than 150 lines.

A Tangled Web Woven

Your average day as a developer extraordinaire starts off with a glance at your task queue. Consider the following innocuous looking task:

Process any newly placed orders to our shipping services and update those orders with tracking numbers.

At first glance this is seems pretty straightforward. We can just create a class that retrieves new orders as they are placed, loop over each order sending to the shipping services and finally save off the updated order information to the database. Well it’s clear you’ve earned your paycheck today, and you take a coffee break to celebrate. As you sip your invigorating beverage, you begin to think about the performance implications of your design. Maybe it’s the coffee talking, but you see some clear issues with your original solution:

  • The shipping service takes in multiple orders at once, so sending them in one at a time is inefficient use of the network, but sending them all at once will take some time, which will hold up resources until completed. We will need to batch the orders with a reasonable size. Also, if multiple shipping providers are supported, we may need to route the orders to more than one shipping service.
  • Our class that takes in newly placed orders will have to employ some locking mechanism to make sure that the internal queue of orders is managed in a thread safe manner. If there are a lot of orders, processing these one by one may also cause the queue to grow larger and larger, eventually putting pressure on memory resources.
  • Once all of the orders have tracking numbers, the DB has to be updated. This could take time and there’s no reason that other orders shouldn’t be sent to the shipping services while these orders are being saved to the database. Idle hands are the devils playthings after all.

Immediately our simple solution now involves batch queuing, concurrent processing and coordination of threads through synchronization objects. The full weight of how much code you’ll actually have to write is upon you. It’s not that any of these problems are insurmountable, but the amount of code that will be generated, time spent testing and the complex, error prone nature of the code will end up determining how long it will take to put a get a solution like this into production. Looks like it will be a long day, better refill that coffee!

The Power of DataFlow

The TPL Dataflow library is essentially a set of components, called blocks, which serve a specific role in producer/consumer systems. What’s great about these blocks is that they are all built on top of the Task Parallel Library, which means that they support the new async/await pattern for asynchronous programming. This makes our process more efficient with its use of threads and system resources while making the code to accomplish asynchronous functionality easier to read and maintain. In addition to async support, the blocks can be linked to each other to create entire task workflows from simple pipelines to complex network flows. The blocks and the linkages between them expose settings that allow you to configure how each component will ingest, process and dispatch the data that it receives. This eliminates the need to write verbose code to synchronize the various stages of the workflow yourself. The blocks provided by the library fall into three main categories:

  • Buffering Blocks: These blocks essentially receive and store data so that it can be dispatched to one or more other target blocks for processing. All of these blocks guarantee order in message delivery, with the exception of the WriteOnceBlock<T> which essentially only allows the first message received to be dispatched to its linked targets.
  • Execution Blocks: Blocks that fall into this category take in a specified delegate that is run on each data object that is dispatched to it. The execution of the delegate is done asynchronously, which means that as data is received by these classes, the delegate is scheduled by the task scheduling engine of the TPL to be processed on a thread pool thread. All of these classes will by default process only one data element at a time, buffering the other data until the previous one completes, but each class has settings that allow you to specify the number of data objects that can be processed in parallel and the internals of the class handle the rest.
  • Grouping Blocks: The classes that comprise this category tackle the need to aggregate data elements from one or more data sources. This can be done either by grouping data into sets to be processed as a batch, or synchronizing multiple outputs from other tasks running asynchronously to be processed by another component.

If one of the blocks provided by the library doesn’t quite fit the bill, you can build your own custom component. The Dataflow library exposes three main interfaces that can be implemented in order to make your custom component compatible and composable with the built-in blocks. ISourceBlock<TOutput> is for components that will buffer and/or dispatch data elements to one or more target blocks, ITargetBlock<TInput> enables implementations to recieve and process, and IPropagatorBlock<TInput, TOutput> is for components that will act as both a source and a target component. If you want to build a wrapper class that utilizes an existing source and target component, the Encapsulate<TInput, TOutput> method can generate a propagator for you to simplify your implementation. While we won’t be covering each of the blocks of the library in depth, we will touch on a few of these blocks in more detail as we build our example.

One interesting side-note about the TPL Dataflow library that Microsoft makes very clear is that it is not packaged as part of the .NET 4.5 framework. This means that you won’t have access to these classes without importing the library. This is easily done, however, with the Nuget Package Manager by either searching the online gallery for “TPL Dataflow” or in the CLI by typing “Install-Package Microsoft.Tpl.Dataflow”.

Let’s Meet Our Contestants

Let’s go back to our requirement, now armed with knowledge of the Dataflow library, and compose a pipeline that represents how we can get orders updated with tracking numbers as quickly and efficiently as possible. All of the code fragments shown below are meant to be shown in the context of the block that is being explained, but you can download the fully working demo and play with it yourself. The full network of steps we’ll be implementing will look something like this:

diagram

The solution will have a single point of input which can then be broadcast to one of two shipping pipelines corresponding to the UPS and FedEx supported carriers. Each shipping pipeline will consist of a batching stage to group a set or orders and then a processing stage to send those orders to the shipping service and receive the response. Finally, the responses will be threaded off to be saved to the database. Now let’s take a look at the Dataflow blocks that we will use to build this solution:

BufferBlock<T>

This block is part of the Buffering category and essentially serves as an asynchronous data store. Its sole task is to receive data and asynchronously dispatch it to the one or more linked targets that are connected to it. Since our orders can be coming in from multiple threads, this block eliminates our need to manage a thread safe queue. By default, BufferBlock<T> is unbounded, which means that it can store as many data elements as we can throw at it. This is fine if you expect that your system will be able to consume at a rate that outpaces the orders needing to have tracking numbers generated, but if consumption falls behind production, you could end up growing a large in memory buffer, which will eventually cause pressure on system memory. Buffering blocks take in an instance of DataflowBlockOptions that allow us to set the capacity of the internal queue. The caveat to setting a BoundingCapacity of let’s say 50, is that if for some reason the buffer is at capacity, the next call to the Post method will immediately return false without adding the item to the buffer. This semantic is common to all target blocks in Dataflow and can cause unexpected behavior in your application if you are not aware of it. To guarantee that all items get into the buffer, even if the buffer is temporarily full, you can use the SendAsync method and either block (using the Wait method) or await the returned Task. We instantiate our buffer simply:

var orderBuffer = new BufferBlock<Order>();

In this example, we’re going to buffer our Order objects to be sent over to the shipping service to be given tracking numbers. The Order class is simple and just contains a list of OrderItem objects that contain a SKU. Since orders can be split into multiple shipments depending on availability of the items on the order, the tracking number will actually be assigned to each OrderItem by the shipping services. In this example, we are using the default settings, which means that the OrderBuffer can hold an unlimited number of Order objects.

BroadcastBlock<T>

BroadcastBlock<T> is another block in the Buffering category of the Dataflow library. It differs from BufferBlock<T> in one critical way. By default, BufferBlock<T> will remove a message from the queue when the first target in its list of targets accepts that message. This means that if you need to send the same message to multiple targets, BufferBlock<T> won’t be a good candidate because it will send the message to the first linked target and then remove that message from the queue and proceed to the next message. BroadcastBlock<T> takes in a delegate that dictates how the data element will be cloned before it is passed to each target. This is important to consider because passing in something as simple as mydata => mydata means that each target will get the same reference to the data element. If any of these components, which could be in different processing pipelines, were to modify the object, it could have unintended consequences in the other components and lead to strange behavior. Since we can have multiple carriers, we’ll want to broadcast each order to the appropriate carrier processing pipeline based on the carrier that was specified on the Order object. We create our BroadcastBlock and link it to our BufferBlock:

var broadcaster = new BroadcastBlock<Order>(order => order);
orderBuffer.LinkTo(broadcaster);

You’ll notice here that indeed I’ve told the BroadcastBlock<T> to just go ahead and send the same Order reference to each target that I might link to it going forward. In my case, this will be OK because I’m going to be filtering the messages that go to each target so that only the proper pipeline will be able to process each message. Read on to see how that’s done.

BatchBlock<T>

Our shipping service can take in more than a single order at a time, and since the service requires a network call, processing each order sequentially increases network traffic and potentially the latency of the system. The BatchBlock<T> block is one of the Grouping blocks offered by the Dataflow library and can aggregate data elements automatically from one or more sources until it reaches the specified size. Once this happens, it will flush the batch of items all at once to any linked target. We’ll put one BatchBlock in place for each carrier and for this example we’ll batch 5 orders at a time for each one. This means that as orders come in, we’ll be queuing batches of 5 orders separately to be processed by the UPS or FedEx endpoint of the service.

One caveat to keep in mind for the BatchBlock<T> block is that it will only flush it’s queue when the batch limit is reached OR if it’s Complete method has been called, which tells the BatchBlock<T> not to expect any more data elements to be sent to it. So if you happen to be developing a windows service or other long running process that uses a batch block to send orders in batches of 100, and the orders trickle in quite slowly, the time before the batch will flush is indeterminate. This could result in orders not being processed for shipping for quite some time as the batch limit was not reached. To counter this, it may be beneficial to create a timer that can elapse after some predetermined period of inactivity that will flush the batch block out using the TriggerBatch method so as to guarantee timely processing. Let’s create our BatchBlock block instances and link them as targets to the BroadcastBlock<T> from above:

var upsBatcher = new BatchBlock<Order>(5);
var fedexBatcher = new BatchBlock<Order>(5);

broadcaster.LinkTo(upsBatcher, order => order.Carrier == CarrierType.Ups);
broadcaster.LinkTo(fedexBatcher, order => order.Carrier == CarrierType.Fedex);

In this example, we can see that each BatchBlock takes in the limit for how many items to group before flushing to its targets. Also, we link the BroadcastBlock to both BatchBlock instances using the LinkTo method. One thing to point out here is the lambda being passed to the LinkTo method for our BatchBlock instances. The LinkTo method supports filtering data messages from the source block to the target blocks by taking in a Predicate function. In our predicate above, we’re dictating that only orders with the Carrier property set to CarrierType.Ups will be sent to the upsBatcher BatchBlock class. This gives us an easy way to control how data flows to the targets without having to put complex if/else logic or a factory pattern into each of the downstream execution blocks.

Remember that order matters when you link targets to a source block. With the BroadcastBlock<T> we’re sure that all targets will get a chance to process the same message if the target passes the predicate, but with other Buffering blocks like the BufferBlock<T>, this is not the case, so if the predicate isn’t specific enough, you risk having the message sent to the wrong target. Also, note that in our example above, if we add support in the future for DHL, and fail to add another batcher to our target list for the broadcaster, the system will come to a complete stop because BroadcastBlock<T> must process messages in order and it can’t find a target to accept the data, which means that orders start backing up and either deplete your memory resources or cause your order processing to block.

TransformManyBlock<TInput, TOutput>

Now that we have a way to buffer our orders and batch them per carrier, we need to handle sending the orders to the shipping service to have tracking numbers assigned to each item on those orders. The shipping service will return results for multiple orders as well, but we’ll want to update each order to the database one at a time. To accomplish this, we’ll use the TransformManyBlock<TInput, TOutput>. This block is in the Execution category, which means that it can take data from one or more sources and execute a specified delegate on that object before dispatching the result to one or more targets. This particular block is very similar to the TransformBlock<TInput, TOutput> with one exception: The TransformManyBlock<TInput, TOutput> expects that the return type of the delegate be an IEnumerable<TOutput>, but when dispatching the output to the target blocks, it will send each item in the enumerable list one at a time. What this means is that even though our shipping service will be sending back one XML document with tracking numbers on items for multiple orders, our target component can just accept a single OrderItem and the block itself will handle the iterative dispatching of the Enumerable results to the target.

As with our BatchBlock<T>, we’ll need one TransformManyBlock<TInput, TOutput> per carrier, as each carrier has a different endpoint on our shipping service. We wire everything up below:

var upsProcessor = new TransformManyBlock<Order[], ItemTrackingDetail>(orders => PostToCarrierAsync(CarrierType.Ups, orders));
var fedexProcessor = new TransformManyBlock<Order[], ItemTrackingDetail>(orders => PostToCarrierAsync(CarrierType.Fedex, orders));
upsBatcher.LinkTo(upsProcessor);
fedexBatcher.LinkTo(fedexProcessor);

// Simulates transformation of a list of Order objects into a service api model (ShipDetail).
private static List<ShipDetail> CreateShipDetails(IEnumerable<Order> orders)
{
    var shipDetails = orders.Select(order =>
          new ShipDetail
           {
              ShipId = order.OrderId,
              Items = order.Items.Select(item =>
                  new ShipItemInfo
                  {
                     Sku = item.Sku,
                  }).ToList()
           });

    return shipDetails.ToList();
}

// Sends orders to a shipping service endpoint dependent on the specified carrier.
private static async Task<IEnumerable<ItemTrackingDetail>> PostToCarrierAsync(CarrierType carrierType, Order[] orders)
{
    var shipDetails = CreateShipDetails(orders);

    var client = new System.Net.Http.HttpClient();
    var response = await client.PostAsync("http://localhost:9099/api/" + carrierType.ToString(), shipDetails, new JsonMediaTypeFormatter());
    if (!response.IsSuccessStatusCode)
         throw new ApplicationException("Exception in shipping processor: " + response.ReasonPhrase);

    var results = await response.Content.ReadAsAsync<List<ItemTrackingDetail>>();
    return results;
}

Our first task is to initialize the TransformManyBlock<TInput, TOutput> instances and designate the delegate we want the block to run. In our case, we can reuse the same method because we’re also passing the carrier as a parameter, which will ensure that we can hit the appropriate endpoint on our shipping service. Notice that the PostToCarrierAsync method returns a Task<IEnumerable>. All execution blocks can perform synchronous or asynchronous execution depending on whether the delegate parameter returns an instance of a TOutput, or a Task<TOutput>. Since we’re using the asynchronous form of the delegate specification, the block will release its current thread back to the thread pool while we await the response from the service and then resume execution once the response is received. This allows the execution thread to be used for other task processing and results in a more responsive, efficient system. Best of all, the Dataflow library classes can manage the asynchronous calls while still being able to coordinate the data to and from other components your pipeline.

ActionBlock<T>

Our last task is to update the orders to the database once the items on those orders have been assigned tracking numbers. Since the persistence step is the last step in our flow, we’ll be using another block from the Execution category of blocks, the ActionBlock<T> class. ActionBlock<T> can really be thought about as the Parallel.ForEach of the Dataflow library. It is very similar in function except that it also supports the composability of linking to one or more source blocks as well. One thing that differentiates ActionBlock<T> from other execution blocks like the TransformBlock<TInput, TOutput> or TransformManyBlock<TInput, TOutput> is that ActionBlock<T> is a target only block. It can only be linked to and does not support being linking to other blocks. In a flowchart, it is a terminator node. For our example, since the persistence to the database is independent of the carrier flow that gets the tracking numbers, we’ll be linking our ActionBlock to both carrier processing flows. Let’s take a look at the code:

var asyncPersistAction = new Func<ItemTrackingDetail, Task>(PersistToDatabase);

var storageProcessor = new ActionBlock<ItemTrackingDetail>(asyncPersistAction, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

upsProcessor.LinkTo(storageProcessor);
fedexProcessor.LinkTo(storageProcessor);

// Simulates persistence of tracking numbers for each item to a database.
private static async Task PersistToDatabase(ItemTrackingDetail itemTrackingDetail)
{
    // ...  your DB code here
    //Simulate updating the order to the database.
    await Task.Delay(50); 
}

Notice that the Input type of the ActionBlock<T> is ItemTrackingDetail. Because it is linked as a target of a TransformManyBlock<TInput, TOutput>, the Enumerable output from the transform will be iterated from within that block and sent as individual objects to the ActionBlock<T>, reducing even further the amount of code we have to write to iterate the data ourselves. The other thing to notice is that the ActionBlock<T> constructor is being passed an instance of ExecutionDataflowBlockOptions that specifies a MaxDegreeOfParallelism of three All execution block classes can be passed an instance of the ExecutionDataflowBlockOptions class that lets you specify how the TPL will parallelize and schedule the delegate to run. In our case, we are specifying that at most three calls to the database persistence delegate can be run in parallel at any given time. Similar to the TransformManyBlock<TInput, TOutput>, the persistence delegate also makes use of the asynchronous delegate overload, which maximizes the productivity of the thread pool. The linking stage is straightforward as well, with both transform block instances simply linking to the new ActionBlock<T> instance.

You Complete Me

Since each Dataflow block component is technically running code on the thread pool at one time or another throughout the course of the pipeline, it’s necessary to let each component know that it can finish up what it’s doing and release its resources for good. All DataflowBlocks have a Complete method that will prevent additional data from being received. Once complete is called, any data elements that are in the block’s buffer are processed and dispatched to any linked targets, upon which the block ceases its execution and releases any thread pool resources it was holding. Since all Dataflow blocks run asynchronously, they expose a Completion property that returns a Task object representing the execution state of the block instance. We can use this to set continuation delegates on all components in our Dataflow network such that they can all complete at the proper time.

In our example, the BufferBlock<T> needs to call the Complete method of the BroadcastBlock<T> when it has finished processing. The BroadcastBlock<T> has to notify both BatchBlock<T> instances (one per carrier) when it’s done. At the tail end, the ActionBlock<T> has two sources, so it can’t have its Complete method called until both of those sources have completed. We can represent these continuations right after we instantiate and link our components:

orderBuffer.Completion.ContinueWith(t => broadcaster.Complete());
broadcaster.Completion.ContinueWith(t =>
{
   upsBatcher.Complete();
   fedexBatcher.Complete();
});

upsBatcher.Completion.ContinueWith(t => upsProcessor.Complete());
fedexBatcher.Completion.ContinueWith(t => fedexProcessor.Complete());

Action<Task> postOrderCompletion = t =>
{
   Task.WaitAll(upsProcessor.Completion, fedexProcessor.Completion);
   storageProcessor.Complete();
};

upsProcessor.Completion.ContinueWith(postOrderCompletion);
fedexProcessor.Completion.ContinueWith(postOrderCompletion);

By declaring your completions as continuations, your shutdown process simply needs to be two lines:

orderBuffer.Complete();
storageProcessor.Completion.Wait();

The first line tells the BufferBlock<T> that there will not be any more orders to process, and the second line waits on the completion of the ActionBlock<T> that performs the database persistence. You don’t have to use continuations, you can wait on each block in sequence it really is just a matter of personal preference. Our flow can be managed sequentially by modifying our shutdown process to look as follows:

orderBuffer.Complete();
orderBuffer.Completion.Wait();

broadcaster.Complete();
broadcaster.Completion.Wait();

upsBatcher.Complete();
fedexBatcher.Complete();
Task.WaitAll(upsBatcher.Completion, fedexBatcher.Completion);

upsProcessor.Complete();
fedexProcessor.Complete();
Task.WaitAll(upsProcessor.Completion, fedexProcessor.Completion);

storageProcessor.Complete();
storageProcessor.Completion.Wait();

The Taskmatics Connection

The example requirement above could have been implemented as a recurring task that pulls in all new orders that were placed since the last time the task executed. Using the Dataflow library can help you simplify complex pipelines or execution logic into linked Dataflow blocks that can help keep resource usage down and significantly reduce the amount of code that needs to be written and tested when creating custom tasks for the Taskmatics Scheduler. The scheduler itself utilizes the Dataflow library internally to manage its own complex flows for features like Live Tracing and task status notifications.

Being a distributed system, the Taskmatics Scheduler relies on constant communication between the Agent processes running the tasks and the Coordinator component dispatching the tasks. The Coordinator relays all incoming messages both to the administration website clients that are connected as well as to a central data store for historical purposes. The ability to efficiently transmit these messages over the network between processes while maximizing the responsiveness of the application is critical to the overall performance of the system. To that end, we employ batching of messages over the network and asynchronous persistence to the data store using the BatchBlock<T> and ActionBlock<T> to simplify our logic and make it easier to maintain the code around that functionality.

Wrapping It Up

The classes of the TPL Dataflow library can be used to greatly simplify the amount of code needed to create large and complex asynchronous processes without the need to write all of the synchronization logic between the moving parts. Also, it reduces the amount of testing and debugging that has to be done due to having lots of complicated code to control the flow of data from start to finish. With the compound time savings that you get from using the Dataflow blocks, you might be able to get that complicated requirement done by lunchtime. Now go enjoy some coffee, you’ve earned it!

A fully working demonstration of the example pipeline above can be downloaded from here. it includes some basic instrumentation to measure processing time. Feel free to experiment with different parallelization and other settings on the various blocks to see how it can improve or reduce efficiency of the system as a whole. For more information on the Dataflow Library, and the Task-based Asynchronous Pattern, see the following links:

MSDN TPL Dataflow documentation
MSDN Task-based Asynchronous Pattern Documentation
My previous post showing the advantages of async/await to improve application performance

Black Friday Madness – Musings of an E-Commerce Developer

As I sit here typing this post, somewhere close to 30 million Americans are pushing, shoving and otherwise cramming themselves into retail stores after a day of gluttony to partake in the ritual of Black Friday. It’s the one magical day where many retailers experience the largest profits of the year and offer some great deals in order to get more shoppers into their stores for the frenzy of frivolity. Whether it’s brick and mortar stores or their online counterparts, the overarching goal is a common one: throughput. The premise here is quite simple in that the more people you can get into your store looking at merchandise and the more checkout lanes you have open, the more customers you can process in a given period of time and the more money you can potentially make.

For your local retail stores, this means many things. Product placement is critical, with highly popular products usually placed deeper in the store so that you are at increased odds for more impulse purchases of fantastic deals on items you normally wouldn’t buy at all. Also, it’s important for stores to have a flow that can accommodate a mass of customers desperate for retail therapy. Aisles that contain popular products are wide and a conduit that circumnavigates the store is kept clear of debris at all times. I personally think it would be an interesting and amusing application for the Ford-Fulkerson algorithm to maximize the flow of customers through a store, placing higher weights on aisles with more tempting goodies or doorbuster sales.

The Reckoning Approaches

Online stores take a different approach to throughput. For these virtual storefronts, the more people that can hit their sites translate into more sales. That means they need to be able to support intense loads at key sale times. So how can developers for online retailers prepare their sites for the digital onslaught of the Black Friday/Cyber Monday one-two punch? For this article I will be focusing on the use of the new async and await keywords as a way to improve throughput on websites. The async and await keywords are part of the Task-based Asynchronous Pattern (TAP) that was introduced with the release of C# 5.0. It’s not that async/await allow you to do things that weren’t possible before. Asynchronous patterns in .NET have existed since framework version 2.0 with the Asynchronous Programming Model (APM) that brought forth methods like BeginInvoke/EndInvoke and the infamous IAsyncResult. These new keywords just make it much easier to implement and make the code involved much more maintainable as a result. I also want to make it clear that async/await is not a magic bullet and is only part of the total solution. Other throughput improving techniques like output caching and load balancing are still just as important in squeezing the most juice out of your servers.

Let’s analyze a typical ASP.NET web request process. When a page request is received, a thread is dispatched to handle that request. The thread is held while the server processes and builds the response, and once the response has been returned to the client, the thread is returned to the pool to be used for another request. It’s mind numbingly simple, but there are some gotchas. A worker process has a finite number of threads that can be dispatched due to the fact that resources are limited on the machine and at some point as the thread count becomes too high the cost of context switching begins to negatively affect performance as well. If more requests come in than there are threads to process those requests, the users will not be able to get to the site and will quickly get frustrated, reducing the overall sales numbers. The problem is that some requests take longer to process because they just do more stuff, and the stuff that I’m talking about here specifically are non-CPU bound tasks. Consider that you’re shopping at your favorite online store and you are in the process of checking out. After you fill in all of the pertinent information and hit the purchase button you have to incur at least one database call, a service call for payment processing and maybe the sending of a confirmation email of the order all before the response is sent to the client. In a traditional ASP.NET MVC website, this can look something like the following:

    public class CheckoutController : Controller
    {
        private readonly IPaymentProcessor _paymentProcessor;
        private readonly IOrderRepository _orderRepository;
        private readonly IEmailGenerator _emailGenerator;

        public CheckoutController(IPaymentProcessor paymentProcessor, IOrderRepository orderRepository, IEmailGenerator emailGenerator)
        {
            _paymentProcessor = paymentProcessor;
            _orderRepository = orderRepository;
            _emailGenerator = emailGenerator;
        }

        public ActionResult ProcessPurchase(OrderViewModel orderData)
        {
            orderData.ValidateData();

            ProcessPaymentAndSave(orderData); //make various service calls (this can take some time)

            return View("Confirmation", orderData);
        }

        private void ProcessPaymentAndSave(OrderViewModel orderData)
        {
            //collect the payment information
            orderData.PaymentDetails.AuthorizationCode = _paymentProcessor.ProcessPayment(orderData.PaymentDetails);

            //save the order to the database
            orderData.OrderId = _orderRepository.Save(orderData);

            //generate and send a confirmation email.
            _emailGenerator.SendConfirmationEmail(orderData);
        }
    }

These types of instructions are I/O bound and can take a fair amount of time to complete, and the thread that is processing these requests has to wait for all of them to finish before being able to process another request.

Defending against the Horde

As I mentioned above, asynchronous patterns in .NET have been around for quite a while and they center around the idea that calls can be made asynchronously by breaking them into two parts, the beginning (call) part, and the end (callback) portion. Using this model the webserver still dispatches a thread to process the incoming request, but as the asynchronous call is made, the thread is returned to the pool to do process other incoming requests. Once the call has been completed ASP.NET is notified and the callback is queued onto the threadpool (most likely a different thread than the call was sent on, I might add) to pick up where it left off. This allows the webserver to be more efficient with thread management and more requests can be handled simultaneously as a result. It must be noted that to get the benefits of true asynchronous calls, it’s not sufficient to simply make the entry point asynchronous but rather to refactor the underlying long running I/O operations to also use their asynchronous counterparts. As of .NET 4.5, many classes in the framework have been augmented to include asynchronous methods that return Task or Task objects as opposed to the earlier asynchronous model of calling into the Begin/End methods provided on those classes. This means that if we are sending info to a payment gateway using HttpWebRequest, that we use the GetResponseAsync method rather than the synchronous GetResponse or the BeginGetResponse and EndGetResponse combination. In MVC3 and MVC4, there has been a convention for enabling asynchronous behavior, but it involves some significant refactoring and can be detrimental to readability and the flow of what’s actually happening because of all of the separation. Here’s our checkout process example written out in MVC4:

    public class CheckoutController : AsyncController
    {
        private readonly IPaymentProcessorAsync _paymentProcessor;
        private readonly IOrderRepositoryAsync _orderRepository;
        private readonly IEmailGeneratorAsync _emailGenerator;

        public CheckoutController(IPaymentProcessorAsync paymentProcessor, IOrderRepositoryAsync orderRepository, IEmailGeneratorAsync emailGenerator)
        {
            _paymentProcessor = paymentProcessor;
            _orderRepository = orderRepository;
            _emailGenerator = emailGenerator;
        }

        public void ProcessPurchaseAsync(OrderViewModel orderData)
        {
            orderData.ValidateData();

            ProcessPaymentAndSaveAsync(orderData); //make various service calls (this can take some time)
        }

        public ActionResult ProcessPurchaseCompleted(OrderViewModel orderData)
        {
            return View("Confirmation", orderData);
        }

        private void ProcessPaymentAndSaveAsync(OrderViewModel orderData)
        {
            AsyncManager.OutstandingOperations.Increment(3);

            //collect the payment information
            _paymentProcessor.ProcessPaymentAsync(orderData.PaymentDetails)
                .ContinueWith(paymentCollectionResult => 
                    {
                        orderData.PaymentDetails.AuthorizationCode = paymentCollectionResult.Result;
                        AsyncManager.OutstandingOperations.Decrement();

                        //save the order to the database
                        _orderRepository.SaveAsync(orderData)
                        .ContinueWith(saveResult => 
                        {
                             orderData.OrderId = saveResult.Result;
                             AsyncManager.OutstandingOperations.Decrement();

                            //generate and send a confirmation email.
                            _emailGenerator.SendConfirmationEmailAsync(orderData)
                                .ContinueWith(emailResult => 
                                {
                                    AsyncManager.OutstandingOperations.Decrement();
                                });

                        });

                        AsyncManager.Parameters["orderData"] = orderData;
                    });
        }
    }

   internal class SampleOrderRepository : IOrderRepositoryAsync
    {
        public Task<object> SaveAsync(OrderViewModel orderData)
        {
            var connectionString = ConfigurationManager.ConnectionStrings["orderDatabase"].ConnectionString;
            using(var connection = new SqlConnection(connectionString))
            using(var command = new SqlCommand("spInsertOrderData", connection))
            {
                connection.Open();

                //...  building parameters from the orderData not shown here for simplicity

                return command.ExecuteScalarAsync();
            }
        }
    }

As you can see the code is very different from its original form. MVC has a special controller type to enable asynchronous processing and each action that will be called asynchronously has to be created with Async and Completed conventions so that MVC knows how to handle the flow of the asynchronous call. Also, the AsyncManager class is used to track the outputs of the long running portions of code so that they can be passed into the callback once all of the data is aggregated. Also, we’ve converted all of our service interfaces to use asynchronous versions of their calls. An example of that is shown in the SaveAsync method of the SampleOrderRepository class. Instead of using the ExecuteScalar method, we call ExecuteScalarAsync, which returns a Task<object> instead of just object. The Task class serves as a representation of an asynchronous operation, but it also contains information about the operation taking place, like the current status of the operation and the result of the operation. The class also provides a set of continuation methods that allow for the specification of logic that should happen when the operation completes. The Task class sits on top of the Task Parallel Library and makes use of the new threadpool implementation from .NET 4.0 to efficiently schedule when the work takes place.

With the advent of the async and await keywords in combination with the Task class, the simplified syntax lets the compiler know that you want to create an asynchronous call with a callback method, and it can do the heavy lifting on its own which makes your code significantly simpler and easier to read and understand. Let’s take a look at how to accomplish the same asynchronous checkout process using the async/await keywords:

    public class CheckoutController : Controller
    {
        private readonly IPaymentProcessorAsync _paymentProcessor;
        private readonly IOrderRepositoryAsync _orderRepository;
        private readonly IEmailGeneratorAsync _emailGenerator;

        public CheckoutController(IPaymentProcessorAsync paymentProcessor, IOrderRepositoryAsync orderRepository, IEmailGeneratorAsync emailGenerator)
        {
            _paymentProcessor = paymentProcessor;
            _orderRepository = orderRepository;
            _emailGenerator = emailGenerator;
        }

        public async Task ProcessPurchase(OrderViewModel orderData)
        {
            orderData.ValidateData();

            await ProcessPaymentAndSaveAsync(orderData); //make various service calls (this can take some time)

            return View("Confirmation", orderData);
        }

        private async Task ProcessPaymentAndSaveAsync(OrderViewModel orderData)
        {
                //collect the payment information
                orderData.PaymentDetails.AuthorizationCode = await _paymentProcessor.ProcessPaymentAsync(orderData.PaymentDetails);
                //save the order to the database
                orderData.OrderId = await _orderRepository.SaveAsync(orderData);
                //generate and send a confirmation email.
                await _emailGenerator.SendConfirmationEmailAsync(orderData);
        }
    }

    internal class SampleOrderRepository : IOrderRepositoryAsync
    {
        public Task<object> SaveAsync(OrderViewModel orderData)
        {
            var connectionString = ConfigurationManager.ConnectionStrings["orderDatabase"].ConnectionString;
            using(var connection = new SqlConnection(connectionString))
            using(var command = new SqlCommand("spInsertOrderData", connection))
            {
                connection.Open();

                //...  building parameters from the orderData not shown here for simplicity

                return command.ExecuteScalarAsync();
            }
        }
    }

The first thing to notice here is that our MVC code doesn’t look so fragmented anymore. It’s quite representative of the synchronous model. As before, we’re using our asynchronous implementations of the services that perform our long running processes. There are three additional callouts here that all work together to make the call happen smoothly. The first is that the Action method now returns Task instead of just ActionResult. As we mentioned before, the Task class represents an asynchronous operation that in this case promises to return an ActionResult object. Secondly, The async keyword is also added to the action method signature. Adding this keyword allows the await keyword to be used within the body of the method. It’s a way of telling the compiler that we will be making some asynchronous calls there. Finally, the await keyword distinguishes the calling portion of the code from the callback. To the compiler, when the await keyword is encountered, it immediately returns from the method after that line. Everything after the await and before the end of the method is treated as code that will run after the async call has returned. Think of everything below the await keyword as an in-line callback, because the compiler will be creating a callback delegate behind the scenes using this logic. Those simple changes allow our checkout process to be more scalable, process more orders and therefore bring in more money.

That’s A Wrap

The Taskmatics Scheduler makes judicious use of the new async and await keywords in the administration website. In order to ensure that the UI is responsive when retrieving a lot of data, we make data retrieval calls asynchronously so that we can retrieve more data concurrently and therefore display the screen to the users as fast as possible. For us, using async and await translates into a better user experience when administering tasks through the website, and if you’re a developer for a major retailer’s online store this year, it could mean enjoying more delicious thanksgiving leftovers knowing that your site has an upper hand against the throngs of shoppers looking to cash in on the savings.