Chapter 7. Concurrency patterns

In this chapter

  • Control the lifetime of programs
  • Manage a pool of resources that can be reused
  • Create a pool of goroutines that can process work

In chapter 6 you learned what concurrency is and how channels behave, and reviewed code that showed concurrency in action. In this chapter you’ll extend that knowledge by reviewing more code. We’ll review three packages that implement different concurrency patterns that you can use in your own projects. Each package provides a practical perspective on the use of concurrency and channels and how they can make concurrent programs easier to write and reason about.

7.1. Runner

The purpose of the runner package is to show how channels can be used to monitor the amount of time a program is running and terminate the program if it runs too long. This pattern is useful when developing a program that will be scheduled to run as a background task process. This could be a program that runs as a cron job, or in a worker-based cloud environment like Iron.io.

Let’s take a look at the runner.go code file from the runner package.

Listing 7.1. runner/runner.go
 01 // Example provided with help from Gabriel Aszalos.
 02 // Package runner manages the running and lifetime of a process.
 03 package runner
 04
 05 import (
 06     "errors"
 07     "os"
 08     "os/signal"
 09     "time"
 10 )
 11
 12 // Runner runs a set of tasks within a given timeout and can be
 13 // shut down on an operating system interrupt.
 14 type Runner struct {
 15     // interrupt channel reports a signal from the
 16     // operating system.
 17     interrupt chan os.Signal
 18
 19     // complete channel reports that processing is done.
 20     complete chan error
 21
 22     // timeout reports that time has run out.
 23     timeout <-chan time.Time
 24
 25     // tasks holds a set of functions that are executed
 26     // synchronously in index order.
 27     tasks []func(int)
 28 }
 29
 30 // ErrTimeout is returned when a value is received on the timeout.
 31 var ErrTimeout = errors.New("received timeout")
 32
 33 // ErrInterrupt is returned when an event from the OS is received.
 34 var ErrInterrupt = errors.New("received interrupt")
 35
 36 // New returns a new ready-to-use Runner.
 37 func New(d time.Duration) *Runner {
 38     return &Runner{
 39         interrupt: make(chan os.Signal, 1),
 40         complete:  make(chan error),
 41         timeout:   time.After(d),
 42     }
 43 }
 44
 45 // Add attaches tasks to the Runner. A task is a function that
 46 // takes an int ID.
 47 func (r *Runner) Add(tasks ...func(int)) {
 48     r.tasks = append(r.tasks, tasks...)
 49 }
 50
 51 // Start runs all tasks and monitors channel events.
 52 func (r *Runner) Start() error {
 53     // We want to receive all interrupt based signals.

 54     signal.Notify(r.interrupt, os.Interrupt)
 55
 56     // Run the different tasks on a different goroutine.
 57     go func() {
 58         r.complete <- r.run()
 59     }()
 60
 61     select {
 62     // Signaled when processing is done.
 63     case err := <-r.complete:
 64         return err
 65
 66     // Signaled when we run out of time.
 67     case <-r.timeout:
 68         return ErrTimeout
 69     }
 70 }
 71
 72 // run executes each registered task.
 73 func (r *Runner) run() error {
 74     for id, task := range r.tasks {
 75         // Check for an interrupt signal from the OS.
 76         if r.gotInterrupt() {
 77             return ErrInterrupt
 78         }
 79
 80         // Execute the registered task.
 81         task(id)
 82     }
 83
 84     return nil
 85 }
 86
 87 // gotInterrupt verifies if the interrupt signal has been issued.
 88 func (r *Runner) gotInterrupt() bool {
 89     select {
 90     // Signaled when an interrupt event is sent.
 91     case <-r.interrupt:
 92         // Stop receiving any further signals.
 93         signal.Stop(r.interrupt)
 95         return true
 96
 97     // Continue running as normal.
 98     default:
 99         return false
100     }
101 }

The program in listing 7.1 shows a concurrency pattern for task-oriented programs that run unattended on a schedule. It’s designed with three possible termination points:

  • The program can finish its work within the allotted amount of time and terminate normally.
  • The program doesn’t finish in time and kills itself.
  • An operating system interrupt event is received and the program attempts to immediately shut down cleanly.

Let’s walk through the code and see how each point has been implemented.

Listing 7.2. runner/runner.go: lines 12–28
12 // Runner runs a set of tasks within a given timeout and can be
13 // shut down on an operating system interrupt.
14 type Runner struct {
15     // interrupt channel reports a signal from the
16     // operating system.
17     interrupt chan os.Signal
18
19     // complete channel reports that processing is done.
20     complete chan error
21
22     // timeout reports that time has run out.
23     timeout <-chan time.Time
24
25     // tasks holds a set of functions that are executed
26     // synchronously in index order.
27     tasks []func(int)
28 }

Listing 7.2 starts us off with the declaration of the struct named Runner on line 14. This type declares three channels that help manage the lifecycle of the program and a slice of functions that represent the different tasks to run in series.

The interrupt channel on line 17 sends and receives values of interface type os.Signal and is used to receive interrupt events from the host operating system.

Listing 7.3. golang.org/pkg/os/#Signal
// A Signal represents an operating system signal. The usual underlying
// implementation is operating system-dependent: on Unix it is
// syscall.Signal.
type Signal interface {
    String() string
    Signal() // to distinguish from other Stringers
}

The declaration of the os.Signal interface is presented in listing 7.3. This interface abstracts specific implementations for trapping and reporting events from different operating systems.

The second field is named complete and is a channel that sends and receives values of interface type error.

Listing 7.4. runner/runner.go: lines 19–20
19     // complete channel reports that processing is done.
20     complete chan error

This channel is called complete because it’s used by the goroutine running the tasks to signal that the channel’s done. If an error occurs, it’s reported back via an error interface value sent through the channel. If no error occurs, the value of nil is sent as the error interface value.

The third field is named timeout and receives time.Time values.

Listing 7.5. runner/runner.go: lines 22–23
22     // timeout reports that time has run out.
23     timeout <-chan time.Time

This channel is used to manage the amount of time the process has to complete all its tasks. If a time.Time value is ever received on this channel, the program will attempt to shut itself down cleanly.

The final field is named tasks and is a slice of function values.

Listing 7.6. runner/runner.go: lines 25–27
25     // tasks holds a set of functions that are executed
26     // synchronously in index order.
27     tasks []func(int)

These function values represent functions that are run in series, one after the other. The execution of these functions happens on a single but separate goroutine from main.

With the Runner type declared, next we have two error interface variables.

Listing 7.7. runner/runner.go: lines 30–34
30 // ErrTimeout is returned when a value is received on the timeout.
31 var ErrTimeout = errors.New("received timeout")
32
33 // ErrInterrupt is returned when an event from the OS is received.
34 var ErrInterrupt = errors.New("received interrupt")

The first error interface variable is named ErrTimeout. This error value is returned by the Start method when a timeout event is received. The second error interface variable is named ErrInterrupt. This error value is returned by the Start method when an operating system event is received.

Now we can look at how users can create values of type Runner.

Listing 7.8. runner/runner.go: lines 36–43
36 // New returns a new ready-to-use Runner.
37 func New(d time.Duration) *Runner {
38     return &Runner{
39         interrupt: make(chan os.Signal, 1),
40         complete:  make(chan error),

41         timeout:   time.After(d),
42     }
43 }

Listing 7.8 shows a factory function called New, which accepts a value of type time.Duration and returns a pointer of type Runner. The function creates a value of type Runner and initializes each of the channel fields. The tasks field is not explicitly initialized, since the zero value for this field is a nil slice. Each channel field has a unique initialization, so let’s explore each one in more detail.

The interrupt channel is initialized as a buffered channel with a buffer of 1. This guarantees at least one os.Signal value is received from the runtime. The runtime sends this event in a nonblocking way. If a goroutine isn’t ready to receive this value, the value is thrown away. As an example, if the user hits Ctrl+C repeatedly, the program will receive the event only when a buffer is available in the channel and all other events will be thrown away.

The complete channel is initialized as an unbuffered channel. When the goroutine running the tasks is finished, it sends an error value or nil error value on this channel. Then it waits for main to receive it. Once main receives the error value, it’s safe for the goroutine to terminate.

The final channel, timeout, is initialized using the After function from the time package. The After function returns a channel of type time.Time. The runtime will send a time.Time value on this channel after the specified duration has elapsed.

Now that you’ve seen how a Runner value is created and initialized, we can look at the methods associated with the Runner type. The first method, Add, is used to capture the task functions to be executed.

Listing 7.9. runner/runner.go: lines 45–49
45 // Add attaches tasks to the Runner. A task is a function that
46 // takes an int ID.
47 func (r *Runner) Add(tasks ...func(int)) {
48     r.tasks = append(r.tasks, tasks...)
49 }

Listing 7.9 shows the Add method, which is declared with a single varadic parameter named tasks. Varadic parameters can accept any number of values that are passed in. In this case the value must be a function that accepts a single integer value and returns nothing. The tasks parameter, once inside the code, becomes a slice of these function values.

Now let’s look at the run method.

Listing 7.10. runner/runner.go: lines 72–85
72 // run executes each registered task.
73 func (r *Runner) run() error {
74     for id, task := range r.tasks {

75         // Check for an interrupt signal from the OS.
76         if r.gotInterrupt() {
77             return ErrInterrupt
78         }
79
80         // Execute the registered task.
81         task(id)
82     }
83
84     return nil
85 }

The run method on line 73 in listing 7.10 iterates over the tasks slice and executes each function in order. Before any function is executed on line 81, the gotInterrupt method is called on line 76 to see if there are any events to receive from the operating system.

Listing 7.11. runner/runner.go: lines 87–101
 87 // gotInterrupt verifies if the interrupt signal has been issued.
 88 func (r *Runner) gotInterrupt() bool {
 89     select {
 90     // Signaled when an interrupt event is sent.
 91     case <-r.interrupt:
 92         // Stop receiving any further signals.
 93         signal.Stop(r.interrupt)
 95         return true
 96
 97     // Continue running as normal.
 98     default:
 99         return false
100     }
101 }

The gotInterrupt method in listing 7.11 shows a classic use of the select statement with a default case. On line 91, the code attempts to receive on the interrupt channel. Normally that would block if there was nothing to receive, but we have a default case on line 98. The default case turns the attempt to receive on the interrupt channel into a nonblocking call. If there’s an interrupt to receive, then it’s received and processed. If there’s nothing to receive, the default case is then executed.

When an interrupt event is received, the code requests to stop receiving any future events by calling the Stop method on line 93. Then the function returns true. If no interrupt event is received, the method returns false on line 99. Essentially, the gotInterrupt method lets the goroutine peek for interrupt events and keep processing work if one has not been issued.

The final method in the package is called Start.

Listing 7.12. runner/runner.go: lines 51–70
51 // Start runs all tasks and monitors channel events.
52 func (r *Runner) Start() error {

53     // We want to receive all interrupt based signals.
54     signal.Notify(r.interrupt, os.Interrupt)
55
56     // Run the different tasks on a different goroutine.
57     go func() {
58         r.complete <- r.run()
59     }()
60
61     select {
62     // Signaled when processing is done.
63     case err := <-r.complete:
64         return err
65
66     // Signaled when we run out of time.
67     case <-r.timeout:
68         return ErrTimeout
69     }
70 }

The Start method implements the main workflow for the program. In listing 7.12 on line 52, Start sets up the ability for the gotInterrupt method to receive interrupt events from the operating system. On lines 56 through 59, an anonymous function is declared and created as a goroutine. This is the goroutine that executes the set of assigned tasks for the program. On line 58, inside this goroutine, the run method is called and the returned error interface value is sent on the complete channel. Once the error interface value is received, the goroutine returns that value to the caller.

With the goroutine created, Start enters into a select statement that blocks waiting for one of two events to occur. If an error interface value is received on the complete channel, then the goroutine either finished its work within the allotted amount of time or an operating system interrupt event was received. Regardless, the received error interface value is returned and the method terminates. If a time.Time value is received on the timeout channel, then the goroutine didn’t finish its work within the allotted amount of time. In this case, the program returns the ErrTimeout variable.

Now that you’ve seen the code for the runner package and learned how it works, let’s review the test program in the main.go source code file.

Listing 7.13. runner/main/main.go
01 // This sample program demonstrates how to use a channel to
02 // monitor the amount of time the program is running and terminate
03 // the program if it runs too long.
03 package main
04
05 import (
06     "log"
07     "time"
08
09     "github.com/goinaction/code/chapter7/patterns/runner"
10 )

11
12 // timeout is the number of second the program has to finish.
13 const timeout = 3 * time.Second
14
15 // main is the entry point for the program.
16 func main() {
17     log.Println("Starting work.")
18
19     // Create a new timer value for this run.
20     r := runner.New(timeout)
21
22     // Add the tasks to be run.
23     r.Add(createTask(), createTask(), createTask())
24
25     // Run the tasks and handle the result.
26     if err := r.Start(); err != nil {
27         switch err {
28         case runner.ErrTimeout:
29             log.Println("Terminating due to timeout.")
30             os.Exit(1)
31         case runner.ErrInterrupt:
32             log.Println("Terminating due to interrupt.")
33             os.Exit(2)
34         }
35     }
36
37     log.Println("Process ended.")
38 }
39
40 // createTask returns an example task that sleeps for the specified
41 // number of seconds based on the id.
42 func createTask() func(int) {
43     return func(id int) {
44         log.Printf("Processor - Task #%d.", id)
45         time.Sleep(time.Duration(id) * time.Second)
46     }
47 }

The main function can be found on line 16 in listing 7.13. On line 20 the timeout value is passed to the New function, and a pointer of type Runner is returned. Then the createTask function is added to the Runner several times on line 23. The createTask function, declared on line 42, is a function that just pretends to perform some work for a specified amount of time. Once the functions have been added, the Start method is called on line 26 and the main function waits for Start to return.

When Start returns the returned error interface value is checked. If an error did occur, the code uses the error interface variables to identify if the method terminated due to a timeout event or interrupt event. If there’s no error, then the tasks finished in time. On a timeout event, the program terminates with an error code of 1. On an interrupt event, the program terminates with an error code of 2. In all other cases, the program terminates normally with error code 0.

7.2. Pooling

The purpose of the pool package is to show how you can use a buffered channel to pool a set of resources that can be shared and individually used by any number of goroutines. This pattern is useful when you have a static set of resources to share, such as database connections or memory buffers. When a goroutine needs one of these resources from the pool, it can acquire the resource, use it, and then return it to the pool.

Let’s take a look at the pool.go code file from the pool package.

Listing 7.14. pool/pool.go
 01 // Example provided with help from Fatih Arslan and Gabriel Aszalos.
 02 // Package pool manages a user defined set of resources.
 03 package pool
 04
 05 import (
 06     "errors"
 07     "log"
 08     "io"
 09     "sync"
 10 )
 11
 12 // Pool manages a set of resources that can be shared safely by
 13 // multiple goroutines. The resource being managed must implement
 14 // the io.Closer interface.
 15 type Pool struct {
 16     m         sync.Mutex
 17     resources chan io.Closer
 18     factory   func() (io.Closer, error)
 19     closed    bool
 20 }
 21
 22 // ErrPoolClosed is returned when an Acquire returns on a
 23 // closed pool.
 24 var ErrPoolClosed = errors.New("Pool has been closed.")
 25
 26 // New creates a pool that manages resources. A pool requires a
 27 // function that can allocate a new resource and the size of
 28 // the pool.
 29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
 30     if size <= 0 {
 31         return nil, errors.New("Size value too small.")
 32     }
 33
 34     return &Pool{
 35         factory:   fn,
 36         resources: make(chan io.Closer, size),
 37     }, nil
 38 }
 39
 40 // Acquire retrieves a resource from the pool.
 41 func (p *Pool) Acquire() (io.Closer, error) {

 42     select {
 43     // Check for a free resource.
 44     case r, ok := <-p.resources:
 45         log.Println("Acquire:", "Shared Resource")
 46         if !ok {
 47             return nil, ErrPoolClosed
 48         }
 49         return r, nil
 50
 51     // Provide a new resource since there are none available.
 52     default:
 53         log.Println("Acquire:", "New Resource")
 54         return p.factory()
 55     }
 56 }
 57
 58 // Release places a new resource onto the pool.
 59 func (p *Pool) Release(r io.Closer) {
 60     // Secure this operation with the Close operation.
 61     p.m.Lock()
 62     defer p.m.Unlock()
 63
 64     // If the pool is closed, discard the resource.
 65     if p.closed {
 66         r.Close()
 67         return
 68     }
 69
 70     select {
 71     // Attempt to place the new resource on the queue.
 72     case p.resources <- r:
 73         log.Println("Release:", "In Queue")
 74
 75     // If the queue is already at capacity we close the resource.
 76     default:
 77         log.Println("Release:", "Closing")
 78         r.Close()
 79     }
 80 }
 81
 82 // Close will shutdown the pool and close all existing resources.
 83 func (p *Pool) Close() {
 84     // Secure this operation with the Release operation.
 85     p.m.Lock()
 86     defer p.m.Unlock()
 87
 88     // If the pool is already closed, don't do anything.
 89     if p.closed {
 90         return
 91     }
 92
 93     // Set the pool as closed.
 94     p.closed = true
 95
 96     // Close the channel before we drain the channel of its

 97     // resources. If we don't do this, we will have a deadlock.
 98     close(p.resources)
 99
100     // Close the resources
101     for r := range p.resources {
102         r.Close()
103     }
104 }

The code for the pool package in listing 7.14 declares a struct named Pool that allows the caller to create as many different pools as needed. Each pool can manage any type of resource as long as the type implements the io.Closer interface. Let’s take a look at the declaration of the Pool struct.

Listing 7.15. pool/pool.go: lines 12–20
12 // Pool manages a set of resources that can be shared safely by
13 // multiple goroutines. The resource being managed must implement
14 // the io.Closer interface.
15 type Pool struct {
16     m         sync.Mutex
17     resources chan io.Closer
18     factory   func() (io.Closer, error)
19     closed    bool
20 }

The Pool struct declares four fields, each of which helps manage the pool in a goroutine-safe way. On line 16 the struct starts off with a field of type sync.Mutex. This mutex is used to keep all the operations against a Pool value-safe for multigoroutine access. The second field is named resources and is declared as a channel of interface type io.Closer. This channel will be created as a buffered channel and will contain the resources being shared. Because an interface type is being used, the pool can manage any type of resource that implements the io.Closer interface.

The factory field is of a function type. Any function that takes no parameters and returns an io.Closer and an error interface value can be assigned to this field. The purpose of this function is to create a new resource when the pool requires one. This functionality is an implementation detail beyond the scope of the pool package and needs to be implemented and supplied by the user using this package.

The final field on line 19 is the closed field. This field is a flag that indicates the Pool is being shut down or is already shut down. Now that you’ve seen the declaration of the Pool struct, let’s look at the error interface variable that’s declared on line 24.

Listing 7.16. pool/pool.go: lines 22–24
22 // ErrPoolClosed is returned when an Acquire returns on a
23 // closed pool.
24 var ErrPoolClosed = errors.New("Pool has been closed.")

Creating error interface variables is a common practice in Go. This allows the caller to identify specific returned error values from any function or method within the package. The error interface variable in listing 7.16 has been declared to report when the user calls the Acquire method and the Pool has been closed. Since the Acquire method can return multiple different errors, returning this error variable when the Pool is closed allows the caller to identify this specific error over others.

With the Pool type and the error interface variable declared, we can start to look at the functions and methods that are declared in the pool package. Let’s start with the pool’s factory function, named New.

Listing 7.17. pool/pool.go: lines 26–38
26 // New creates a pool that manages resources. A pool requires a
27 // function that can allocate a new resource and the size of
28 // the pool.
29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
30     if size <= 0 {
31         return nil, errors.New("Size value too small.")
32     }
33
34     return &Pool{
35         factory:   fn,
36         resources: make(chan io.Closer, size),
37     }, nil
38 }

The New function in listing 7.17 accepts two parameters and returns two values. The first parameter, fn, is declared as a function type that accepts no parameters and returns an io.Closer and an error interface value. The function parameter represents a factory function that creates values of the resource being managed by the pool. The second parameter, size, represents the size of the buffered channel created to hold the resources.

On line 30 the value of size is checked to make sure it’s not less than or equal to 0. If it is, then the code returns nil for the returned pool pointer value and then creates an error interface value on the fly for the error. Since this is the only error being returned from this function, it’s not necessary to create and use an error interface variable for this error. If the size value is good, then a new Pool value is created and initialized. On line 35 the function parameter is assigned, and on line 36 the buffered channel is created using the size value. Everything can be created and initialized within the scope of the return statement. So a pointer to the new Pool value and the error interface value of nil are created and returned as the arguments.

With the ability to create and initialize a Pool value, next let’s look at the Acquire method. This method allows the caller to acquire a resource from the pool.

Listing 7.18. pool/pool.go: lines 40–56
40 // Acquire retrieves a resource from the pool.
41 func (p *Pool) Acquire() (io.Closer, error) {
42     select {
43     // Check for a free resource.
44     case r, ok := <-p.resources:
45         log.Println("Acquire:", "Shared Resource")
46         if !ok {
47             return nil, ErrPoolClosed
48         }
49         return r, nil
50
51     // Provide a new resource since there are none available.
52     default:
53         log.Println("Acquire:", "New Resource")
54         return p.factory()
55     }
56 }

Listing 7.18 contains the code for the Acquire method. This method returns a resource from the pool if one is available, or creates a new one for the call. This implementation is accomplished by using a select / case statement to check if there’s a resource in the buffered channel. If there is, it’s received and then returned to the caller. This can be seen on lines 44 and 49. If there’s no resource in the buffered channel to receive, then the default case is executed. In this case, on line 54 the user’s factory function is executed and a new resource is created and returned.

After a resource is acquired and no longer needed, it must be released back into the pool. This is where the Release method comes in. But to understand the mechanics behind the code in the Release method, we need to look at the Close method first.

Listing 7.19. pool/pool.go: lines 82–104
 82 // Close will shutdown the pool and close all existing resources.
 83 func (p *Pool) Close() {
 84     // Secure this operation with the Release operation.
 85     p.m.Lock()
 86     defer p.m.Unlock()
 87
 88     // If the pool is already closed, don't do anything.
 89     if p.closed {
 90         return
 91     }
 92
 93     // Set the pool as closed.
 94     p.closed = true
 95
 96     // Close the channel before we drain the channel of its
 97     // resources. If we don't do this, we will have a deadlock.
 98     close(p.resources)
 99

100     // Close the resources
101     for r := range p.resources {
102         r.Close()
103     }
104 }

Once the program is finished with the pool, it should call the Close method. The code for the Close method is shown in listing 7.19. The method closes and flushes the buffered channel on lines 98 and 101, closing any resources that exist until the channel is empty. All the code in this method must be executed by only one goroutine at a time. In fact, when this code is being executed, goroutines must also be prevented from executing code in the Release method. You’ll understand why this is important soon.

On lines 85 and 86, the mutex is locked and then scheduled to be unlocked when the function returns. On line 89 the closed flag is checked to see if the pool is already closed. If it is, the method returns immediately, which releases the lock. If this is the first time the method is called, then the flag is set to true and the resources channel is closed and flushed.

Now we can look at the Release method and see how it works in coordination with the Close method.

Listing 7.20. pool/pool.go: lines 58–80
58 // Release places a new resource onto the pool.
59 func (p *Pool) Release(r io.Closer) {
60     // Secure this operation with the Close operation.
61     p.m.Lock()
62     defer p.m.Unlock()
63
64     // If the pool is closed, discard the resource.
65     if p.closed {
66         r.Close()
67         return
68     }
69
70     select {
71     // Attempt to place the new resource on the queue.
72     case p.resources <- r:
73         log.Println("Release:", "In Queue")
74
75     // If the queue is already at capacity we close the resource.
76     default:
77         log.Println("Release:", "Closing")
78         r.Close()
79     }
80 }

The implementation of the Release method can be found in listing 7.20. The method starts out with the locking and unlocking of a mutex on lines 61 and 62. This is the same mutex as in the Close method. This is how both methods are prevented from being run at the same time by different goroutines. The use of the mutex serves two purposes. First, it protects the read on the closed flag on line 65 from happening at the same time as a write on this flag in the Close method. Second, we don’t want to attempt to send on a closed channel because this will cause a panic. When the closed field is false, we know the resources channel has been closed.

On line 66, the Close method on the resource is called directly when the pool is closed. This is because there’s no way to release the resource back into the pool. At this point the pool has been both closed and flushed. The reads and writes to the closed flag must be synchronized or else goroutines could be misled into thinking the pool is open and attempt to perform an invalid operation on the channel.

Now that you’ve seen the pool code and learned how it works, let’s review the test program in the main.go source code file.

Listing 7.21. pool/main/main.go
01 // This sample program demonstrates how to use the pool package
02 // to share a simulated set of database connections.
03 package main
04
05 import (
06     "log"
07     "io"
08     "math/rand"
09     "sync"
10     "sync/atomic"
11     "time"
12
13     "github.com/goinaction/code/chapter7/patterns/pool"
14 )
15
16 const (
17     maxGoroutines   = 25 // the number of routines to use.
18     pooledResources = 2  // number of resources in the pool
19 )
20
21 // dbConnection simulates a resource to share.
22 type dbConnection struct {
23     ID int32
24 }
25
26 // Close implements the io.Closer interface so dbConnection
27 // can be managed by the pool. Close performs any resource
28 // release management.
29 func (dbConn *dbConnection) Close() error {
30     log.Println("Close: Connection", dbConn.ID)
31     return nil
32 }
33
34 // idCounter provides support for giving each connection a unique id.
35 var idCounter int32
36
37 // createConnection is a factory method that will be called by
38 // the pool when a new connection is needed.
39 func createConnection() (io.Closer, error) {

40     id := atomic.AddInt32(&idCounter, 1)
41     log.Println("Create: New Connection", id)
42
43     return &dbConnection{id}, nil
44 }
45
46 // main is the entry point for all Go programs.
47 func main() {
48     var wg sync.WaitGroup
49     wg.Add(maxGoroutines)
50
51     // Create the pool to manage our connections.
52     p, err := pool.New(createConnection, pooledResources)
53     if err != nil {
54         log.Println(err)
55     }
56
57     // Perform queries using connections from the pool.
58     for query := 0; query < maxGoroutines; query++ {
59         // Each goroutine needs its own copy of the query
60         // value else they will all be sharing the same query
61         // variable.
62         go func(q int) {
63             performQueries(q, p)
64             wg.Done()
65         }(query)
66     }
67
68     // Wait for the goroutines to finish.
69     wg.Wait()
70
71     // Close the pool.
72     log.Println("Shutdown Program.")
73     p.Close()
74 }
75
76 // performQueries tests the resource pool of connections.
77 func performQueries(query int, p *pool.Pool) {
78     // Acquire a connection from the pool.
79     conn, err := p.Acquire()
80     if err != nil {
81         log.Println(err)
82         return
83     }
84
85     // Release the connection back to the pool.
86     defer p.Release(conn)
87
88     // Wait to simulate a query response.
89     time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90     log.Printf("QID[%d] CID[%d]
", query, conn.(*dbConnection).ID)
91 }

The code in main.go, shown in listing 7.21, uses the pool package to manage a simulated pool of database connections. The code starts out declaring two constants, maxGoroutines and pooledResources, to set the number of goroutines and resources the program is going to use. The declaration of the resource and the implementation of the io.Closer interface follows.

Listing 7.22. pool/main/main.go: lines 21–32
21 // dbConnection simulates a resource to share.
22 type dbConnection struct {
23     ID int32
24 }
25
26 // Close implements the io.Closer interface so dbConnection
27 // can be managed by the pool. Close performs any resource
28 // release management.
29 func (dbConn *dbConnection) Close() error {
30     log.Println("Close: Connection", dbConn.ID)
31     return nil
32 }

Listing 7.22 shows the declaration of the dbConnection struct and its implementation of the io.Closer interface. The dbConnection type simulates a struct that’s managing a database connection and currently has one field, ID, that contains a unique ID for each connection. The Close method just reports that the connection is being closed and displays its ID.

Next we have the factory function that creates values of dbConnection.

Listing 7.23. pool/main/main.go: lines 34–44
34 // idCounter provides support for giving each connection a unique id.
35 var idCounter int32
36
37 // createConnection is a factory method that will be called by
38 // the pool when a new connection is needed.
39 func createConnection() (io.Closer, error) {
40     id := atomic.AddInt32(&idCounter, 1)
41     log.Println("Create: New Connection", id)
42
43     return &dbConnection{id}, nil
44 }

Listing 7.23 shows the implementation of the createConnection function. The function generates a new and unique ID for the connection, displays that the connection is being created, and returns a pointer to a value of type dbConnection with this unique ID. The generation of the unique ID is performed with the atomic.AddInt32 function. It’s used to safely increment the value of the package level variable idCounter. Now that we have our resource and the factory function, we can use it with the pool package.

Next, let’s look at the code inside the main function.

Listing 7.24. pool/main/main.go: lines 48–55
48     var wg sync.WaitGroup
49     wg.Add(maxGoroutines)
50
51     // Create the pool to manage our connections.
52     p, err := pool.New(createConnection, pooledResources)
53     if err != nil {
54         log.Println(err)
55     }

The main function starts out with declaring a WaitGroup on line 48 and setting the value of the WaitGroup to match the number of goroutines that will be created. The new Pool is created using the New function from the pool package. The factory function and the number of resources to manage are passed in. This returns a pointer to the Pool value and any possible error is checked. Now that we have a Pool, we can create goroutines that can share resources being managed by the pool.

Listing 7.25. pool/main/main.go: lines 57–66
57     // Perform queries using connections from the pool.
58     for query := 0; query < maxGoroutines; query++ {
59         // Each goroutine needs its own copy of the query
60         // value else they will all be sharing the same query
61         // variable.
62         go func(q int) {
63             performQueries(q, p)
64             wg.Done()
65         }(query)
66     }

A for loop is used in listing 7.25 to create goroutines that will use the pool. Each goroutine calls the performQueries function once and then quits. The performQueries function is provided a unique ID for logging and the pointer to the Pool. Once all the goroutines are created, the main function then waits for the goroutines to complete.

Listing 7.26. pool/main/main.go: lines 68–73
68     // Wait for the goroutines to finish.
69     wg.Wait()
70
71     // Close the pool.
72     log.Println("Shutdown Program.")
73     p.Close()

In listing 7.26, the main function waits on the WaitGroup. Once all the goroutines report they’re done, the Pool is closed and the program terminates. Next, let’s look at the performQueries function, which uses the pool’s Acquire and Release methods.

Listing 7.27. pool/main/main.go: lines 76–91
76 // performQueries tests the resource pool of connections.
77 func performQueries(query int, p *pool.Pool) {
78     // Acquire a connection from the pool.
79     conn, err := p.Acquire()
80     if err != nil {
81         log.Println(err)
82         return
83     }
84
85     // Release the connection back to the pool.
86     defer p.Release(conn)
87
88     // Wait to simulate a query response.
89     time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90     log.Printf("QID[%d] CID[%d]
", query, conn.(*dbConnection).ID)
91 }

The implementation of the performQueries function in listing 7.27 shows the use of the pool’s Acquire and Release methods. The function starts out by calling the Acquire method to retrieve a dbConnection from the pool. The returned error interface value is checked, and then on line 86 a defer is used to release the dbConnection back into the pool once the function returns. On lines 89 and 90 a random amount of sleep occurs to simulate work time using the dbConnection.

7.3. Work

The purpose of the work package is to show how you can use an unbuffered channel to create a pool of goroutines that will perform and control the amount of work that gets done concurrently. This is a better approach than using a buffered channel of some arbitrary static size that acts as a queue of work and throwing a bunch of goroutines at it. Unbuffered channels provide a guarantee that data has been exchanged between two goroutines. This approach of using an unbuffered channel allows the user to know when the pool is performing the work, and the channel pushes back when it can’t accept any more work because it’s busy. No work is ever lost or stuck in a queue that has no guarantee it will ever be worked on.

Let’s take a look at the work.go code file from the work package.

Listing 7.28. work/work.go
01 // Example provided with help from Jason Waldrip.
02 // Package work manages a pool of goroutines to perform work.
03 package work
04
05 import "sync"
06
07 // Worker must be implemented by types that want to use
08 // the work pool.
09 type Worker interface {
10     Task()

11 }
12
13 // Pool provides a pool of goroutines that can execute any Worker
14 // tasks that are submitted.
15 type Pool struct {
16     work chan Worker
17     wg   sync.WaitGroup
18 }
19
20 // New creates a new work pool.
21 func New(maxGoroutines int) *Pool {
22     p := Pool{
23         tasks: make(chan Worker),
24     }
25
26     p.wg.Add(maxGoroutines)
27     for i := 0; i < maxGoroutines; i++ {
28         go func() {
29             for w := range p.work {
30                 w.Task()
31             }
32             p.wg.Done()
33         }()
34     }
35
36     return &p
37 }
38
39 // Run submits work to the pool.
40 func (p *Pool) Run(w Worker) {
41     p.work <- w
42 }
43
44 // Shutdown waits for all the goroutines to shutdown.
45 func (p *Pool) Shutdown() {
46     close(p.tasks)
47     p.wg.Wait()
48 }

The work package in listing 7.28 starts off with the declaration of an interface named Worker and a struct named Pool.

Listing 7.29. work/work.go: lines 07–18
07 // Worker must be implemented by types that want to use
08 // the work pool.
09 type Worker interface {
10     Task()
11 }
12
13 // Pool provides a pool of goroutines that can execute any Worker
14 // tasks that are submitted.
15 type Pool struct {
16     work chan Worker
17     wg   sync.WaitGroup
18 }

The Worker interface on line 09 in listing 7.29 declares a single method called Task. On line 15 a struct named Pool is declared, which is the type that implements the pool of goroutines and will have methods that process the work. The type declares two fields, one named work, which is a channel of the Worker interface type, and a sync.WaitGroup named wg.

Next let’s look at the factory function for the work package.

Listing 7.30. work/work.go: lines 20–37
20 // New creates a new work pool.
21 func New(maxGoroutines int) *Pool {
22     p := Pool{
23         work: make(chan Worker),
24     }
25
26     p.wg.Add(maxGoroutines)
27     for i := 0; i < maxGoroutines; i++ {
28         go func() {
29             for w := range p.work {
30                 w.Task()
31             }
32             p.wg.Done()
33         }()
34     }
35
36     return &p
37 }

Listing 7.30 shows the New function that’s used to create work pool that’s configured with a fixed number of goroutines. The number of goroutines is passed in as a parameter to the New function. On line 22 a value of type Pool is created, and the work field is initialized with an unbuffered channel.

Then, on line 26, the WaitGroup is initialized, and on lines 27 through 34 the same number of goroutines are created. The goroutines just receive interface values of type Worker and call the Task method on those values.

Listing 7.31. work/work.go: lines 28–33
28         go func() {
29             for w := range w.work {
30                 w.Task()
31             }
32             p.wg.Done()
33         }()

The for range loop blocks until there’s a Worker interface value to receive on the work channel. When a value is received, the Task method is called. Once the work channel is closed, the for range loop ends and the call to Done on the WaitGroup is called. Then the goroutine terminates.

Now that we can create a pool of goroutines that can wait for and execute work, let’s look at how work is submitted into the pool.

Listing 7.32. work/work.go: lines 39–42
39 // Run submits work to the pool.
40 func (p *Pool) Run(w Worker) {
41     w.work <- w
42 }

Listing 7.32 shows the Run method. This method is used to submit work into the pool. It accepts an interface value of type Worker and sends that value through the work channel. Since the work channel is an unbuffered channel, the caller must wait for a goroutine from the pool to receive it. This is what we want, because the caller needs the guarantee that the work being submitted is being worked on once the call to Run returns.

At some point, the work pool needs to be shut down. This is where the Shutdown method comes in.

Listing 7.33. work/work.go: lines 44–48
44 // Shutdown waits for all the goroutines to shutdown.
45 func (p *Pool) Shutdown() {
46     close(p.work)
47     p.wg.Wait()
48 }

The Shutdown method in listing 7.33 does two things. First, it closes the work channel, which causes all of the goroutines in the pool to shut down and call the Done method on the WaitGroup. Then the Shutdown method calls the Wait method on the WaitGroup, which causes the Shutdown method to wait for all the goroutines to report they have terminated.

Now that you’ve seen the code for the work package and learned how it works, let’s review the test program in the main.go source code file.

Listing 7.34. work/main/main.go
01 // This sample program demonstrates how to use the work package
02 // to use a pool of goroutines to get work done.
03 package main
04
05 import (
06     "log"
07     "sync"
08     "time"
09
10     "github.com/goinaction/code/chapter7/patterns/work"
11 )
12

13 // names provides a set of names to display.
14 var names = []string{
15     "steve",
16     "bob",
17     "mary",
18     "therese",
19     "jason",
20 }
21
22 // namePrinter provides special support for printing names.
23 type namePrinter struct {
24     name string
25 }
26
27 // Task implements the Worker interface.
28 func (m *namePrinter) Task() {
29     log.Println(m.name)
30     time.Sleep(time.Second)
31 }
32
33 // main is the entry point for all Go programs.
34 func main() {
35     // Create a work pool with 2 goroutines.
36     p := work.New(2)
37
38     var wg sync.WaitGroup
39     wg.Add(100 * len(names))
40
41     for i := 0; i < 100; i++ {
42         // Iterate over the slice of names.
43         for _, name := range names {
44             // Create a namePrinter and provide the
45             // specific name.
46             np := namePrinter{
47                 name: name,
48             }
49
50             go func() {
51                 // Submit the task to be worked on. When RunTask
52                 // returns we know it is being handled.
53                 p.Run(&np)
54                 wg.Done()
55             }()
56         }
57     }
58
59     wg.Wait()
60
61     // Shutdown the work pool and wait for all existing work
62     // to be completed.
63     p.Shutdown()
64 }

Listing 7.34 shows the test program that uses the work package to perform the displaying of names. The code starts out on line 14 with the declaration of a package level variable named names, which is declared as a slice of strings. The slice is also initialized with five names. Then a type named namePrinter is declared.

Listing 7.35. work/main/main.go: lines 22–31
22 // namePrinter provides special support for printing names.
23 type namePrinter struct {
24     name string
25 }
26
27 // Task implements the Worker interface.
28 func (m *namePrinter) Task() {
29     log.Println(m.name)
30     time.Sleep(time.Second)
31 }

On line 23 in listing 7.35, the namePrinter type is declared and the implementation of the Worker interface follows. The purpose of the work is to display names to the screen. The type contains a single field, name, that will contain the name to display. The implementation of the Worker interface uses the log.Println function to display the name and then waits a second before returning. The second wait is just to slow the test program down so you can see the concurrency is action.

With the implementation of the Worker interface, we can look at the code inside the main function.

Listing 7.36. work/main/main.go: lines 33–64
33 // main is the entry point for all Go programs.
34 func main() {
35     // Create a work pool with 2 goroutines.
36     p := work.New(2)
37
38     var wg sync.WaitGroup
39     wg.Add(100 * len(names))
40
41     for i := 0; i < 100; i++ {
42         // Iterate over the slice of names.
43         for _, name := range names {
44             // Create a namePrinter and provide the
45             // specific name.
46             np := namePrinter{
47                 name: name,
48             }
49
50             go func() {
51                 // Submit the task to be worked on. When RunTask
52                 // returns we know it is being handled.
53                 p.Run(&np)

54                 wg.Done()
55             }()
56         }
57     }
58
59     wg.Wait()
60
61     // Shutdown the work pool and wait for all existing work
62     // to be completed.
63     p.Shutdown()
64 }

On line 36 in listing 7.36, the New function from the work package is called to create the work pool. The number 2 is passed into the call, indicating the pool should only contain two goroutines. On lines 38 and 39 a WaitGroup is declared and initialized to each goroutine that will be created. In this case, a goroutine for each name in the names slice will be created 100 times. This is to create a lot of goroutines competing to submit work to the pool.

On lines 41 and 43 inner and outer for loops are declared to create all the goroutines. Inside each iteration of the inner loop, a value of type namePrinter is created and provided with a name to print. Then, on line 50, an anonymous function is declared and created as a goroutine. The goroutine calls the Run method against the work pool to submit the namePrinter value to the pool. Once a goroutine from the work pool receives the value, the call to Run returns. This in turn causes the goroutine to decrement the WaitGroup count and terminate.

Once all the goroutines are created, the main function calls Wait on the WaitGroup. The function will wait until all the goroutines that were created submit their work. Once Wait returns, the work pool is shut down by calling the Shutdown method. This method won’t return until all the work is complete. In our case, there would be only two outstanding pieces of work by this time.

7.4. Summary

  • You can use channels to control the lifetime of programs.
  • A select statement with a default case can be used to attempt a nonblocking send or receive on a channel.
  • Buffered channels can be used to manage a pool of resources that can be reused.
  • The coordination and synchronization of channels is taken care of by the runtime.
  • Create a pool of goroutines to perform work using unbuffered channels.
  • Any time an unbuffered channel can be used to exchange data between two goroutines, you have guarantees you can count on.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset