00%
blog.info()
← Back to Home
SEQUENCE // Building The Blog

Adding Active Metrics using SSE

Author Thorn Hall
0

I wanted to display stats about the application in "semi-real time." That is, every few seconds, my server will tell your browser stats about its runtime, and your browser will display them. To do this I used Server Sent Events (SSE).

SSE

SSE is extremely useful for one-way communication from the server to the client. It only relies upon existing HTTP structure, so no extra infrastructure is required. Most, if not all modern programming languages support it in some way, and most modern browsers support it.

What is SSE under the hood? It's really just a long-lived HTTP connection, which is why it has these qualities.

How It's Implemented

SSE is almost implemented the same way as a regular API Handler, which I've covered in other articles. The main differences are:

  • Specific HTTP headers are used to mark the connection as a "long-lived" SSE connection
  • The connection remains opened, with data being periodically "flushed" to the client

Let's go over the specific code.

First, we'll talk about the router.

 1func New(h *handler.Handler, log *slog.Logger, publicDir string) http.Handler {
 2	appMux := http.NewServeMux()
 3	appMux.HandleFunc("POST /api/likes/{slug}", h.HandleLike)
 4	appMux.HandleFunc("GET /api/stats/{slug}", h.HandleGetStats)
 5	appMux.HandleFunc("POST /api/views/{slug}", h.HandleView)
 6
 7	fs := http.FileServer(http.Dir(publicDir))
 8	assetsFs := http.FileServer(http.Dir("./assets"))
 9	appMux.Handle("GET /assets/", http.StripPrefix("/assets/", assetsFs))
10	appMux.Handle("GET /", fs)
11
12	// Wrap all routes except SSE in middleware
13	var appHandler http.Handler = appMux
14	appHandler = middleware.WithLogger(appHandler, log)
15	appHandler = middleware.WithRecover(appHandler, log)
16
17	// SSE gets its own handler to avoid middleware which breaks it
18	rootMux := http.NewServeMux()
19	rootMux.HandleFunc("GET /api/streams/stats", h.HandleStreamStats)
20	rootMux.Handle("/", appHandler)
21	muxWithRecover := middleware.WithRecover(rootMux, log)
22
23	return muxWithRecover
24}

The important thing to note here is that the SSE handler only gets specific middleware applied to it (in this case panic recovery - without it, it could crash our server.) Other middleware can interfere with the SSE connection, so we exclude it.

Now the handler itself:

 1func (h *Handler) HandleStreamStats(w http.ResponseWriter, r *http.Request) {
 2	w.Header().Set("Content-Type", "text/event-stream")
 3	w.Header().Set("Cache-Control", "no-cache")
 4	w.Header().Set("Connection", "keep-alive")
 5	w.Header().Set("Access-Control-Allow-Origin", "*")
 6	w.Header().Set("X-Accel-Buffering", "no")
 7
 8	flusher, ok := w.(http.Flusher)
 9	if !ok {
10		http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
11		return
12	}
13
14	flusher.Flush()
15
16	ticker := time.NewTicker(5 * time.Second)
17	defer ticker.Stop()
18
19	sendStats := func() error {
20		var m runtime.MemStats
21		runtime.ReadMemStats(&m)
22
23		var dbSizeStr string
24		fileInfo, err := os.Stat("./blog.db")
25		if err == nil {
26			dbSizeStr = fmt.Sprintf("%.2f", float64(fileInfo.Size())/1024/1024)
27		} else {
28			dbSizeStr = "0.00"
29		}
30
31		stats := SysStats{
32			Uptime:     time.Since(StartTime).Round(time.Second).String(),
33			MemoryMB:   m.Alloc / 1024 / 1024,
34			Goroutines: runtime.NumGoroutine(),
35			DbSizeMB:   dbSizeStr,
36		}
37
38		data, _ := json.Marshal(stats)
39
40		_, err = fmt.Fprintf(w, "data: %s\n\n", data)
41		if err != nil {
42			return err
43		}
44
45		flusher.Flush()
46		return nil
47	}
48
49	if err := sendStats(); err != nil {
50		return
51	}
52
53	for {
54		select {
55		case <-r.Context().Done():
56			return
57		case <-ticker.C:
58			if err := sendStats(); err != nil {
59				return
60			}
61		}
62	}
63}

As I mentioned previously, we need to set specific headers to denote this as an SSE connection.

1w.Header().Set("Content-Type", "text/event-stream")
2w.Header().Set("Cache-Control", "no-cache")
3w.Header().Set("Connection", "keep-alive")
4w.Header().Set("Access-Control-Allow-Origin", "*")
5w.Header().Set("X-Accel-Buffering", "no")

Then we check that the client we're connected to actually supports SSE - not all do. Afterwards, we immediately flush the connection to establish it.

1flusher, ok := w.(http.Flusher)
2if !ok {
3	http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
4	return
5}
6flusher.Flush()

sendStats collects and formats the necessary data. The most important line to remember in that function is the following:

1_, err = fmt.Fprintf(w, "data: %s\n\n", data)
2flusher.Flush()

SSE expects the data to be formatted this way when flushing it to the client. Flushing the data then keeps the connection open but still pushes the data to the user.

Finally, we create the loop that sends data to the client on an interval:

 1for {
 2	select {
 3	case <-r.Context().Done():
 4		return
 5	case <-ticker.C:
 6		if err := sendStats(); err != nil {
 7			return
 8		}
 9	}
10}

select will block until a value is received from one of the channels in the case statement.

Our ticker will send data through the channel on the exact interval we specify.

Server Changes

We need to change a little bit about how we instantiate our server when doing SSE.

First, we need to set the write timeout to either 0, or a large value to prevent the SSE connection from being terminated.

 1return &http.Server{
 2	Addr:              ":8080",
 3	Handler:           mux,
 4	ReadTimeout:       10 * time.Second,
 5	IdleTimeout:       120 * time.Second,
 6	WriteTimeout:      0,
 7	ReadHeaderTimeout: 5 * time.Second,
 8	BaseContext: func(l net.Listener) context.Context {
 9		return ctx
10	},
11}

Because we're creating a potentially infinite loop in our handler, we need a way of cancelling it. You may have noticed in the select statement, one of the cases is this:

1case <-r.Context().Done():
2	return

This tells the function that if the enclosing context is cancelled, to also cancel the loop. However, we just modified our write timeout to let connections stay forever, so the loop may never terminate unless the connection remains idle for 2 minutes, as defined by our IdleTimeout.

To gracefully close these connections on shutdown, we need to modify the BaseContext of the HTTP server. This is so we can tell the server to close connections based on our own custom context. To do this, we pass our custom context into the server creation function and set the BaseContext property of the http.Server to that of our custom context.

 1func NewServer(ctx context.Context, publicDir, domain string) *http.Server {
 2	logger := logging.New(os.Stdout)
 3	database := db.New()
 4	rep := repo.New(database)
 5	hnd := handler.New(rep, logger, publicDir)
 6	mux := router.New(hnd, logger, publicDir)
 7
 8	if _, err := database.Exec("PRAGMA journal_mode=WAL;"); err != nil {
 9		logger.Error("failed to enable WAL mode", "error", err)
10	}
11
12	if domain != "" {
13		logger.Info("configuring production server (HTTPS)", "domain", domain)
14
15		certManager := autocert.Manager{
16			Prompt:     autocert.AcceptTOS,
17			HostPolicy: autocert.HostWhitelist(domain, "www."+domain),
18			Cache:      autocert.DirCache("certs"),
19		}
20
21		go func() {
22			logger.Info("starting http redirect server", "addr", ":80")
23			if err := http.ListenAndServe(":80", certManager.HTTPHandler(nil)); err != nil {
24				logger.Error("redirect server failed", "error", err)
25			}
26		}()
27
28		return &http.Server{
29			Addr:    ":443",
30			Handler: mux,
31			TLSConfig: &tls.Config{
32				GetCertificate: certManager.GetCertificate,
33				MinVersion:     tls.VersionTLS12,
34			},
35			ReadTimeout:       10 * time.Second,
36			IdleTimeout:       120 * time.Second,
37			WriteTimeout:      0,
38			ReadHeaderTimeout: 5 * time.Second,
39			BaseContext: func(l net.Listener) context.Context {
40				return ctx
41			},
42		}
43	}
44
45	logger.Info("configuring development server (HTTP)", "addr", ":8080")
46	return &http.Server{
47		Addr:              ":8080",
48		Handler:           mux,
49		ReadTimeout:       10 * time.Second,
50		IdleTimeout:       120 * time.Second,
51		WriteTimeout:      0,
52		ReadHeaderTimeout: 5 * time.Second,
53		BaseContext: func(l net.Listener) context.Context {
54			return ctx
55		},
56	}
57}

Now in our main function, we create a custom context which we pass down to the server creation:

1engineCtx, cancelEngine := context.WithCancel(context.Background())
2defer cancelEngine()
3
4domain := os.Getenv("DOMAIN")
5srv := NewServer(engineCtx, "./public", domain)

Now, when we receive a shutdown signal on our server, we cancel the engine context, which will propogate all the way down to the handler function.

 1shutDownChan := make(chan os.Signal, 1)
 2signal.Notify(shutDownChan, syscall.SIGINT, syscall.SIGTERM)
 3<-shutDownChan
 4
 5shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 6defer cancel()
 7cancelBackup()
 8cancelEngine()
 9
10if err := srv.Shutdown(shutdownCtx); err != nil {
11	log.Fatalf("unable to shutdown server gracefully: %v", err)
12}

This allows our server to shutdown gracefully even if there are open connections.

Conclusion

Now we have active server metrics for users to see in near-real time. It was a blast implementing this and in the future I may implement more features with SSE.

View Abstract Syntax Tree (Build-Time Generated)
Document
Paragraph
Text "I wanted to display stats a..."
Text " browser"
Text "stats about its runtime, an..."
Text "(SSE)."
Heading
Text "SSE"
Paragraph
Text "SSE is extremely useful for..."
Text " it."
Paragraph
Text "What is SSE under the hood?..."
Text " qualities."
Heading
Text "How It's"
Text " Implemented"
Paragraph
Text "SSE is almost implemented t..."
Text " are:"
List
ListItem
TextBlock
Text "Specific HTTP headers are u..."
Text " connection"
ListItem
TextBlock
Text "The connection remains open..."
Text " client"
Paragraph
Text "Let's go over the specific"
Text " code."
Paragraph
Text "First, we'll talk about the"
Text " router."
FencedCodeBlock code: "func New(h *handl..."
Paragraph
Text "The important thing to note..."
Text " it."
Paragraph
Text "Now the handler"
Text " itself:"
FencedCodeBlock code: "func (h *Handler)..."
Paragraph
Text "As I mentioned previously, ..."
Text " connection."
FencedCodeBlock code: "w.Header().Set("C..."
Paragraph
Text "Then we check that the clie..."
Text " do."
Text "Afterwards, we immediately ..."
Text " it."
FencedCodeBlock code: "flusher, ok := w...."
Paragraph
CodeSpan
Text "sendStats"
Text " collects and formats the n..."
Text " following:"
FencedCodeBlock code: "_, err = fmt.Fpri..."
Paragraph
Text "SSE expects the data to be ..."
Text " client."
Text "Flushing the data then keep..."
Text " user."
Paragraph
Text "Finally, we create the loop..."
Text " interval:"
FencedCodeBlock code: "for { "
Paragraph
CodeSpan
Text "select"
Text " will block until a value i..."
CodeSpan
Text "case"
Text " statement."
Paragraph
Text "Our "
CodeSpan
Text "ticker"
Text " will send data through the..."
Text " specify."
Heading
Text "Server"
Text " Changes"
Paragraph
Text "We need to change a little ..."
Text " SSE."
Paragraph
Text "First, we need to set the w..."
Text " terminated."
FencedCodeBlock code: "return &http.Serv..."
Paragraph
Text "Because we're creating a po..."
Text ""
Text "You may have noticed in the "
CodeSpan
Text "select"
Text " statement, one of the case..."
Text " this:"
FencedCodeBlock code: "case <-r.Context(..."
Paragraph
Text "This tells the function tha..."
Text " our"
Text "write timeout to let connec..."
Text " as"
Text "defined by our"
Text " IdleTimeout."
Paragraph
Text "To gracefully close these c..."
CodeSpan
Text "BaseContext"
Text " of the HTTP server. This i..."
Text " close"
Text "connections based on our ow..."
CodeSpan
Text "BaseContext"
Text " property"
Text "of the "
CodeSpan
Text "http.Server"
Text " to that of our custom"
Text " context."
FencedCodeBlock code: "func NewServer(ct..."
Paragraph
Text "Now in our "
CodeSpan
Text "main"
Text " function, we create a cust..."
Text " creation:"
FencedCodeBlock code: "engineCtx, cancel..."
Paragraph
Text "Now, when we receive a shut..."
Text " function."
FencedCodeBlock code: "shutDownChan := m..."
Paragraph
Text "This allows our server to s..."
Text " connections."
Heading
Text "Conclusion"
Paragraph
Text "Now we have active server m..."
Text ""
Text "It was a blast implementing..."
Text " SSE."