Async Processing Pipeline with Rx.Net

I was looking for the easiest way to process task asynchronously, but keep the order. However, implementing something on top of the BlockingCollection<T> seemed a bit of an overkill.

Simple async/await

Instead, the original implementation was similar to:

backgroundTasks.Add(LongProcessAsync());

And later on I could await all or some of the tasks in the backgroundTasks list. But this simple approach has some disadvantages:

  • The order is completely random; some tasks started later can finish sooner than others
  • If you want to trigger further code right after the task was finished, you need to adjust LongProcessAsync as anything else would be too complicated
  • Don't even get me started on error handling

So the simplest option is clearly not a viable one. For UI development I'm using Angular with Ngrx, so I'm familiar with RxJS. I was wondering, maybe Rx.Net would provide a nice solution. RxJS has a cool method called concatMap, which sould theoretically do exactly that. If you visit the ReactiveX documentation on those operators you'll find mentions of SelectMany and ManySelect from Rx.Net, but those aren't exactly doing the same thing. Instead there are .Merge(...) and .Concat(...) operators which can be added on top of a nested Observable (IObservable<IObservable<T>> flattened to a simple IObservable<T>).

Rx.Net Concat

So clearly, .Merge(...) doesn't keep the order, but .Concat(...) should. So I tried it out:

var kindaInOrder = Observable.Range(1, 5)
    .Select(x => LongProcessAsync(x).ToObservable())
    .Concat()
    .Do(x => Console.WriteLine($"{x} concatenated"));

await kindaInOrder.RunAsync(CancellationToken.None);

No this works great, but what was surprising to me (might be totally obvious to you), LongProcessAsync was still processed fully in parallel. So if this is the implementation of LongProcessAsync:

static async Task<int> LongProcessAsync(int n)
{
    Console.WriteLine($"Job {n} started");
    await Task.Delay(TimeSpan.FromSeconds(5 - n));
    Console.WriteLine($"Job {n} done");

    return n;
}

Then, the output will be:

Job 1 started
Job 2 started
Job 3 started
Job 4 started
Job 5 started
Job 5 done
Job 4 done
Job 3 done
Job 2 done
Job 1 done
1 concatenated
2 concatenated
3 concatenated
4 concatenated
5 concatenated

As you can see, all the jobs were started in parallel, and they finish in the opposite order. While the rest of the observable will be executed in the correct order, I also require the LongProcessAsync to be executed in order. Unfortunately, I haven't found any way to achieve this using only Rx.Net.

Rx.Net Concat with Semaphore

The only solution I've found, was to adjusted LongProcessAsync to include a semaphore, like this:

static async Task<int> OrderedLongProcessAsync(int n)
{
    await semaphore.WaitAsync();
    try
    {
        return await LongProcessAsync(n);
    }
    finally
    {
        semaphore.Release();
    }
}

Now, using a semaphore is not ideal. In fact, I'd have prefered using a simple lock, but that's not possible with C#, because it's potentially dangerous. (See https://stackoverflow.com/a/7612714/621366 to find out more.) But at least now, I get the output, I expect (it's exactly the same Rx.Net pipeline, just LongProcessAsync exchanged for OrderedLongProcessAsync):

Job 1 started
Job 1 done
Job 2 started
1 concatenated
Job 2 done
2 concatenated
Job 3 started
Job 3 done
3 concatenated
Job 4 started
Job 4 done
Job 5 started
Job 5 done
4 concatenated
5 concatenated

So now, Job 2 doesn't start until Job 1 is done. This is great, this is exactly what I was looking for. As a reminder again, this is the Rx.Net pipeline setup:

var kindaInOrder = Observable.Range(1, 5)
    .Select(x => OrderedLongProcessAsync(x).ToObservable())
    .Concat()
    .Do(x => Console.WriteLine($"{x} concatenated"));

await kindaInOrder.RunAsync(CancellationToken.None);

You can exend it in any further ways after .Concat(), where you can use more async tasks or not, with or without a semaphore.

Bonus - Rx.Net with Merge

Just FYI, if you were wonderin, how to have the performance benefits of a simple backgroundTasks.Add(LongProcessAsync()); and a bit more structure using Rx.Net, you can use .Merge():

var outOfOrder = Observable.Range(1, 5)
    .Select(x => LongProcessAsync(x).ToObservable())
    .Merge()
    .Do(x => Console.WriteLine($"{x} concatenated"));

await outOfOrder.RunAsync(CancellationToken.None);

This is now going to finish up the fastest tasks first:

Job 1 started
Job 2 started
Job 3 started
Job 4 started
Job 5 started
Job 5 done
5 concatenated
Job 4 done
4 concatenated
Job 3 done
3 concatenated
Job 2 done
2 concatenated
Job 1 done
1 concatenated

A summary of all Rx.Net examples can be found on dotnetfiddle: https://dotnetfiddle.net/VSgT0J