Implementando um worker pool

Agora que já falamos praticamente tudo que havia para ser falado sobre goroutines e channels, vamos utilizar esse conhecimento para implementar um worker pool.

Se você não conhece o termo, um worker pool é basicamente uma coleção de threads que ficam esperando tarefas serem atribuídas a elas. Quando a thread finaliza a tarefa que foi atribuída, se torna disponível novamente para execução de uma nova tarefa.

Antes de começar a meter a mão na massa, vou deixar aqui o link para os outros posts da série sobre goroutines e channels.

O worker pool que vamos implementar irá somar os dígitos passados e armazenar o resultado.

Para isso, vamos precisar de duas structs, uma para armazenar o job que será executado e outra para armazenar o resultado da execução.

type Job struct {  
    id  int
    num int
}

type Result struct {  
    job   Job
    total int
}

Com as structs criadas, vamos iniciar um buffered channel globalmente na nossa aplicação para cada uma das structs.

var (
    jobs = make(chan Job, 10)  
    results = make(chan Result, 10)
)

Na função de soma, vamos adicionar um pequeno sleep para que o programa simule uma grande carga de processamento.

func sum(number int) (total int) {  
    no := number
    for no != 0 {
        digit := no % 10
        total += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return
}

Agora, vamos implementar a função que irá criar nossos workers.

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, sum(job.num)}
        results <- output
    }
    wg.Done()
}

Com a função que cria nossos workers feita, vamos implementar uma outra função que será responsável por criar o pool de workers.

func createWorkerPool() {  
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

Como podemos ver, nessa função iniciamos um sync.WaitGroup, e como conhecemos o número total workers que serão criados, já adicionamos a quantidade de goroutines que ele deve aguardar.

Agora, vamos criar uma função para gerar 300 jobs e adicioná-los ao channel para que sejam executados.

func allocateJob() {  
    for i := 0; i &lt; 300; i++ {
        num := rand.Intn(999)
        job := Job{i, num}
        jobs <- job
    }
    close(jobs)
}

Antes de escrever nossa função principal, vamos implementar nossa última função auxiliar. Essa função será responsável por exibir o resultado de cada um dos jobs executados.

func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, número %d , soma dos dígitos %d\n", result.job.id, result.job.num, result.total)
    }
    done <- true
}

Por último, vamos implementar nossa função principal e colocar tudo isso para funcionar junto.

func main() {
    go allocateJob()
    done := make(chan bool)
    go result(done)
    createWorkerPool()
    <-done
}

Ao executar go run main.go, teremos um resultado similar a esse:

Job id 8, número 904 , soma dos dígitos 13
Job id 5, número 735 , soma dos dígitos 15
Job id 3, número 983 , soma dos dígitos 20
Job id 4, número 895 , soma dos dígitos 22
Job id 0, número 878 , soma dos dígitos 23
...

E pronto! Implementamos nosso primeiro worker pool.

Deixem suas dúvidas nos comentários.

Até a próxima!


Subscreva

Fique por dentro de tudo o que acontece no mundo Go.

Deixe uma resposta