Golang: Example of Worker Pool Pattern
With the buffered channel feature and go routine, it’s easy to create a pool of workers to do “work”. Let create the worker pool and use the workers to do compression and decompression.
Work.go — Set up input and output structs
First go to your project directory e.g. <project>. Let’s initialize the go module:
go module init poolworker
The initlization of the module, will later help in finding the poolworker package in our main program, especially if you are using VS Code editor.
work.go — Set up input and output structs
In the project directory, create the work.go file. This is where we keep the channel structures and have a runOrder function that will convert a WorkOrder to WorkResponse.
package poolworkerimport "errors"type op stringconst (
Compress op = "compress"
Decompress op = "decompress"
)type WorkOrder struct {
Op op
Payloadbytes []byte
}type WorkResponse struct {
Wo WorkOrder
ResultBytes []byte
Err error
}func runOrder(wo WorkOrder) WorkResponse {
switch wo.Op {
case Compress:
return compression(wo)
case Decompress:
return decompression(wo)
default:
return WorkResponse{Err: errors.New("unsupported operation")}
}
}
The WorkOrder struct is where we define the operation to carry out, in this case, compression or decompression op. The payload bytes to be processed is also encapsulated within. The WorkResponse, will encapsulating the associated WorkOrder and the result in byte slices. The error Err will be mutually exclusive with the WorkOrder ResultBytes. So check for Err != nil to know that you have a successful WorkResponse. The runOrder function bases the Op string to decide whether to compress or decompress. But the actual work of compression and decompression are in another .go file which we will get to. Notice that if the Op code in the incoming WorkOrder does not tally with the switch options, a WorkResponse with an error will be created.
worker.go — Creating the workers in the pool
In the same project directory, create another worker.go. The pool of worker routines will be defined here.
package poolworkerimport (
"context"
"fmt"
)func DispatchWorkers(numberWorkers int) (context.CancelFunc, chan WorkOrder, chan WorkResponse) {
in := make(chan WorkOrder, numberWorkers)
out := make(chan WorkResponse, numberWorkers)
ctx, cancel := context.WithCancel(context.Background()) for i := 0; i < numberWorkers; i++ {
go Worker(ctx, i+1, in, out) }
return cancel, in, out}func Worker(ctx context.Context, id int, in chan WorkOrder, out chan WorkResponse) {
for { //run indefinately
select {
case <-ctx.Done():
return case wo := <-in:
case wo := <-in:
fmt.Printf("Workder %d performing work %s\n", id, wo.Op)
out <- runOrder(wo)
}}
The function DispatchWorkers take in a number to decide how many workers to create in the pool. It will ouput an context cancel function and two buffered channels (each sized as the number of workers). The former cancel function is obtained through creation of an context.WithCancel() initializer. This will be used in your program to stop the go worker routines, say when your program ends.
The Worker function is the go routine. It setup a forever for loop with a select function that will be triggered shutting down the worker routine if the context is cancelled. Or it will take in the WorkOrder from the in buffered channel and “run the order” — runOrder being the function defined in work.go.
compression_work.go — Where your actual “work” should be done
We still haven’t define compression and decompression functions seen in work.go. So let’s create them in compression_work.go:
package poolworkerimport (
"bytes"
"compress/gzip"
"io"
)func compression(wo WorkOrder) WorkResponse {
bufferWriter := new(bytes.Buffer)
text := wo.Payloadbytes
compressedWriter := gzip.NewWriter(bufferWriter)
_, err := compressedWriter.Write(text)
if err != nil {
return WorkResponse{
Err: err}
}
compressedWriter.Close() //to flush to underlying writer
return WorkResponse{
Wo: wo,
ResultBytes: bufferWriter.Bytes()}
}func decompression(wo WorkOrder) WorkResponse {
bufferReader := bytes.NewReader(wo.Payloadbytes) decompressionReader, err := gzip.NewReader(bufferReader)
if err != nil {
return WorkResponse{Err: err}
}
defer decompressionReader.Close() uncompressedBytes := make([]byte, len(wo.Payloadbytes)*5) //at least have capacity five times of compressed bytes _, err = decompressionReader.Read(uncompressedBytes)
if err != nil && err != io.EOF {
return WorkResponse{Err: err}
}
return WorkResponse{Wo: wo, ResultBytes: bytes.TrimRight(uncompressedBytes, "\x00")} //trup right away /x00 bytes}
We use gzip that is part of the standard library to do the actual compression and decompression. The payload bytes of the WorkOrder or more specifically the WorkOrder struct will form the inputs to the two function.
Noteworthy is how we decompress to bytes in the decompression function. We had to have a byte slice that we will read in the uncompressed bytes. However if you make your byte slice too short, the uncompressed string bytes will be truncated. How I approach the choice of the length of the byte slice as the vassel is to make a byte slice that is 5 times the length of the uncompressed bytes. You could use lesser than that in a memory storage constraint environment, but I gathered that the gzip compression factor cannot be more than 5 times.
Left unfilled, the uncompressed byte slice would have a lot of zero bytes in the form of “\x00”. So these are trimmed away using the bytes.TrimRight function with the string “\x00” cutset.
Main Program — the complete workflow
In the main.go file which we create in a child directory of the project folder, we basically create WorkOrders of string bytes that we want to compress, run these orders to create WorkResponse, print them out and them stuff these back into WorkOrders to decompress. Then the decompressed strings are printed out to see if they are the same as the original uncompressed strings.
package mainimport (
"fmt"
"poolworker"
)func main() { cancel, in, out := poolworker.DispatchWorkers(5)
defer cancel() orders := []poolworker.WorkOrder{
{Op: poolworker.Compress, Payloadbytes: []byte("A long time ago")},
{Op: poolworker.Compress, Payloadbytes: []byte("in a galaxy far far away")},
{Op: poolworker.Compress, Payloadbytes: []byte("there is a universe that is upside-down")},
{Op: poolworker.Compress, Payloadbytes: []byte("Is there time for things to heal, I asked")},
}//compression
for i := 0; i < len(orders); i++ {
in <- orders[i]
} decompressOrders := make([]poolworker.WorkOrder, 0, len(orders))
for i := 0; i < len(orders); i++ {
res := <-out
if res.Err != nil {
fmt.Println(res.Err)
continue
}
fmt.Printf("input: %s, compressed output: %q\n", res.Wo.Payloadbytes, string(res.ResultBytes))
decompressOrders = append(decompressOrders, poolworker.WorkOrder{Op: poolworker.Decompress, Payloadbytes: res.ResultBytes})
}//decompression
for i := 0; i < len(decompressOrders); i++ {
in <- decompressOrders[i]
//fmt.Println(decompressOrders[i])
}
for i := 0; i < len(decompressOrders); i++ {
res := <-out
if res.Err != nil {
fmt.Println(res.Err)
continue
}
fmt.Printf("input: %s, de-compressed output: %q\n", res.Wo.Payloadbytes, string(res.ResultBytes))
}}
Notice that defer cancel() is invoked after the DispatchWorker routine is run. This ensures that that the go worker routines are killed when the main program ends.
Summary — Benefits of this infrastructure
The number of go worker routines are bounded and can be reused for different operations. So as your number of operations scale up, you will not have a ballooning pool of workers which will be a memory concern. You could for example, set the number of go worker routines to be less than the number of cores or threads in your spunky PC setup to harness the power of hyperthreading or multiprocessor. With calibration of the Op code, you could process almost anything e.g. run hash codes or checksums or even storing data to Databases.