Go Do
Peter Bourgon
SoundCloud
Peter Bourgon
SoundCloud
is concerned with developing and maintaining software systems that behave reliably and efficiently, are affordable to develop and maintain, and satisfy all the requirements that customers have defined for them.—ACM
I think of this as a process. I look at a system as it exists today, and compare it against how it should exist in the future. The better I am at what I do, the better I can perform that transformation.
I think Go is a really excellent tool.
It provides a small set of carefully-considered primitives which are orthogonal to each other, and can be easily combined to develop solutions to a large class of problems. Those solutions tend to be decomposable, testable, and maintainable.
package main import "fmt" func main() { var s1 string = "Jón Þór Birgisson" // Variables may be explicitly typed, s2 := "Jónsi" // or implicitly typed. fmt.Printf("%s, or '%s' for short\n", s1, s2) }
package main import "fmt" type Thing struct { Name string Value int } type Celsius float32 type Fahrenheit float32 func main() { c := Celsius(-40.0) f := Fahrenheit(-40.0) if c == f { // compiler error fmt.Println("It's cold!") } }
An interface is similar to an abstract class in other languages.
type Runner interface { Run(int) error }
Concrete types implement interfaces.
type Runbot9000 struct { // ... }
func (b Runbot9000) Run(distance int) error { return nil // Runbot 9000 is not programmed to fail }
Crucially: there is no explicit declaration of intent.
Interfaces are first-class objects: many stdlib functions operate exclusively on interfaces.
func Race(distance int, runners ...Runner) { for i, r := range runners { err := r.Run(distance) if err == nil { fmt.Printf("Runner %d finished, hooray!\n", i) } else { fmt.Printf("Runner %d didn't finish: %s\n", i, err) } } }
package main import ( "fmt" "math/rand" "time" ) type Runner interface { Run(int) error } type Runbot9000 struct { // ... } func (b Runbot9000) Run(distance int) error { return nil // Runbot 9000 is not programmed to fail } type Baby struct{} func (x Baby) Run(distance int) error { return fmt.Errorf("babies can't run") } type Developer struct { Clumsiness float32 // 0..1 } func (d Developer) Run(distance int) error { if rand.Float32() < d.Clumsiness { return fmt.Errorf("tripped over shoelaces") } return nil } func init() { rand.Seed(time.Now().UnixNano()) } func Race(distance int, runners ...Runner) { for i, r := range runners { err := r.Run(distance) if err == nil { fmt.Printf("Runner %d finished, hooray!\n", i) } else { fmt.Printf("Runner %d didn't finish: %s\n", i, err) } } }
func main() { Race(50, Developer{Clumsiness: 0.5}, Baby{}, Runbot9000{}) }
Goroutines are essentially coroutines, from Tony Hoare's Communicating Sequential Processes. Like very lightweight threads, multiplexed onto OS threads.
Launch any function call in a new goroutine with the go keyword. It begins executing concurrently, "in the background".
You don't get handles, or any explicit goroutine management. Because...
Communication between goroutines is idiomatically accomplished with channels, which are typed, synchronized, and optionally-buffered pipes for data.
Channels are first-class objects, and may be passed around like anything else. You can have a channel of channels (of channels...)
package main import "fmt" func main() { c := make(chan int) go produce(c) consume(c) } func produce(c chan int) { c <- 1 // put data onto channel c <- 2 c <- 3 close(c) } func consume(c chan int) { for i := range c { fmt.Println(i) } }
Sameer Ajmani, Google Tech Talk
April 25, 2012
Example respectfully repurposed.
Let's say a backend can perform queries, and return a string.
type Backend interface { Query(q string) string }
type MyBackend string func (b MyBackend) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return fmt.Sprintf("%s/%s", b, q) }
type Skrillex struct{} func (s Skrillex) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return "wub-wub-wub" }
We want to broadcast a query to a set of backends, and aggregate the responses.
What's the most naïve implementation?
func QueryAll(q string, backends ...Backend) []string { results := []string{} for _, backend := range backends { r := backend.Query(q) results = append(results, r) } return results }
package main import ( "flag" "fmt" "math/rand" "time" ) func init() { flag.Parse() rand.Seed(time.Now().UnixNano()) } type Backend interface { Query(q string) string } type Skrillex struct{} func (s Skrillex) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return "wub-wub-wub" } type MyBackend string func (b MyBackend) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return fmt.Sprintf("%s/%s", b, q) } func QueryAll(q string, backends ...Backend) []string { results := []string{} for _, backend := range backends { r := backend.Query(q) results = append(results, r) } return results }
func main() { b1 := MyBackend("server-1") b2 := MyBackend("server-2") b3 := Skrillex{} began := time.Now() results := QueryAll("dubstep", b1, b2, b3) fmt.Println(results) fmt.Println(time.Since(began)) }
We can do better. Let's fire queries concurrently.
package main import ( "flag" "fmt" "math/rand" "time" ) func init() { flag.Parse() rand.Seed(time.Now().UnixNano()) } type Backend interface { Query(q string) string } type Skrillex struct{} func (s Skrillex) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return "wub-wub-wub" } type MyBackend string func (b MyBackend) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return fmt.Sprintf("%s/%s", b, q) }
func QueryAll(q string, backends ...Backend) []string { // query c := make(chan string, len(backends)) // buffered chan for _, backend := range backends { go func(b Backend) { c <- b.Query(q) }(backend) } // aggregate results := []string{} for i := 0; i < cap(c); i++ { results = append(results, <-c) } return results }
func main() { b1 := MyBackend("server-1") b2 := MyBackend("server-2") b3 := Skrillex{} began := time.Now() results := QueryAll("dubstep", b1, b2, b3) fmt.Println(results) fmt.Println(time.Since(began)) }
Note the QueryAll method definition didn't change. We still call it synchronously, and it does concurrent stuff internally.
We can do even better. Replicate backends!
type Replicas []Backend // Backends with same content func (r Replicas) Query(q string) string { c := make(chan string, len(r)) for _, backend := range r { go func(b Backend) { c <- b.Query(q) }(backend) } return <-c }
Then query the replicas.
package main import ( "flag" "fmt" "math/rand" "time" ) var ( method = flag.String("method", "sync", "which query method to use") ) func init() { flag.Parse() rand.Seed(time.Now().UnixNano()) } type Backend interface { Query(q string) string } type Skrillex struct{} func (s Skrillex) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return "wub-wub-wub" } type MyBackend string func (b MyBackend) Query(q string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return fmt.Sprintf("%s/%s", b, q) } type Replicas []Backend // Backends with same content func (r Replicas) Query(q string) string { // HL c := make(chan string, len(r)) for _, backend := range r { go func(b Backend) { c <- b.Query(q) }(backend) } return <-c // HL } func QueryAll(q string, backends ...Backend) []string { // query c := make(chan string, len(backends)) // buffered chan for _, backend := range backends { go func(b Backend) { c <- b.Query(q) }(backend) // HL } // aggregate results := []string{} for i := 0; i < cap(c); i++ { results = append(results, <-c) } return results }
func main() { r1 := Replicas{MyBackend("foo1"), MyBackend("foo2"), MyBackend("foo3")} r2 := Replicas{MyBackend("bar1"), MyBackend("bar2")} r3 := Replicas{Skrillex{}, Skrillex{}, Skrillex{}, Skrillex{}, Skrillex{}} began := time.Now() results := QueryAll("dubstep", r1, r2, r3) fmt.Println(results) fmt.Println(time.Since(began)) }
We're subscribing to an event publisher, converting messages to enriched data models, and feeding them into a data store.
Model each stage as a function.
Here, our Listen function simulates an infinite stream of messages, pushing them down an output channel.
func Listen(out chan Msg) { for { time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond) if rand.Intn(10) < 6 { out <- "foo" } else { out <- "bar" } } }
The Enrich stage reads a single message from the input channel, processes it, and pushes the result down the output channel.
func Enrich(in, out chan Msg) { for { msg := <-in msg = "☆ " + msg + " ☆" out <- msg } }
Note: no explicit synchronization, no condition variables, no timed waits.
Everything falls out of the goroutine/channel model.
The Store stage simulates writing the message somewhere.
func Store(in chan Msg) { for { msg := <-in fmt.Println(msg) // store to stdout } }
Wire the stages together, and launch each stage as a goroutine.
package main import ( "fmt" "math/rand" "time" ) func init() { rand.Seed(time.Now().UnixNano()) } type Msg string func Listen(out chan Msg) { for { time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond) if rand.Intn(10) < 6 { out <- "foo" } else { out <- "bar" } } } func Enrich(in, out chan Msg) { for { msg := <-in msg = "☆ " + msg + " ☆" out <- msg } } func Store(in chan Msg) { for { msg := <-in fmt.Println(msg) // store to stdout } }
func main() { // build the infrastructure toEnricher := make(chan Msg) toStore := make(chan Msg) // launch the actors go Listen(toEnricher) go Enrich(toEnricher, toStore) go Store(toStore) time.Sleep(1 * time.Second) }
Using channels to pass ownership of a message between stages makes the program naturally concurrent. It also cleanly separates the business logic from transport semantics: total separation of concerns.
Note that because the channels are unbuffered, you get automatic backpressure, which (in my experience) is generally what you want.
Let's add a Filtering stage!
func Filter(in, out chan Msg) { for { msg := <-in if msg == "bar" { continue // drop } out <- msg } }
Think in terms of actors doing the work, and the pipes used to transport that work. It's safe and easy to abort the pipeline at any stage.
Wire up the Filter stage...
package main import ( "fmt" "math/rand" "time" ) func init() { rand.Seed(time.Now().UnixNano()) } type Msg string func Listen(out chan Msg) { for { time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond) if rand.Intn(10) < 6 { out <- "foo" } else { out <- "bar" } } } func Filter(in, out chan Msg) { for { msg := <-in if msg == "bar" { continue // drop } out <- msg } } func Enrich(in, out chan Msg) { for { msg := <-in msg = "☆ " + msg + " ☆" out <- msg } } func Store(in chan Msg) { for { msg := <-in fmt.Println(msg) // mock storage } }
func main() { toFilter := make(chan Msg) toEnricher := make(chan Msg) toStore := make(chan Msg) go Listen(toFilter) go Filter(toFilter, toEnricher) go Enrich(toEnricher, toStore) go Store(toStore) time.Sleep(1 * time.Second) }
There's no complex abstraction to get lost in. You can look at this function and immediately understand what it does and how it works.
"The code does what it says on the page."
Scaling the actors for a stage increases the concurrency of the program.
package main import ( "fmt" "math/rand" "time" ) func init() { rand.Seed(time.Now().UnixNano()) } type Msg string func Listen(out chan Msg) { for { time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond) if rand.Intn(10) < 6 { out <- "foo" } else { out <- "bar" } } } func Filter(in, out chan Msg) { for { msg := <-in if msg == "bar" { continue // drop } out <- msg } } func Enrich(in, out chan Msg) { for { msg := <-in msg = "☆ " + msg + " ☆" out <- msg } } func Store(in chan Msg) { for { msg := <-in fmt.Println(msg) // mock storage } }
func main() { toFilter := make(chan Msg) toEnricher := make(chan Msg) toStore := make(chan Msg) go Listen(toFilter) go Filter(toFilter, toEnricher) go Enrich(toEnricher, toStore) go Enrich(toEnricher, toStore) go Enrich(toEnricher, toStore) go Store(toStore) time.Sleep(1 * time.Second) }
Channel operations are a synchronization point across goroutines, so multiple goroutines may safely read from (or write to) the same channel. Each message will be delivered to exactly one receiver.
What if our event source isn't a message queue, but instead an HTTP server?
What if every message is an HTTP request?
We can just change the Msg type, to hold the relevant information...
type Msg struct { Data string // 'data' parameter extracted from form values Done chan bool // signal channel to request handler }
And modify the Listener to start an HTTP server, to generate those Msgs.
(We're passing pointers, now, because the stages can modify the message.)
func Listen(out chan *Msg) { h := func(w http.ResponseWriter, r *http.Request) { msg := &Msg{ Data: r.FormValue("data"), Done: make(chan bool), } out <- msg success := <-msg.Done // wait for done signal if !success { w.Write([]byte(fmt.Sprintf("aborted: %s", msg.Data))) return } w.Write([]byte(fmt.Sprintf("OK: %s", msg.Data))) } http.HandleFunc("/incoming", h) fmt.Println("listening on :8080") http.ListenAndServe(":8080", nil) // blocks }
Also, whenever our pipeline completes, we need to signal the HTTP handler to write a response to the client and close the connection.
func Filter(in, out chan *Msg) { for { msg := <-in if msg.Data == "bar" { msg.Done <- false continue } out <- msg } }
func Store(in chan *Msg) { for { msg := <-in fmt.Println(msg.Data) msg.Done <- true } }
Otherwise, everything is identical.
package main import ( "fmt" "net/http" // production-grade HTTP server ) type Msg struct { Data string // 'data' parameter extracted from form values Done chan bool // signal channel to request handler } func Listen(out chan *Msg) { h := func(w http.ResponseWriter, r *http.Request) { msg := &Msg{ Data: r.FormValue("data"), Done: make(chan bool), } out <- msg success := <-msg.Done // wait for done signal if !success { w.Write([]byte(fmt.Sprintf("aborted: %s", msg.Data))) return } w.Write([]byte(fmt.Sprintf("OK: %s", msg.Data))) } http.HandleFunc("/incoming", h) fmt.Println("listening on :8080") http.ListenAndServe(":8080", nil) // blocks } func Filter(in, out chan *Msg) { for { msg := <-in if msg.Data == "bar" { msg.Done <- false // HL continue } out <- msg } } func Enrich(in, out chan *Msg) { for { msg := <-in msg.Data = "☆ " + msg.Data + " ☆" out <- msg } } func Store(in chan *Msg) { for { msg := <-in fmt.Println(msg.Data) msg.Done <- true // HL } }
func main() { toFilter := make(chan *Msg) toEnricher := make(chan *Msg) toStore := make(chan *Msg) go Listen(toFilter) go Filter(toFilter, toEnricher) go Enrich(toEnricher, toStore) go Store(toStore) select {} // block forever without spinning the CPU }
Go helps the process of engineering software to be more pleasant.