Going with the Flow: Simplifying Producer/Consumer Processing with TPL Dataflow Structures
In programming, even simple looking requirements can foreshadow some of the most complex code developers can create. Often the complexity is driven by factors that we, as developers, need to consider that the business defining the requirements do not. These factors normally include performance metrics like throughput, latency and demand on system resources. This article will explore how quickly the complexity of a simple task can grow, and how the TPL Dataflow library helps to tackle the complexity with an API that builds on the Task-based Asynchronous Pattern (TAP) that was introduced in .NET 4.5. To demonstrate the power of the Dataflow library, we’ll show how a completely custom asynchronous parallelized workflow that would likely have been over 1,000 lines of code can be condensed to less than 150 lines.
A Tangled Web Woven
Your average day as a developer extraordinaire starts off with a glance at your task queue. Consider the following innocuous looking task:
Process any newly placed orders to our shipping services and update those orders with tracking numbers.
At first glance this is seems pretty straightforward. We can just create a class that retrieves new orders as they are placed, loop over each order sending to the shipping services and finally save off the updated order information to the database. Well it’s clear you’ve earned your paycheck today, and you take a coffee break to celebrate. As you sip your invigorating beverage, you begin to think about the performance implications of your design. Maybe it’s the coffee talking, but you see some clear issues with your original solution:
- The shipping service takes in multiple orders at once, so sending them in one at a time is inefficient use of the network, but sending them all at once will take some time, which will hold up resources until completed. We will need to batch the orders with a reasonable size. Also, if multiple shipping providers are supported, we may need to route the orders to more than one shipping service.
- Our class that takes in newly placed orders will have to employ some locking mechanism to make sure that the internal queue of orders is managed in a thread safe manner. If there are a lot of orders, processing these one by one may also cause the queue to grow larger and larger, eventually putting pressure on memory resources.
- Once all of the orders have tracking numbers, the DB has to be updated. This could take time and there’s no reason that other orders shouldn’t be sent to the shipping services while these orders are being saved to the database. Idle hands are the devils playthings after all.
Immediately our simple solution now involves batch queuing, concurrent processing and coordination of threads through synchronization objects. The full weight of how much code you’ll actually have to write is upon you. It’s not that any of these problems are insurmountable, but the amount of code that will be generated, time spent testing and the complex, error prone nature of the code will end up determining how long it will take to put a get a solution like this into production. Looks like it will be a long day, better refill that coffee!
The Power of DataFlow
The TPL Dataflow library is essentially a set of components, called blocks, which serve a specific role in producer/consumer systems. What’s great about these blocks is that they are all built on top of the Task Parallel Library, which means that they support the new async/await pattern for asynchronous programming. This makes our process more efficient with its use of threads and system resources while making the code to accomplish asynchronous functionality easier to read and maintain. In addition to async support, the blocks can be linked to each other to create entire task workflows from simple pipelines to complex network flows. The blocks and the linkages between them expose settings that allow you to configure how each component will ingest, process and dispatch the data that it receives. This eliminates the need to write verbose code to synchronize the various stages of the workflow yourself. The blocks provided by the library fall into three main categories:
- Buffering Blocks: These blocks essentially receive and store data so that it can be dispatched to one or more other target blocks for processing. All of these blocks guarantee order in message delivery, with the exception of the WriteOnceBlock<T> which essentially only allows the first message received to be dispatched to its linked targets.
- Execution Blocks: Blocks that fall into this category take in a specified delegate that is run on each data object that is dispatched to it. The execution of the delegate is done asynchronously, which means that as data is received by these classes, the delegate is scheduled by the task scheduling engine of the TPL to be processed on a thread pool thread. All of these classes will by default process only one data element at a time, buffering the other data until the previous one completes, but each class has settings that allow you to specify the number of data objects that can be processed in parallel and the internals of the class handle the rest.
- Grouping Blocks: The classes that comprise this category tackle the need to aggregate data elements from one or more data sources. This can be done either by grouping data into sets to be processed as a batch, or synchronizing multiple outputs from other tasks running asynchronously to be processed by another component.
If one of the blocks provided by the library doesn’t quite fit the bill, you can build your own custom component. The Dataflow library exposes three main interfaces that can be implemented in order to make your custom component compatible and composable with the built-in blocks. ISourceBlock<TOutput> is for components that will buffer and/or dispatch data elements to one or more target blocks, ITargetBlock<TInput> enables implementations to recieve and process, and IPropagatorBlock<TInput, TOutput> is for components that will act as both a source and a target component. If you want to build a wrapper class that utilizes an existing source and target component, the Encapsulate<TInput, TOutput> method can generate a propagator for you to simplify your implementation. While we won’t be covering each of the blocks of the library in depth, we will touch on a few of these blocks in more detail as we build our example.
One interesting side-note about the TPL Dataflow library that Microsoft makes very clear is that it is not packaged as part of the .NET 4.5 framework. This means that you won’t have access to these classes without importing the library. This is easily done, however, with the Nuget Package Manager by either searching the online gallery for “TPL Dataflow” or in the CLI by typing “Install-Package Microsoft.Tpl.Dataflow”.
Let’s Meet Our Contestants
Let’s go back to our requirement, now armed with knowledge of the Dataflow library, and compose a pipeline that represents how we can get orders updated with tracking numbers as quickly and efficiently as possible. All of the code fragments shown below are meant to be shown in the context of the block that is being explained, but you can download the fully working demo and play with it yourself. The full network of steps we’ll be implementing will look something like this:
The solution will have a single point of input which can then be broadcast to one of two shipping pipelines corresponding to the UPS and FedEx supported carriers. Each shipping pipeline will consist of a batching stage to group a set or orders and then a processing stage to send those orders to the shipping service and receive the response. Finally, the responses will be threaded off to be saved to the database. Now let’s take a look at the Dataflow blocks that we will use to build this solution:
BufferBlock<T>
This block is part of the Buffering category and essentially serves as an asynchronous data store. Its sole task is to receive data and asynchronously dispatch it to the one or more linked targets that are connected to it. Since our orders can be coming in from multiple threads, this block eliminates our need to manage a thread safe queue. By default, BufferBlock<T> is unbounded, which means that it can store as many data elements as we can throw at it. This is fine if you expect that your system will be able to consume at a rate that outpaces the orders needing to have tracking numbers generated, but if consumption falls behind production, you could end up growing a large in memory buffer, which will eventually cause pressure on system memory. Buffering blocks take in an instance of DataflowBlockOptions that allow us to set the capacity of the internal queue. The caveat to setting a BoundingCapacity of let’s say 50, is that if for some reason the buffer is at capacity, the next call to the Post method will immediately return false without adding the item to the buffer. This semantic is common to all target blocks in Dataflow and can cause unexpected behavior in your application if you are not aware of it. To guarantee that all items get into the buffer, even if the buffer is temporarily full, you can use the SendAsync method and either block (using the Wait method) or await the returned Task. We instantiate our buffer simply:
var orderBuffer = new BufferBlock<Order>();
In this example, we’re going to buffer our Order objects to be sent over to the shipping service to be given tracking numbers. The Order class is simple and just contains a list of OrderItem objects that contain a SKU. Since orders can be split into multiple shipments depending on availability of the items on the order, the tracking number will actually be assigned to each OrderItem by the shipping services. In this example, we are using the default settings, which means that the OrderBuffer can hold an unlimited number of Order objects.
BroadcastBlock<T>
BroadcastBlock<T> is another block in the Buffering category of the Dataflow library. It differs from BufferBlock<T> in one critical way. By default, BufferBlock<T> will remove a message from the queue when the first target in its list of targets accepts that message. This means that if you need to send the same message to multiple targets, BufferBlock<T> won’t be a good candidate because it will send the message to the first linked target and then remove that message from the queue and proceed to the next message. BroadcastBlock<T> takes in a delegate that dictates how the data element will be cloned before it is passed to each target. This is important to consider because passing in something as simple as mydata => mydata means that each target will get the same reference to the data element. If any of these components, which could be in different processing pipelines, were to modify the object, it could have unintended consequences in the other components and lead to strange behavior. Since we can have multiple carriers, we’ll want to broadcast each order to the appropriate carrier processing pipeline based on the carrier that was specified on the Order object. We create our BroadcastBlock and link it to our BufferBlock:
var broadcaster = new BroadcastBlock<Order>(order => order); orderBuffer.LinkTo(broadcaster);
You’ll notice here that indeed I’ve told the BroadcastBlock<T> to just go ahead and send the same Order reference to each target that I might link to it going forward. In my case, this will be OK because I’m going to be filtering the messages that go to each target so that only the proper pipeline will be able to process each message. Read on to see how that’s done.
BatchBlock<T>
Our shipping service can take in more than a single order at a time, and since the service requires a network call, processing each order sequentially increases network traffic and potentially the latency of the system. The BatchBlock<T> block is one of the Grouping blocks offered by the Dataflow library and can aggregate data elements automatically from one or more sources until it reaches the specified size. Once this happens, it will flush the batch of items all at once to any linked target. We’ll put one BatchBlock in place for each carrier and for this example we’ll batch 5 orders at a time for each one. This means that as orders come in, we’ll be queuing batches of 5 orders separately to be processed by the UPS or FedEx endpoint of the service.
One caveat to keep in mind for the BatchBlock<T> block is that it will only flush it’s queue when the batch limit is reached OR if it’s Complete method has been called, which tells the BatchBlock<T> not to expect any more data elements to be sent to it. So if you happen to be developing a windows service or other long running process that uses a batch block to send orders in batches of 100, and the orders trickle in quite slowly, the time before the batch will flush is indeterminate. This could result in orders not being processed for shipping for quite some time as the batch limit was not reached. To counter this, it may be beneficial to create a timer that can elapse after some predetermined period of inactivity that will flush the batch block out using the TriggerBatch method so as to guarantee timely processing. Let’s create our BatchBlock block instances and link them as targets to the BroadcastBlock<T> from above:
var upsBatcher = new BatchBlock<Order>(5); var fedexBatcher = new BatchBlock<Order>(5); broadcaster.LinkTo(upsBatcher, order => order.Carrier == CarrierType.Ups); broadcaster.LinkTo(fedexBatcher, order => order.Carrier == CarrierType.Fedex);
In this example, we can see that each BatchBlock takes in the limit for how many items to group before flushing to its targets. Also, we link the BroadcastBlock to both BatchBlock instances using the LinkTo method. One thing to point out here is the lambda being passed to the LinkTo method for our BatchBlock instances. The LinkTo method supports filtering data messages from the source block to the target blocks by taking in a Predicate function. In our predicate above, we’re dictating that only orders with the Carrier property set to CarrierType.Ups will be sent to the upsBatcher BatchBlock class. This gives us an easy way to control how data flows to the targets without having to put complex if/else logic or a factory pattern into each of the downstream execution blocks.
Remember that order matters when you link targets to a source block. With the BroadcastBlock<T> we’re sure that all targets will get a chance to process the same message if the target passes the predicate, but with other Buffering blocks like the BufferBlock<T>, this is not the case, so if the predicate isn’t specific enough, you risk having the message sent to the wrong target. Also, note that in our example above, if we add support in the future for DHL, and fail to add another batcher to our target list for the broadcaster, the system will come to a complete stop because BroadcastBlock<T> must process messages in order and it can’t find a target to accept the data, which means that orders start backing up and either deplete your memory resources or cause your order processing to block.
TransformManyBlock<TInput, TOutput>
Now that we have a way to buffer our orders and batch them per carrier, we need to handle sending the orders to the shipping service to have tracking numbers assigned to each item on those orders. The shipping service will return results for multiple orders as well, but we’ll want to update each order to the database one at a time. To accomplish this, we’ll use the TransformManyBlock<TInput, TOutput>. This block is in the Execution category, which means that it can take data from one or more sources and execute a specified delegate on that object before dispatching the result to one or more targets. This particular block is very similar to the TransformBlock<TInput, TOutput> with one exception: The TransformManyBlock<TInput, TOutput> expects that the return type of the delegate be an IEnumerable<TOutput>, but when dispatching the output to the target blocks, it will send each item in the enumerable list one at a time. What this means is that even though our shipping service will be sending back one XML document with tracking numbers on items for multiple orders, our target component can just accept a single OrderItem and the block itself will handle the iterative dispatching of the Enumerable results to the target.
As with our BatchBlock<T>, we’ll need one TransformManyBlock<TInput, TOutput> per carrier, as each carrier has a different endpoint on our shipping service. We wire everything up below:
var upsProcessor = new TransformManyBlock<Order[], ItemTrackingDetail>(orders => PostToCarrierAsync(CarrierType.Ups, orders)); var fedexProcessor = new TransformManyBlock<Order[], ItemTrackingDetail>(orders => PostToCarrierAsync(CarrierType.Fedex, orders)); upsBatcher.LinkTo(upsProcessor); fedexBatcher.LinkTo(fedexProcessor); // Simulates transformation of a list of Order objects into a service api model (ShipDetail). private static List<ShipDetail> CreateShipDetails(IEnumerable<Order> orders) { var shipDetails = orders.Select(order => new ShipDetail { ShipId = order.OrderId, Items = order.Items.Select(item => new ShipItemInfo { Sku = item.Sku, }).ToList() }); return shipDetails.ToList(); } // Sends orders to a shipping service endpoint dependent on the specified carrier. private static async Task<IEnumerable<ItemTrackingDetail>> PostToCarrierAsync(CarrierType carrierType, Order[] orders) { var shipDetails = CreateShipDetails(orders); var client = new System.Net.Http.HttpClient(); var response = await client.PostAsync("http://localhost:9099/api/" + carrierType.ToString(), shipDetails, new JsonMediaTypeFormatter()); if (!response.IsSuccessStatusCode) throw new ApplicationException("Exception in shipping processor: " + response.ReasonPhrase); var results = await response.Content.ReadAsAsync<List<ItemTrackingDetail>>(); return results; }
Our first task is to initialize the TransformManyBlock<TInput, TOutput> instances and designate the delegate we want the block to run. In our case, we can reuse the same method because we’re also passing the carrier as a parameter, which will ensure that we can hit the appropriate endpoint on our shipping service. Notice that the PostToCarrierAsync method returns a Task<IEnumerable>. All execution blocks can perform synchronous or asynchronous execution depending on whether the delegate parameter returns an instance of a TOutput, or a Task<TOutput>. Since we’re using the asynchronous form of the delegate specification, the block will release its current thread back to the thread pool while we await the response from the service and then resume execution once the response is received. This allows the execution thread to be used for other task processing and results in a more responsive, efficient system. Best of all, the Dataflow library classes can manage the asynchronous calls while still being able to coordinate the data to and from other components your pipeline.
ActionBlock<T>
Our last task is to update the orders to the database once the items on those orders have been assigned tracking numbers. Since the persistence step is the last step in our flow, we’ll be using another block from the Execution category of blocks, the ActionBlock<T> class. ActionBlock<T> can really be thought about as the Parallel.ForEach of the Dataflow library. It is very similar in function except that it also supports the composability of linking to one or more source blocks as well. One thing that differentiates ActionBlock<T> from other execution blocks like the TransformBlock<TInput, TOutput> or TransformManyBlock<TInput, TOutput> is that ActionBlock<T> is a target only block. It can only be linked to and does not support being linking to other blocks. In a flowchart, it is a terminator node. For our example, since the persistence to the database is independent of the carrier flow that gets the tracking numbers, we’ll be linking our ActionBlock to both carrier processing flows. Let’s take a look at the code:
var asyncPersistAction = new Func<ItemTrackingDetail, Task>(PersistToDatabase); var storageProcessor = new ActionBlock<ItemTrackingDetail>(asyncPersistAction, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }); upsProcessor.LinkTo(storageProcessor); fedexProcessor.LinkTo(storageProcessor); // Simulates persistence of tracking numbers for each item to a database. private static async Task PersistToDatabase(ItemTrackingDetail itemTrackingDetail) { // ... your DB code here //Simulate updating the order to the database. await Task.Delay(50); }
Notice that the Input type of the ActionBlock<T> is ItemTrackingDetail. Because it is linked as a target of a TransformManyBlock<TInput, TOutput>, the Enumerable output from the transform will be iterated from within that block and sent as individual objects to the ActionBlock<T>, reducing even further the amount of code we have to write to iterate the data ourselves. The other thing to notice is that the ActionBlock<T> constructor is being passed an instance of ExecutionDataflowBlockOptions that specifies a MaxDegreeOfParallelism of three All execution block classes can be passed an instance of the ExecutionDataflowBlockOptions class that lets you specify how the TPL will parallelize and schedule the delegate to run. In our case, we are specifying that at most three calls to the database persistence delegate can be run in parallel at any given time. Similar to the TransformManyBlock<TInput, TOutput>, the persistence delegate also makes use of the asynchronous delegate overload, which maximizes the productivity of the thread pool. The linking stage is straightforward as well, with both transform block instances simply linking to the new ActionBlock<T> instance.
You Complete Me
Since each Dataflow block component is technically running code on the thread pool at one time or another throughout the course of the pipeline, it’s necessary to let each component know that it can finish up what it’s doing and release its resources for good. All DataflowBlocks have a Complete method that will prevent additional data from being received. Once complete is called, any data elements that are in the block’s buffer are processed and dispatched to any linked targets, upon which the block ceases its execution and releases any thread pool resources it was holding. Since all Dataflow blocks run asynchronously, they expose a Completion property that returns a Task object representing the execution state of the block instance. We can use this to set continuation delegates on all components in our Dataflow network such that they can all complete at the proper time.
In our example, the BufferBlock<T> needs to call the Complete method of the BroadcastBlock<T> when it has finished processing. The BroadcastBlock<T> has to notify both BatchBlock<T> instances (one per carrier) when it’s done. At the tail end, the ActionBlock<T> has two sources, so it can’t have its Complete method called until both of those sources have completed. We can represent these continuations right after we instantiate and link our components:
orderBuffer.Completion.ContinueWith(t => broadcaster.Complete()); broadcaster.Completion.ContinueWith(t => { upsBatcher.Complete(); fedexBatcher.Complete(); }); upsBatcher.Completion.ContinueWith(t => upsProcessor.Complete()); fedexBatcher.Completion.ContinueWith(t => fedexProcessor.Complete()); Action<Task> postOrderCompletion = t => { Task.WaitAll(upsProcessor.Completion, fedexProcessor.Completion); storageProcessor.Complete(); }; upsProcessor.Completion.ContinueWith(postOrderCompletion); fedexProcessor.Completion.ContinueWith(postOrderCompletion);
By declaring your completions as continuations, your shutdown process simply needs to be two lines:
orderBuffer.Complete(); storageProcessor.Completion.Wait();
The first line tells the BufferBlock<T> that there will not be any more orders to process, and the second line waits on the completion of the ActionBlock<T> that performs the database persistence. You don’t have to use continuations, you can wait on each block in sequence it really is just a matter of personal preference. Our flow can be managed sequentially by modifying our shutdown process to look as follows:
orderBuffer.Complete(); orderBuffer.Completion.Wait(); broadcaster.Complete(); broadcaster.Completion.Wait(); upsBatcher.Complete(); fedexBatcher.Complete(); Task.WaitAll(upsBatcher.Completion, fedexBatcher.Completion); upsProcessor.Complete(); fedexProcessor.Complete(); Task.WaitAll(upsProcessor.Completion, fedexProcessor.Completion); storageProcessor.Complete(); storageProcessor.Completion.Wait();
The Taskmatics Connection
The example requirement above could have been implemented as a recurring task that pulls in all new orders that were placed since the last time the task executed. Using the Dataflow library can help you simplify complex pipelines or execution logic into linked Dataflow blocks that can help keep resource usage down and significantly reduce the amount of code that needs to be written and tested when creating custom tasks for the Taskmatics Scheduler. The scheduler itself utilizes the Dataflow library internally to manage its own complex flows for features like Live Tracing and task status notifications.
Being a distributed system, the Taskmatics Scheduler relies on constant communication between the Agent processes running the tasks and the Coordinator component dispatching the tasks. The Coordinator relays all incoming messages both to the administration website clients that are connected as well as to a central data store for historical purposes. The ability to efficiently transmit these messages over the network between processes while maximizing the responsiveness of the application is critical to the overall performance of the system. To that end, we employ batching of messages over the network and asynchronous persistence to the data store using the BatchBlock<T> and ActionBlock<T> to simplify our logic and make it easier to maintain the code around that functionality.
Wrapping It Up
The classes of the TPL Dataflow library can be used to greatly simplify the amount of code needed to create large and complex asynchronous processes without the need to write all of the synchronization logic between the moving parts. Also, it reduces the amount of testing and debugging that has to be done due to having lots of complicated code to control the flow of data from start to finish. With the compound time savings that you get from using the Dataflow blocks, you might be able to get that complicated requirement done by lunchtime. Now go enjoy some coffee, you’ve earned it!
A fully working demonstration of the example pipeline above can be downloaded from here. it includes some basic instrumentation to measure processing time. Feel free to experiment with different parallelization and other settings on the various blocks to see how it can improve or reduce efficiency of the system as a whole. For more information on the Dataflow Library, and the Task-based Asynchronous Pattern, see the following links:
MSDN TPL Dataflow documentation
MSDN Task-based Asynchronous Pattern Documentation
My previous post showing the advantages of async/await to improve application performance
Pretty! This was an extremely wonderful article.
Thank you for supplying this information.
Thanks Pamala! Glad you enjoyed it.
Good article. It helps clarify what the TPL can do. One question that still confuses me, even though you touch on it at the end, is the completion. You mention you could use Taskmatic, I’m assuming, to setup a recurring task to run the pipeline. But that means it would have to get the next orders to process, I’m guessing not from the buffer block. But from some other queuing mechanism. Since sending complete on the buffer block prevents it from accepting anything else. Is it possible to just not send complete and have a long running instance?
You are right that in the example I was highlighting how the pipeline can be executed by a recurring task, but you can also have the pipeline integrated into a windows service where it can listen or poll some source of data, like a queue, and only call complete in the handler for service shutdown. There’s nothing dictating when you call complete, it’s just good practice that you do it when you want to end the pipeline gracefully.
Great article.
How do I implement committing (PersistToDatabase) multiple orders (batch of orders) rather than one order at a time using ActionBlock or CustomBlock?
Thanks
Thomas,
There are a number of ways that this can be accomplished. Using our example from the article, the easiest way would likely be to change the upsProcessor and fedexProcessor from TransformManyBlock to TransformBlocks . Then change the StorageProcessor from ActionBlock to ActionBlock (don’t forget to make a corresponding type change to the action delegate) and you’ll be able to persist the batch on a shared connection. Another way could be to introduce another BatchBlock which would be linked to from both the UpsProcessor and FedexProcessor blocks and use that to aggregate and feed the storageProcessor, which would still need to be modified to accept ItemTrackingDetail[] now. I would personally go the first route as it’s not ideal to reduce the batches to single elements just to aggregate them again later, unless for some reason you need a separate throttle for persistence vs. your other work.
Hope that helps!
Thanks Dave, yes it did helped me.
I have implemented your first recommendation, and it worked. I have also implemented timer with TriggerBatch method to flush out the remaining from the BatchBlock, I must say I struggled little bit.
May I suggest Dave that if you could update this blog with persisting multiple orders and TriggerBatch would be great for the any new players, basically this covers a decent percentage of in the real world implementations.
Thanks heaps.
How to integrate transactions in the system? What if I I have to read source orders from some queue? Since message once read will be lost if something bad happens down the process pipeline ..say shipping service throws exception..or persist operation fails..how I can execute complete process as single transaction?
Very neat article post. Really Cool.