Golang: Example of Worker Pool Pattern

go module init poolworker

work.go — Set up input and output structs

package poolworkerimport "errors"type op stringconst (
Compress op = "compress"
Decompress op = "decompress"
)
type WorkOrder struct {
Op op
Payloadbytes []byte
}
type WorkResponse struct {
Wo WorkOrder
ResultBytes []byte
Err error
}
func runOrder(wo WorkOrder) WorkResponse {
switch wo.Op {
case Compress:
return compression(wo)
case Decompress:
return decompression(wo)
default:
return WorkResponse{Err: errors.New("unsupported operation")}
}
}

worker.go — Creating the workers in the pool

package poolworkerimport (
"context"
"fmt"
)
func DispatchWorkers(numberWorkers int) (context.CancelFunc, chan WorkOrder, chan WorkResponse) {
in := make(chan WorkOrder, numberWorkers)
out := make(chan WorkResponse, numberWorkers)
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < numberWorkers; i++ {
go Worker(ctx, i+1, in, out)
}
return cancel, in, out
}func Worker(ctx context.Context, id int, in chan WorkOrder, out chan WorkResponse) {
for { //run indefinately
select {
case <-ctx.Done():
return case wo := <-in:
case wo := <-in:
fmt.Printf("Workder %d performing work %s\n", id, wo.Op)
out <- runOrder(wo)
}
}

compression_work.go — Where your actual “work” should be done

package poolworkerimport (
"bytes"
"compress/gzip"
"io"
)
func compression(wo WorkOrder) WorkResponse {
bufferWriter := new(bytes.Buffer)
text := wo.Payloadbytes
compressedWriter := gzip.NewWriter(bufferWriter)
_, err := compressedWriter.Write(text)
if err != nil {
return WorkResponse{
Err: err}
}
compressedWriter.Close() //to flush to underlying writer
return WorkResponse{
Wo: wo,
ResultBytes: bufferWriter.Bytes()}
}
func decompression(wo WorkOrder) WorkResponse {
bufferReader := bytes.NewReader(wo.Payloadbytes)
decompressionReader, err := gzip.NewReader(bufferReader)
if err != nil {
return WorkResponse{Err: err}
}
defer decompressionReader.Close()
uncompressedBytes := make([]byte, len(wo.Payloadbytes)*5) //at least have capacity five times of compressed bytes _, err = decompressionReader.Read(uncompressedBytes)
if err != nil && err != io.EOF {
return WorkResponse{Err: err}
}
return WorkResponse{Wo: wo, ResultBytes: bytes.TrimRight(uncompressedBytes, "\x00")} //trup right away /x00 bytes
}

Main Program — the complete workflow

package mainimport (
"fmt"
"poolworker"
)
func main() { cancel, in, out := poolworker.DispatchWorkers(5)
defer cancel()
orders := []poolworker.WorkOrder{
{Op: poolworker.Compress, Payloadbytes: []byte("A long time ago")},
{Op: poolworker.Compress, Payloadbytes: []byte("in a galaxy far far away")},
{Op: poolworker.Compress, Payloadbytes: []byte("there is a universe that is upside-down")},
{Op: poolworker.Compress, Payloadbytes: []byte("Is there time for things to heal, I asked")},
}
//compression
for i := 0; i < len(orders); i++ {
in <- orders[i]
}
decompressOrders := make([]poolworker.WorkOrder, 0, len(orders))
for i := 0; i < len(orders); i++ {
res := <-out
if res.Err != nil {
fmt.Println(res.Err)
continue
}
fmt.Printf("input: %s, compressed output: %q\n", res.Wo.Payloadbytes, string(res.ResultBytes))
decompressOrders = append(decompressOrders, poolworker.WorkOrder{Op: poolworker.Decompress, Payloadbytes: res.ResultBytes})
}
//decompression
for i := 0; i < len(decompressOrders); i++ {
in <- decompressOrders[i]
//fmt.Println(decompressOrders[i])
}
for i := 0; i < len(decompressOrders); i++ {
res := <-out
if res.Err != nil {
fmt.Println(res.Err)
continue
}
fmt.Printf("input: %s, de-compressed output: %q\n", res.Wo.Payloadbytes, string(res.ResultBytes))
}
}
Photo by Razvan Chisu on Unsplash

--

--

--

Writing to soothe the soul, programming to achieve flow

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Capturing and Editing Packets with PyDivert

Source: http://www.cs.cmu.edu/afs/cs/academic/class/15441-f01/www/assignments/P2/htmlproj2_split/node5.html

What Is an OutSystems MVP and How to Become One

Top 10 Best Free Windows Hidden Tips & Tricks

Structural Testing in a nutshell

Screen Capture — Quality without the Cost

Persistence Layer Library — Boosting DB Performance

The Right Tools in the Right Place: Drupal’s CKEditor

Protecting Greenplum databases

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Mipsmonsta

Mipsmonsta

Writing to soothe the soul, programming to achieve flow

More from Medium

Golang Interfaces

Unit testing in Go using Mockery

Connecting dockerized Golang services via Kafka

How Golang DB migration tool work?