Golang: Concurrency Pattern — Pipelined Workers

Mipsmonsta
5 min readJan 11, 2022
Photo by the blowup on Unsplash

In my earlier article, I explored how you can have a pool of workers that are ready to do processing. In that example, your have to write pipelining codes in the main program. What if you want to use fixed pipeline tasks, write them once and reuse them in the package.

In this article, I will show guide you to a different variant of concurrency with pipelined workers.

worker.go — Setup of Module

go mod init pipelineworker

It’s always a good practice to create your module, in this case, pipelineworker. The benefit is that VS Code will pick up the cue and sense the package, so that your code importation within VS Code will work fine and dandy.

Once done, we create the Worker in the worker.go file, using a struct. In the struct, we have a in and out channel that is non-buffered and work with string.

type Worker struct {
In chan string
Out chan string
}

We also define the operation strings so that our worker can be discerning and perform different stages of work as instructed.

type Op stringconst (
Print Op = "print"
Hash Op = "hash"
Composite Op = "composite"
)

Next, since our worker is a struct, we will need to have job methods that will work with the worker as a receiver.

func (wk *Worker) Print(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case payload := <-wk.In:
fmt.Println(payload)
wk.Out <- payload
}
}
}
//do sha512/256 hashing of string
func (wk *Worker) Hash(ctx context.Context) {
//do sha512/256
for {
select {
case <-ctx.Done():
return
case payload := <-wk.In:
checksum := sha512.Sum512_256([]byte(payload))
wk.Out <- (payload + ";" + string(checksum[:]))
}
}
}
func (wk *Worker) Composite(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case payload := <-wk.In:
stringSlices := strings.Split(payload, ";")
stringLen := len(stringSlices[0])
outload := fmt.Sprintf("payload: %q has hash %q and string len: %d\n", stringSlices[0], stringSlices[1], stringLen)
wk.Out <- outload
}
}
}

In the above, we have the Hash, Composite and Print methods. In hash, we take a string as input from the channel and compute the SHA512/256 hash. After hashing, we combined the input string and the hash with a semicolon delimiter and send this new string into the out channel.

In the composite method, we recover the original string and compute its length, before compositing the string, its hash and its length into a formatted string. The latter is once again sent to the out channel.

For the print method, it will take the input string from the in channel and print them. After printing, the same input string is stuffed into the out channel.

pipeline.go — Writing your pipelining once

In the pipeline, we write a new function whose aim is to create a new pipeline. The pipeline function will encapsulate codes that will perform the three tasks — hashing, compositing and printing in sequence. Each tasks are also carried out by workers and the number of workers are defined as inputs to this pipeline.

package pipelineworkerimport "context"func NewPipeline(ctx context.Context, numberOfHashWorkers int, numberOfCompositeWorkers int, numberOfPrintWorkers int) (in chan string, out chan string) {
inHash := make(chan string, numberOfHashWorkers)
inComposite := make(chan string, numberOfCompositeWorkers)
inPrint := make(chan string, numberOfPrintWorkers)
outFinal := make(chan string, numberOfPrintWorkers)
for i := 0; i < numberOfHashWorkers; i++ {
w := Worker{
In: inHash,
Out: inComposite,
}
go w.Hash(ctx)
}
for i := 0; i < numberOfCompositeWorkers; i++ {
w := Worker{
In: inComposite,
Out: inPrint,
}
go w.Composite(ctx)
}
for i := 0; i < numberOfPrintWorkers; i++ {
w := Worker{
In: inPrint,
Out: outFinal,
}
go w.Print((ctx))
}
return inHash, outFinal}

In the code above, we see that three for loops are created one after another, where within the for loop, we will launch the hash, composite and print tasks in their respective loop. The use of the for loop is to ensure we launch the input-defined number of go routine workers for each tasks. These workers are running through the lifespan of the main program and will do work when their input channel is filled.

Important to note is that we are using buffered channel and the depth of the in channels are the same size as the number of workers for the tasks that will be using the channels. The idea here is that we should have enough buffer slots such that all the slots can be processed by one worker, if the buffer channel is filled to the brim. If too many workers, in a case where the buffer channel is filled, the excess workers are idling.

The outputs of the NewPipeline are the in channel and the out channel. In our main program, as we will see later, we will make use of the in channel to do the “work” tasks.

Our main program — production ready!

package mainimport (
"context"
"pipelineworker/pipelineworker"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in, out := pipelineworker.NewPipeline(ctx, 5, 2, 2)setOfStrings := []string{
"Today is a good day, so where do you want to go"
"Sun is rising from the east, and set in the West",
"Time to say goodbye, it's being a good ride",
"A good way to live is to live everyday like it's the last",
"Putting this issue aside, your are simply rude",
}
go func() {
for i := 0; i < len(setOfStrings); i++ {
in <- setOfStrings[i]
}
}()
for i := 0; i < len(setOfStrings); i++ {
<-out
}
}

In the main program, we create a new pipeline and add strings that we want to be processed by the pipeline to the in channel obtained from the new pipeline. This is run as a go routine so that in the main program, we simultaneously read out the out channel. The last step is akin to pulling out the outputs so that all task stages are executed on the input in sequence. Without this last step, the out receiving channel will block and the workers will not be processing tasks.

--

--

Mipsmonsta

Writing to soothe the soul, programming to achieve flow