Worker Pool Queue In Go

November 11, 2018

Today I want to talk about work queues in Go. Specifically I want to talk about queues that not only process some work, but queues that generate new work while processing said work.

This topic is interesting to me because when I first implemented it with Go, I quickly got to a deadlock panic. My workers were consuming work, and sending results while also generating new work to be processed, which quickly gets you get to a point where no more work can be processed because those workers are trying to send their output to the same place that is trying to send more work into the input channel.

There are several posts out there that discuss channels and how they can be used to create work queues and pipelines. This one from the Go blog is quite good: https://blog.golang.org/pipelines but I couldn’t find anything written on the web about how to create queues or pipelines that can generate more work as they process input work.

So, how do we solve this? Simple, use a scheduler. Most of the times you can get away with creating a pool of workers and send them some work and call it done once they finish. However when they can generate new possible inputs, you should use a scheduler that can orchestrate or better yet “queue” up the work and give it to the worker pool as they free up from existing work.

I created an example queue to help show some code on how to structure this.

Lets say we want to create a function that given a number, it will iterate every number until the given number and print it. However if any of those numbers iterated can be divisble by 2, we will want to do the same thing, iterate every number and repeat. If any of those are divisible by two, we will feed it back into the queue for iteration as long as we haven’t seen that number before. Yes, I know you don’t know need a pool of go routines to do this, but for the sake of this exercise lets assume we do.

Lets create our worker:

func worker(ctx context.Context, input chan int, output chan []int) {
   for {
     select {
     case <-ctx.Done():
        return
     case n := <-input:
        var numbers []int
        for i := 1; i <= n; i++ {
            numbers = append(numbers, n)
        }
        output <- numbers
     }
   }
}

This worker is simple enough, we loop forever, if context tells us to finish, we return, otherwise we wait until we receive some work via the input channel. Once work is received, we iterate all the numbers, append them to a slice and send it via the output channel.

Now lets create our main function that will create a worker pool using the above worker and give it some work:

func main() {
    workCh := make(chan int)
    outputCh := make(chan []int)
    done := make(chan bool)

    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for i := 0; i < runtime.NumCPU(); i++ {
        go worker(ctx, workCh, outputCh)
    }

    wg.Add(1)
    workCh <- 10

    go func() {
        wg.Wait()
        done <- true
    }()

    seen := make(map[int]bool)

    for {
        select {
        case <- done:
            return
        case nums := <-outputCh:
            for _, n := range nums {
                fmt.Println(n)
            }
            wg.Done()
        }
    }
}

The function above, sends the number 10 to the pool, and waits for results, as it gets them it simply prints them and eventually finishes. This code will only end up using 1 go routine and the others will simply do nothing. After it finishes, context will call cancel and all workers will shutdown. If you ran this, you would get the following output:

1
2
3
4
5
6
7
8
9
10

This is great and it works really well a for straight forward worker pool that generates no extra work. But now lets add our requirement where we must also iterate any numbers outputted that are divisble by 2.

We modify our main function to look like this:

func main() {
    workCh := make(chan int)
    outputCh := make(chan []int)
    done := make(chan bool)

    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for i := 0; i < runtime.NumCPU(); i++ {
        go worker(ctx, workCh, outputCh)
    }

    wg.Add(1)
    workCh <- 10

    go func() {
        wg.Wait()
        done <- true
    }()

    seen := make(map[int]bool)

    for {
        select {
        case <- done:
            return
        case nums := <-outputCh:
            for _, n := range nums {
                if _, exists := seen[n]; !exists && n%2 == 0 {
                    wg.Add(1)
                    workCh <- n
                }
                fmt.Println(n)
                seen[n] = true
            }
            wg.Done()
        }
    }
}

Simple enough, now for every number we print, we add to the seen map, and every time we print, we check if the number has been seen. If it hasn’t been seen and it is divisble by 2, we send it back to the queue for iteration.

It should work right? Unfortunately no. If you ran the code above you would see the following output:

1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:

Oh oh, Go is telling us that we have a deadlock. It can’t process more work because the workers are blocked trying to send their results to the output channel, but the output channel can’t iterate because it is trying to send work to the pool. In order to solve this problem, we must introduce a scheduler in front of the worker pool. The scheduler will keep track of all of the work sent to the queue and give it to the workers as they become available.

Lets implement a scheduler to solve this problem:

func scheduler(ctx context.Context, input chan int, work chan int) {
    var queue []int
    for {
        if len(queue) == 0 {
            select {
            case <-ctx.Done():
                return
            case i := <-input:
                queue = append(queue, i)
            }
        } else {
            select {
            case <-ctx.Done():
                return
            case i := <-input:
                queue = append(queue, i)
            case work <- queue[0]:
                queue = queue[1:]
            }
        }
    }
}

Let us break this down. First we create a slice that will hold input values that need to be processed by the queue. Then we create a for loop that will loop forever until context tells us to stop. Inside this for loop we have a condition, that checks if the queue is empty, then we will only listen for context signals, or input signals, no need to send any work to the pool because there is none.

The else clause is where the magic is at, we do the same thing as above, we listen for context signals, for any input signal we append to the queue, and this is where I just love Go, because we know that we have available work (len(queue) > 0), we can try to send the worker pool some of this work, if the pool is busy or currently working, no worries, the select statement will just wait for one of those 3 conditions to become true, we either quit, add more work, or send work to process. The fact we can send inside a select statement is something I find really facinating and I believe not a lot of people know about when they first start working with Go. It is a really powerful construct, so give it a try!

In order to make this all come together we must now add the scheduler to our main function and make some small changes. It looks like this:

func main() {
	inputCh := make(chan int)
	workCh := make(chan int)
	outputCh := make(chan []int)
	done := make(chan bool)

	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go scheduler(ctx, inputCh, workCh)
	for i := 0; i < runtime.NumCPU(); i++ {
		go worker(ctx, workCh, outputCh)
	}

	wg.Add(1)
	inputCh <- 10

	go func() {
		wg.Wait()
		done <- true
	}()

	seen := make(map[int]bool)

	for {
		select {
		case <-done:
			return
		case nums := <-outputCh:
			for _, n := range nums {
				if _, exists := seen[n]; !exists && n%2 == 0 {
					wg.Add(1)
					inputCh <- n
				}
				fmt.Println(n)
				seen[n] = true
			}
			wg.Done()
		}
	}
}

So, we added a new channel inputCh that will be the channel we send work to the scheduler with. Notice that workCh is now only sent work via the scheduler and not from main, that’s because only the scheduler knows when its the right time to send work to the pool. We also start our scheduler in a go routine, and allow it to listen for input, and manage our worker pool for us.

All in all, if you run this code you should now see a full output and no deadlock errors from Go. I hope this helps anyone implementing this pattern or facing this type of problem, I thought it was a fun exercise to blog about.

I have the final code available here: https://github.com/josebalius/workqueue in case you want to take a look at the final solution all in one file. Feel free to copy it, modify it or do as you wish.