Newsletter

Workflow Elements Sample

You find in the workflow control elements sample tasks, each explaining one concept of the workflow capabilities. Obviously they can be combined to design workflows of arbitrary complexity.

Sequence Workflow

A sequence is used to execute one sub workflow after each other. In this sample these are simple HTTP requests, but that can be any other workflow element too. The coding of sequences has been greatly simplified since the async/await pattern was introduced in C# and Visual Basic. But workflow sequences can also be coded with handlers. This is a sequence of three HTTP requests (without any payload):

public override async void StartWork()
{
    base.TaskResult = new SequenceTaskResult();
    this.TaskCompletedWorkHandler = taskBase => { Console.WriteLine("  Task workflow is complete"); };
    var firstRequest = await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 }); 
    var secondRequest = await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 }); 
    var thirdRequest = await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 }); 
}

 Each request is awaited and the subsequent request starts after the request completes. The async/await mechanism isn’t made with TPL tasks, The workflow elements have an specialized awaiter that integrates them in the crawler engine the same way as you would use handlers.  

Parallel Workflow

By default, everything in the crawler task workflows is executed in parallel. This sample shows how to start and process parallel HTTP requests in a workflow:

public override void StartWork()
{
    base.TaskResult = new ParallelTaskResult();
    this.TaskCompletedWorkHandler = taskBase => { Console.WriteLine("  Task workflow is complete"); };
    // To start multiple requests in parallel, can't await them. Instead we provide a Success handler to perform individual actions to process the complete request.
    Console.WriteLine("  First request will be created");
    new HttpRequest(
        new HttpRequestConfig
            {
                Url = new Uri("http://localhost:10080/"),
                Quota = new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 },
                Successful = request =>
                    {
                        Console.WriteLine("  First request is delivering content of type " + request.Response.ContentType);
                    }
            });
    Console.WriteLine("  Second request (delayed response) will be created");
    new HttpRequest(
        new HttpRequestConfig
        {
            Url = new Uri("http://localhost:10080/send-delayed"),
            Quota = new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 },
            Successful = request =>
            {
                Console.WriteLine("  Second request is delivering content of type " + request.Response.ContentType);
            }
        });
    Console.WriteLine("  Third request will be created");
    new HttpRequest(
        new HttpRequestConfig
        {
            Url = new Uri("http://localhost:10080/"),
            Quota = new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 },
            Successful = request =>
            {
                Console.WriteLine("  Third request is delivering content of type " + request.Response.ContentType);
            }
        });
    Console.WriteLine("  The StartWorkflow() is almost finished, but no request runs at the moment.n  They will start in parallel after this method has returned.n");
}

 

All requests are created, but none is started until the StartWork() method finishes. This is what we see when we look at the output:

  First request will be created
  Second request (delayed response) will be created
  Third request will be created
  The StartWorkflow() is almost finished, but no request runs at the moment.
  They will start in parallel after this method has returned.
  Third request is delivering content of type text/html; charset=utf-8
  First request is delivering content of type text/html; charset=utf-8
  Second request is delivering content of type text/plain

The requests (you can even start thousands of them) are executed in parallel. The success handlers are called after the request is finished. Creating workflow element objects mean that they are executed in parallel after the handler (which crates them) has finished.

Delay Element

Sometimes a subsequent operation should not start immediately, but wait a certain time. This is where the Delay workflow element comes into play: 

public override async void StartWork()
{
    base.TaskResult = new DelayTaskResult();
    this.TaskCompletedWorkHandler = taskBase => { Console.WriteLine("  Task workflow is complete"); };
    Console.WriteLine("  First request will be created");
    var firstRequest = await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 }); 
    
    Console.WriteLine("  First request is successful, starting 2 seconds delay");
    await new Delay(2000);
    Console.WriteLine("  Delay is over, second request will be created");
    var secondRequest = await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 }); 
    Console.WriteLine("  Second request is successful");
}

The Delay element doesn’t block the worker threads of the crawler engine. Instead it stops the workflow and restarts it in a very efficient manner. It is no problem to have thousands of delays working in parallel. The internal structure is designed to serve this well.

Group Element

Sometimes we have to wait that a bunch of parallel operations are completed before we continue with a certain peace of work. This is what the Group workflow element does:

public override async void StartWork()
{
    base.TaskResult = new GroupTaskResult();
    this.TaskCompletedWorkHandler = taskBase => { Console.WriteLine("  Task workflow is complete"); };
    Console.WriteLine("  Group will be created");
    await new Group(
        group =>
            {
                Console.WriteLine();
                Console.WriteLine("    Requests inside group will be created");
                new HttpRequest(
                    new HttpRequestConfig
                        {
                            Url = new Uri("http://localhost:10080/"),
                            Quota = new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 },
                            Successful = request => { Console.WriteLine("    Group request #1 is successful"); }
                        });
                new HttpRequest(
                    new HttpRequestConfig
                        {
                            // Delayed response, shows that group waits until last request completes and that the other requests execute in paralell
                            Url = new Uri("http://localhost:10080/send-delayed"),
                            Quota = new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 },
                            Successful = request => { Console.WriteLine("    Group request #2 (delayed) is successful"); }
                        });
                new HttpRequest(
                    new HttpRequestConfig
                        {
                            Url = new Uri("http://localhost:10080/"),
                            Quota = new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 },
                            Successful = @request => { Console.WriteLine("    Group request #3 is successful"); }
                        });
            });
    Console.WriteLine();
    Console.WriteLine("  Group is successful, subsequent request will be created");
    var subsequentRequest = await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 });
    Console.WriteLine("  Subsequent request is successful");
}

We can simply await a Group and the workflow will wait till all the operations in the group are completed. There is no need for complicate synchronization mechanisms.

Retry Element

Some operations that possibly fail from time to time should be retried to complete them. If an access path is used, like a proxy or a datacenter or some other kind of cluster node, the access path should be switched before the operation is performed again. The retry itself is structured with the Retry element. Access paths are managed with an AccessPathManager that can work with the Retry element. This sample shows the basic construct:

public override async void StartWork()
{
    base.TaskResult = new RetryTaskResult();
    this.TaskCompletedWorkHandler = taskBase => { Console.WriteLine("  Task workflow is complete"); };
    Console.WriteLine("  Retry element will be created");
    await new Retry(
        10,
        async @retry =>
            {
                Console.WriteLine();
                Console.WriteLine("    Unreliable request will be created. Retry count: " + retry.ProcessingInfo.RetryCount);
                // This request is called with AwaitProcessingEnum.All, so we can analyze the failed requests. 
                var request = await new HttpRequest(new Uri("http://localhost:10080/unreliable"), 
                    new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 },
                    AwaitProcessingEnum.All );
                if (request.Success)
                {
                    Console.WriteLine("    Unreliable request is successful");
                }
                else
                {
                    Console.WriteLine("    Unreliable request has failed with status code: " + request.Response.StatusCode); 
                }
            });
    Console.WriteLine();
    Console.WriteLine("  Retry is successful, subsequent request will be created");
    await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 });
    Console.WriteLine("  Subsequent request is successful");
}

This sample is very very basic. The concept of a new attempt to perform a failed operation is much more complex. For example in the child workflow of the Retry element it is decidable if the retry should be performed or not.

Expense Element

Especially Google, but also Facebook, Twitter and many others -even the internal accounting- calculate costs on API calls or put cost limits on the APIs. The Expense workflow element is used to express ‘expenses’ of operations in a workflow. In fact this element declares the costs of operations in a workflow. These costs are aggregated and delivered in the OperationCosts property of the task result. This is an easy way to calculate costs depending on a concrete workflow and the status of the perfomed operations (success/failed). The whole mechanism is completely extendable (IOperationCost) by implementing own costs beyond the predefined general OperationCostUnits.

public override async void StartWork()
{
    base.TaskResult = new ExpenseTaskResult();
    this.TaskCompletedWorkHandler = taskBase => { Console.WriteLine("  Task workflow is complete"); };
    // this expense books 50 units and 100 units named as 'TaskCost' on success or failure (default is on success).
    await new Expense(
        new OperationCostCollection { new OperationCostUnits("TaskCost", 100), new OperationCostUnits(50) },BookCostsEnum.All,
        async outerExpense =>
            {
                // Somewhere in the workflow can be additional expenses, here we cover a request which costs 100 units on success
                await new Expense(
                    new OperationCostCollection { new OperationCostUnits(100) },
                    async expense =>
                        {
                            await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 });
                        });
                // Somewhere in the workflow can be additional expenses, here we cover a request which costs 200 units on failure but nothing on success
                await new Expense(
                    new OperationCostCollection { new OperationCostUnits(200) },BookCostsEnum.Failed,
                    async expense =>
                        {
                            await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 });
                        });
            });
}

 Beyond booking and aggregating costs for a certain workflow, the Expense element acts like a Group element. 

Limit Element

Many APIs have certain limits that should be respected. It is quite easy to violate these limits with the crawler engine, because the engine is capable to flood a service with thousands of requests. The Limit element is used to ensure that your workflow doesn’t violate the limits of the service or flood a web server with too many requests.

public override void StartWork()
{
    // You must always assign TaskResult on top of the StartWork() method , because without a TaskResult no exception can be returned in case of an error
    base.TaskResult = new LimitTaskResult();
    // The TaskCompletedWorkHandler is called when the workflow of the task is complete.
    this.TaskCompletedWorkHandler = taskBase => { Console.WriteLine("  Task workflow is complete"); };
    // The 'TestLimiter' is added to the CrawlerEngine and works global. So all Limit workflow elements must pass this limiter as a gate. 
    Console.WriteLine("  Creating Limits");
    for (int i = 0; i < 4; ++ i)
    {
        int limitedOperationNumber = Interlocked.Increment(ref nextLimitedOperationNumber);
        Console.WriteLine("  Creating Limit for Operation #" + limitedOperationNumber);
        new Limit(new LimitConfig { 
            LimiterName = "TestLimiter",
            StartWork = async limited =>
            {
                Console.WriteLine("  Limited Work #" + limitedOperationNumber + " started");
                Thread.Sleep(16); 
                var request = await new HttpRequest(new Uri("http://localhost:10080/"), new HttpRequestQuota { MaxDownloadSize = 100000, OperationTimeoutMilliseconds = 10000, ResponseTimeoutMilliseconds = 5000 });
                Console.WriteLine("  Limited Work #" + limitedOperationNumber + " ends");
            }, 
            Successful = limited =>
                {
                    Console.WriteLine("  Limited Work #" + limitedOperationNumber + " finished");
                } });
    }
    Console.WriteLine("  Creating Limits finished");
}

 The Limit element works against an Limiter that is instantiated once per engine. It is added to the engine with the AddLimiter() method. Limiters can limit the degree of parallelism, throughput or even operation costs that are expressed with an Expense elements.