Adding Active Metrics using SSE
I wanted to display stats about the application in "semi-real time." That is, every few seconds, my server will tell your browser stats about its runtime, and your browser will display them. To do this I used Server Sent Events (SSE).
SSE
SSE is extremely useful for one-way communication from the server to the client. It only relies upon existing HTTP structure, so no extra infrastructure is required. Most, if not all modern programming languages support it in some way, and most modern browsers support it.
What is SSE under the hood? It's really just a long-lived HTTP connection, which is why it has these qualities.
How It's Implemented
SSE is almost implemented the same way as a regular API Handler, which I've covered in other articles. The main differences are:
- Specific HTTP headers are used to mark the connection as a "long-lived" SSE connection
- The connection remains opened, with data being periodically "flushed" to the client
Let's go over the specific code.
First, we'll talk about the router.
1func New(h *handler.Handler, log *slog.Logger, publicDir string) http.Handler {
2 appMux := http.NewServeMux()
3 appMux.HandleFunc("POST /api/likes/{slug}", h.HandleLike)
4 appMux.HandleFunc("GET /api/stats/{slug}", h.HandleGetStats)
5 appMux.HandleFunc("POST /api/views/{slug}", h.HandleView)
6
7 fs := http.FileServer(http.Dir(publicDir))
8 assetsFs := http.FileServer(http.Dir("./assets"))
9 appMux.Handle("GET /assets/", http.StripPrefix("/assets/", assetsFs))
10 appMux.Handle("GET /", fs)
11
12 // Wrap all routes except SSE in middleware
13 var appHandler http.Handler = appMux
14 appHandler = middleware.WithLogger(appHandler, log)
15 appHandler = middleware.WithRecover(appHandler, log)
16
17 // SSE gets its own handler to avoid middleware which breaks it
18 rootMux := http.NewServeMux()
19 rootMux.HandleFunc("GET /api/streams/stats", h.HandleStreamStats)
20 rootMux.Handle("/", appHandler)
21 muxWithRecover := middleware.WithRecover(rootMux, log)
22
23 return muxWithRecover
24}
The important thing to note here is that the SSE handler only gets specific middleware applied to it (in this case panic recovery - without it, it could crash our server.) Other middleware can interfere with the SSE connection, so we exclude it.
Now the handler itself:
1func (h *Handler) HandleStreamStats(w http.ResponseWriter, r *http.Request) {
2 w.Header().Set("Content-Type", "text/event-stream")
3 w.Header().Set("Cache-Control", "no-cache")
4 w.Header().Set("Connection", "keep-alive")
5 w.Header().Set("Access-Control-Allow-Origin", "*")
6 w.Header().Set("X-Accel-Buffering", "no")
7
8 flusher, ok := w.(http.Flusher)
9 if !ok {
10 http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
11 return
12 }
13
14 flusher.Flush()
15
16 ticker := time.NewTicker(5 * time.Second)
17 defer ticker.Stop()
18
19 sendStats := func() error {
20 var m runtime.MemStats
21 runtime.ReadMemStats(&m)
22
23 var dbSizeStr string
24 fileInfo, err := os.Stat("./blog.db")
25 if err == nil {
26 dbSizeStr = fmt.Sprintf("%.2f", float64(fileInfo.Size())/1024/1024)
27 } else {
28 dbSizeStr = "0.00"
29 }
30
31 stats := SysStats{
32 Uptime: time.Since(StartTime).Round(time.Second).String(),
33 MemoryMB: m.Alloc / 1024 / 1024,
34 Goroutines: runtime.NumGoroutine(),
35 DbSizeMB: dbSizeStr,
36 }
37
38 data, _ := json.Marshal(stats)
39
40 _, err = fmt.Fprintf(w, "data: %s\n\n", data)
41 if err != nil {
42 return err
43 }
44
45 flusher.Flush()
46 return nil
47 }
48
49 if err := sendStats(); err != nil {
50 return
51 }
52
53 for {
54 select {
55 case <-r.Context().Done():
56 return
57 case <-ticker.C:
58 if err := sendStats(); err != nil {
59 return
60 }
61 }
62 }
63}
As I mentioned previously, we need to set specific headers to denote this as an SSE connection.
1w.Header().Set("Content-Type", "text/event-stream")
2w.Header().Set("Cache-Control", "no-cache")
3w.Header().Set("Connection", "keep-alive")
4w.Header().Set("Access-Control-Allow-Origin", "*")
5w.Header().Set("X-Accel-Buffering", "no")
Then we check that the client we're connected to actually supports SSE - not all do. Afterwards, we immediately flush the connection to establish it.
1flusher, ok := w.(http.Flusher)
2if !ok {
3 http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
4 return
5}
6flusher.Flush()
sendStats collects and formats the necessary data. The most important line to remember in that function is the following:
1_, err = fmt.Fprintf(w, "data: %s\n\n", data)
2flusher.Flush()
SSE expects the data to be formatted this way when flushing it to the client. Flushing the data then keeps the connection open but still pushes the data to the user.
Finally, we create the loop that sends data to the client on an interval:
1for {
2 select {
3 case <-r.Context().Done():
4 return
5 case <-ticker.C:
6 if err := sendStats(); err != nil {
7 return
8 }
9 }
10}
select will block until a value is received from one of the channels in the case statement.
Our ticker will send data through the channel on the exact interval we specify.
Server Changes
We need to change a little bit about how we instantiate our server when doing SSE.
First, we need to set the write timeout to either 0, or a large value to prevent the SSE connection from being terminated.
1return &http.Server{
2 Addr: ":8080",
3 Handler: mux,
4 ReadTimeout: 10 * time.Second,
5 IdleTimeout: 120 * time.Second,
6 WriteTimeout: 0,
7 ReadHeaderTimeout: 5 * time.Second,
8 BaseContext: func(l net.Listener) context.Context {
9 return ctx
10 },
11}
Because we're creating a potentially infinite loop in our handler, we need a way of cancelling it.
You may have noticed in the select statement, one of the cases is this:
1case <-r.Context().Done():
2 return
This tells the function that if the enclosing context is cancelled, to also cancel the loop. However, we just modified our write timeout to let connections stay forever, so the loop may never terminate unless the connection remains idle for 2 minutes, as defined by our IdleTimeout.
To gracefully close these connections on shutdown, we need to modify the BaseContext of the HTTP server. This is so we can tell the server to close
connections based on our own custom context. To do this, we pass our custom context into the server creation function and set the BaseContext property
of the http.Server to that of our custom context.
1func NewServer(ctx context.Context, publicDir, domain string) *http.Server {
2 logger := logging.New(os.Stdout)
3 database := db.New()
4 rep := repo.New(database)
5 hnd := handler.New(rep, logger, publicDir)
6 mux := router.New(hnd, logger, publicDir)
7
8 if _, err := database.Exec("PRAGMA journal_mode=WAL;"); err != nil {
9 logger.Error("failed to enable WAL mode", "error", err)
10 }
11
12 if domain != "" {
13 logger.Info("configuring production server (HTTPS)", "domain", domain)
14
15 certManager := autocert.Manager{
16 Prompt: autocert.AcceptTOS,
17 HostPolicy: autocert.HostWhitelist(domain, "www."+domain),
18 Cache: autocert.DirCache("certs"),
19 }
20
21 go func() {
22 logger.Info("starting http redirect server", "addr", ":80")
23 if err := http.ListenAndServe(":80", certManager.HTTPHandler(nil)); err != nil {
24 logger.Error("redirect server failed", "error", err)
25 }
26 }()
27
28 return &http.Server{
29 Addr: ":443",
30 Handler: mux,
31 TLSConfig: &tls.Config{
32 GetCertificate: certManager.GetCertificate,
33 MinVersion: tls.VersionTLS12,
34 },
35 ReadTimeout: 10 * time.Second,
36 IdleTimeout: 120 * time.Second,
37 WriteTimeout: 0,
38 ReadHeaderTimeout: 5 * time.Second,
39 BaseContext: func(l net.Listener) context.Context {
40 return ctx
41 },
42 }
43 }
44
45 logger.Info("configuring development server (HTTP)", "addr", ":8080")
46 return &http.Server{
47 Addr: ":8080",
48 Handler: mux,
49 ReadTimeout: 10 * time.Second,
50 IdleTimeout: 120 * time.Second,
51 WriteTimeout: 0,
52 ReadHeaderTimeout: 5 * time.Second,
53 BaseContext: func(l net.Listener) context.Context {
54 return ctx
55 },
56 }
57}
Now in our main function, we create a custom context which we pass down to the server creation:
1engineCtx, cancelEngine := context.WithCancel(context.Background())
2defer cancelEngine()
3
4domain := os.Getenv("DOMAIN")
5srv := NewServer(engineCtx, "./public", domain)
Now, when we receive a shutdown signal on our server, we cancel the engine context, which will propogate all the way down to the handler function.
1shutDownChan := make(chan os.Signal, 1)
2signal.Notify(shutDownChan, syscall.SIGINT, syscall.SIGTERM)
3<-shutDownChan
4
5shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6defer cancel()
7cancelBackup()
8cancelEngine()
9
10if err := srv.Shutdown(shutdownCtx); err != nil {
11 log.Fatalf("unable to shutdown server gracefully: %v", err)
12}
This allows our server to shutdown gracefully even if there are open connections.
Conclusion
Now we have active server metrics for users to see in near-real time. It was a blast implementing this and in the future I may implement more features with SSE.