Go + Microservices = Go kit
Peter Bourgon
Gopher π
Peter Bourgon
Gopher π
None of these are good definitions.
"A single programmer can design, implement, deploy, and maintain."
"Software that fits in your head."
"A microservice implements a single bounded context."
"A single logical database per service."
"Built & deployed independently. Stateless; state as backing services."
"Addressable through a service discovery system."
This is another architecture β which is incredibly interesting!
But not what we'll be talking about today.
Microservices: Real Architectural Patterns by Camille Fournier
Concerns for a single service, Sean Treadway, SoundCloud
2012
2014
In general, a way to create & maintain a shared context for building Go microservices, interoperable with other teams' work.
Go kit encourages you to write your programs with:
Towards established practices of software engineering:
Let's say we have some service, illustrated by an interface.
type ProfileService interface { PostProfile(p Profile) error GetProfile(id string) (Profile, error) }
Here's a profile.
type Profile struct { ID string Name string }
Without Go kit stuff, for now.
type profileService struct { profiles map[string]Profile // in-memory data store } func (ps *profileService) PostProfile(p Profile) error { if _, ok := ps.profiles[p.ID]; ok { return errors.New("profile already exists") } ps.profiles[p.ID] = p return nil } func (ps *profileService) GetProfile(id string) (Profile, error) { p, ok := ps.profiles[id] if !ok { return Profile{}, errors.New("profile not found") } return p, nil }
Defining a ServeHTTP method on the profileService turns it into an HTTP handler.
func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "POST": var p Profile if err := json.NewDecoder(r.Body).Decode(&p); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if err := ps.PostProfile(p); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } fmt.Fprintf(w, "OK")
case "GET": id := r.FormValue("id") if id == "" { http.Error(w, "no ID specified", http.StatusBadRequest) return } p, err := ps.GetProfile(id) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } json.NewEncoder(w).Encode(p) } }
Now that the profileService is an HTTP handler, we can pass it to ListenAndServe.
package main
import (
"encoding/json"
"errors"
"fmt"
"net/http"
)
type ProfileService interface {
PostProfile(p Profile) error
GetProfile(id string) (Profile, error)
}
type Profile struct {
ID string
Name string
}
// START IMPL OMIT
type profileService struct {
profiles map[string]Profile // in-memory data store // HL
}
func (ps *profileService) PostProfile(p Profile) error {
if _, ok := ps.profiles[p.ID]; ok {
return errors.New("profile already exists")
}
ps.profiles[p.ID] = p
return nil
}
func (ps *profileService) GetProfile(id string) (Profile, error) {
p, ok := ps.profiles[id]
if !ok {
return Profile{}, errors.New("profile not found")
}
return p, nil
}
// END IMPL OMIT
// START POST OMIT
func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "POST":
var p Profile
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := ps.PostProfile(p); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "OK")
// END POST OMIT
// START GET OMIT
case "GET":
id := r.FormValue("id")
if id == "" {
http.Error(w, "no ID specified", http.StatusBadRequest)
return
}
p, err := ps.GetProfile(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(p)
}
}
// END GET OMIT
func main() { ps := &profileService{ profiles: map[string]Profile{ "john": Profile{ID: "john", Name: "John Edgar"}, }, } http.ListenAndServe(":8080", ps) }
Here's how you might try adding logging...
func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "POST": var p Profile if err := json.NewDecoder(r.Body).Decode(&p); err != nil { code := http.StatusBadRequest http.Error(w, err.Error(), code) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) return } if err := ps.PostProfile(p); err != nil { code := http.StatusInternalServerError http.Error(w, err.Error(), code) log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code) return } fmt.Fprintf(w, "OK") log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200)
case "GET": id := r.FormValue("id") if id == "" { code := http.StatusBadRequest http.Error(w, "no ID specified", code) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) return } p, err := ps.GetProfile(id) if err != nil { code := http.StatusInternalServerError http.Error(w, err.Error(), code) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) return } json.NewEncoder(w).Encode(p) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) } }
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
)
type ProfileService interface {
PostProfile(p Profile) error
GetProfile(id string) (Profile, error)
}
type Profile struct {
ID string
Name string
}
type profileService struct {
profiles map[string]Profile
}
func (ps *profileService) PostProfile(p Profile) error {
if _, ok := ps.profiles[p.ID]; ok {
return errors.New("profile already exists")
}
ps.profiles[p.ID] = p
return nil
}
func (ps *profileService) GetProfile(id string) (Profile, error) {
p, ok := ps.profiles[id]
if !ok {
return Profile{}, errors.New("profile not found")
}
return p, nil
}
// START POST OMIT
func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "POST":
var p Profile
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
code := http.StatusBadRequest
http.Error(w, err.Error(), code)
log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) // HL
return
}
if err := ps.PostProfile(p); err != nil {
code := http.StatusInternalServerError
http.Error(w, err.Error(), code)
log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code) // HL
return
}
fmt.Fprintf(w, "OK")
log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) // HL
// END POST OMIT
// START GET OMIT
case "GET":
id := r.FormValue("id")
if id == "" {
code := http.StatusBadRequest
http.Error(w, "no ID specified", code)
log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) // HL
return
}
p, err := ps.GetProfile(id)
if err != nil {
code := http.StatusInternalServerError
http.Error(w, err.Error(), code)
log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) // HL
return
}
json.NewEncoder(w).Encode(p)
log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) // HL
}
}
// END GET OMIT
func main() { ps := &profileService{ profiles: map[string]Profile{ "john": Profile{ID: "john", Name: "John Edgar"}, }, } log.Printf("listening on :8080") log.Fatal(http.ListenAndServe(":8080", ps)) }
And here's how you might try adding some instrumentation...
var dur = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "users_team", Subsystem: "profile_service", Name: "http_request_duration_seconds", Help: "Time spent serving HTTP requests.", }, []string{"method", "status_code"})
func (ps *profileService) ServeHTTP(w http.ResponseWriter, r *http.Request) { begin := time.Now() switch r.Method { case "POST": var p Profile if err := json.NewDecoder(r.Body).Decode(&p); err != nil { code := http.StatusBadRequest http.Error(w, err.Error(), code) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds()) return } if err := ps.PostProfile(p); err != nil { code := http.StatusInternalServerError http.Error(w, err.Error(), code) log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code) dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds()) return } fmt.Fprintf(w, "OK") log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) dur.WithLabelValues(r.Method, fmt.Sprint(200)).Observe(time.Since(begin).Seconds())
case "GET": id := r.FormValue("id") if id == "" { code := http.StatusBadRequest http.Error(w, "no ID specified", code) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds()) return } p, err := ps.GetProfile(id) if err != nil { code := http.StatusInternalServerError http.Error(w, err.Error(), code) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds()) return } json.NewEncoder(w).Encode(p) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) dur.WithLabelValues(r.Method, fmt.Sprint(200)).Observe(time.Since(begin).Seconds()) } }
Our business logic is still clean, but now our transport code is full of mixed concerns.
begin := time.Now() switch r.Method { case "POST": var p Profile if err := json.NewDecoder(r.Body).Decode(&p); err != nil { code := http.StatusBadRequest http.Error(w, err.Error(), code) log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, code) dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds()) return } if err := ps.PostProfile(p); err != nil { code := http.StatusInternalServerError http.Error(w, err.Error(), code) log.Printf("%s: %s%s : %d", r.RemoteAddr, r.Method, r.URL, code) dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds()) return } fmt.Fprintf(w, "OK") log.Printf("%s: %s %s: %d", r.RemoteAddr, r.Method, r.URL, 200) dur.WithLabelValues(r.Method, fmt.Sprint(200)).Observe(time.Since(begin).Seconds())
Go kit encourages you to separate all of these concerns.
It would be great to have some kind of single-purpose, composable middleware.
func foo(...) { // do the business logic } func log(...) { // proceed as normal, get the request and response log.Printf("%s: %s: %d", r.RemoteAddr, r.Method, code) } func instrument(...) { // proceed as normal, get the request and response dur.WithLabelValues(r.Method, fmt.Sprint(code)).Observe(time.Since(begin).Seconds()) } func rateLimit(...) { if aboveThreshold { error } // proceed as normal }
Let's generalize each operation as an RPC: request, response. We can model RPC methods as endpoints.
type Endpoint func(request) response
RPCs can fail, so we want some way to signal failure. And we'd like to have some way of passing request-scoped information between the layers of this onion. So the actual definition is a little bigger.
type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error)
interface{} ? interface{} :(
One way to do it is to change the profileService methods into endpoints.
func (ps *profileService) PostProfile(ctx *context.Context, request interface{}) (response interface{}, err error) { p := request.(Profile) if _, ok := ps.profiles[p.ID]; ok { return nil, errors.New("profile already exists") } ps.profiles[p.ID] = p return nil, nil }
func (ps *profileService) GetProfile(ctx *context.Context, request interface{}) (response interface{}, err error) { id := request.(string) p, ok := ps.profiles[id] if !ok { return nil, errors.New("profile not found") } return p, nil }
Another way is to write endpoint constructors which take the (unaltered) service.
func makePostProfileEndpoint(ps *profileService) Endpoint { return func(_ context.Context, request interface{}) (response interface{}, err error) { p := request.(Profile) err := ps.PostProfile(p) return nil, err } }
func makeGetProfileEndpoint(ps *profileService) Endpoint { return func(_ context.Context, request interface{}) (response interface{}, err error) { id := request.(string) p, err := ps.GetProfile(id) return p, err } }
This way is better. It allows us to leave our ProfileService interface pure, avoiding mixing in the endpoint concerns.
In either case, now that we have a general form for our operations, we can layer in value-added stuff without knowing or caring about the underlying logic.
type Middleware func(Endpoint) Endpoint
(Also known as the Decorator pattern.)
Perhaps the simplest middleware is one that just logs the call.
func loggingMiddleware(next Endpoint) Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { begin := time.Now() defer func() { log.Printf("request took %s", time.Since(begin)) }() return next(ctx, request) } }
But which method is it? We need to provide some more information...
Add another layer of context.
func loggingMiddleware(method string) Middleware { return func(next Endpoint) Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { begin := time.Now() defer func() { log.Printf("%s took %s", method, time.Since(begin)) }() return next(ctx, request) } } }
And here's how you wire it in.
var e Endpoint e = makePostProfile(ps) e = loggingMiddleware("PostProfile")(e) e = instrumentingMiddleware(...)(e)
Each middleware is exclusively concerned with one task.
You can avoid mixing different concerns in the same place β keeping each piece of functionality pure and distinct.
This makes your code much easier to maintain, refactor, and eventually delete.
func breaker(cb *CircuitBreaker) Middleware { return func(next Endpoint) Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { if !cb.Allow() { return nil, breaker.ErrCircuitOpen } defer func(begin time.Time) { if err == nil { cb.Success(time.Since(begin)) } else { cb.Failure(time.Since(begin)) } }(time.Now()) return next(ctx, request) } } }
func limiter(tb *TokenBucket) Middleware { return func(next Endpoint) Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { if tb.TakeAvailable(1) == 0 { return nil, ErrLimited } return next(ctx, request) } } }
So far our middleware has operated on an endpoint.
type Middleware func(Endpoint) Endpoint
But remember we also have a service interface.
type ProfileService interface { PostProfile(p Profile) error GetProfile(id string) (Profile, error) }
Which means we could define a service middleware, custom to our application.
type Middleware func(ProfileService) ProfileService
Service middlewares are a good way to implement single-responsibility functionality that interacts with your business domain.
Let's say you wanted to know when any Michaels registered.
type alertingMiddleware struct { ProfileService alert Alerter } func (mw alertingMiddleware) PostProfile(p Profile) error { if strings.Contains(p.Name, "Michael") { mw.alert.Alert("We've got a new Michael!!") } return mw.ProfileService.PostProfile(p) }
And wire it in to the service declaration, similar to constructing endpoints.
var ps ProfileService ps = newInmemProfileService() ps = alertingMiddleware{ps, alerter}
(In reality, you'd probably use a service middleware to do logging, too.)
Go kit comes with a handy HTTP transport. It adapts an endpoint to an HTTP handler.
postProfile := httptransport.NewServer( context.Background(), makePostProfileEndpoint(ps), decodePostRequest, encodePostResponse, ) getProfile := httptransport.NewServer( context.Background(), makeGetProfileEndpoint(ps), decodeGetRequest, encodeGetResponse, )
Use it with the slightly more fully-featured Gorilla muxer.
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"github.com/go-kit/kit/endpoint"
httptransport "github.com/go-kit/kit/transport/http"
)
func main() {
var alerter Alerter
// START SERVICE OMIT
var ps ProfileService
ps = newInmemProfileService()
ps = alertingMiddleware{ps, alerter}
// END SERVICE OMIT
// START SERVER OMIT
postProfile := httptransport.NewServer(
context.Background(),
makePostProfileEndpoint(ps), // HL
decodePostRequest,
encodePostResponse,
)
getProfile := httptransport.NewServer(
context.Background(),
makeGetProfileEndpoint(ps), // HL
decodeGetRequest,
encodeGetResponse,
)
// END SERVER OMIT
r := mux.NewRouter() r.Methods("POST").Handler(postProfile) r.Methods("GET").Handler(getProfile)
log.Printf("listening on :8080")
log.Fatal(http.ListenAndServe(":8080", r))
}
func decodePostRequest(_ context.Context, r *http.Request) (interface{}, error) {
var p Profile
err := json.NewDecoder(r.Body).Decode(&p)
return p, err
}
func encodePostResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
fmt.Fprintf(w, "OK")
return nil
}
func decodeGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
return r.FormValue("id"), nil
}
func encodeGetResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
type ProfileService interface {
PostProfile(p Profile) error
GetProfile(id string) (Profile, error)
}
type Profile struct {
ID string
Name string
}
type profileService struct {
profiles map[string]Profile
}
func newInmemProfileService() *profileService {
return &profileService{profiles: map[string]Profile{}}
}
func (ps *profileService) PostProfile(p Profile) error {
if _, ok := ps.profiles[p.ID]; ok {
return errors.New("profile already exists")
}
ps.profiles[p.ID] = p
return nil
}
func (ps *profileService) GetProfile(id string) (Profile, error) {
p, ok := ps.profiles[id]
if !ok {
return Profile{}, errors.New("profile not found")
}
return p, nil
}
func makePostProfileEndpoint(ps ProfileService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
p := request.(Profile)
return nil, ps.PostProfile(p)
}
}
func makeGetProfileEndpoint(ps ProfileService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
id := request.(string)
return ps.GetProfile(id)
}
}
func loggingMiddleware(method string) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) { // HL
defer func(begin time.Time) {
log.Printf("%s: %s (%v)", method, time.Since(begin), err)
}(time.Now())
return next(ctx, request)
}
}
}
var dur = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "users_team",
Subsystem: "profile_service",
Name: "http_request_duration_seconds", // HL
Help: "Time spent serving HTTP requests.",
}, []string{"method", "status_code"}) // HL
type Alerter interface {
Alert(string)
}
// START SERVICEMIDDLEWARE OMIT
type alertingMiddleware struct {
ProfileService
alert Alerter
}
func (mw alertingMiddleware) PostProfile(p Profile) error {
if strings.Contains(p.Name, "Michael") {
mw.alert.Alert("We've got a new Michael!!")
}
return mw.ProfileService.PostProfile(p)
}
// END SERVICEMIDDLEWARE OMIT
var postEndpoint endpoint.Endpoint postEndpoint = makePostProfileEndpoint(ps) postEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(postEndpoint) postEndpoint = logging("PostProfile")(postEndpoint) postEndpoint = instrument(dur)(postEndpoint) postProfile := httptransport.NewServer( context.Background(), postEndpoint, decodePostRequest, encodePostResponse, )
(There's a similar stack for the GetProfile endpoint.)
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/ratelimit"
httptransport "github.com/go-kit/kit/transport/http"
rl "github.com/juju/ratelimit"
)
func main() {
dur := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "users_team",
Subsystem: "profile_service",
Name: "http_request_duration_seconds", // HL
Help: "Time spent serving HTTP requests.",
}, []string{"method", "status_code"}) // HL
var alerter Alerter
var ps ProfileService
ps = newInmemProfileService()
ps = alertingMiddleware{ps, alerter}
// START MIDDLEWARE OMIT
var postEndpoint endpoint.Endpoint
postEndpoint = makePostProfileEndpoint(ps)
postEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(postEndpoint)
postEndpoint = logging("PostProfile")(postEndpoint)
postEndpoint = instrument(dur)(postEndpoint)
postProfile := httptransport.NewServer(
context.Background(),
postEndpoint,
decodePostRequest,
encodePostResponse,
)
// END MIDDLEWARE OMIT
var getEndpoint endpoint.Endpoint
getEndpoint = makeGetProfileEndpoint(ps)
getEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(getEndpoint)
getEndpoint = logging("GetProfile")(getEndpoint)
getEndpoint = instrument(dur)(getEndpoint)
getProfile := httptransport.NewServer(
context.Background(),
getEndpoint,
decodeGetRequest,
encodeGetResponse,
)
r := mux.NewRouter() r.Methods("POST").Handler(postProfile) r.Methods("GET").Handler(getProfile) log.Printf("listening on :8080") log.Fatal(http.ListenAndServe(":8080", r))
}
func decodePostRequest(_ context.Context, r *http.Request) (interface{}, error) {
var p Profile
err := json.NewDecoder(r.Body).Decode(&p)
return p, err
}
func encodePostResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
fmt.Fprintf(w, "OK")
return nil
}
func decodeGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
return r.FormValue("id"), nil
}
func encodeGetResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
func logging(method string) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
defer func(begin time.Time) {
if err == nil {
log.Printf("%s: %s (OK)", method, time.Since(begin))
} else {
log.Printf("%s: %s (%v)", method, time.Since(begin), err)
}
}(time.Now())
return next(ctx, request)
}
}
}
func instrument(dur *prometheus.SummaryVec) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
defer func(begin time.Time) {
method := "UNKNOWN"
if v := ctx.Value("method"); v != nil {
method = v.(string)
}
code := "UNKNOWN"
if v := ctx.Value("status_code"); v != nil {
code = fmt.Sprint(v.(string))
}
dur.WithLabelValues(method, code).Observe(time.Since(begin).Seconds())
}(time.Now())
return next(ctx, request)
}
}
}
type ProfileService interface {
PostProfile(p Profile) error
GetProfile(id string) (Profile, error)
}
type Profile struct {
ID string
Name string
}
type profileService struct {
profiles map[string]Profile
}
func newInmemProfileService() *profileService {
return &profileService{
profiles: map[string]Profile{
"john": Profile{ID: "john", Name: "John Edgar"},
},
}
}
func (ps *profileService) PostProfile(p Profile) error {
if _, ok := ps.profiles[p.ID]; ok {
return errors.New("profile already exists")
}
ps.profiles[p.ID] = p
return nil
}
func (ps *profileService) GetProfile(id string) (Profile, error) {
p, ok := ps.profiles[id]
if !ok {
return Profile{}, errors.New("profile not found")
}
return p, nil
}
func makePostProfileEndpoint(ps ProfileService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
p := request.(Profile)
return nil, ps.PostProfile(p)
}
}
func makeGetProfileEndpoint(ps ProfileService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
id := request.(string)
return ps.GetProfile(id)
}
}
func loggingMiddleware(method string) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) { // HL
defer func(begin time.Time) {
log.Printf("%s: %s (%v)", method, time.Since(begin), err)
}(time.Now())
return next(ctx, request)
}
}
}
type Alerter interface {
Alert(string)
}
type alertingMiddleware struct {
ProfileService
alert Alerter
}
func (mw alertingMiddleware) PostProfile(p Profile) error {
if strings.Contains(p.Name, "Michael") {
mw.alert.Alert("We've got a new Michael!!")
}
return mw.ProfileService.PostProfile(p)
}
Can easily have the same service served on multiple transports simultaneously.
Shared middleware stacks, just different serialization.
Your organization has probably already decided on a transport layer.
And you've probably already built tooling, reporting, and culture around it.
Go kit wants to make it easy to adopt Go. We will always play nicely with your infrastructure, to the extent it's possible. You know best.
(Same goes for service discovery, logging infrastructure, orchestration, configuration...)
Whenever control passes in to or out of a microservice, that service notes timing, annotation, and contextual data into a partial span. Spans are batched and emitted to a collector. The collector stitches together multiple related spans into a trace.
One popular implementation is Zipkin. It produces a sequence diagram of all the requests between your microservices.
(I wish they used flame charts instead...)
Go kit has a helper to move IDs from e.g. HTTP headers to the context object.
func ToContext(ctx context.Context, r *http.Request) context.Context { // Copy trace and span IDs from the http.Request to the context.Context }
And then middlewares to create the spans and emit them to the collectors.
func AnnotateServer(...) endpoint.Middleware { return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { span := getFrom(ctx) defer span.Collect() span.AnnotateReceive() defer span.AnnotateSend() return next(ctx, request) } } }
Now, if you want to support Zipkin, you just add a middleware to your stack.
var myEndpoint Endpoint myEndpoint = makeEndpoint(myService) myEndpoint = instrument(...)(myEndpoint) myEndpoint = zipkin.AnnotateServer(...)(myEndpoint) // β myEndpoint = logging(...)(myEndpoint)
Imagine trying to do this without some structure?
Make microservices tractable, and make Go viable, for your organization.