Go Do

Peter Bourgon

SoundCloud

Software Engineering

is concerned with developing and maintaining software systems that behave reliably and efficiently, are affordable to develop and maintain, and satisfy all the requirements that customers have defined for them.
ACM

Software Engineering as Process

I think of this as a process. I look at a system as it exists today, and compare it against how it should exist in the future. The better I am at what I do, the better I can perform that transformation.

Why Go?

I think Go is a really excellent tool.

It provides a small set of carefully-considered primitives which are orthogonal to each other, and can be easily combined to develop solutions to a large class of problems. Those solutions tend to be decomposable, testable, and maintainable.

Go in 5 minutes

Facts about Go

Syntax

package main

import "fmt"

func main() {
    var s1 string = "Jón Þór Birgisson" // Variables may be explicitly typed,
    s2 := "Jónsi"                       // or implicitly typed.

    fmt.Printf("%s, or '%s' for short\n", s1, s2)
}

Types

package main

import "fmt"

type Thing struct {
    Name  string
    Value int
}

type Celsius float32
type Fahrenheit float32

func main() {
    c := Celsius(-40.0)
    f := Fahrenheit(-40.0)

    if c == f { // compiler error
        fmt.Println("It's cold!")
    }
}

Interfaces

An interface is similar to an abstract class in other languages.

type Runner interface {
    Run(int) error
}

Concrete types implement interfaces.

type Runbot9000 struct {
    // ...
}
func (b Runbot9000) Run(distance int) error {
    return nil // Runbot 9000 is not programmed to fail
}

Crucially: there is no explicit declaration of intent.

Interfaces

Interfaces are first-class objects: many stdlib functions operate exclusively on interfaces.

func Race(distance int, runners ...Runner) {
    for i, r := range runners {
        err := r.Run(distance)
        if err == nil {
            fmt.Printf("Runner %d finished, hooray!\n", i)
        } else {
            fmt.Printf("Runner %d didn't finish: %s\n", i, err)
        }
    }
}
package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Runner interface {
	Run(int) error
}

type Runbot9000 struct {
	// ...
}

func (b Runbot9000) Run(distance int) error {
	return nil // Runbot 9000 is not programmed to fail
}

type Baby struct{}

func (x Baby) Run(distance int) error {
	return fmt.Errorf("babies can't run")
}

type Developer struct {
	Clumsiness float32 // 0..1
}

func (d Developer) Run(distance int) error {
	if rand.Float32() < d.Clumsiness {
		return fmt.Errorf("tripped over shoelaces")
	}
	return nil
}

func init() {
	rand.Seed(time.Now().UnixNano())
}

func Race(distance int, runners ...Runner) {
	for i, r := range runners {
		err := r.Run(distance)
		if err == nil {
			fmt.Printf("Runner %d finished, hooray!\n", i)
		} else {
			fmt.Printf("Runner %d didn't finish: %s\n", i, err)
		}
	}
}

func main() {
    Race(50, Developer{Clumsiness: 0.5}, Baby{}, Runbot9000{})
}

Goroutines

Goroutines are essentially coroutines, from Tony Hoare's Communicating Sequential Processes. Like very lightweight threads, multiplexed onto OS threads.

Launch any function call in a new goroutine with the go keyword. It begins executing concurrently, "in the background".

You don't get handles, or any explicit goroutine management. Because...

Channels

Communication between goroutines is idiomatically accomplished with channels, which are typed, synchronized, and optionally-buffered pipes for data.

Channels are first-class objects, and may be passed around like anything else. You can have a channel of channels (of channels...)

Channels

package main

import "fmt"

func main() {
    c := make(chan int)
    go produce(c)
    consume(c)
}

func produce(c chan int) {
    c <- 1 // put data onto channel
    c <- 2
    c <- 3
    close(c)
}

func consume(c chan int) {
    for i := range c {
        fmt.Println(i)
    }
}

Concepts

Software Engineering as Process

Example: Collect data from multiple backends

"Let's Go Further: Building Concurrent Software With Go"

Sameer Ajmani, Google Tech Talk
April 25, 2012

Example respectfully repurposed.

Collect data from multiple backends

Let's say a backend can perform queries, and return a string.

type Backend interface {
    Query(q string) string
}
type MyBackend string

func (b MyBackend) Query(q string) string {
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    return fmt.Sprintf("%s/%s", b, q)
}
type Skrillex struct{}

func (s Skrillex) Query(q string) string {
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    return "wub-wub-wub"
}

Collect data from multiple backends

We want to broadcast a query to a set of backends, and aggregate the responses.
What's the most naïve implementation?

func QueryAll(q string, backends ...Backend) []string {
    results := []string{}
    for _, backend := range backends {
        r := backend.Query(q)
        results = append(results, r)
    }
    return results
}
package main

import (
	"flag"
	"fmt"
	"math/rand"
	"time"
)

func init() {
	flag.Parse()
	rand.Seed(time.Now().UnixNano())
}

type Backend interface {
	Query(q string) string
}

type Skrillex struct{}

func (s Skrillex) Query(q string) string {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return "wub-wub-wub"
}

type MyBackend string

func (b MyBackend) Query(q string) string {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return fmt.Sprintf("%s/%s", b, q)
}

func QueryAll(q string, backends ...Backend) []string {
	results := []string{}
	for _, backend := range backends {
		r := backend.Query(q)
		results = append(results, r)
	}
	return results
}

func main() {
    b1 := MyBackend("server-1")
    b2 := MyBackend("server-2")
    b3 := Skrillex{}

    began := time.Now()
    results := QueryAll("dubstep", b1, b2, b3)
    fmt.Println(results)
    fmt.Println(time.Since(began))
}

Collect data from multiple backends

We can do better. Let's fire queries concurrently.

package main

import (
	"flag"
	"fmt"
	"math/rand"
	"time"
)

func init() {
	flag.Parse()
	rand.Seed(time.Now().UnixNano())
}

type Backend interface {
	Query(q string) string
}

type Skrillex struct{}

func (s Skrillex) Query(q string) string {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return "wub-wub-wub"
}

type MyBackend string

func (b MyBackend) Query(q string) string {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return fmt.Sprintf("%s/%s", b, q)
}

func QueryAll(q string, backends ...Backend) []string {
    // query
    c := make(chan string, len(backends)) // buffered chan
    for _, backend := range backends {
        go func(b Backend) { c <- b.Query(q) }(backend)
    }

    // aggregate
    results := []string{}
    for i := 0; i < cap(c); i++ {
        results = append(results, <-c)
    }
    return results
}
func main() {
	b1 := MyBackend("server-1")
	b2 := MyBackend("server-2")
	b3 := Skrillex{}

	began := time.Now()
	results := QueryAll("dubstep", b1, b2, b3)
	fmt.Println(results)
	fmt.Println(time.Since(began))
}

Note the QueryAll method definition didn't change. We still call it synchronously, and it does concurrent stuff internally.

Collect data from multiple backends

We can do even better. Replicate backends!

type Replicas []Backend // Backends with same content

func (r Replicas) Query(q string) string {
    c := make(chan string, len(r))
    for _, backend := range r {
        go func(b Backend) { c <- b.Query(q) }(backend)
    }
    return <-c
}

Collect data from multiple backends

Then query the replicas.

package main

import (
	"flag"
	"fmt"
	"math/rand"
	"time"
)

var (
	method = flag.String("method", "sync", "which query method to use")
)

func init() {
	flag.Parse()
	rand.Seed(time.Now().UnixNano())
}

type Backend interface {
	Query(q string) string
}

type Skrillex struct{}

func (s Skrillex) Query(q string) string {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return "wub-wub-wub"
}

type MyBackend string

func (b MyBackend) Query(q string) string {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return fmt.Sprintf("%s/%s", b, q)
}

type Replicas []Backend // Backends with same content

func (r Replicas) Query(q string) string { // HL
	c := make(chan string, len(r))
	for _, backend := range r {
		go func(b Backend) { c <- b.Query(q) }(backend)
	}
	return <-c // HL
}

func QueryAll(q string, backends ...Backend) []string {
	// query
	c := make(chan string, len(backends)) // buffered chan
	for _, backend := range backends {
		go func(b Backend) { c <- b.Query(q) }(backend) // HL
	}

	// aggregate
	results := []string{}
	for i := 0; i < cap(c); i++ {
		results = append(results, <-c)
	}
	return results
}

func main() {
    r1 := Replicas{MyBackend("foo1"), MyBackend("foo2"), MyBackend("foo3")}
    r2 := Replicas{MyBackend("bar1"), MyBackend("bar2")}
    r3 := Replicas{Skrillex{}, Skrillex{}, Skrillex{}, Skrillex{}, Skrillex{}}

    began := time.Now()
    results := QueryAll("dubstep", r1, r2, r3)
    fmt.Println(results)
    fmt.Println(time.Since(began))
}

Example: Pipelined data processing

Pipelined data processing

We're subscribing to an event publisher, converting messages to enriched data models, and feeding them into a data store.

Pipelined data processing: Listen

Model each stage as a function.

Here, our Listen function simulates an infinite stream of messages, pushing them down an output channel.

func Listen(out chan Msg) {
    for {
        time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond)
        if rand.Intn(10) < 6 {
            out <- "foo"
        } else {
            out <- "bar"
        }
    }
}

Pipelined data processing: Enrich

The Enrich stage reads a single message from the input channel, processes it, and pushes the result down the output channel.

func Enrich(in, out chan Msg) {
    for {
        msg := <-in
        msg = "☆ " + msg + " ☆"
        out <- msg
    }
}

Note: no explicit synchronization, no condition variables, no timed waits.
Everything falls out of the goroutine/channel model.

Pipelined data processing: Store

The Store stage simulates writing the message somewhere.

func Store(in chan Msg) {
    for {
        msg := <-in
        fmt.Println(msg) // store to stdout
    }
}

Pipelined data processing: main

Wire the stages together, and launch each stage as a goroutine.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

type Msg string

func Listen(out chan Msg) {
	for {
		time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond)
		if rand.Intn(10) < 6 {
			out <- "foo"
		} else {
			out <- "bar"
		}
	}
}

func Enrich(in, out chan Msg) {
	for {
		msg := <-in
		msg = "☆ " + msg + " ☆"
		out <- msg
	}
}

func Store(in chan Msg) {
	for {
		msg := <-in
		fmt.Println(msg) // store to stdout
	}
}

func main() {
    // build the infrastructure
    toEnricher := make(chan Msg)
    toStore := make(chan Msg)

    // launch the actors
    go Listen(toEnricher)
    go Enrich(toEnricher, toStore)
    go Store(toStore)

    time.Sleep(1 * time.Second)
}

Using channels to pass ownership of a message between stages makes the program naturally concurrent. It also cleanly separates the business logic from transport semantics: total separation of concerns.

Note that because the channels are unbuffered, you get automatic backpressure, which (in my experience) is generally what you want.

Pipelined data processing: Filter

Let's add a Filtering stage!

func Filter(in, out chan Msg) {
    for {
        msg := <-in
        if msg == "bar" {
            continue // drop
        }
        out <- msg
    }
}

Think in terms of actors doing the work, and the pipes used to transport that work. It's safe and easy to abort the pipeline at any stage.

Pipelined data processing: Filter

Wire up the Filter stage...

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

type Msg string

func Listen(out chan Msg) {
	for {
		time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond)
		if rand.Intn(10) < 6 {
			out <- "foo"
		} else {
			out <- "bar"
		}
	}
}

func Filter(in, out chan Msg) {
	for {
		msg := <-in
		if msg == "bar" {
			continue // drop
		}
		out <- msg
	}
}

func Enrich(in, out chan Msg) {
	for {
		msg := <-in
		msg = "☆ " + msg + " ☆"
		out <- msg
	}
}

func Store(in chan Msg) {
	for {
		msg := <-in
		fmt.Println(msg) // mock storage
	}
}

func main() {
    toFilter := make(chan Msg)
    toEnricher := make(chan Msg)
    toStore := make(chan Msg)

    go Listen(toFilter)            
    go Filter(toFilter, toEnricher)
    go Enrich(toEnricher, toStore)
    go Store(toStore)

    time.Sleep(1 * time.Second)
}

There's no complex abstraction to get lost in. You can look at this function and immediately understand what it does and how it works.

"The code does what it says on the page."

Pipelined data processing: concurrency

Scaling the actors for a stage increases the concurrency of the program.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

type Msg string

func Listen(out chan Msg) {
	for {
		time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond)
		if rand.Intn(10) < 6 {
			out <- "foo"
		} else {
			out <- "bar"
		}
	}
}

func Filter(in, out chan Msg) {
	for {
		msg := <-in
		if msg == "bar" {
			continue // drop
		}
		out <- msg
	}
}

func Enrich(in, out chan Msg) {
	for {
		msg := <-in
		msg = "☆ " + msg + " ☆"
		out <- msg
	}
}

func Store(in chan Msg) {
	for {
		msg := <-in
		fmt.Println(msg) // mock storage
	}
}

func main() {
    toFilter := make(chan Msg)
    toEnricher := make(chan Msg)
    toStore := make(chan Msg)

    go Listen(toFilter)
    go Filter(toFilter, toEnricher)
    go Enrich(toEnricher, toStore)
    go Enrich(toEnricher, toStore)
    go Enrich(toEnricher, toStore)
    go Store(toStore)

    time.Sleep(1 * time.Second)
}

Channel operations are a synchronization point across goroutines, so multiple goroutines may safely read from (or write to) the same channel. Each message will be delivered to exactly one receiver.

Pipelined data processing: HTTP

What if our event source isn't a message queue, but instead an HTTP server?
What if every message is an HTTP request?

We can just change the Msg type, to hold the relevant information...

type Msg struct {
    Data string    // 'data' parameter extracted from form values
    Done chan bool // signal channel to request handler
}

Pipelined data processing: HTTP

And modify the Listener to start an HTTP server, to generate those Msgs.
(We're passing pointers, now, because the stages can modify the message.)

func Listen(out chan *Msg) {
    h := func(w http.ResponseWriter, r *http.Request) {
        msg := &Msg{
            Data: r.FormValue("data"),
            Done: make(chan bool),
        }
        out <- msg

        success := <-msg.Done // wait for done signal
        if !success {
            w.Write([]byte(fmt.Sprintf("aborted: %s", msg.Data)))
            return
        }
        w.Write([]byte(fmt.Sprintf("OK: %s", msg.Data)))
    }

    http.HandleFunc("/incoming", h)
    fmt.Println("listening on :8080")
    http.ListenAndServe(":8080", nil) // blocks
}

Pipelined data processing: HTTP

Also, whenever our pipeline completes, we need to signal the HTTP handler to write a response to the client and close the connection.

func Filter(in, out chan *Msg) {
    for {
        msg := <-in
        if msg.Data == "bar" {
            msg.Done <- false
            continue
        }
        out <- msg
    }
}
func Store(in chan *Msg) {
    for {
        msg := <-in
        fmt.Println(msg.Data)
        msg.Done <- true
    }
}

Pipelined data processing: HTTP

Otherwise, everything is identical.

package main

import (
	"fmt"
	"net/http" // production-grade HTTP server
)

type Msg struct {
	Data string    // 'data' parameter extracted from form values
	Done chan bool // signal channel to request handler
}

func Listen(out chan *Msg) {
	h := func(w http.ResponseWriter, r *http.Request) {
		msg := &Msg{
			Data: r.FormValue("data"),
			Done: make(chan bool),
		}
		out <- msg

		success := <-msg.Done // wait for done signal
		if !success {
			w.Write([]byte(fmt.Sprintf("aborted: %s", msg.Data)))
			return
		}
		w.Write([]byte(fmt.Sprintf("OK: %s", msg.Data)))
	}

	http.HandleFunc("/incoming", h)
	fmt.Println("listening on :8080")
	http.ListenAndServe(":8080", nil) // blocks
}

func Filter(in, out chan *Msg) {
	for {
		msg := <-in
		if msg.Data == "bar" {
			msg.Done <- false // HL
			continue
		}
		out <- msg
	}
}

func Enrich(in, out chan *Msg) {
	for {
		msg := <-in
		msg.Data = "☆ " + msg.Data + " ☆"
		out <- msg
	}
}

func Store(in chan *Msg) {
	for {
		msg := <-in
		fmt.Println(msg.Data)
		msg.Done <- true // HL
	}
}

func main() {
    toFilter := make(chan *Msg)
    toEnricher := make(chan *Msg)
    toStore := make(chan *Msg)

    go Listen(toFilter)
    go Filter(toFilter, toEnricher)
    go Enrich(toEnricher, toStore)
    go Store(toStore)

    select {} // block forever without spinning the CPU
}

Recap

Go helps the process of engineering software to be more pleasant.

Thank you

Peter Bourgon

SoundCloud