Taskmatics Scheduler 2.0 Released

We’ve just released version 2.0 of Taskmatics Scheduler, which focused primarily on creation of a more intuitive, more powerful user interface. We also packed in a number of bugfixes, performance tweaks and stability improvements.

How to get it

The downloads page has been updated to provide a link to the 2.0.1 installer. Users needing to access an older installer for installation of agents of a previous version will also find those installers on the downloads page.

Featured Enhancements

This release brings with it some big enhancements, so we wanted to give a little bit more information about those:

New Improved Administrative Interface

From version 1.0, we wanted to give users a simple, advanced administrative interface that could be accessed via a web browser. We received a good amount of feedback on some of the areas of the UI that failed to meet that need and so in 2.0 we ironed many of those pain points out and made the most common workflows simpler to achieve, so you can spend less time in setup while also making it easier to find the information you’re looking for. Furthermore, because the new admin interface is written in straight HTML/Javascript, users can access it from a wide range of browsers.

Job Creation Wizard

One of the biggest difficulties that our users brought to us in the past was the complexity of the process around job creation. To create a job in previous versions of the platform, you had to start at the bottom and work your way up by:

  • Creating a fileset that contains your executable code.
  • Creating a definition that points to the fileset that can be used as a configurable template for creating your jobs.
  • Creating the job itself on top of the definition.

While all of these pieces are part of what makes Taskmatics Scheduler so versatile, users had to navigate to different areas to do each step, which made job creation more involved and time consuming.

In version 2.0, we have introduced a job creation wizard, which centralizes and simplifies the job creation process while continuing to provide the same level of versatility from the older versions. The goal of the wizard is to direct the user through the components of job creation in a straightforward way.

Configurable Runtime Users

You can now configure jobs to run under a specified set of credentials. In previous versions, we allowed users to specify a runtime user for the Agent process, which would be used to execute all Jobs created in the system. Under the new system, the administration website has a new area for specifying runtime users that can be assigned to a job the same way we allow specifying an agent for a job. Jobs with a specified runtime user will execute under that user’s credentials, while those that do not have a specified runtime user will execute under the Agent’s runtime user credentials.

And Much More!

You can find the full release notes with all of the new features, bug fixes and improvements here.

Upgrade Instructions

As of now, we don’t have an upgradeable installer, which means that to upgrade an older installation of Taskmatics Scheduler, you should first uninstall the previous version and then simply run the new 2.0.1 installer and point to the existing database where the previous Taskmatics Scheduler install resided. Note: Uninstalling Taskmatics Scheduler will NEVER remove the data in your database. We suggest following these steps to install the new version on your system:

  1. Stop all coordinator and agent Windows services (done from the Services snap-in).
  2. Backup your existing database and the filesets in your fileset root folder. This is precautionary in case anything unexpected happens during the upgrade.
  3. Take note of the following information from your current installation:
    • Your serial number, which is in your email you got when you purchased Taskmatics Scheduler. You can also find it in the license.xml file in the ProgramData\Taskmatics\Scheduler directory.
    • Your current database server, database name and the runtime credentials. These can be found in the ProgramData\Taskmatics\Scheduler\Taskmatics.Scheduler.Coordination.ServiceHost.exe.config file.
    • The runtime users for both the coordinator and agent Windows services, which you can see from the services snap-in.
    • The root filesets path and the root working folders path, which are found in the .config files located in the ProgramData\Taskmatics\Scheduler directory.
  4. Uninstall the existing Taskmatics Scheduler components (done from the Programs and Features screen in Windows)
  5. Run the new Taskmatics.Scheduler installer. You will need to re-enter the information you collected in step 3 to make sure that all the original permissions and mappings are re-used.

Taskmatics Scheduler 1.1 Released

We’ve released version 1.1.0.0 today which contains many performance improvements as well as some new features that were requested by the community. This means that if you download the installer today you’ll be getting the new version, and we’ve updated the Taskmatics Scheduler package on the Nuget repository with a new version. The primary focus of the build was to improve system-wide performance. In cases where there are a significant number of tasks that run at high frequencies we’ve seen some slowness. With our changes we’ve come to see improvements of between 80% to 90% in terms of time. This isn’t limited to the administration UI screens, but also applies to the reports as well.

How to get it

To get the 1.1.0.0 installer, go here!

What’s Inside

The full list of changes in this release are as follows:

  • Added ability to set exclusion dates in the calendar trigger on which the scheduled task will not run.
  • Fixed an intermittent trigger loading error when adding a trigger to an existing scheduled task.
  • Added performance improvements to all reports.
  • Added performance improvements to dashboard loading.
  • Added performance improvements to task dispatching.
  • Added performance improvements to task history search.
  • Database schema changes to help improve overall system performance.
  • Fix for intermittent out of memory exception when querying resource usage.
  • Added filter to the Dashboard screen to allow users to find scheduled tasks that contain the entered term (similar to filter on other listing screens).
  • Added additional error handling and update messaging to the installer process.

Breaking Changes

While it’s always a goal to be fully backwards compatible, this release does contain one breaking change that may require a rebuild of one or more tasks. In the past, each Task instance was identified with a Globally Unique Identifier, or GUID. These identifiers are great at ensuring that you can have billions of tasks that don’t collide with each other but the downside to their guarantee of uniqueness is that they also are not as performant as using sequential integers. As part of the new release, we’ve re-mapped all task instances to use straight 64 bit integers, which gives us the same uniqueness guarantee, but also performs much better under search.

You should update all of your task libraries to reference the Taskmatics.Scheduler.Core 1.1.0.0 Nuget package. If any of your tasks reference TaskBase.Context.TaskInstanceId, you will need to rebuild and re-deploy those tasks as the TaskBase.Context.TaskInstanceId property is now represented as Int64 instead of Guid.

Upgrade Instructions

As of now, we don’t have an upgradeable installer, which means that to upgrade a 1.0.0.0 installation of Taskmatics Scheduler, you should first uninstall the previous version and then simply run the new 1.1.0.0 installer and point to the existing database where Taskmatics was originally set up to use. Note: Uninstalling Taskmatics Scheduler will NEVER remove the data in your database. We suggest following these steps to install the new version on your system:

  1. Stop all coordinator and agent Windows services (done from the Services snap-in).
  2. Backup your existing database. This is precautionary in case anything unexpected happens during the upgrade.
  3. Take note of the following information from your current installation:
    • Your serial number, which is in your email you got when you purchased Taskmatics Scheduler. You can also find it in the license.xml file in the ProgramData\Taskmatics\Scheduler directory.
    • Your current database server, database name and the runtime credentials. These can be found in the ProgramData\Taskmatics\Scheduler\Taskmatics.Scheduler.Coordination.ServiceHost.exe.config file.
    • The runtime users for both the coordinator and agent Windows services, which you can see from the services snap-in.
    • The root filesets path and the root working folders path, which are found in the .config files located in the ProgramData\Taskmatics\Scheduler directory.
  4. Uninstall the existing Taskmatics Scheduler components (done from the Programs and Features screen in Windows)
  5. Run the new Taskmatics.Scheduler installer. You will need to re-enter the information you collected in step 3 to make sure that all the original permissions and mappings are re-used.

Due to the nature of the performance improvements and the significant amount of database changes that are in this update, it is possible, depending on the size of your database, that installation of the new version could take upwards of 30 minutes (if you have millions of task instances in your DB). Making the upgrade path smoother for our users is on the top of our roadmap so we don’t expect this to be the norm going forward.

More to Come

This is just the beginning of the changes that we have in store. Thanks again for using Taskmatics Scheduler, and we hope you enjoy working with the new version even more. Let us know what features you’d like to see in subsequent versions of Taskmatics Scheduler as we’re always looking for ideas from the community.

Where was that download link again?

In case you missed it at the top, to get the 1.1.0.0 installer, go here!

How to Host ASP.NET in a Windows Service

Today, I’ll be showing how you can finally host ASP.NET in a Windows service. Yes, with ASP.NET 5, it is possible to host ASP.NET in a Windows service. This article builds on a previous one which shows you how to run a DNX (.NET Execution Environment) application in a Windows Service. It’s a quick one, so go read it now.

Project Dependencies

Once you’ve got the shell project set up using the previous article, you’ll need to add in some dependencies.

Open up project.json and add in a dependencies property with the following entries:

  • "Microsoft.AspNet.Hosting": "1.0.0-beta7" – Bootstraps the web server
  • "Microsoft.AspNet.Server.Kestrel": "1.0.0-beta7" – Web server implementation
  • "Microsoft.AspNet.StaticFiles": "1.0.0-beta7" – Hosts static files
  • "Microsoft.AspNet.Mvc": "6.0.0-beta7" – Includes all the MVC packages

Your project.json should now look like this:

{
    "version": "1.0.0-*",
    "description": "MyDnxService Console Application",
    "commands": {
        "run": "MyDnxService"
    },
    "frameworks": {
        "dnx451": {
            "frameworkAssemblies": {
                "System.ServiceProcess": "4.0.0.0"
            }
        }
    },
    "dependencies": {
        "Microsoft.AspNet.Hosting": "1.0.0-beta7",
        "Microsoft.AspNet.Server.Kestrel": "1.0.0-beta7",
        "Microsoft.AspNet.StaticFiles": "1.0.0-beta7",
        "Microsoft.AspNet.Mvc": "6.0.0-beta7"
    }
}

Run in Visual Studio or Command Line

It would really be a pain to develop and debug your application while running as a Windows service. That’s not to say you can’t, but if you are currently developing an application, you probably want to run it from Visual Studio or the command line. To do this, you need some sort of switch in the Main method that will either call ServiceBase.Run or just call directly into OnStart and OnStop. Let’s do this by writing our Main method as follows:

public void Main(string[] args)
{
    if (args.Contains("--windows-service"))
    {
        Run(this);
        return;
    }

    OnStart(null);
    Console.ReadLine();
    OnStop();
}

Simply check for the presence of the --windows-service argument, call ServiceBase.Run and you are good to go. Now you can just run and debug your application from Visual Studio by hitting F5. When you want to install the application as a Windows service, don’t forget to pass --windows-service at the end of the binPath= argument.

sc.exe create <service-name> binPath= "\"<dnx-exe-path>\" \"<project-path>\" run --windows-service"

Configure and Start the Server and ASP.NET

Let’s add some namespaces:

using Microsoft.AspNet.Builder;
using Microsoft.AspNet.Hosting;
using Microsoft.AspNet.Hosting.Internal;
using Microsoft.Framework.Configuration;
using Microsoft.Framework.Configuration.Memory;
using Microsoft.Framework.DependencyInjection;
using System;
using System.Diagnostics;
using System.Linq;
using System.ServiceProcess;

Before we start the server, we need to set up a few fields and a constructor on the Program class. We need to store the hosting engine, its shutdown function and a service provider that we will use soon. The IServiceProvider instance will be injected by the DNX runtime.

private readonly IServiceProvider _serviceProvider;
private IHostingEngine _hostingEngine;
private IDisposable _shutdownServerDisposable;

public Program(IServiceProvider serviceProvider)
{
    _serviceProvider = serviceProvider;
}

To get the server up and running, we will use the WebHostBuilder class. Your Program.OnStart method should look as follows:

protected override void OnStart(string[] args)
{
    var configSource = new MemoryConfigurationSource();
    configSource.Add("server.urls", "http://localhost:5000");

    var config = new ConfigurationBuilder(configSource).Build();
    var builder = new WebHostBuilder(_serviceProvider, config);
    builder.UseServer("Microsoft.AspNet.Server.Kestrel");
    builder.UseServices(services => services.AddMvc());
    builder.UseStartup(appBuilder =>
    {
        appBuilder.UseDefaultFiles();
        appBuilder.UseStaticFiles();
        appBuilder.UseMvc();
    });

    _hostingEngine = builder.Build();
    _shutdownServerDisposable = _hostingEngine.Start();
}

There are several steps involved here:

  1. Create and populate the server configuration. (lines 3-6)
  2. Create the builder and tell it what server implementation to use. (lines 7-8)
  3. Configure services using the built-in dependecy injection support. (line 9)
  4. Configure the ASP.NET middleware pipeline. (lines 10-15)
  5. Build and start the server. (lines 17-18)

The previous code is oversimplified for the purposes of this article. We are hardcoding the server URL into an in memory config store, but you can set this up in other ways. See the ASP.NET Configuration repo on Github for other options.

To gracefully shut down the server, implement Program.OnStop as follows:

protected override void OnStop()
{
    if (_shutdownServerDisposable != null)
        _shutdownServerDisposable.Dispose();
}

Getting Down to Business

Now that we have a server set up and running ASP.NET with static files and MVC, let’s add some content and a controller.

First, create an index.html file and add “Hello world” as the content. Then create a controller file TimeController.cs with the file contents as follows:

using Microsoft.AspNet.Mvc;
using System;

namespace MyDnxService
{
    public class TimeController
    {
        [Route("time")]
        public DateTime GetTime()
        {
            return DateTime.Now;
        }
    }
}

That’s all there is to it. Now we can pull up http://localhost:5000 to see “Hello world” and http://localhost:5000/time to see the current time.

The Runtime User and Going to Production

When you install a Windows service, you have to specify a user under which to run the process. If you don’t, “Local System” is the default. Why is this important?

In the previous article we simply ran our Windows service as the default (Local System) user. It turns out we got lucky since we didn’t reference any package dependencies. If we try to do the same thing in this case, the service will quickly fail to start because “Local System” won’t be able to resolve any of the package dependencies we just added.

As you add package references to the project.json file, Visual Studio quietly runs dnu restore in the background and downloads the packages to your user profile (c:\users\<username>\.dnx\packages). When you run dnx.exe, it will resolve any dependencies from your user profile. (You’ll see how to override this in a bit.)

Since “Local System” doesn’t have Visual Studio helping it out, we need to somehow get those packages installed someplace that it can see. Running dnu restore as “Local System” will download the packages to c:\Windows\SysWOW64\config\systemprofile\.dnx\packages (on the 64-bit OS) since there’s no user profile for that account. How can you run dnu restore as “Local System”? This can be done by running psexec.exe -s cmd.exe from Sysinternals (runs a cmd.exe console as “Local System”) and then running dnu restore from the directory of your project. You can imagine that having to do this each time you want to deploy is a gigantic inconvenience.

While you’re IN DEVELOPMENT, you should run the Windows service under your own account. This allows you to install the service once and not have to manually run dnu restore under another account each time you modify your config (remember, Visual Studio is helping you out here). The service will be able to resolve the dependencies since it’s running under your account as well.

IN PRODUCTION, we can publish our project by running the following command (from the project’s root folder) and point the sc.exe create binPath= to its output:

dnu publish --out ..\publish-output --runtime active --no-source

This command builds and packages your code and copies it and all of its dependencies into an output folder. It will also add a global.json file in a sub-folder within the output directory that tells DNX to load the packages from the local folder and not the user’s profile. If that’s not enough, the command also packages the “active” DNX runtime. This means that the target machine doesn’t require DNX to be installed and doesn’t require you to run dnu restore on the target machine either. With this method, once you copy the published folder to the production machine, you can run the service under the account of your choosing and point to the run.cmd file within the root of the output folder.

sc.exe create <service-name> obj= <machine|domain>\<username> binPath= "\"<output-folder-path>\run.cmd\" --windows-service"

Now you have what you need to run ASP.NET in a Windows service in both development and production environments.

Get the Source

The source code and installation scripts for this article can be found in the aspnet-windows-service Github repo.

In Closing…

We can finally run ASP.NET in our Windows services. This is a big win for us as we host an admin site for our scheduler out of a Windows service. We currently do this using a custom-built static file server and WCF for web services.

There was a lot of information covered in this article, and there is still a lot more that could be covered. If you liked this article or have any questions, please feel free to leave a comment below. Thanks, and I hope you have enjoyed this post.

How to Run DNX Applications in a Windows Service

Today I’m going to show you how you can run a DNX application in a Windows service because we recently needed to do this, and while there are many posts out there asking how to do it, the answers would lead you to believe it’s impossible. For this example we’re going to be using the beta6 version of ASP.NET 5. If you don’t yet have DNX, you should go here to get started.

Creating the Application

For demonstrative purposes, we’ll be creating a very simple DNX console application that will simply write to the event log when the service is started/stopped. To begin, create a new DNX Console Application (Package) in Visual Studio. You can find this template in the “Web” category, for some reason. Next we need to make a few tweaks to the project.json file:

  • First, remove the DNX Core reference, this is a windows service after all, so no point in trying to be cross platform today.
  • Under frameworks:dnx451, add a frameworkAssemblies reference to "System.ServiceProcess": "4.0.0.0"
  • Change your command key to something simple like “run” or “runservice”. You don’t have to do this, but it makes it more clear later what we’re doing.

When you’re all done, your project.json should look something like this:

{
    "version": "1.0.0-*",
    "description": "MyDnxService Console Application",
    "commands": {
        "run": "MyDnxService"
    },
    "frameworks": {
        "dnx451": {
            "frameworkAssemblies": {
                "System.ServiceProcess": "4.0.0.0"
            }
        }
    }
}

Next, make the Program class inherit from System.ServiceProcess.ServiceBase. This lets us create overrides for OnStart and OnStop service methods. We’re going to use these methods to simply log out messages to the event log. Finally, in the Main(string[] args) method of the Program class we add Run(this); in order to bootstrap the Windows service. Here is our program.cs file in it’s entirety:

using System.Diagnostics;
using System.ServiceProcess;

namespace MyDnxService
{
    public class Program : ServiceBase
    {
        private readonly EventLog _log = 
            new EventLog("Application") { Source = "Application" };

        public void Main(string[] args)
        {
            _log.WriteEntry("Test from MyDnxService.", EventLogEntryType.Information, 1);
            Run(this);
        }

        protected override void OnStart(string[] args)
        {
            _log.WriteEntry("MyDnxService started.");
        }

        protected override void OnStop()
        {
            _log.WriteEntry("MyDnxService stopped.");
        }
    }
}

Installing the Service

Once our application is written and building successfully we can install it as a service. To do this, open a command prompt in administrator mode and enter the following command:

sc.exe create <service-name> binPath= "\"<dnx-exe-path>\" \"<project-path>\" run"

UPDATE
If you’re targeting beta8 or beyond, the way we tell dnx.exe where to find our project has changed to include a –project (-p for short) argument. So the above command changes to:
sc.exe create <service-name> binPath= "\"<dnx-exe-path>\" -p \"<project-path>\" run"

sc.exe is a built-in tool that lets us perform a number of operations on a Windows service. Here we’re using it to create a new service. The very first argument that the create operation needs is the service name. This is the name that will show up in the services snap in. You can set this to any value, but since our example application is called MyDnxService we put our adventurous nature aside and simply used that.

The binPath= parameter is the only other parameter we’re specifying to sc.exe, even though it looks like three separate parameters. We’re basically telling sc.exe what command to execute when starting the service, which is DNX. The other parameters after that are actually parameters to dnx.exe, not sc.exe. Because there are spaces in the argument value, we are wrapping the entire argument in quotes (I’ll explain the escaped quotes in a minute). One gotcha to keep in mind when working with sc.exe is that the “=” you see after “binPath” is actually part of the parameter name for sc.exe, so that space you see between binPath= and the value is necessary as it tells sc.exe that we’re moving from the parameter name to the value. Now let’s look at the three components in that binPath= argument:

  • Since dnx.exe is the application that runs a DNX application, we’re going to need to point to it via its full path. If you install DNX via DNVM the default install directory (at least on our machines) is c:\users\\.dnx\runtimes\\bin\dnx.exe but if you aren’t sure where on your machine it is just open a command prompt and run where.exe dnx.exe and it should tell you where to find it. Since the path to DNX could have spaces, we’re wrapping that parameter in quotes too, but since these quotes are inside the quotes we specified for the binPath= parameter to sc.exe, we need to escape them.
  • Next we need to tell DNX where to find our application. If you use dnu publish to publish your application it will generate a .cmd file for you that you normally would use to launch your application. You cannot use that .cmd file when running your application in a Windows service but if you open that .cmd file you’ll see a path for --appbase and that’s the one we want. Again, escape the quotes around this path for safety.
  • Finally we tell DNX what command within our application to run. Remember above in the project.json file we named our command key “run”? that’s what this value specifies. So if you named your command key something else, just replace the “run” parameter to DNX with whatever command key name you chose in your project.json.

So putting all of that together, the complete sc.exe command we used for the application above (without the generic tokens) was:
sc.exe create MyDnxService binpath= "\"C:\Users\dave\.dnx\runtimes\dnx-clr-win-x86.1.0.0-beta6\bin\dnx.exe\" \"C:\Users\dave\Desktop\DnxWindowsService\src\MyDnxService\" run"

UPDATE
As mentioned above, if you’re targeting beta8 or beyond, the way we tell dnx.exe where to find our project has changed to include a –project (-p for short) argument. So the above command changes to:
sc.exe create MyDnxService binpath= "\"C:\Users\dave\.dnx\runtimes\dnx-clr-win-x86.1.0.0-beta8\bin\dnx.exe\" -p \"C:\Users\dave\Desktop\DnxWindowsService\src\MyDnxService\" run"

Running the Service

Now that our application has been installed as a service, it’s time to revel in the fruits of our labor. To run the service, open the Services MMC Snap-in, find the service we installed by its name and start it. Open Event Viewer, go to the Application log and you should see the text that we output from our OnStart() override in our application. Stopping the service and refreshing the event log will show the stop message. As I said earlier, this is a super basic demonstration of functionality, but it’s the basis for running any DNX application in the context of a Windows service.

Why Do This?

Taskmatics Scheduler currently hosts the administration website from inside one of the Windows services that are installed. In its current implementation we couldn’t support ASP.NET applications because only IIS would provide the environment necessary to properly run each request through the pipeline. With ASP.NET 5, the game changes, and you can host ASP.NET applications from a console application without being dependent on IIS. After some headbanging we are successfully running our MVC 6 web application inside of a Windows service, which you can now read about here.

Update!

Check out Erez’s follow up post showing you how to host a fully functional ASP.NET site from a Windows service, which has just been posted!

Monitoring Flights and Sending SMS with Taskmatics Scheduler and Enzo Unified

Software developers need to build solutions quickly so that businesses remain competitive and agile. This blog post shows you how Taskmatics Scheduler and Enzo Unified can help developers build and deploy solutions very quickly by removing two significant pain points: the learning curve of new APIs, and orchestrating Internet services.

Sample Solution

Let’s build a solution that checks incoming flights in Miami, Florida and send a text message using SMS when new flights arrive to one or more phone numbers. To track flight arrivals, we will be using the FlightAware service which provides a REST API to retrieve flight information. To send SMS messages, we will be using Twilio’s service which provides an API as well for sending messages.

To remove the learning from these APIs, we used Enzo Unified, a Backend as a Service (BaaS) platform that enables the consumption of services through native SQL statements. Enzo Unified abstracts communication and simplifies development of a large number of internal systems and Internet services. In this example, Enzo Unified is hosted on the Microsoft Azure platform for scalability and operational efficiency.

To orchestrate and schedule the solution, we used the Taskmatics Scheduler platform. Taskmatics calls into your custom code written in .NET on a schedule that you specify, which is configured to connect to Enzo Unified in the cloud. The call to Enzo Unified is made using ADO.NET by sending native SQL statements to pull information from FlightAware, and send an SMS message through Twilio. At a high level, the solution looks like this:

High Level call sequence between Taskmatics Scheduler and Enzo Unified

High Level call sequence between Taskmatics Scheduler and Enzo Unified

How To Call FlightAware and Twilio with Enzo Unified

Developers can call Enzo Unified using a REST interface, or a native SQL interface. In this example, the developer uses the SQL interface, leveraging ADO.NET. The following code connects to Enzo Unified as a database endpoint using the SqlConnection class, and sends a command to fetch flights from a specific airport code using an SqlCommand object. Fetching FlightAware data is as simple as calling the “Arrived” stored procedure against the “flightaware” database schema.

var results = new List<ArrivedFlightInfo>();

// Connect to Enzo Unified using SqlConnection
using (var connection = new SqlConnection(parameters.EnzoConnectionString))
  // Prepare call to FlightAware's Arrived procedure 
  using (var command = new SqlCommand("flightaware.arrived", connection))
  {
    connection.Open();
    command.CommandType = System.Data.CommandType.StoredProcedure;
    command.Parameters.Add(new SqlParameter("airport", airportCode));
    command.Parameters.Add(new SqlParameter("count", 10));
    command.Parameters.Add(new SqlParameter("type", "airline"));

    // Call FlightAware's Arrived procedure 
    using (var reader = command.ExecuteReader())
      while (reader.Read())
        results.Add(new ArrivedFlightInfo
        {
          Ident = (String)reader["ident"],
          AircraftType = (String)reader["aircrafttype"],
          OriginICAO = (String)reader["origin"],
          OriginName = (String)reader["originName"],
          DestinationName = (String)reader["destinationName"],
          DestinationCity = (String)reader["destinationCity"]
          // ... additional code removed for clarity...
        });
    }

Calling Twilio is just as easy. A simple ADO.NET call to the SendSMS stored procedure in the “Twilio” schema is all that’s needed (the code is simplified to show the relevant part of the call).

// Establish a connection Enzo Unified
using (var connection = new SqlConnection(parameters.EnzoConnectionString))
  using (var command = new SqlCommand("twilio.sendsms", connection))
  {
    connection.Open();
    command.CommandType = System.Data.CommandType.StoredProcedure;
    command.Parameters.Add(new SqlParameter("phones", phoneNumbers));
    command.Parameters.Add(new SqlParameter("message", smsMessage));

    // Call Twilio’s SendSMS method
    command.ExecuteReader();
  }

If you inspect the above code carefully, you will notice that it does not reference the APIs of FlightAware or Twilio. Indeed, calling both FlightAware and Twilio was done using ADO.NET calls against Enzo Unified; because Enzo Unified behaves like a native database server (without the need to install special ODBC drivers), authenticating, making the actual API calls, and interpreting the REST results was entirely abstracted away from the developer, and replaced by an SQL interface, which dramatically increases developer productivity. Database developers can call Enzo Unified directly to test FlightAware and Twilio using SQL Server Management Studio (SSMS). The following picture shows the results of calling Enzo Unified from SSMS to retrieve arrived flights from FlightAware.

Calling the FlightAware service using simple SQL syntax in SQL Server Management Studio

Calling the FlightAware service using simple SQL syntax in SQL Server Management Studio

Sending a SMS text message using Twilio is just as simple using SSMS:

Calling the Twilio service using simple SQL syntax in SQL Server Management Studio

Calling the Twilio service using simple SQL syntax in SQL Server Management Studio

How To Schedule The Call With Taskmatics Scheduler

In order to run and schedule this code, we are using Taskmatics Scheduler, which provides an enterprise grade scheduling and monitoring platform. When a class written in .NET inherits from the Taskmatics.Scheduler.Core.TaskBase class, it becomes automatically available as a custom task inside the Taskmatics Scheduler user interface. This means that a .NET library can easily be scheduled without writing additional code. Furthermore, marking the custom class with the InputParameters attribute provides a simple way to specify input parameters (such as the airport code to monitor, and the phone numbers to call) for your task through the Taskmatics user interface.
The following simplified code shows how a custom task class is created so that it can be hosted inside the Taskmatics Scheduler platform. Calling Context.Logger.Log gives developers the ability to log information directly to Taskmatics Scheduler for troubleshooting purposes.

namespace Taskmatics.EnzoUnified.FlightTracker
{
    // Mark this class so it is visible in the Taskmatics interface
    [InputParameters(typeof(FlightNotificationParameters))]
    public class FlightNotificationTask : TaskBase
    {
        // Override the Execute method called by Taskmatics on a schedule
        protected override void Execute()
        {
	     // Retrieve parameters as specified inside Taskmatics
            var parameters = (FlightNotificationParameters)Context.Parameters;

            // Invoke method that calls FlightAware through Enzo Unified
            var arrivedFlights = GetArrivedFlights(parameters);

            // do more work here… such as identify new arrivals
            var newFlights = FlightCache.FilterNewArrivals(arrivedFlights);

            // Do we have new arrivals since last call?
            if (newFlights.Count > 0)
            {
               // Invoke method that calls Twilio through Enzo Unified
               var results = SendArrivedFlightsViaSMS(newFlights, parameters);

		  // Update cache so these flights won’t be sent through SMS again
		  FlightCache.SaveFlightsToCache(newFlights); 
            }
            else
                Context.Logger.Log("SMS phase skipped due to no new arrivals.");

            Context.Logger.Log("Job execution complete.");
        }
    }
}

Installing the task into the Taskmatics Scheduler platform is very straightforward. Log into the user interface and create a definition for the flight tracker task. This step allows you to import your library into the system to serve as a template for the new scheduled task that we will create next.

Import your custom task as a definition

Import your custom task as a definition

Schedule your custom task to run on the days and times you specify.

Schedule your custom task to run on the days and times you specify.

Once you have created your definition, go to the “Scheduled Tasks” section of the user interface, and create the task by selecting the definition that you just created from the Task dropdown. This is also where you will schedule the time and frequency that the task will run as well as configure the input parameters for the task.

Configure the parameters for the scheduled task.

Configure the parameters for the scheduled task.

Finally, from the Dashboard screen, you can run your task manually and watch the output live, or look at a past execution of the task to see the outcome and logs from that run. In the image below, you can see the execution of the Flight Tracking task where we monitored recent arrivals into the Miami International Airport (KMIA).

Review and analyze previous task executions or watch your tasks live as they run.

Review and analyze previous task executions or watch your tasks live as they run.

Conclusion

This blog post shows how developers can easily build integrated solutions without having to learn complex APIs using simple SQL statements, thanks to Enzo Unified’s BaaS platform. In addition, developers can easily orchestrate and schedule their libraries using the Taskmatics Scheduler platform. Combining the strengths of Enzo Unified and Taskmatics, organizations can reap the following benefits:

  • Rapid application development by removing the learning curve associated with APIs
  • Reduced testing and simple deployment by leveraging already tested services
  • Service orchestration spanning Internet services and on-premises systems
  • Enterprise grade scheduling and monitoring

About Blue Syntax Consulting

Our mission is to make your business successful through the technologies we build, create innovative solutions that are relevant to the technical community, and help your company adopt cloud computing where it makes sense. We are now making APIs irrelevant with Enzo® Unified. For more information about Enzo Unified and how developers can access services easily using SQL statements or a simple REST interface, visit http://www.enzounified.com or contact Blue Syntax Consulting at info@bluesyntaxconsulting.com.

About Taskmatics

Taskmatics was founded by a group of developers looking to improve the productivity of their peers. Their flagship application, Taskmatics Scheduler, aims to boost developer productivity and reduce the effort involved in creating consistent and scalable tasks while providing a centralized user interface to manage all aspects of your task automation. For more information and a free 90-day trial, visit us or email us at info@taskmatics.com.

Not Just .NET: Run node.js Scripts In a Task – Part 1

Taskmatics Scheduler is known for being a powerful tool that .NET developers can use to simplify their task automation. By providing an easy to use API, it allows developers to leverage the power of the scheduling platform to run custom .NET tasks. What might not be well known is that it’s also super easy to run code outside the .NET framework within a custom task. In this three part series, we’re going to walk through how simple it is to do this by creating a task that runs node.js scripts. In the end, you’ll come away with a custom task that you can use as a basis for scheduling tasks in a bunch of different languages.

Why Run node.js Scripts From Taskmatics Scheduler?

Without Taskmatics Scheduler, managing task automation usually means overseeing a growing number of executables or scripts that are scheduled using Windows Task Scheduler or Cron. The end result is usually a cacophony of code where one task may fail and write some error to a database table or file somewhere on the system while another task just fails and doesn’t indicate the underlying reason. Also, trying to keep track of which jobs successfully ran or are in the process of running can be a nightmare. The beauty of Taskmatics Scheduler is that task scheduling, execution and reporting can be managed from one place:  the administration website (‘admin’). Furthermore, logging and reporting for every task is done for you in a centralized and consistent manner.

Taskmatics Scheduler makes it possible to extend the benefits we just covered to other languages and frameworks as well. Since each task instance is spawned off as its own process, you can create your own child processes in every task without having to worry about affecting the overall ecosystem of the Scheduler. This means that any code that be can run from a command line can be run by Taskmatics Scheduler, and you get all the same features and benefits that standard .NET tasks receive.

There are, of course, some pre-requisites before you can run a node.js script (or any other code for that matter) from Taskmatics Scheduler. Node.js isn’t installed as part of the Taskmatics Scheduler installation process, so it’s important that node.js be installed on the computer that has Taskmatics Scheduler installed. This applies to all other languages. The runtimes must be available to execute the code or it simply won’t work. Once that’s out of the way, we can use the Taskmatics Scheduler API to write a task template that can be used to create not only our node.js task, but any other scripting tasks we want to create as well (think Python, Powershell and the like):

public abstract class ExecuteProcessTask : TaskBase
{
    protected override void Execute()
    {
        var info = new ProcessStartInfo(GetProcessFileName());
        info.Arguments = GetArguments();
        info.RedirectStandardOutput = true;
        info.RedirectStandardError = true;
        info.UseShellExecute = false;
        info.CreateNoWindow = true;

        var process = new Process();
        process.StartInfo = info;
        process.Start();
        process.OutputDataReceived += (s, e) => Log("INFO", e.Data);
        process.ErrorDataReceived += (s, e) => Log("ERROR", e.Data);
        process.BeginOutputReadLine();
        process.BeginErrorReadLine();
        process.WaitForExit();

        HandleExitCode(process.ExitCode);
    }

    protected abstract string GetProcessFileName();
    protected abstract string GetArguments();

    protected virtual void Log(string type, string message)
    {
        if (message == null)
            return;

        Context.Logger.Log("{0}: {1}", type, message);
    }

    protected virtual void HandleExitCode(int exitCode)
    {
        if (exitCode == 0)
            return;

        throw new ApplicationException("The process failed with exit code " + exitCode + ".");
    }
}

Analyzing the Code

Tasks in Taskmatics Scheduler inherit from TaskBase and must override the Execute method. This task handles that by using the Process and ProcessStartInfo classes of the .NET framework to create a child process and execute it, redirecting all standard and error output to Taskmatics Scheduler’s centralized logging infrastructure to store and stream all output to the user in the admin, just like any other task. Being an abstract class, it provides the GetProcessFileName method for determining the file system path to the executable that will run our script. The GetArguments method is where logic around creating the arguments to that script is handled.

The Log method bridges the outputs of the child process to the Taskmatics Scheduler real time logger out of the gate, but the method is left virtual which allows for customization of how logging is handled if additional logging infrastructure is needed. Finally, the HandleExitCode method shown above simply throws an exception if the exit code is nonzero, which the Taskmatics Scheduler system will consider a failed task status. Again since the method is virtual it is flexible for those who may need more complex behavior when a process completes.

Another key feature of Taskmatics Scheduler is its extensibility. It’s a snap to use custom parameter objects in these tasks, which makes extending ExecuteProcessTask into a single task that can run any node.js script a piece of cake.

Next Up…

With a simple task, we’ve provided a basis for running code written in any language within Taskmatics Scheduler. This allows you to centralize all of your task automation under one roof, regardless of the language. You also get a consistent execution and logging pattern that can make maintenance of a large number of disparate tasks much easier than using Windows Task Scheduler or Cron. In the next article, we’ll create a simple node.js script and a wrap it in a task using the ExecuteProcessTask class from this article, and we’ll finish up the series by demonstrating how easy it is to schedule our new node.js task in the admin and see it run.

How Asynchronous Operations Can Reduce Performance

It’s well known that asynchronous programming can improve application performance. Developers writing Windows applications are trying to keep the UI thread responsive as work is performed in the background while website and web API developers attempt to maximize thread availability for processing new requests. Like any technique, however, asynchronous programming can produce results that are contradictory to the expectation. Without some forethought and care into how work is offloaded onto threads, developers can unwittingly put areas of application performance at risk. In this post I want to talk about the different types of asynchronous work, how the operating system schedules this work and how to determine when creating new threads should be created in your application when initiating asynchronous operations.

Types of Asynchronous Operations

Most of the functionality we code into our applications fall into one of two main categorizations: Compute-Bound operations are those in which calculations are done that are time consuming to perform. Examples of this include sorting a large array or calculating the first 1000 digits of pi. In these cases, the CPU is responsible for completing the work but with most every modern machine supporting multiple cores it benefits the application to continue processing the main thread while these calculations are taking place on a second core. For these scenarios, it’s worthwhile to offload the work to a thread pool thread to improve the application’s performance. We can do this by using the Task class to schedule the work on the thread pool as in the example below:

private static Task<double> DoHeavyCalculationsAsync(int length)
{
   return Task.Run(() => {
	//do heavy calculations on another thread
   });
}

Most of the other operations we think of as time consuming, such as reading and writing to files, database operations and calls made over the network are classified as I/O-bound operations. At first blush, I/O operations seem like the perfect candidate for offloading to a new thread pool thread. They can be unpredictable in the time it takes for the operations to complete so therefore they will likely cause the application to lose responsiveness. By moving this logic to another thread, the main thread can continue to perform its work and all is right with the world, or so it seems. In reality, because of the way that the thread pool schedules work to be performed, the truth of the matter is that performance can actually be worsened rather than improved. Let’s look at a simple example:

private static Task<byte[]> ReadFileAsync(string path)
{
    return Task.Run(() =>
    {
        Byte[] bytes;
 
        using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
        {
            bytes = new byte[stream.Length];
            stream.Read(bytes, 0, (int)stream.Length);
        }
 
        return bytes;
     });
}

Thread Management under the Hood

While it’s true that this code will not block the calling thread, it’s not doing the application any favors in terms of overall performance. All .NET applications are allocated two sets of threads onto which it can queue work, and the number of these threads that are allocated to the process as of .NET 4 varies based on a number of factors, including the resources available on the machine running the application. The first set contains worker threads that can execute work in parallel with the main thread of the application. This is the only group of threads outside of the main application thread that the Task class in the .NET 4 framework can queue work onto. The second set of threads are known as I/O completion threads. These are threads that are requested by the CLR when the OS has completed asynchronous I/O operations like reading file contents, making driver calls or making calls over a network. When an I/O operation is complete, the completion port bound to that operation will notify the CLR, which will obtain a thread from this set on which to execute the callback of the asynchronous operation. The problem with offloading I/O operations to another thread using Task.Run is that it results in swapping one thread for another when the OS can perform asynchronous I/O without blocking a thread. In the example above if you had a loop that was reading the contents of all files within a fairly large folder tree, you could easily have 100 or more threads waiting for their respective files to finish being read. This will tax the application for sure because the thread pool will have to create and destroy a large number of worker threads to handle the volume of files being processed, and eventually this can cause resource contention on the machine since each thread takes a minimum of one megabyte of RAM. About the only thing that will be going to plan is that the UI will be responsive, at least until the system starts to go into resource starvation. So how can we make use of the I/O completion threads in our applications if the .NET thread pool is exclusively tied to queuing operations on worker threads?

Looking Inward

While it is possible for .NET developers to queue work onto these completion ports using some rather obscure methods in the ThreadPool class and native overlapped structures, it’s quite unlikely you’ll ever really need to. Luckily, the framework is already chock full with many methods that implement this I/O pattern for you. Any methods that implement the Asynchronous Programming Model (APM), characterized by the Begin and End method pairs, or the more recent Task-based Asynchronous Programming (TAP) paradigm. These methods have been programmed to use the native calls that schedule work to be run on the I/O completion threads. The result is that no thread is held up waiting for the I/O to complete, and the overall application performance will be better for it. Here is our example from above, rewritten to use built in TAP methods that use the I/O completion threads:

private static async Task<byte[]> ReadFileAsync(string path)
{
    var bytesRead = 0;
    byte[] fileBytes;

    using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
    {
        fileBytes = new byte[stream.Length];
        bytesRead = await stream.ReadAsync(fileBytes, 0, (int)stream.Length).ConfigureAwait(false);
    }
 
    return fileBytes;
}

In the above example, we aren’t creating a new thread to offload the work onto, but simply calling the ReadAsync method of the Stream class. When the await statement is hit, the current execution is returned to the caller and the work is queued on the OS, freeing the current thread to perform other work. When the file read is complete, a thread from the pool is re-assigned to handle the remaining work after the await.

Summary

By differentiating between CPU and I/O operations in your application and a little digging into the classes surrounding I/O operations, you can determine when it is truly beneficial to offload work to the thread pool or whether to let to the framework asynchronous methods do the heavy lifting instead.

Understanding TPL Dataflow – Conceptual Overview

TPL Dataflow is a fairly recent and awesome addition to the .NET framework. It provides developers a high level approach to dealing with asynchronous programming. Asynchronous programming can be daunting, especially when synchronizing threads and protecting shared memory. TPL Dataflow abstracts away a lot of that tedious and error prone code.

This series provides an in depth look into TPL Dataflow to see how it can benefit you as a developer. In this video I’ll cover the conceptual overview and describe the core concepts of TPL Dataflow. Enjoy.

 

Taskmatics Scheduler and Windows Azure Scheduler: Redux

There has been a lot of coverage lately about the Windows Azure Scheduler offering currently in preview. After getting familiar with the product, I thought it might be a good idea to talk about what it brings to the table and how it differs from Taskmatics Scheduler, which is currently in beta. We’ll also talk a little bit about how you can use the two platforms together to get robust automation for your environment.

What is Windows Azure Scheduler?

Windows Azure Scheduler is a service that exposes a REST API that allows you to schedule jobs that will be run either in a web or worker role hosted within Azure or on a remote server outside the cloud. These jobs could be run once or recurring in nature, and the API provides you with ways to group these jobs into a job collection, which is a logical grouping of jobs that share settings such as the region from which the job should be triggered. Currently, Windows Azure Scheduler offers two distinct ways of executing jobs scheduled:

  • The simplest way to execute jobs is by providing an HTTP or HTTPS endpoint address which the service will call at the scheduled time(s). This could be any HTTP endpoint that will execute the job when it receives a request from the scheduler. While simple, this method is more suited to small jobs with a duration of 30 seconds or less. The scheduler currently records the responses of the HTTP/HTTPS calls as whether or not the job succeeds, and the default timeout for the call is 30 seconds, which means that longer jobs will record a failure if this method is used.
  • The second method for job execution is for the scheduled job to post a message to an Azure Storage Queue. This will require you to set up and configure Azure Storage Queues on your account but it provides a way to trigger job execution by watching a queue from your own process. How the message is processed is determined completely by whatever application is listening to the queue, and the success or failure of the scheduled job is simply whether the queue receives the posted message successfully.

The Windows Azure Scheduler API also has methods to track and recall job status for all jobs and get success/failure history for one or more jobs as well, though again note that the success or failure not of the job execution itself, but simply the posting of the message to the endpoint or queue. Being that the service is currently in preview, I’m sure that even more functionality will be added over time. There are a few shortcomings of the new scheduler offering:

  • While jobs scheduled are run in the cloud and are therefore reliable, the endpoints or queue subscribers are not necessarily cloud based and the reliability and scale of these applications which will actually perform the work is still a burden placed on the developer. Running web/worker roles to host endpoints that can process messages can get costly since they’ll need to be listening (and thus active) 24/7.
  • Reporting and statistics for the jobs is currently very basic and doesn’t provide a way to extend the collected information such that it can be reported on.
  • Job execution cannot be controlled once the job has started running. Jobs cannot be paused/terminated.

What is Taskmatics Scheduler?

Taskmatics Scheduler is a task scheduling platform that combines a fully extensible .NET API for creating and executing jobs with an administration site that can manage and provide reporting on the jobs created with the API. Where Windows Azure Scheduler focuses on ways to trigger job execution, Taskmatics Scheduler covers the triggering of jobs, but also manages the distribution and execution of those jobs and provides health monitoring and detailed reporting during execution and beyond. Taskmatics Scheduler has the edge on Azure Scheduler when your automation needs extend beyond just a few small nightly tasks because it can handle distribution of the job load internally, without relying on the developer to ensure load balancing and availability of resources to run the desired job. The true power of Taskmatics Scheduler, however, is the extensibility it provides for developers. There are basically three main components to the job scheduling API:

  • Triggers define how job execution will be started. Like Windows Azure Scheduler, a calendar based trigger is provided out of the box, but you can create custom triggers that can fire based on domain specific criteria.
  • Tasks are the central unit of work in the system. They represent the work to be automated, and are executed by one or more triggers. Tasks can be paired with custom configuration to allow reusability and contain built in controls that allows users to pause or terminate running tasks.
  • Event Listeners define code that will execute when one or more target events are raised by a task during execution. Custom event listeners can be created that can be used for real time error notifications or integration with line of business applications such as Enterprise ERP and CRM systems.

Taskmatics Scheduler also provides a web based administration console where customized tasks can be loaded, scheduled and managed. The console also provides detailed reports and execution history for all tasks. If your job automation landscape is fairly complex or involves many long running tasks, Taskmatics Scheduler might be a better fit than using the Windows Azure Scheduler service.

The Best of Both Worlds

On one hand you have a cloud based scheduler that can reliably fire off messages, and on the other you have a fully customizable system that is designed to distribute and execute jobs. Can they be used together? The short answer is yes. If you want to benefit from scheduling jobs in the cloud environment, you can create a custom trigger for Taskmatics Scheduler that will listen on a given HTTP address and port as a receiver for the cloud based scheduler. Another option is a custom trigger that subscribes to the Azure Storage Queue that gets messages posted to it that can fire off one or more tasks within Taskmatics Scheduler. If you are drawn to the potential of Windows Azure Scheduler as a reliable, cloud based scheduling tool, I encourage you to put Taskmatics Scheduler to the test for that same reliability and much more.

Going with the Flow: Simplifying Producer/Consumer Processing with TPL Dataflow Structures

In programming, even simple looking requirements can foreshadow some of the most complex code developers can create. Often the complexity is driven by factors that we, as developers, need to consider that the business defining the requirements do not. These factors normally include performance metrics like throughput, latency and demand on system resources. This article will explore how quickly the complexity of a simple task can grow, and how the TPL Dataflow library helps to tackle the complexity with an API that builds on the Task-based Asynchronous Pattern (TAP) that was introduced in .NET 4.5. To demonstrate the power of the Dataflow library, we’ll show how a completely custom asynchronous parallelized workflow that would likely have been over 1,000 lines of code can be condensed to less than 150 lines.

A Tangled Web Woven

Your average day as a developer extraordinaire starts off with a glance at your task queue. Consider the following innocuous looking task:

Process any newly placed orders to our shipping services and update those orders with tracking numbers.

At first glance this is seems pretty straightforward. We can just create a class that retrieves new orders as they are placed, loop over each order sending to the shipping services and finally save off the updated order information to the database. Well it’s clear you’ve earned your paycheck today, and you take a coffee break to celebrate. As you sip your invigorating beverage, you begin to think about the performance implications of your design. Maybe it’s the coffee talking, but you see some clear issues with your original solution:

  • The shipping service takes in multiple orders at once, so sending them in one at a time is inefficient use of the network, but sending them all at once will take some time, which will hold up resources until completed. We will need to batch the orders with a reasonable size. Also, if multiple shipping providers are supported, we may need to route the orders to more than one shipping service.
  • Our class that takes in newly placed orders will have to employ some locking mechanism to make sure that the internal queue of orders is managed in a thread safe manner. If there are a lot of orders, processing these one by one may also cause the queue to grow larger and larger, eventually putting pressure on memory resources.
  • Once all of the orders have tracking numbers, the DB has to be updated. This could take time and there’s no reason that other orders shouldn’t be sent to the shipping services while these orders are being saved to the database. Idle hands are the devils playthings after all.

Immediately our simple solution now involves batch queuing, concurrent processing and coordination of threads through synchronization objects. The full weight of how much code you’ll actually have to write is upon you. It’s not that any of these problems are insurmountable, but the amount of code that will be generated, time spent testing and the complex, error prone nature of the code will end up determining how long it will take to put a get a solution like this into production. Looks like it will be a long day, better refill that coffee!

The Power of DataFlow

The TPL Dataflow library is essentially a set of components, called blocks, which serve a specific role in producer/consumer systems. What’s great about these blocks is that they are all built on top of the Task Parallel Library, which means that they support the new async/await pattern for asynchronous programming. This makes our process more efficient with its use of threads and system resources while making the code to accomplish asynchronous functionality easier to read and maintain. In addition to async support, the blocks can be linked to each other to create entire task workflows from simple pipelines to complex network flows. The blocks and the linkages between them expose settings that allow you to configure how each component will ingest, process and dispatch the data that it receives. This eliminates the need to write verbose code to synchronize the various stages of the workflow yourself. The blocks provided by the library fall into three main categories:

  • Buffering Blocks: These blocks essentially receive and store data so that it can be dispatched to one or more other target blocks for processing. All of these blocks guarantee order in message delivery, with the exception of the WriteOnceBlock<T> which essentially only allows the first message received to be dispatched to its linked targets.
  • Execution Blocks: Blocks that fall into this category take in a specified delegate that is run on each data object that is dispatched to it. The execution of the delegate is done asynchronously, which means that as data is received by these classes, the delegate is scheduled by the task scheduling engine of the TPL to be processed on a thread pool thread. All of these classes will by default process only one data element at a time, buffering the other data until the previous one completes, but each class has settings that allow you to specify the number of data objects that can be processed in parallel and the internals of the class handle the rest.
  • Grouping Blocks: The classes that comprise this category tackle the need to aggregate data elements from one or more data sources. This can be done either by grouping data into sets to be processed as a batch, or synchronizing multiple outputs from other tasks running asynchronously to be processed by another component.

If one of the blocks provided by the library doesn’t quite fit the bill, you can build your own custom component. The Dataflow library exposes three main interfaces that can be implemented in order to make your custom component compatible and composable with the built-in blocks. ISourceBlock<TOutput> is for components that will buffer and/or dispatch data elements to one or more target blocks, ITargetBlock<TInput> enables implementations to recieve and process, and IPropagatorBlock<TInput, TOutput> is for components that will act as both a source and a target component. If you want to build a wrapper class that utilizes an existing source and target component, the Encapsulate<TInput, TOutput> method can generate a propagator for you to simplify your implementation. While we won’t be covering each of the blocks of the library in depth, we will touch on a few of these blocks in more detail as we build our example.

One interesting side-note about the TPL Dataflow library that Microsoft makes very clear is that it is not packaged as part of the .NET 4.5 framework. This means that you won’t have access to these classes without importing the library. This is easily done, however, with the Nuget Package Manager by either searching the online gallery for “TPL Dataflow” or in the CLI by typing “Install-Package Microsoft.Tpl.Dataflow”.

Let’s Meet Our Contestants

Let’s go back to our requirement, now armed with knowledge of the Dataflow library, and compose a pipeline that represents how we can get orders updated with tracking numbers as quickly and efficiently as possible. All of the code fragments shown below are meant to be shown in the context of the block that is being explained, but you can download the fully working demo and play with it yourself. The full network of steps we’ll be implementing will look something like this:

diagram

The solution will have a single point of input which can then be broadcast to one of two shipping pipelines corresponding to the UPS and FedEx supported carriers. Each shipping pipeline will consist of a batching stage to group a set or orders and then a processing stage to send those orders to the shipping service and receive the response. Finally, the responses will be threaded off to be saved to the database. Now let’s take a look at the Dataflow blocks that we will use to build this solution:

BufferBlock<T>

This block is part of the Buffering category and essentially serves as an asynchronous data store. Its sole task is to receive data and asynchronously dispatch it to the one or more linked targets that are connected to it. Since our orders can be coming in from multiple threads, this block eliminates our need to manage a thread safe queue. By default, BufferBlock<T> is unbounded, which means that it can store as many data elements as we can throw at it. This is fine if you expect that your system will be able to consume at a rate that outpaces the orders needing to have tracking numbers generated, but if consumption falls behind production, you could end up growing a large in memory buffer, which will eventually cause pressure on system memory. Buffering blocks take in an instance of DataflowBlockOptions that allow us to set the capacity of the internal queue. The caveat to setting a BoundingCapacity of let’s say 50, is that if for some reason the buffer is at capacity, the next call to the Post method will immediately return false without adding the item to the buffer. This semantic is common to all target blocks in Dataflow and can cause unexpected behavior in your application if you are not aware of it. To guarantee that all items get into the buffer, even if the buffer is temporarily full, you can use the SendAsync method and either block (using the Wait method) or await the returned Task. We instantiate our buffer simply:

var orderBuffer = new BufferBlock<Order>();

In this example, we’re going to buffer our Order objects to be sent over to the shipping service to be given tracking numbers. The Order class is simple and just contains a list of OrderItem objects that contain a SKU. Since orders can be split into multiple shipments depending on availability of the items on the order, the tracking number will actually be assigned to each OrderItem by the shipping services. In this example, we are using the default settings, which means that the OrderBuffer can hold an unlimited number of Order objects.

BroadcastBlock<T>

BroadcastBlock<T> is another block in the Buffering category of the Dataflow library. It differs from BufferBlock<T> in one critical way. By default, BufferBlock<T> will remove a message from the queue when the first target in its list of targets accepts that message. This means that if you need to send the same message to multiple targets, BufferBlock<T> won’t be a good candidate because it will send the message to the first linked target and then remove that message from the queue and proceed to the next message. BroadcastBlock<T> takes in a delegate that dictates how the data element will be cloned before it is passed to each target. This is important to consider because passing in something as simple as mydata => mydata means that each target will get the same reference to the data element. If any of these components, which could be in different processing pipelines, were to modify the object, it could have unintended consequences in the other components and lead to strange behavior. Since we can have multiple carriers, we’ll want to broadcast each order to the appropriate carrier processing pipeline based on the carrier that was specified on the Order object. We create our BroadcastBlock and link it to our BufferBlock:

var broadcaster = new BroadcastBlock<Order>(order => order);
orderBuffer.LinkTo(broadcaster);

You’ll notice here that indeed I’ve told the BroadcastBlock<T> to just go ahead and send the same Order reference to each target that I might link to it going forward. In my case, this will be OK because I’m going to be filtering the messages that go to each target so that only the proper pipeline will be able to process each message. Read on to see how that’s done.

BatchBlock<T>

Our shipping service can take in more than a single order at a time, and since the service requires a network call, processing each order sequentially increases network traffic and potentially the latency of the system. The BatchBlock<T> block is one of the Grouping blocks offered by the Dataflow library and can aggregate data elements automatically from one or more sources until it reaches the specified size. Once this happens, it will flush the batch of items all at once to any linked target. We’ll put one BatchBlock in place for each carrier and for this example we’ll batch 5 orders at a time for each one. This means that as orders come in, we’ll be queuing batches of 5 orders separately to be processed by the UPS or FedEx endpoint of the service.

One caveat to keep in mind for the BatchBlock<T> block is that it will only flush it’s queue when the batch limit is reached OR if it’s Complete method has been called, which tells the BatchBlock<T> not to expect any more data elements to be sent to it. So if you happen to be developing a windows service or other long running process that uses a batch block to send orders in batches of 100, and the orders trickle in quite slowly, the time before the batch will flush is indeterminate. This could result in orders not being processed for shipping for quite some time as the batch limit was not reached. To counter this, it may be beneficial to create a timer that can elapse after some predetermined period of inactivity that will flush the batch block out using the TriggerBatch method so as to guarantee timely processing. Let’s create our BatchBlock block instances and link them as targets to the BroadcastBlock<T> from above:

var upsBatcher = new BatchBlock<Order>(5);
var fedexBatcher = new BatchBlock<Order>(5);

broadcaster.LinkTo(upsBatcher, order => order.Carrier == CarrierType.Ups);
broadcaster.LinkTo(fedexBatcher, order => order.Carrier == CarrierType.Fedex);

In this example, we can see that each BatchBlock takes in the limit for how many items to group before flushing to its targets. Also, we link the BroadcastBlock to both BatchBlock instances using the LinkTo method. One thing to point out here is the lambda being passed to the LinkTo method for our BatchBlock instances. The LinkTo method supports filtering data messages from the source block to the target blocks by taking in a Predicate function. In our predicate above, we’re dictating that only orders with the Carrier property set to CarrierType.Ups will be sent to the upsBatcher BatchBlock class. This gives us an easy way to control how data flows to the targets without having to put complex if/else logic or a factory pattern into each of the downstream execution blocks.

Remember that order matters when you link targets to a source block. With the BroadcastBlock<T> we’re sure that all targets will get a chance to process the same message if the target passes the predicate, but with other Buffering blocks like the BufferBlock<T>, this is not the case, so if the predicate isn’t specific enough, you risk having the message sent to the wrong target. Also, note that in our example above, if we add support in the future for DHL, and fail to add another batcher to our target list for the broadcaster, the system will come to a complete stop because BroadcastBlock<T> must process messages in order and it can’t find a target to accept the data, which means that orders start backing up and either deplete your memory resources or cause your order processing to block.

TransformManyBlock<TInput, TOutput>

Now that we have a way to buffer our orders and batch them per carrier, we need to handle sending the orders to the shipping service to have tracking numbers assigned to each item on those orders. The shipping service will return results for multiple orders as well, but we’ll want to update each order to the database one at a time. To accomplish this, we’ll use the TransformManyBlock<TInput, TOutput>. This block is in the Execution category, which means that it can take data from one or more sources and execute a specified delegate on that object before dispatching the result to one or more targets. This particular block is very similar to the TransformBlock<TInput, TOutput> with one exception: The TransformManyBlock<TInput, TOutput> expects that the return type of the delegate be an IEnumerable<TOutput>, but when dispatching the output to the target blocks, it will send each item in the enumerable list one at a time. What this means is that even though our shipping service will be sending back one XML document with tracking numbers on items for multiple orders, our target component can just accept a single OrderItem and the block itself will handle the iterative dispatching of the Enumerable results to the target.

As with our BatchBlock<T>, we’ll need one TransformManyBlock<TInput, TOutput> per carrier, as each carrier has a different endpoint on our shipping service. We wire everything up below:

var upsProcessor = new TransformManyBlock<Order[], ItemTrackingDetail>(orders => PostToCarrierAsync(CarrierType.Ups, orders));
var fedexProcessor = new TransformManyBlock<Order[], ItemTrackingDetail>(orders => PostToCarrierAsync(CarrierType.Fedex, orders));
upsBatcher.LinkTo(upsProcessor);
fedexBatcher.LinkTo(fedexProcessor);

// Simulates transformation of a list of Order objects into a service api model (ShipDetail).
private static List<ShipDetail> CreateShipDetails(IEnumerable<Order> orders)
{
    var shipDetails = orders.Select(order =>
          new ShipDetail
           {
              ShipId = order.OrderId,
              Items = order.Items.Select(item =>
                  new ShipItemInfo
                  {
                     Sku = item.Sku,
                  }).ToList()
           });

    return shipDetails.ToList();
}

// Sends orders to a shipping service endpoint dependent on the specified carrier.
private static async Task<IEnumerable<ItemTrackingDetail>> PostToCarrierAsync(CarrierType carrierType, Order[] orders)
{
    var shipDetails = CreateShipDetails(orders);

    var client = new System.Net.Http.HttpClient();
    var response = await client.PostAsync("http://localhost:9099/api/" + carrierType.ToString(), shipDetails, new JsonMediaTypeFormatter());
    if (!response.IsSuccessStatusCode)
         throw new ApplicationException("Exception in shipping processor: " + response.ReasonPhrase);

    var results = await response.Content.ReadAsAsync<List<ItemTrackingDetail>>();
    return results;
}

Our first task is to initialize the TransformManyBlock<TInput, TOutput> instances and designate the delegate we want the block to run. In our case, we can reuse the same method because we’re also passing the carrier as a parameter, which will ensure that we can hit the appropriate endpoint on our shipping service. Notice that the PostToCarrierAsync method returns a Task<IEnumerable>. All execution blocks can perform synchronous or asynchronous execution depending on whether the delegate parameter returns an instance of a TOutput, or a Task<TOutput>. Since we’re using the asynchronous form of the delegate specification, the block will release its current thread back to the thread pool while we await the response from the service and then resume execution once the response is received. This allows the execution thread to be used for other task processing and results in a more responsive, efficient system. Best of all, the Dataflow library classes can manage the asynchronous calls while still being able to coordinate the data to and from other components your pipeline.

ActionBlock<T>

Our last task is to update the orders to the database once the items on those orders have been assigned tracking numbers. Since the persistence step is the last step in our flow, we’ll be using another block from the Execution category of blocks, the ActionBlock<T> class. ActionBlock<T> can really be thought about as the Parallel.ForEach of the Dataflow library. It is very similar in function except that it also supports the composability of linking to one or more source blocks as well. One thing that differentiates ActionBlock<T> from other execution blocks like the TransformBlock<TInput, TOutput> or TransformManyBlock<TInput, TOutput> is that ActionBlock<T> is a target only block. It can only be linked to and does not support being linking to other blocks. In a flowchart, it is a terminator node. For our example, since the persistence to the database is independent of the carrier flow that gets the tracking numbers, we’ll be linking our ActionBlock to both carrier processing flows. Let’s take a look at the code:

var asyncPersistAction = new Func<ItemTrackingDetail, Task>(PersistToDatabase);

var storageProcessor = new ActionBlock<ItemTrackingDetail>(asyncPersistAction, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

upsProcessor.LinkTo(storageProcessor);
fedexProcessor.LinkTo(storageProcessor);

// Simulates persistence of tracking numbers for each item to a database.
private static async Task PersistToDatabase(ItemTrackingDetail itemTrackingDetail)
{
    // ...  your DB code here
    //Simulate updating the order to the database.
    await Task.Delay(50); 
}

Notice that the Input type of the ActionBlock<T> is ItemTrackingDetail. Because it is linked as a target of a TransformManyBlock<TInput, TOutput>, the Enumerable output from the transform will be iterated from within that block and sent as individual objects to the ActionBlock<T>, reducing even further the amount of code we have to write to iterate the data ourselves. The other thing to notice is that the ActionBlock<T> constructor is being passed an instance of ExecutionDataflowBlockOptions that specifies a MaxDegreeOfParallelism of three All execution block classes can be passed an instance of the ExecutionDataflowBlockOptions class that lets you specify how the TPL will parallelize and schedule the delegate to run. In our case, we are specifying that at most three calls to the database persistence delegate can be run in parallel at any given time. Similar to the TransformManyBlock<TInput, TOutput>, the persistence delegate also makes use of the asynchronous delegate overload, which maximizes the productivity of the thread pool. The linking stage is straightforward as well, with both transform block instances simply linking to the new ActionBlock<T> instance.

You Complete Me

Since each Dataflow block component is technically running code on the thread pool at one time or another throughout the course of the pipeline, it’s necessary to let each component know that it can finish up what it’s doing and release its resources for good. All DataflowBlocks have a Complete method that will prevent additional data from being received. Once complete is called, any data elements that are in the block’s buffer are processed and dispatched to any linked targets, upon which the block ceases its execution and releases any thread pool resources it was holding. Since all Dataflow blocks run asynchronously, they expose a Completion property that returns a Task object representing the execution state of the block instance. We can use this to set continuation delegates on all components in our Dataflow network such that they can all complete at the proper time.

In our example, the BufferBlock<T> needs to call the Complete method of the BroadcastBlock<T> when it has finished processing. The BroadcastBlock<T> has to notify both BatchBlock<T> instances (one per carrier) when it’s done. At the tail end, the ActionBlock<T> has two sources, so it can’t have its Complete method called until both of those sources have completed. We can represent these continuations right after we instantiate and link our components:

orderBuffer.Completion.ContinueWith(t => broadcaster.Complete());
broadcaster.Completion.ContinueWith(t =>
{
   upsBatcher.Complete();
   fedexBatcher.Complete();
});

upsBatcher.Completion.ContinueWith(t => upsProcessor.Complete());
fedexBatcher.Completion.ContinueWith(t => fedexProcessor.Complete());

Action<Task> postOrderCompletion = t =>
{
   Task.WaitAll(upsProcessor.Completion, fedexProcessor.Completion);
   storageProcessor.Complete();
};

upsProcessor.Completion.ContinueWith(postOrderCompletion);
fedexProcessor.Completion.ContinueWith(postOrderCompletion);

By declaring your completions as continuations, your shutdown process simply needs to be two lines:

orderBuffer.Complete();
storageProcessor.Completion.Wait();

The first line tells the BufferBlock<T> that there will not be any more orders to process, and the second line waits on the completion of the ActionBlock<T> that performs the database persistence. You don’t have to use continuations, you can wait on each block in sequence it really is just a matter of personal preference. Our flow can be managed sequentially by modifying our shutdown process to look as follows:

orderBuffer.Complete();
orderBuffer.Completion.Wait();

broadcaster.Complete();
broadcaster.Completion.Wait();

upsBatcher.Complete();
fedexBatcher.Complete();
Task.WaitAll(upsBatcher.Completion, fedexBatcher.Completion);

upsProcessor.Complete();
fedexProcessor.Complete();
Task.WaitAll(upsProcessor.Completion, fedexProcessor.Completion);

storageProcessor.Complete();
storageProcessor.Completion.Wait();

The Taskmatics Connection

The example requirement above could have been implemented as a recurring task that pulls in all new orders that were placed since the last time the task executed. Using the Dataflow library can help you simplify complex pipelines or execution logic into linked Dataflow blocks that can help keep resource usage down and significantly reduce the amount of code that needs to be written and tested when creating custom tasks for the Taskmatics Scheduler. The scheduler itself utilizes the Dataflow library internally to manage its own complex flows for features like Live Tracing and task status notifications.

Being a distributed system, the Taskmatics Scheduler relies on constant communication between the Agent processes running the tasks and the Coordinator component dispatching the tasks. The Coordinator relays all incoming messages both to the administration website clients that are connected as well as to a central data store for historical purposes. The ability to efficiently transmit these messages over the network between processes while maximizing the responsiveness of the application is critical to the overall performance of the system. To that end, we employ batching of messages over the network and asynchronous persistence to the data store using the BatchBlock<T> and ActionBlock<T> to simplify our logic and make it easier to maintain the code around that functionality.

Wrapping It Up

The classes of the TPL Dataflow library can be used to greatly simplify the amount of code needed to create large and complex asynchronous processes without the need to write all of the synchronization logic between the moving parts. Also, it reduces the amount of testing and debugging that has to be done due to having lots of complicated code to control the flow of data from start to finish. With the compound time savings that you get from using the Dataflow blocks, you might be able to get that complicated requirement done by lunchtime. Now go enjoy some coffee, you’ve earned it!

A fully working demonstration of the example pipeline above can be downloaded from here. it includes some basic instrumentation to measure processing time. Feel free to experiment with different parallelization and other settings on the various blocks to see how it can improve or reduce efficiency of the system as a whole. For more information on the Dataflow Library, and the Task-based Asynchronous Pattern, see the following links:

MSDN TPL Dataflow documentation
MSDN Task-based Asynchronous Pattern Documentation
My previous post showing the advantages of async/await to improve application performance