An alternative to server-sent events

DEVELOPMENT -️ November 21, 2023

The Challenge

In my quest for an efficient way to stream data from a backend API endpoint to the frontend application, I sought an alternative to the widely-used WebSockets API.

Having heard of it before, my search led me to server-sent events ( SSE), which aligned with most of my requirements except one crucial aspect. The SSE specification assumes that the request will always be a GET request, a constraint which is also evident in EventSource API, in which you cannot change the HTTP verb used for the request.

My use-case involved sending a complex object to the endpoint when initializing the stream, a task that isn't well-suited for a query string format, which the SSE spec unfortunately imposes.

Possible solutions

I could have probably solved this problem by:

  • Creating a separate endpoint that accepts the complex object, saves the state in the backend, and returns a reference ID.
  • Using this reference ID in a GET endpoint as a lookup key for the data. Once retrieved, the stream can be initialized.

However, this approach has its drawbacks:

  • Additional I/O operations would be required to provision a record in a database or cache for each user initiating a streaming request.
  • The necessity of having an expiry date on this record, as it represents temporary state data that we don't want to permanently store.
  • Increased complexity in the frontend, as it requires extra steps like calling an additional endpoint, saving the reference token, initiating the streaming endpoint, and cleaning up the reference token when the user stops consuming the stream.

Due to these trade-offs, I decided to explore alternative implementations.

The alternative implementation

The implementation below is written in C# for the backend and vanilla JavaScript and HTML for the frontend.

Backend

The following implementation demonstrates a simple 'stock feed'. A user can filter a specific stock and receive a stream of randomized stock prices at random intervals. Although not a realistic use case, it offers an easy-to-understand example of the technical implementation.

The setup

We will be creating a few classes that define what kind of details we want to display regarding stocks.

public static class StockCodes
{
    public readonly static string[] All =
    {
        Google, Amazon, Nvidia, Meta, Tesla
    };

    public const string Google = "GOOGL";
    public const string Amazon = "AMZN";
    public const string Nvidia = "NVDA";
    public const string Meta = "META";
    public const string Tesla = "TSLA";
}


/// <summary>
/// Fictive stock price at a certain point in time
/// </summary>
/// <param name="Code">Stock NASDAQ Code</param>
/// <param name="Value">Stock price</param>
/// <param name="Date">Moment in time</param>
public sealed record Stock(string Code, decimal Value, DateTimeOffset Date);

Using the Stock class created above we'll be setting up a feed that holds our random generated stock values. We will be using a Bounded Channel with a limit of 5, for which the producers need to wait if the channel is full.

public class StockChannel
{
    public Channel<Stock> Stocks { get; }

    public StockChannel()
    {
        var channelOptions = new BoundedChannelOptions(5) // arbitrary limit for testing purposes
        {
            FullMode = BoundedChannelFullMode.Wait
        };

        Stocks = Channel.CreateBounded<Stock>(channelOptions);
    }
}

Next, we create a producer that generates random stock values, and puts it on our channel. The idea is that it selects a random stock, generates a random date, and a random price for the stock, every second.

We are inheriting from BackgroundService to create a service that initiates at the startup of our application.

To get the best asynchronous support for generating the 'every second tick', we use PeriodicTimer which was (apparently) released in .NET 6.

public sealed class StockProducingService(StockChannel stockChannel) : BackgroundService
{
    private readonly Random _rnd = new();

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();
        var tick = TimeSpan.FromSeconds(1);
        using PeriodicTimer timer = new(tick);

        while (!stoppingToken.IsCancellationRequested)
        {
            var stock = GenerateStock(_rnd);

            await stockChannel.Stocks.Writer.WriteAsync(stock, stoppingToken);
            await timer.WaitForNextTickAsync(stoppingToken);
        }

        stockChannel.Stocks.Writer.Complete();
    }

    private static Stock GenerateStock(Random rnd)
    {
        var dateTimeOffset = RandomDate(rnd);
        var stockCode = RandomStockCode(rnd);
        var price = RandomStockPrice(rnd);

        return new Stock(stockCode, price, dateTimeOffset);
    }

    private static decimal RandomStockPrice(Random rnd)
    {
        var pre = rnd.Next(0, 1000);
        var post = rnd.Next(1, 100);
        var stringified = $"{pre},{post}";
        return decimal.Parse(stringified);
    }

    private static string RandomStockCode(Random rnd) =>
        StockCodes.All[rnd.Next(0, StockCodes.All.Length)];

    private static DateTimeOffset RandomDate(Random rnd)
    {
        var now = DateTimeOffset.UtcNow;
        var yearAgo = DateTimeOffset.UtcNow.AddYears(-1);

        var timeSpan = now - yearAgo;
        var randomSpan = new TimeSpan(0, rnd.Next(0, (int)timeSpan.TotalMinutes), 0);

        return yearAgo.DateTime + randomSpan;
    }
}

Consuming the magic sauce

Having established the producing side, it's time to build the consuming side.

We'll start by creating a service that reads from our channel.

public sealed class StockFeedService(StockChannel stockChannel)
{
    public async IAsyncEnumerable<Stock> ListenToStockFilteredAsync(string code, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        await foreach (var item in stockChannel.Stocks.Reader.ReadAllAsync(cancellationToken))
        {
            if (string.Equals(code, item.Code, StringComparison.OrdinalIgnoreCase))
            {
                yield return item;
            }
        }
    }
}

The backbone of our streaming will be made by utilizing the streaming behavior of IAsyncEnumerable. This paired with the usage of a CancellationToken and the EnumeratorCancellation attribute will allow us to easily stop consuming when the user aborts the request.

In our implementation we will be filtering on the code we've defined in our Stock record, and only yielding a result when the code matches the code of the record that has been generated.

Bootstrapping all our code

We register the services we've made above in our Program.cs

  • StockChannel as a Singleton, to ensure everyone can consume the same exact feed.
  • StockProducingService as a HostedService so that the producing of our stocks starts up automatically.
  • StockFeedService as Scoped so that every request on the endpoint we will be building can consume seperately.
using System.Text.Json;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Net.Http.Headers;
using ServerSentEventsAlternative.Channels;
using ServerSentEventsAlternative.Mapping;
using ServerSentEventsAlternative.Models;
using ServerSentEventsAlternative.Services;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddRazorPages();
builder.Services.AddSingleton<StockChannel>();
builder.Services.AddHostedService<StockProducingService>();
builder.Services.AddScoped<StockFeedService>();

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();


app.UseHttpsRedirection();
app.UseDefaultFiles();
app.UseStaticFiles();

app.UseRouting();

app.UseSwagger();
app.UseSwaggerUI();

app.Run();

The API layer

We'll be writing an endpoint that would technically work for server-sent events, if it had a GET method instead of a POST method.

To expose our stock, we will be creating a few DTOs.

// Forces our DTOs to have a definition of the Event they represent, this is required due to the spec
public interface IServerSentEventData
{
    public string EventName { get; }
}

// The actual SSE wrapper that takes in the EventName from the DTO
public record ServerSentEvent<T>(string Id, T Data) where T : IServerSentEventData
{
    public string Event { get; init; } = Data.EventName;
}

// The DTO which we will expose. The EventName is JsonIgnored as we do not want to expose it in our JSON.
public record StockDto(string Code, decimal Value, DateTimeOffset Date) : IServerSentEventData
{
    [JsonIgnore]
    public string EventName { get; } = "Stock";
}

// The filter the user uses to filter on the right stock
public sealed record StockFilter(string Code);

We follow the spec by setting a few headers:

  • ContentType: 'text/event-stream'
  • CacheControl: 'no-cache'
  • Connection 'keep-alive'

Subsequently we aim to listen to our StockFeedService and map to our DTO with the ServerSentEvent wrapper around it. We are using this wrapper to match the Event stream format.

According to the spec, event streams are always encoded in UTF-8, and this is something C# will gladly do for us when calling the

        public static Task SerializeAsync<TValue>(
            Stream utf8Json,
            TValue value,
            JsonSerializerOptions? options = null,
            CancellationToken cancellationToken = default)
            

overload.

app.MapPost("stocks/sse",
    static async (
        HttpContext ctx,
        [FromBody] StockFilter filter,
        StockFeedService stockFeedService,
        CancellationToken ct = default
    ) =>
    {
        ctx.Response.Headers.Add(HeaderNames.ContentType, "text/event-stream");
        ctx.Response.Headers.Add(HeaderNames.CacheControl, CacheControlHeaderValue.NoCacheString);
        ctx.Response.Headers.Add(HeaderNames.Connection, "keep-alive");

        await foreach (var stock in stockFeedService.ListenToStockFilteredAsync(filter.Code, ct))
        {
            await JsonSerializer.SerializeAsync(ctx.Response.Body, stock.MapToServerSentEvent());
            await ctx.Response.WriteAsync(string.Format("{0}{0}", Environment.NewLine));
            await ctx.Response.Body.FlushAsync();
        }

        return ValueTask.CompletedTask;
    });

The frontend

We create a simple index.html file in the wwwroot folder of our project. The aim is to build a no-frills, no styling, simple streaming example, calling our POST stocks/sse endpoint.

To do this we will create:

  • an input in which the user can filter on a specific stock by code
  • a button which allows the user to start and stop the stream
  • div in which the streaming responses will be added
  • some accompanying JavaScript code to make it all work
<html lang="en">
<div class="container">
    <label for="code">
        <input type="text" id="code" name="code" value="GOOGL"/>
    </label>
    <button id="send">Send</button>
    <div class="response"></div>
</div>
<script>
    let controller = new AbortController();
    let signal = controller.signal;

    // execute our POST call using the correct headers
    async function fetchData() {
        let code = document.getElementById('code').value;
        let response = await fetch(`/stocks/sse`, {
            signal,
            method: 'POST',
            headers: {
                'Accept': 'text/event-stream',
                'Content-Type': 'application/json'
            },
            body: JSON.stringify({code})
        });

        if (!response.ok) {
            throw Error(response.statusText);
        }

        // keep reading the stream until it's empty
        for (const reader = response.body.getReader(); ;) {
            const {value, done} = await reader.read();

            if (done) {
                break;
            }

            const chunk = new TextDecoder().decode(value);
            // extract the data out of the event wrapper
            const subChunks = chunk.split(/(?<=})\n\ndata: (?={)/);

            for (const subChunk of subChunks) {
                // unwrap the chunk
                const payload = subChunk.replace(/^data: /, "");
                const parsed = JSON.parse(payload);

                // add the response to our DOM
                let listItem = document.createElement('li');
                listItem.textContent = JSON.stringify(parsed);
                document.querySelector('.response').appendChild(listItem);
            }
        }
    }

    // Stops the stream
    function stopFetch() {
        controller.abort();
        controller = new AbortController(); // re-instantiate AbortController for the next request
        signal = controller.signal;
        document.getElementById('send').textContent = 'Send';
        document.getElementById('send').addEventListener('click', handleSend);
    }

    // Start the stream
    async function handleSend() {
        document.getElementById('send').textContent = 'Stop';
        document.getElementById('send').removeEventListener('click', handleSend);
        document.getElementById('send').addEventListener('click', stopFetch);
        await fetchData();
    }

    document.getElementById('send').addEventListener('click', handleSend);
</script>
</html>

A few things to note here:

  • We are not using EventSource API as it does not allow us to use the POST http method.

    • This also comes with some disadvantages as we lose some nice features such as:

  • AbortController's signal allows us to easily abort our stream.
  • We do have some manual parsing work to do using a TextDecoder.

The result

After running our backend with dotnet run and pressing the start button, we get to witness the streaming experience. The result

The conclusion

In our quest to find an effective for our streaming requirements, we've successfully crafted a solution that employs POST requests instead of limiting us to GET requests. While this approach steers away from the conventional SSE specification, it satisfies our unique requirements, especially when dealing with complex objects during the streaming initialization.

Even though our implementation does have some downsides, especially the lack of certain desirable features inherent of the EventSource API, it offers a streamlined and efficient solution for this specific use case. A solution of which the downsides can be mitigated considerably through having robust documentation and clear communication within a or multiple development teams.

In conclusion, this alternative SSE solution presents a viable option when faced with unique development constraints, even though it does require us to do 'the extra plumbing' to make it all work :)

You can find the source code on my GitHub repository.

Initializing...