Go + Microservices = Go kit

Peter Bourgon

Gopher 🏌

Outline

Hang on

Microservice?

None of these are good definitions.

Size definitions

"A single programmer can design, implement, deploy, and maintain."

"Software that fits in your head."

Data definitions

"A microservice implements a single bounded context."

"A single logical database per service."

Operational definitions

"Built & deployed independently. Stateless; state as backing services."

"Addressable through a service discovery system."

Architecture

Did you mean...

This is another architecture β€” which is incredibly interesting!

But not what we'll be talking about today.

Important

Cost v. benefit

Problems solved

Problems caused

From one to many

Concerns for a single service, Sean Treadway, SoundCloud

Think twice

Go

Go, IMO

Go, IMO

Go at SoundCloud

2012

2014

Go was missing something

In general, a way to create & maintain a shared context for building Go microservices, interoperable with other teams' work.

Go lost

Enter Go kit

The pitch

NOT a framework

Compare to

Philosophy

Go kit encourages you to write your programs with:

Towards established practices of software engineering:

How does it taste?

Profile service

Let's say we have some service, illustrated by an interface.

type ProfileService interface {
    PostProfile(p Profile) error
    GetProfile(id string) (Profile, error)
}

Here's a profile.

type Profile struct {
    ID   string
    Name string
}

NaΓ―ve implementation

Without Go kit stuff, for now.

type profileService struct {
    profiles map[string]Profile // in-memory data store
}

func (ps *profileService) PostProfile(p Profile) error {
    if _, ok := ps.profiles[p.ID]; ok {
        return errors.New("profile already exists")
    }
    ps.profiles[p.ID] = p
    return nil
}

func (ps *profileService) GetProfile(id string) (Profile, error) {
    p, ok := ps.profiles[id]
    if !ok {
        return Profile{}, errors.New("profile not found")
    }
    return p, nil
}

Transport

Defining a ServeHTTP method on the profileService turns it into an HTTP handler.

func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "POST":
        var p Profile
        if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }
        if err := ps.PostProfile(p); err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        fmt.Fprintf(w, "OK")

Transport

    case "GET":
        id := r.FormValue("id")
        if id == "" {
            http.Error(w, "no ID specified", http.StatusBadRequest)
            return
        }
        p, err := ps.GetProfile(id)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        json.NewEncoder(w).Encode(p)
    }
}

main

Now that the profileService is an HTTP handler, we can pass it to ListenAndServe.

package main

import (
	"encoding/json"
	"errors"
	"fmt"
	"net/http"
)

type ProfileService interface {
	PostProfile(p Profile) error
	GetProfile(id string) (Profile, error)
}

type Profile struct {
	ID   string
	Name string
}

// START IMPL OMIT
type profileService struct {
	profiles map[string]Profile // in-memory data store // HL
}

func (ps *profileService) PostProfile(p Profile) error {
	if _, ok := ps.profiles[p.ID]; ok {
		return errors.New("profile already exists")
	}
	ps.profiles[p.ID] = p
	return nil
}

func (ps *profileService) GetProfile(id string) (Profile, error) {
	p, ok := ps.profiles[id]
	if !ok {
		return Profile{}, errors.New("profile not found")
	}
	return p, nil
}

// END IMPL OMIT

// START POST OMIT
func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case "POST":
		var p Profile
		if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
			http.Error(w, err.Error(), http.StatusBadRequest)
			return
		}
		if err := ps.PostProfile(p); err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		fmt.Fprintf(w, "OK")
		// END POST OMIT

		// START GET OMIT
	case "GET":
		id := r.FormValue("id")
		if id == "" {
			http.Error(w, "no ID specified", http.StatusBadRequest)
			return
		}
		p, err := ps.GetProfile(id)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		json.NewEncoder(w).Encode(p)
	}
}

// END GET OMIT

func main() {
    ps := &profileService{
        profiles: map[string]Profile{
            "john": Profile{ID: "john", Name: "John Edgar"},
        },
    }
    http.ListenAndServe(":8080", ps)
}

Hooray

Logging

Here's how you might try adding logging...

func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "POST":
        var p Profile
        if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
            code := http.StatusBadRequest
            http.Error(w, err.Error(), code)
            log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code)
            return
        }
        if err := ps.PostProfile(p); err != nil {
            code := http.StatusInternalServerError
            http.Error(w, err.Error(), code)
            log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code)
            return
        }
        fmt.Fprintf(w, "OK")
        log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200)

Logging

    case "GET":
        id := r.FormValue("id")
        if id == "" {
            code := http.StatusBadRequest
            http.Error(w, "no ID specified", code)
            log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code)
            return
        }
        p, err := ps.GetProfile(id)
        if err != nil {
            code := http.StatusInternalServerError
            http.Error(w, err.Error(), code)
            log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code)
            return
        }
        json.NewEncoder(w).Encode(p)
        log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200)
    }
}

Logging

package main

import (
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"net/http"
)

type ProfileService interface {
	PostProfile(p Profile) error
	GetProfile(id string) (Profile, error)
}

type Profile struct {
	ID   string
	Name string
}

type profileService struct {
	profiles map[string]Profile
}

func (ps *profileService) PostProfile(p Profile) error {
	if _, ok := ps.profiles[p.ID]; ok {
		return errors.New("profile already exists")
	}
	ps.profiles[p.ID] = p
	return nil
}

func (ps *profileService) GetProfile(id string) (Profile, error) {
	p, ok := ps.profiles[id]
	if !ok {
		return Profile{}, errors.New("profile not found")
	}
	return p, nil
}

// START POST OMIT
func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case "POST":
		var p Profile
		if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
			code := http.StatusBadRequest
			http.Error(w, err.Error(), code)
			log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) // HL
			return
		}
		if err := ps.PostProfile(p); err != nil {
			code := http.StatusInternalServerError
			http.Error(w, err.Error(), code)
			log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code) // HL
			return
		}
		fmt.Fprintf(w, "OK")
		log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) // HL
		// END POST OMIT

		// START GET OMIT
	case "GET":
		id := r.FormValue("id")
		if id == "" {
			code := http.StatusBadRequest
			http.Error(w, "no ID specified", code)
			log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) // HL
			return
		}
		p, err := ps.GetProfile(id)
		if err != nil {
			code := http.StatusInternalServerError
			http.Error(w, err.Error(), code)
			log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) // HL
			return
		}
		json.NewEncoder(w).Encode(p)
		log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) // HL
	}
}

// END GET OMIT

func main() {
    ps := &profileService{
        profiles: map[string]Profile{
            "john": Profile{ID: "john", Name: "John Edgar"},
        },
    }
    log.Printf("listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", ps))
}

Instrumentation

And here's how you might try adding some instrumentation...

var dur = prometheus.NewSummaryVec(prometheus.SummaryOpts{
    Namespace: "users_team",
    Subsystem: "profile_service",
    Name:      "http_request_duration_seconds",
    Help:      "Time spent serving HTTP requests.",
}, []string{"method", "status_code"})

Instrumentation

func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    begin := time.Now()
    switch r.Method {
    case "POST":
        var p Profile
        if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
            code := http.StatusBadRequest
            http.Error(w, err.Error(), code)
            log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code)
            dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds())
            return
        }
        if err := ps.PostProfile(p); err != nil {
            code := http.StatusInternalServerError
            http.Error(w, err.Error(), code)
            log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code)
            dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds())
            return
        }
        fmt.Fprintf(w, "OK")
        log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200)
        dur.WithLabelValues(r.Method, fmt.Sprint(200)).Observe(time.Since(begin).Seconds())

Instrumentation

    case "GET":
        id := r.FormValue("id")
        if id == "" {
            code := http.StatusBadRequest
            http.Error(w, "no ID specified", code)
            log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code)
            dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds())
            return
        }
        p, err := ps.GetProfile(id)
        if err != nil {
            code := http.StatusInternalServerError
            http.Error(w, err.Error(), code)
            log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code)
            dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds())
            return
        }
        json.NewEncoder(w).Encode(p)
        log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200)
        dur.WithLabelValues(r.Method, fmt.Sprint(200)).Observe(time.Since(begin).Seconds())
    }
}

Mixed concerns

Our business logic is still clean, but now our transport code is full of mixed concerns.

begin := time.Now()
switch r.Method {
case "POST":
    var p Profile
    if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
        code := http.StatusBadRequest
        http.Error(w, err.Error(), code)
        log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code)
        dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds())
        return
    }
    if err := ps.PostProfile(p); err != nil {
        code := http.StatusInternalServerError
        http.Error(w, err.Error(), code)
        log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code)
        dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds())
        return
    }
    fmt.Fprintf(w, "OK")
    log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200)
    dur.WithLabelValues(r.Method, fmt.Sprint(200)).Observe(time.Since(begin).Seconds())

Separating concerns

Go kit encourages you to separate all of these concerns.

Middleware

Middleware

It would be great to have some kind of single-purpose, composable middleware.

func foo(...) {
    // do the business logic
}

func log(...) {
    // proceed as normal, get the request and response
    log.Printf("%s: %s: %d", r.RemoteAddr, r.Method, code)
}

func instrument(...) {
    // proceed as normal, get the request and response
    dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds())
}

func rateLimit(...) {
    if aboveThreshold {
        error
    }
    // proceed as normal
}

Endpoint

Let's generalize each operation as an RPC: request, response. We can model RPC methods as endpoints.

type Endpoint func(request) response

RPCs can fail, so we want some way to signal failure. And we'd like to have some way of passing request-scoped information between the layers of this onion. So the actual definition is a little bigger.

type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error)

interface{} ? interface{} :(

Profile service endpoints

One way to do it is to change the profileService methods into endpoints.

func (ps *profileService) PostProfile(ctx *context.Context, request interface{})
        (response interface{}, err error) {

    p := request.(Profile)
    if _, ok := ps.profiles[p.ID]; ok {
        return nil, errors.New("profile already exists")
    }
    ps.profiles[p.ID] = p
    return nil, nil
}
func (ps *profileService) GetProfile(ctx *context.Context, request interface{})
        (response interface{}, err error) {

    id := request.(string)
    p, ok := ps.profiles[id]
    if !ok {
        return nil, errors.New("profile not found")
    }
    return p, nil
}

Profile service endpoints

Another way is to write endpoint constructors which take the (unaltered) service.

func makePostProfileEndpoint(ps *profileService) Endpoint {
    return func(_ context.Context, request interface{}) (response interface{}, err error) {
        p := request.(Profile)
        err := ps.PostProfile(p)
        return nil, err
    }
}
func makeGetProfileEndpoint(ps *profileService) Endpoint {
    return func(_ context.Context, request interface{}) (response interface{}, err error) {
        id := request.(string)
        p, err := ps.GetProfile(id)
        return p, err
    }
}

This way is better. It allows us to leave our ProfileService interface pure, avoiding mixing in the endpoint concerns.

Endpoint middleware

In either case, now that we have a general form for our operations, we can layer in value-added stuff without knowing or caring about the underlying logic.

type Middleware func(Endpoint) Endpoint

(Also known as the Decorator pattern.)

Logging middleware

Perhaps the simplest middleware is one that just logs the call.

func loggingMiddleware(next Endpoint) Endpoint {
    return func(ctx context.Context, request interface{}) (response interface{}, err error) {

        begin := time.Now()
        defer func() {
            log.Printf("request took %s", time.Since(begin))
        }()
        return next(ctx, request)

    }
}

But which method is it? We need to provide some more information...

Logging middleware

Add another layer of context.

func loggingMiddleware(method string) Middleware {
    return func(next Endpoint) Endpoint {
        return func(ctx context.Context, request interface{}) (response interface{}, err error) {

            begin := time.Now()
            defer func() {
                log.Printf("%s took %s", method, time.Since(begin))
            }()
            return next(ctx, request)

        }
    }
}

And here's how you wire it in.

var e Endpoint
e = makePostProfile(ps)
e = loggingMiddleware("PostProfile")(e)
e = instrumentingMiddleware(...)(e)

Middlewares are great

Each middleware is exclusively concerned with one task.

You can avoid mixing different concerns in the same place β€” keeping each piece of functionality pure and distinct.

This makes your code much easier to maintain, refactor, and eventually delete.

Other endpoint middlewares

Circuit breaker

func breaker(cb *CircuitBreaker) Middleware {
    return func(next Endpoint) Endpoint {
        return func(ctx context.Context, request interface{}) (response interface{}, err error) {

            if !cb.Allow() {
                return nil, breaker.ErrCircuitOpen
            }

            defer func(begin time.Time) {
                if err == nil {
                    cb.Success(time.Since(begin))
                } else {
                    cb.Failure(time.Since(begin))
                }
            }(time.Now())

            return next(ctx, request)

        }
    }
}

Rate limiter

func limiter(tb *TokenBucket) Middleware {
    return func(next Endpoint) Endpoint {
        return func(ctx context.Context, request interface{}) (response interface{}, err error) {

            if tb.TakeAvailable(1) == 0 {
                return nil, ErrLimited
            }

            return next(ctx, request)

        }
    }
}

Service middlewares

Service middlewares

So far our middleware has operated on an endpoint.

type Middleware func(Endpoint) Endpoint

But remember we also have a service interface.

type ProfileService interface {
    PostProfile(p Profile) error
    GetProfile(id string) (Profile, error)
}

Which means we could define a service middleware, custom to our application.

type Middleware func(ProfileService) ProfileService

Service concerns

Service middlewares are a good way to implement single-responsibility functionality that interacts with your business domain.

Let's say you wanted to know when any Michaels registered.

type alertingMiddleware struct {
    ProfileService
    alert Alerter
}

func (mw alertingMiddleware) PostProfile(p Profile) error {
    if strings.Contains(p.Name, "Michael") {
        mw.alert.Alert("We've got a new Michael!!")
    }
    return mw.ProfileService.PostProfile(p)
}

Service construction

And wire it in to the service declaration, similar to constructing endpoints.

    var ps ProfileService
    ps = newInmemProfileService()
    ps = alertingMiddleware{ps, alerter}

(In reality, you'd probably use a service middleware to do logging, too.)

Binding to a transport

HTTP transport

Go kit comes with a handy HTTP transport. It adapts an endpoint to an HTTP handler.

    postProfile := httptransport.NewServer(
        context.Background(),
        makePostProfileEndpoint(ps),
        decodePostRequest,
        encodePostResponse,
    )
    getProfile := httptransport.NewServer(
        context.Background(),
        makeGetProfileEndpoint(ps),
        decodeGetRequest,
        encodeGetResponse,
    )

Use it with the slightly more fully-featured Gorilla muxer.

package main

import (
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"net/http"
	"strings"
	"time"

	"github.com/gorilla/mux"
	"github.com/prometheus/client_golang/prometheus"
	"golang.org/x/net/context"

	"github.com/go-kit/kit/endpoint"
	httptransport "github.com/go-kit/kit/transport/http"
)

func main() {
	var alerter Alerter

	// START SERVICE OMIT
	var ps ProfileService
	ps = newInmemProfileService()
	ps = alertingMiddleware{ps, alerter}
	// END SERVICE OMIT

	// START SERVER OMIT
	postProfile := httptransport.NewServer(
		context.Background(),
		makePostProfileEndpoint(ps), // HL
		decodePostRequest,
		encodePostResponse,
	)
	getProfile := httptransport.NewServer(
		context.Background(),
		makeGetProfileEndpoint(ps), // HL
		decodeGetRequest,
		encodeGetResponse,
	)
	// END SERVER OMIT

    r := mux.NewRouter()
    r.Methods("POST").Handler(postProfile)
    r.Methods("GET").Handler(getProfile)

	log.Printf("listening on :8080")
	log.Fatal(http.ListenAndServe(":8080", r))
}

func decodePostRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var p Profile
	err := json.NewDecoder(r.Body).Decode(&p)
	return p, err
}

func encodePostResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
	fmt.Fprintf(w, "OK")
	return nil
}

func decodeGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
	return r.FormValue("id"), nil
}

func encodeGetResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
	return json.NewEncoder(w).Encode(response)
}

type ProfileService interface {
	PostProfile(p Profile) error
	GetProfile(id string) (Profile, error)
}

type Profile struct {
	ID   string
	Name string
}

type profileService struct {
	profiles map[string]Profile
}

func newInmemProfileService() *profileService {
	return &profileService{profiles: map[string]Profile{}}
}

func (ps *profileService) PostProfile(p Profile) error {
	if _, ok := ps.profiles[p.ID]; ok {
		return errors.New("profile already exists")
	}
	ps.profiles[p.ID] = p
	return nil
}

func (ps *profileService) GetProfile(id string) (Profile, error) {
	p, ok := ps.profiles[id]
	if !ok {
		return Profile{}, errors.New("profile not found")
	}
	return p, nil
}

func makePostProfileEndpoint(ps ProfileService) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		p := request.(Profile)
		return nil, ps.PostProfile(p)
	}
}

func makeGetProfileEndpoint(ps ProfileService) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		id := request.(string)
		return ps.GetProfile(id)
	}
}

func loggingMiddleware(method string) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) { // HL
			defer func(begin time.Time) {
				log.Printf("%s: %s (%v)", method, time.Since(begin), err)
			}(time.Now())
			return next(ctx, request)
		}
	}
}

var dur = prometheus.NewSummaryVec(prometheus.SummaryOpts{
	Namespace: "users_team",
	Subsystem: "profile_service",
	Name:      "http_request_duration_seconds", // HL
	Help:      "Time spent serving HTTP requests.",
}, []string{"method", "status_code"}) // HL

type Alerter interface {
	Alert(string)
}

// START SERVICEMIDDLEWARE OMIT
type alertingMiddleware struct {
	ProfileService
	alert Alerter
}

func (mw alertingMiddleware) PostProfile(p Profile) error {
	if strings.Contains(p.Name, "Michael") {
		mw.alert.Alert("We've got a new Michael!!")
	}
	return mw.ProfileService.PostProfile(p)
}

// END SERVICEMIDDLEWARE OMIT

All the middlewares

    var postEndpoint endpoint.Endpoint
    postEndpoint = makePostProfileEndpoint(ps)
    postEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(postEndpoint)
    postEndpoint = logging("PostProfile")(postEndpoint)
    postEndpoint = instrument(dur)(postEndpoint)

    postProfile := httptransport.NewServer(
        context.Background(),
        postEndpoint,
        decodePostRequest,
        encodePostResponse,
    )

(There's a similar stack for the GetProfile endpoint.)

package main

import (
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"net/http"
	"strings"
	"time"

	"github.com/gorilla/mux"
	"github.com/prometheus/client_golang/prometheus"
	"golang.org/x/net/context"

	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/ratelimit"
	httptransport "github.com/go-kit/kit/transport/http"
	rl "github.com/juju/ratelimit"
)

func main() {
	dur := prometheus.NewSummaryVec(prometheus.SummaryOpts{
		Namespace: "users_team",
		Subsystem: "profile_service",
		Name:      "http_request_duration_seconds", // HL
		Help:      "Time spent serving HTTP requests.",
	}, []string{"method", "status_code"}) // HL

	var alerter Alerter

	var ps ProfileService
	ps = newInmemProfileService()
	ps = alertingMiddleware{ps, alerter}

	// START MIDDLEWARE OMIT
	var postEndpoint endpoint.Endpoint
	postEndpoint = makePostProfileEndpoint(ps)
	postEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(postEndpoint)
	postEndpoint = logging("PostProfile")(postEndpoint)
	postEndpoint = instrument(dur)(postEndpoint)

	postProfile := httptransport.NewServer(
		context.Background(),
		postEndpoint,
		decodePostRequest,
		encodePostResponse,
	)
	// END MIDDLEWARE OMIT

	var getEndpoint endpoint.Endpoint
	getEndpoint = makeGetProfileEndpoint(ps)
	getEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(getEndpoint)
	getEndpoint = logging("GetProfile")(getEndpoint)
	getEndpoint = instrument(dur)(getEndpoint)
	getProfile := httptransport.NewServer(
		context.Background(),
		getEndpoint,
		decodeGetRequest,
		encodeGetResponse,
	)

    r := mux.NewRouter()
    r.Methods("POST").Handler(postProfile)
    r.Methods("GET").Handler(getProfile)

    log.Printf("listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", r))
}

func decodePostRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var p Profile
	err := json.NewDecoder(r.Body).Decode(&p)
	return p, err
}

func encodePostResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
	fmt.Fprintf(w, "OK")
	return nil
}

func decodeGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
	return r.FormValue("id"), nil
}

func encodeGetResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
	return json.NewEncoder(w).Encode(response)
}

func logging(method string) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {

			defer func(begin time.Time) {
				if err == nil {
					log.Printf("%s: %s (OK)", method, time.Since(begin))
				} else {
					log.Printf("%s: %s (%v)", method, time.Since(begin), err)
				}
			}(time.Now())

			return next(ctx, request)

		}
	}
}

func instrument(dur *prometheus.SummaryVec) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {

			defer func(begin time.Time) {
				method := "UNKNOWN"
				if v := ctx.Value("method"); v != nil {
					method = v.(string)
				}
				code := "UNKNOWN"
				if v := ctx.Value("status_code"); v != nil {
					code = fmt.Sprint(v.(string))
				}
				dur.WithLabelValues(method, code).Observe(time.Since(begin).Seconds())
			}(time.Now())

			return next(ctx, request)

		}
	}
}

type ProfileService interface {
	PostProfile(p Profile) error
	GetProfile(id string) (Profile, error)
}

type Profile struct {
	ID   string
	Name string
}

type profileService struct {
	profiles map[string]Profile
}

func newInmemProfileService() *profileService {
	return &profileService{
		profiles: map[string]Profile{
			"john": Profile{ID: "john", Name: "John Edgar"},
		},
	}
}

func (ps *profileService) PostProfile(p Profile) error {
	if _, ok := ps.profiles[p.ID]; ok {
		return errors.New("profile already exists")
	}
	ps.profiles[p.ID] = p
	return nil
}

func (ps *profileService) GetProfile(id string) (Profile, error) {
	p, ok := ps.profiles[id]
	if !ok {
		return Profile{}, errors.New("profile not found")
	}
	return p, nil
}

func makePostProfileEndpoint(ps ProfileService) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		p := request.(Profile)
		return nil, ps.PostProfile(p)
	}
}

func makeGetProfileEndpoint(ps ProfileService) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		id := request.(string)
		return ps.GetProfile(id)
	}
}

func loggingMiddleware(method string) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) { // HL
			defer func(begin time.Time) {
				log.Printf("%s: %s (%v)", method, time.Since(begin), err)
			}(time.Now())
			return next(ctx, request)
		}
	}
}

type Alerter interface {
	Alert(string)
}

type alertingMiddleware struct {
	ProfileService
	alert Alerter
}

func (mw alertingMiddleware) PostProfile(p Profile) error {
	if strings.Contains(p.Name, "Michael") {
		mw.alert.Alert("We've got a new Michael!!")
	}
	return mw.ProfileService.PostProfile(p)
}

Supported transports

Can easily have the same service served on multiple transports simultaneously.

Shared middleware stacks, just different serialization.

Why bother?

Your organization has probably already decided on a transport layer.

And you've probably already built tooling, reporting, and culture around it.

Go kit wants to make it easy to adopt Go. We will always play nicely with your infrastructure, to the extent it's possible. You know best.

(Same goes for service discovery, logging infrastructure, orchestration, configuration...)

Distributed tracing

In general

Whenever control passes in to or out of a microservice, that service notes timing, annotation, and contextual data into a partial span. Spans are batched and emitted to a collector. The collector stitches together multiple related spans into a trace.

Zipkin

One popular implementation is Zipkin. It produces a sequence diagram of all the requests between your microservices.

(I wish they used flame charts instead...)

Go kit helpers

Go kit has a helper to move IDs from e.g. HTTP headers to the context object.

func ToContext(ctx context.Context, r *http.Request) context.Context {
    // Copy trace and span IDs from the http.Request to the context.Context
}

And then middlewares to create the spans and emit them to the collectors.

func AnnotateServer(...) endpoint.Middleware {
    return func(next endpoint.Endpoint) endpoint.Endpoint {
        return func(ctx context.Context, request interface{}) (interface{}, error) {

            span := getFrom(ctx)
            defer span.Collect()
            span.AnnotateReceive()
            defer span.AnnotateSend()
            return next(ctx, request)

        }
    }
}

Wire it in

Now, if you want to support Zipkin, you just add a middleware to your stack.

var myEndpoint Endpoint
myEndpoint = makeEndpoint(myService)
myEndpoint = instrument(...)(myEndpoint)
myEndpoint = zipkin.AnnotateServer(...)(myEndpoint) // ☜
myEndpoint = logging(...)(myEndpoint)

Imagine trying to do this without some structure?

A look back

Microservices are a pain in the ass

Go is a great language for microservices

Go kit fills in the gaps

Make microservices tractable, and make Go viable, for your organization.

Thank you

Peter Bourgon

Gopher 🏌