Soluto has Technicians that offer online support for customers. A customer can call on any issue he experiences with one of his smart devices and gets instant help. We develop tools that help our technicians to have fast and quality support sessions. To achieve this, we need a communication channel between the two ends – the technician website and the customer’s phone.

A real-time channel between two ends on the internet can be tricky. Customers can be connected to any network or hiding behind routers and firewalls. There are two approaches for implementing such a solution: One is a peer to peer connection between the device and the technician web client. This P2P solution seems ideal at first, but even the best P2P solutions have up to 90% success rate of linking the two ends. In order to cover the whole 100% we chose the second solution that relies on an external server to link both ends and streams the data between them – Relay Server.

To make this kind of servers reliable and cheap, we need each instance of this server to be as light as we can; light on memory and CPU. It should handle as many sessions as possible, and its bottleneck should be network bandwidth.

Let’s start with a simple TCP Server:

while (true)
{
	var client = tcpListener.AcceptTcpClient();
	new Thread(HandleClient).Start(client);
}

A separate thread is allocated for each client. Threads consume resources and the number of clients that can connect to one instance of a server will be limited in the number of threads an instance can have. A much more lite solution will be to use asynchronous programming.

while (true)
{
	var client = await tcpListener.AcceptTcpClientAsync();
	HandleClient(client);
}

Where HandleClient is an async function which is not awaited. This way the threads will be reused when I/O operations are awaited and the server will handle much more concurrent clients.

We need some session identification, so we can group two separate ends. Each client can be connected with a Session Id as an HTTP URL parameter (http://myrelayserver.com?sessionId=[sessionId]). We can use Websockets. Websockets runs over Http and designed to manage long lasting connections.

while (true)
{
	var context = await httpListener.GetContextAsync();
	var socketContext = await context.AcceptWebSocketAsync(null);
	HandleClient(context.Request.QueryString["sessionId"], socketContext.WebSocket);
}

When the first client connects with a session id, it should wait until the other client with the same session id connects. When the second client connects, the two streams should be linked together. We can use a map of a Session Id as key and the Websocket itself as value. Let’s ignore error handling and concurrency issues for now.

ConnectedClients =  new Dictionary<string, Stream>();

async Task HandleClient(string sessionId, WebSocket webSocket)
{
	WebSocket otherClientWebsocket;
	var isOtherClientConnected = ConnectedClients.TryGetValue(sessionId, out otherClientWebsocket);
	if (!isOtherClientConnected)
	{
		ConnectedClients.Add(sessionId, webSocket);
		Return;
	}

	await Task.WhenAny(Relay(webSocket, otherClientWebsocket),Relay(otherClientWebsocket, webSocket));
}


Where the Relay function will be this:

async Task Relay(WebSocket source, WebSocket destination)
{
	var bytes = new byte[1024];
	while (true)
	{
		var result = await source.ReceiveAsync(new ArraySegment(bytes), CancellationToken.None);
		If (result == 0) break;
		await destination.SendAsync(new ArraySegment(bytes, 0, result.Count),
		WebSocketMessageType.Binary, false, CancellationToken.None);
	}
}

That will work and it’s actually a pretty light weighted solution. Now let’s try it with Rx.

Observable.FromAsync(httpListener.GetContextAsync)

Observable.FromAsync can be used when you want to start an observable from an async operation. The result of the async task will be emitted as an Observable item.

.SelectMany(async context => new
{
	SessionId = context.Request.QueryString["sessionId"],
	WebSocket = (await context.AcceptWebSocketAsync(null)).WebSocket
})

SelectMany/flatMap is used to change every emitted item to zero-to-many observables that emit same type. All will be flattened to one Observable. Other useful behavior of this operator is that it flatten the Task if you use an async function/lambda to create your observables. Without this behaviour the next item would have been a Task of the anonymous type we created.

The above code is similar to what we did in the non-Rx solution inside the while loop, accepting clients and extracting the Session Id from the requests. But we still missing the loop.

.Repeat()

The Repeat operator will repeat the first item emitter (httpListener.GetContextAsync) which allow the server to continue listening for more clients.

.GroupBy(sessionSocket => sessionSocket.SessionId,
                    sessionSocket => sessionSocket.WebSocket)

Do you remember how we used a dictionary to wait for two clients with the same Session Id to connect? This simple GroupBy expression does the trick. The next item will be an observable of a specific session Websockets! And there won’t be any concurrency issues.

.SelectMany(groupedSockets =>groupedSockets.Take(2).ToArray())

The emitted groupSockets item is actually an Observable of Websockets. When a new observable is created we should always think if and when it should be completed. Since we need to wait for only two sessions, we can safely complete it after two clients are connected. That’s exactly what the Take(2) expression does.

The ToArray operator takes all the elements of the completed observable and creates an aggregated array item that contains all the elements. All we need to do next is to Relay their streams.

.SelectMany(sockets => Observable.Merge(
                    Relay(sockets[0], sockets[1]),
                    Relay(sockets[1], sockets[0])))

From the two sockets we create a merged observable of two Relay Observables. I will explain soon why the Relay function returns an observable. Before this, notice the Merge function which takes two Observables that emit the same type of item into one observable.

Keep in mind that every Observable won’t start to emit items until it is subscribed. For every subscriber we should ask ourselves when it is completed, when we should dispose it and how to handle its errors. Therefore, A best practice will be to maintain as less subscribers as possible. In our case we can use only one subscriber.

Now let’s get back to the Relay function. Since we want one subscription we can achieve this by returning an Observable. An Observable that will be subscribed since the Merge Observable will be subscribed. The Relay will be a function which will do the same as the old Relay function, but now returns an Observable.

This will be its signature:

IObservable Relay(WebSocket source, WebSocket destination)

We use the Unit type when we don’t care of the item emitted by the observable.

var bytes = new byte[1024];
var arraySegment = new ArraySegment(bytes);
return Observable.FromAsync(() => source.ReceiveAsync(arraySegment, CancellationToken.None))
	.Repeat()
	.DoWhile(result => result.Count > 0)
	.SelectMany(result => 
	{
		destination.SendAsync(new ArraySegment(arraySegment.ToArray(), 0, result.Count),
		WebSocketMessageType.Binary, false, CancellationToken.None);
		return Unit.Default;
	});

We again use the very useful FromAsync function to start a ‘receiving’ observable, and the Repeat operator to do it in a loop. The DoWhile completes the observable when no bytes were read (socket closed). The SelectMany is used to run the async task of sending the received data to the other end, and returns the Unit type.

Final Source Code


var httpListener = new HttpListener();
httpListener.Prefixes.Add("http://*:2222/");
httpListener.Start();

Observable.FromAsync(httpListener.GetContextAsync)
	.SelectMany(async context => new
	{
		SessionId = context.Request.QueryString["sessionId"],
		WebSocket = (await context.AcceptWebSocketAsync(null)).WebSocket
	})
	.Repeat()
	.GroupBy(sessionSocket => sessionSocket.SessionId,
		sessionSocket => sessionSocket.WebSocket)
	.SelectMany(groupedSockets => groupedSockets.Take(2).ToArray())
	.SelectMany(sockets => Observable.Merge(Relay(sockets[0], sockets[1]),
		Relay(sockets[1], sockets[0])))
	.Subscribe();

static IObservable Relay(WebSocket source, WebSocket destination)
{
	var bytes = new byte[1024];
	var arraySegment = new ArraySegment(bytes);
	return Observable.FromAsync(() => source.ReceiveAsync(arraySegment, CancellationToken.None))
		.Repeat()
		.DoWhile(result => result.Count > 0)
		.SelectMany(result => 
		{
			destination.SendAsync(new ArraySegment(arraySegment.ToArray(), 0, result.Count),
			WebSocketMessageType.Binary, false, CancellationToken.None);
			return Unit.Default;
		});
}

That is 30 lines, which I find more elegant. The solution lacks error handling and logging, (which will also be relatively easy with Rx), but I hope that’s give you the idea of how Rx can be used to create an easy and elegant solutions and how we can make a simple and efficient asynchronous Relay servers.