Workers

Quando si ha una situazione di molti jobs indipendenti da compiere, il lavoro può essere suddiviso su un numero di goroutines, ciascuna delle quali esegue lo stesso codice.

Tali goroutines si chiamano workers.

Ogni worker ha un channel per ricever i jobs ed un altro per inviare i risultati.

Il channel dei jobs viene scritto in questo esempio da un unico produttore, ma letto da molti consumatori: deve essere bufferizzato o si verificano ritardi.

Il channel dei risultati ha molti produttori ma un unico consumatore e il suo svuotamento è determinato dalla velocità del consumatore. Se è bufferizzato si può causare un burst di lavoro, se non lo è diventa rate-limited. Va pianificato.

Esempio

(340worker-pools.go):

package main

import "fmt"
import "time"

// Funzione worker che riceve dati e comunica
// su due channel monodirezionali
func worker(id int, jobs <-chan int, results chan<- int) {
	// range consuma i job dal channel
	// bloccandosi se non ne trova
	for j := range jobs {
		fmt.Println("wk", id, ": processing job", j)
		// Attesa di un secondo
		time.Sleep(time.Second)
		// Risultato è il doppio nel numero del job
		results <- j * 2
	}
}

func main() {

	// Creazione di channel per i job e i risultati
	// Quello dei job deve essere bufferizzato con
	// sufficienti elementi o si rischia il blocco
	jobs := make(chan int, 100)
	// Quello dei risultati può anche non essere
	// bufferizzato poichè ha un solo consumatore
//	results := make(chan int)
	results := make(chan int, 100)

	// Un pool di 3 worker
	// inizialmente bloccati perchè non hanno ancora jobs
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	// Invio di 9 jobs
	for j := 1; j <= 9; j++ {
		jobs <- j
	}
	// Chiusura del channel, termina i worker
	close(jobs)

	// Raccolta dei risultati
	for a := 1; a <= 9; a++ {
		res := <-results
		fmt.Println("main: result ", res)
	}
	// Sappiamo esattamente quanti sono i risultati
	// Se invece usiamo il codice seguente si blocca
//	for {
//		res := <-results
//		fmt.Println("main: result ", res)
//	}

}

Se non sappiamo a priori quanti sono i job da leggere, occorre un loop infinito e un meccanismo per informare il main di smettere di ascoltare il channel dei risultati.

Channel di compimento lavoro

Come al solito la soluzione è un channel extra in cui ogni worker che termina segnala il fatto con il proprio id.
Il main attende la terminazione di tutti i worker prima di finire.

(341-worker-stop.go):

package main

import (
	"fmt"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int, done chan<- int) {
	for j := range jobs {
		fmt.Println("wk", id, ": processing job", j)
		time.Sleep(time.Second)
		results <- j * 2
	}
	done <- id
	fmt.Printf("worker %d exiting\n", id)
}

func main() {
	const numworkers int = 3
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	done := make(chan int, numworkers)

	for w := 1; w <= numworkers; w++ {
		go worker(w, jobs, results, done)
	}

	for j := 1; j <= 9; j++ {
		jobs <- j
	}
	close(jobs)

	finished := 0
loop:
	for {
		select {
		case res := <-results:
			fmt.Println("main: result ", res)
		case n := <-done:
			fmt.Printf("worker %d has finished\n", n)
			finished++
			if finished >= numworkers {
				break loop
			}
		}
	}
	fmt.Println("main exiting")
}