Concurrent HTTP Requests with System.Threading.Channels
The .NET Core team quietly landed the System.Threading.Channels namespace in corefx. They're quite similar to channels from Go: They allow one or more writers to send multiple results to one or more readers.
Recently I had to send out multiple HTTP requests, and process their responses as they come in (and they could complete in an arbitrary order). Channels work quite nicely here. My previous solution was mapping the List<Task<T>>
to a List<TaskCompletionSource<T>>
, and using a helper method that completes the TaskCompletionSources in order, like the Interleaved method described by the .NET team here.
As an example, let's say we want a map of New Zealand's North Isle. Using a tile server, we can download the 4 tiles that make up the North Island at zoom level 6, and combine them with a drawing library. I'll use the built-in HttpClient
for downloading the tiles, and SkiaSharp for drawing them.
The 4 tiles are:
The complete demo solution is available on GitLab. You must add the NuGet packages SkiaSharp
and System.Threading.Channels
. The latter is already included in the ASP.NET Core SDKs, but console apps based on the regular .NET Core SDK don't come with that package by default.
Let's begin by writing a class that holds our tile metadata:
public class Tile
{
public int X { get; }
public int Y { get; }
public string Url { get; }
public Tile(int x, int y, string url)
{
X = x;
Y = y;
Url = url;
}
public override string ToString() => $"({X}/{Y})";
}
and setting up an array of Tiles we need to download and draw:
var tiles = new[]
{
new Tile(0, 0, "https://a.tile.openstreetmap.org/6/62/38.png"),
new Tile(1, 0, "https://a.tile.openstreetmap.org/6/63/38.png"),
new Tile(0, 1, "https://a.tile.openstreetmap.org/6/62/39.png"),
new Tile(1, 1, "https://a.tile.openstreetmap.org/6/63/39.png")
};
We need the X and Y properties to draw the tile in the correct position. The origin is the top left corner in SkiaSharp.
Next, we'll create an HTTP client and a channel:
var httpClient = new HttpClient(new HttpClientHandler { MaxConnectionsPerServer = 2 });
var channel = Channel.CreateUnbounded<(Tile, SKImage)>(new UnboundedChannelOptions
{
// Both options default to false
SingleReader = true,
SingleWriter = false
})
We need both the image and the coordinates of the image. That's why the channel is of type (Tile, SKImage)
. Setting the SingleReader
and SingleWriter
options properly can unlock some optimizations in the Channel implementation.
Now let's queue up the tile downloads:
foreach (var tile in tiles)
{
DownloadTile(channel.Writer, httpClient, tile);
}
I'll show the implementation of the DownloadTile
method later, we're going to stay on the reading side for now.
We'll create a surface to draw on, wait for the tiles to come in, and draw them:
var surface = SKSurface.Create(new SKImageInfo(512, 512));
for (var i = 0; i < tiles.Length; i++)
{
// Wait for the next tile
var (tile, image) = await channel.Reader.ReadAsync();
Console.WriteLine($"Drawing {tile}");
surface.Canvas.DrawImage(image, tile.X * 256, tile.Y * 256);
image.Dispose();
}
Each tile from the OSM tile servers is 256x256 in size. We want to draw a 2x2 grid of them, so the entire image is 512x512 in size. Since we enqueued tiles.Length
tile downloads, we'll loop until we received tiles.Length
items from the channel.
Let's save the final image:
using (var snapshot = surface.Snapshot())
using (var data = snapshot.Encode())
using (var file = File.OpenWrite("NorthIsland.png"))
{
data.SaveTo(file);
}
Now the writer side - the DownloadTile
method:
async void DownloadTile(ChannelWriter<(Tile, SKImage)> writer, HttpClient httpClient, Tile tile)
{
Console.WriteLine($"Starting download of {tile}");
// We're in an async void method, so we better not let an exception go unobserved.
try
{
var bytes = await httpClient.GetByteArrayAsync(tile.Url);
var image = SKImage.FromEncodedData(bytes);
Console.WriteLine($"Downloaded {tile}");
// Send the tile to the reader
await writer.WriteAsync((tile, image));
}
catch
{
// Exception handling is left as an exercise to the reader
await writer.WriteAsync((tile, null));
}
}
If you would want to download only one tile, you would make the method an async Task<(Tile, SKImage)>
, and replace the await writer.WriteAsync()
call with a simple return. In my example, the Channel replaces a Task as the way the result is reported to the caller.
Also, WriteAsync
blocks until there is space in the channel's internal buffer to accept a new item. Since I created an unbounded channel (with Channel.CreateUnbounded
), that buffer will never be full, WriteAsync
will never block, and I could replace it with writer.TryWrite()
.
Also also, since this is an async void
method unhandled exceptions will crash the entire process. In practice, you'd probably want to send the exception into the channel and let the caller handle it.
The program outputs this (order may vary):
Starting download of (0/0)
Starting download of (1/0)
Starting download of (0/1)
Starting download of (1/1)
Downloaded (0/0)
Downloaded (1/0)
Drawing (0/0)
Drawing (1/0)
Downloaded (0/1)
Drawing (0/1)
Downloaded (1/1)
Drawing (1/1)
For more information, see the documentation or another blog post.