Skip to main content

Resilient Server-sent Events (SSE)

·4 mins
Kristof Kovacs
Author
Kristof Kovacs
Software Architect & DevOps Consultant

Hello, I’m Kristof, a human being like you, and an easy to work with, friendly guy.

I've been a programmer, a consultant, CIO in startups, head of software development in government, and built two software companies.

Some days I’m coding Golang in the guts of a system and other days I'm wearing a suit to help clients with their DevOps practices.

I'm a big fan of Server-sent Events (SSE), which are similar to Websockets. The main differences are:

  • SSE works only in one direction: from the server to the client. (AJAX calls can easily provide for the other direction.)
  • SSE is much simpler. You don't need a complex library on the server-side, nor on the client-side (I'm looking at you, Socket.io). You could do SSE in CGI with perl or bash scripts, if you had to.
  • SSE is safer with proxies. Websockets has its own protocol (ws:), which is still not always supported by proxies and reverse proxies.
  • SSE is more resilient. It has embedded support for basic reconnection (and we will improve on it in this article).
  • SSE has the drawback of hogging one browser connection out of six (if you're using HTTP/1.x) or of one hundred (if you are using HTTP/2 or higher).

Ably has a very good article comparing SSE and Websockets.

In a recent project, I had to create a very resilient stream, which:

  1. if disconnected, it keeps trying to reconnect indefinitely (even on errors), and
  2. on reconnection, it remembers the last message (time), and queries the server for any updates since.
UPDATE: While Turbo.js – for which this article was written – is a great technology, nowadays I'd recommend you to go with HTMX instead, unless you are working in Ruby, which is Turbo.js's native territory. Also, HTMX has it's own implementation of the resiliency described here.

(In this particular project, I was using SSE as a delivery mechanism for Turbo Streams, so some lines in the following examples call Turbo or use Content-Type: vnd.turbo-stream.html, but you don't have to. I'm marking these with XXX.)

Wire protocol #

Here's how an SSE connection would look like in telnet, receiving three messages:

The HTTP request:

GET /app/sse HTTP/1.1
Host: example.com

The HTTP response:

HTTP/1.1 200
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked

data: first message

data: another message
data: with two lines

data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

You can do extra things like named events with event: and it has a built-in followup mechanism using id: which automatically generates a Last-Event-ID header on reconnect.

For more information on SSE protocol message format, read this.

Our approach to reconnection only differs in two things:

  1. Ours keeps reconnecting even if the client received an error from the server. SSE doesn't do this on his own.
  2. Ours is a time-based followup, not event-id based.

Client-side javascript #

This is the client-side Javascript:

// Installs the handler for the Turbo Stream Source.
let eventsource;
function InstallTurboStreamSource() {
    // Avoid double installation
    if (eventsource != null) {
        return;
    }

    // Place to save the time of the last message
    // NOTE: Instead of client-side JS, this initial value is even better when
    // generated server-side.
    let inactivitysince = Math.floor(new Date() / 1000);

    // Create our event source
    function NewEventsource() {
        eventsource = new EventSource("/app/sse/");
        fetch("/app/since/" + inactivitysince + "/")
            .then((r) => r.text())
            .then((html) => Turbo.renderStreamMessage(html)); // XXX Do your own stuff here

        // Update the "latest time we got something" on every message
        eventsource.addEventListener("message", (e) => {
            inactivitysince = Math.floor(new Date() / 1000);
        });
        // If it ever disconnects, start a retry
        eventsource.addEventListener("error", (e) => {
            eventsource.close();
            // Connect us again after a small wait
            setTimeout(NewEventsource, 2000);
        });
        // Close SSE on page unload
        window.addEventListener("beforeunload", (e) => {
            eventsource.close();
        });
        // Connect the EventSource to Turbo
        Turbo.session.connectStreamSource(eventsource); // XXX Do your own stuff here
    }
    NewEventsource();
}

Server-side golang #

Here is the golang function for the popular gin-gonic web framework for sending SSE events (no library required, just pure go):

router.GET("/app/sse/", func(c *gin.Context) {
    c.Writer.Header().Set("Content-Type", "text/event-stream")
    c.Writer.Header().Set("Cache-Control", "no-cache")
    c.Writer.Header().Set("Connection", "keep-alive")
    c.Writer.Header().Set("Transfer-Encoding", "chunked")
    c.Writer.Flush()

    log.Println("Client connected to SSE")

    recv := make(chan string)
    Subscribe(recv) // Subscribe to your data source

    clientGone := c.Writer.CloseNotify()
    for {
        select {
        case <-clientGone:
            Unsubscribe(recv)
            log.Println("Client gone from SSE")
            return
        case message := <-recv:
            log.Printf("Sending SSE stream to client for message %s", message)

            // Get data from DB
            data := [...]

            // Render template
            // NOTE: A bit hack-ish way to capture the output of the function that expects a ResponseWriter
            w := httptest.NewRecorder()
            err = router.HTMLRender.Instance("app-stream.html", gin.H{
                "data": data,
            }).Render(w)
            if err != nil {
                c.AbortWithError(http.StatusBadRequest, err)
                return
            }

            // format it as SSE data
            ssedata := bytes.ReplaceAll(w.Body.Bytes(), []byte("\n"), []byte("\ndata: "))

            // Send response
            _, err = c.Writer.Write(ssedata)
            if err != nil {
                c.AbortWithError(http.StatusBadRequest, err)
                return
            }
            _, err = c.Writer.Write([]byte("\n\n"))
            if err != nil {
                c.AbortWithError(http.StatusBadRequest, err)
                return
            }
            c.Writer.Flush()
        }
    }
})

And while it's fairly easy, for reference here is the go function that handles the "catch me up on the events since..." call:

// Event stream by changed time
router.GET("/app/since/:unixtime/", func(c *gin.Context) {
		// Get filters
		unixtime_str := c.Param("unixtime")
		unixtime, err := strconv.Atoi(unixtime_str)
		if err != nil {
				c.AbortWithError(http.StatusBadRequest, err)
				return
		}

		// Load data
		data := ...

		// Send response
		c.HTML(http.StatusOK, "app-stream.html", gin.H{
				"data": data,
		})
})

Auth0 also has a very detailed article on SSE using React on the client and NodeJS on the server.

Germano has another good article using Python.


Questions? Comments? Insults?
Feel free to drop me a line below, I love getting messages!