Workers
Quando si ha una situazione di molti jobs indipendenti da compiere, il lavoro può essere suddiviso su un numero di goroutines, ciascuna delle quali esegue lo stesso codice.
Tali goroutines si chiamano workers.
Ogni worker ha un channel per ricever i jobs ed un altro per inviare i risultati.
Il channel dei jobs viene scritto in questo esempio da un unico produttore, ma letto da molti consumatori: deve essere bufferizzato o si verificano ritardi.
Il channel dei risultati ha molti produttori ma un unico consumatore e il suo svuotamento è determinato dalla velocità del consumatore. Se è bufferizzato si può causare un burst di lavoro, se non lo è diventa rate-limited. Va pianificato.
Esempio
(340worker-pools.go):
package main
import "fmt"
import "time"
// Funzione worker che riceve dati e comunica
// su due channel monodirezionali
func worker(id int, jobs <-chan int, results chan<- int) {
// range consuma i job dal channel
// bloccandosi se non ne trova
for j := range jobs {
fmt.Println("wk", id, ": processing job", j)
// Attesa di un secondo
time.Sleep(time.Second)
// Risultato è il doppio nel numero del job
results <- j * 2
}
}
func main() {
// Creazione di channel per i job e i risultati
// Quello dei job deve essere bufferizzato con
// sufficienti elementi o si rischia il blocco
jobs := make(chan int, 100)
// Quello dei risultati può anche non essere
// bufferizzato poichè ha un solo consumatore
// results := make(chan int)
results := make(chan int, 100)
// Un pool di 3 worker
// inizialmente bloccati perchè non hanno ancora jobs
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// Invio di 9 jobs
for j := 1; j <= 9; j++ {
jobs <- j
}
// Chiusura del channel, termina i worker
close(jobs)
// Raccolta dei risultati
for a := 1; a <= 9; a++ {
res := <-results
fmt.Println("main: result ", res)
}
// Sappiamo esattamente quanti sono i risultati
// Se invece usiamo il codice seguente si blocca
// for {
// res := <-results
// fmt.Println("main: result ", res)
// }
}
Se non sappiamo a priori quanti sono i job da leggere, occorre un loop infinito e un meccanismo per informare il main di smettere di ascoltare il channel dei risultati.
Channel di compimento lavoro
Come al solito la soluzione è un channel extra in cui ogni worker che termina segnala il fatto con il proprio id.
Il main attende la terminazione di tutti i worker prima di finire.
(341-worker-stop.go):
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, done chan<- int) {
for j := range jobs {
fmt.Println("wk", id, ": processing job", j)
time.Sleep(time.Second)
results <- j * 2
}
done <- id
fmt.Printf("worker %d exiting\n", id)
}
func main() {
const numworkers int = 3
jobs := make(chan int, 100)
results := make(chan int, 100)
done := make(chan int, numworkers)
for w := 1; w <= numworkers; w++ {
go worker(w, jobs, results, done)
}
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
finished := 0
loop:
for {
select {
case res := <-results:
fmt.Println("main: result ", res)
case n := <-done:
fmt.Printf("worker %d has finished\n", n)
finished++
if finished >= numworkers {
break loop
}
}
}
fmt.Println("main exiting")
}