[Golang] Tìm hiểu về worker pool – cùng anh Touliver đưa beat đến cho các Rapper

Một phần của series “Đại ca Phong học Golang

— Anh Tèoooo!

— Ủa Hằng. Làm gì mà giờ nãy vẫn còn ở công ty vậy?

— Em đang đọc tiếp về worker pool trong Golang, mà chưa hiểu lắm. Anh Tèo có biết về nó ko, chỉ cho em với , hiu hiu.

— À, anh cũng mới tìm hiểu. Để anh ngồi xích lại kể cho Hằng nghe nhé.

Tóm tắt kiến thức

  • Buffrered channel có từ 1 khoảng trống trở lên để chứa dữ liệu, không yêu cầu cả 2 goroutines gửi và nhận cùng phải sẵn sàng cùng lúc. Luồng xử lý chỉ bị block nếu: goroutines gửi bị block nếu không còn khoảng trống trong channel; goroutines nhận bị block nếu trong channel không có dữ liệu.
  • WaitGroup: Dùng để đợi 1 tập hợp các goroutines hoàn thành xong công việc. Syntax:
    • Khai báo: wg := &sync.WaitGroup{}
    • wg.Add(1)
    • wg.Wait()
    • wg.Done()
  • Worker pool là gì: là 1 tập hợp các goroutines chờ task để xử lý
  • Tại sao dùng worker pool: quản lý được goroutines dễ dàng, tiết kiệm thời gian cấp phát goroutines .
  • Worker pool ứng dụng vào việc gì: Có n công việc cần thực hiện, có thể thực hiện tối đa m việc cùng lúc.
  • Yếu tố quan trọng worker pool: Kiểm soát size, không sẽ bị exhausted resource.

Chém

— Trước tiên, anh sẽ nói với em các khái niệm, công cụ sử dụng trước khi đưa vào ví dụ cho em dễ hiểu

— Dạ

— Đầu tiên là buffered channel: Buffrered channel có từ 1 khoảng trống trở lên để chứa dữ liệu, không yêu cầu cả 2 goroutines gửi và nhận cùng phải sẵn sàng cùng lúc. Luồng xử lý chỉ bị block nếu: goroutines gửi bị block nếu không còn khoảng trống trong channel; goroutines nhận bị block nếu trong channel không có dữ liệu.

— Cái này hôm trước anh Khấc có nói với em rồi.

— Ừa. Nếu quên thì em mở lại bài trước để đọc nha. Tiếp theo là đến wait group. Bình thường ở bài trước, anh thấy em dùng sleep để goroutines ở hàm main đợi như này

package main
import (
"fmt"
"time"
)
func read(data <-< span>chan string) {
fmt.Println(<-< span>data)
}
func write(data chan<-< span> string) {
data <-< span> "Có làm thì mới có ăn"
}
func main() {
unbuffer := make(chan string)
go write(unbuffer)
go read(unbuffer)
time.Sleep(1 * time.Second)
}

view rawcolam_thimoi_coan_v4.go hosted with ❤ by GitHub

— Việc dùng sleep sẽ không chuẩn vì thời gian sleep là cố định, trong khi thời gian mà goroutines xử lý (hàm write) là không cố định đúng không anh

— Ừa đúng dồi. Thế nên Wait Group ra đời để khắc phục nhược điểm trên của sleep. Anh viết lại ví dụ của Hằng, sử dụng wait group như sau:

package main
import (
"fmt"
"sync"
)
func read(wg *sync.WaitGroup, data <-< span>chan string) {
fmt.Println(<-< span>data)
wg.Done()
}
func write(data chan<-< span> string) {
data <-< span> "Có làm thì mới có ăn"
}
func main() {
wg := &sync.WaitGroup{}
wg.Add(1)
unbuffer := make(chan string)
go write(unbuffer)
go read(wg, unbuffer)
wg.Wait()
//time.Sleep(1 * time.Second)
}

view rawcolam_thimoi_coan_v5.go hosted with ❤ by GitHub

— Ở dòng 18 và 19, anh khai báo wait group và bảo nó đợi 1 tín hiệu done:

1
2
wg := &sync.WaitGroup{}
wg.Add(1)

— Chỗ wg.Add(1), nếu em sửa thành wg.Add(2) thì sao?

— Thì wait group sẽ đợi 2 tín hiệu done mới chạy tiếp. Em để ý dòng 10, ở hàm read anh có gọi wg.Done() để báo hiệu goroutines đã chạy xong.

— Tức là nếu add vào 1 thì phải có tương ứng 1 cái done.

— Đúng vậy. Và dòng 25 anh có gọi wg.Wait() để block hàm main lại. Khi nào có đủ số lượng tín hiệu done thì mới cho chạy tiếp.

— À à. Em cũng hiểu mang máng rồi.

— Để Hằng hiểu rõ hơn, anh lấy thêm 1 ví dụ này nhé:

Để chuẩn bị cho chương trình Rap Việt, anh Hoàng Touliver cần làm 100 beat và đưa tới cho các rapper. Vì chỉ có 10 phòng thu, nên khi nào có phòng trống thì anh mới vào làm beat được.

Khi 1 beat được làm xong, các rapper sẽ pick ngay beat để viết bài. Rapper nào viết xong thì có thể pick beat tiếp theo nếu vẫn còn. Cứ như vậy cho tới hết.

Trong bài này, chúng ta sẽ vận dụng worker pool để điều phối beat tới các rapper một cách hợp lí.
Em thử suy luận 1 chút để làm xem sao.

— Hm… đầu tiên là chỉ có 10 phòng thu, ta sẽ tạo ra 1 buffered channel có size là 10

1
beatChannel := make(chan string, 10)

Tiếp đó thì cần thêm 1 vài goroutines: 1 để tạo ra beat (produceBeat) và mỗi rapper tương ứng 1 goroutine để viết rap (writeSong)

func main() {
rappers := []string{"Dế Choắt", "GDucky", "MCK", "TLinh", "Rtee"}
numBeat := 100
wg := &sync.WaitGroup{}
beatChannel := make(chan string, 10)
// produceBeat
go produceBeat(numBeat, beatChannel)
// Write rap song
for _, rapper := range rappers {
wg.Add(1)
go writeSong(wg, rapper, beatChannel)
}
wg.Wait()
}

view rawwrite_rap_song_main_func.go hosted with ❤ by GitHub

Trong vòng for, mỗi rapper em tạo ra 1 goroutine riêng. Mỗi goroutine cần add 1 đơn vị chờ wg.Add(1)

Sau đó để waitGroup block hàm main lại cho tới khi nhận đủ tín hiệu done

1
wg.Wait()

Để giả lập quá trình tạo ra beat và viết bài hát cần có thời gian khác nhau, em sẽ tạo ra hàm wait, sẽ sleep ngẫu nhiên từ 1 đến 2s.

func wait() {
time.Sleep(time.Second * time.Duration(rand.Intn(2)))
}

view rawwrite_rap_song_wait.go hosted with ❤ by GitHub

Ở hàm produceBeat, em sẽ tạo ra beat và đưa vào channel

func produceBeat(numBeat int, beatChannel chan string) {
for i := 1; i <=< span> numBeat; i++ {
wait()
fmt.Println("Tạo ra beat số: ", i)
beatChannel <-< span> fmt.Sprintf("Beat %d", i)
}
close(beatChannel)
}

view rawwrite_rap_song_produce_beat.go hosted with ❤ by GitHub

Ở hàm writeSong, em lấy beat từ channel ra và thực hiện viết rap

func writeSong(wg *sync.WaitGroup, rapper string, beatChannel chan string) {
defer wg.Done()
for {
beat, ok := <-< span>beatChannel
if !ok {
fmt.Printf("Hết beat. %s ra khỏi phòng thu\n", rapper)
return
}
fmt.Printf("Rapper %s sử dụng beat %s\n", rapper, beat)
wait()
}
}

view rawwrite_rap_song_write_song.go hosted with ❤ by GitHub

Sau khi ghép lại thì em được đoạn code như này

package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rappers := []string{"Dế Choắt", "GDucky", "MCK", "TLinh", "Rtee"}
numBeat := 100
wg := &sync.WaitGroup{}
beatChannel := make(chan string, 10)
// produceBeat
go produceBeat(numBeat, beatChannel)
// Write rap song
for _, rapper := range rappers {
wg.Add(1)
go writeSong(wg, rapper, beatChannel)
}
wg.Wait()
}
func writeSong(wg *sync.WaitGroup, rapper string, beatChannel chan string) {
defer wg.Done()
for {
beat, ok := <-< span>beatChannel
if !ok {
fmt.Printf("Hết beat. %s ra khỏi phòng thu\n", rapper)
return
}
fmt.Printf("Rapper %s sử dụng beat %s\n", rapper, beat)
wait()
}
}
func produceBeat(numBeat int, beatChannel chan string) {
for i := 1; i <=< span> numBeat; i++ {
wait()
fmt.Println("Tạo ra beat số: ", i)
beatChannel <-< span> fmt.Sprintf("Beat %d", i)
}
close(beatChannel)
}
func wait() {
time.Sleep(time.Second * time.Duration(rand.Intn(2)))
}

view rawwrite_rap_song.go hosted with ❤ by GitHub

— Ok. Đoạn code của em chạy đúng rồi. Anh chỉ muốn nói thêm vài common mistake mà mọi người hay mắc phải nữa thôi.

Đầu tiên wait group truyền vào trong các hàm cần ở dạng con trỏ

— Nếu không ở dạng con trỏ thì sao ạ?

— Thì khi em gọi wg.Done() ở trong hàm này, waitgroup phía ngoài sẽ không được update giá trị

— Do nếu truyền ở dạng thường (không phải con trỏ), Go sẽ pass by value phải không ạ?

— Đúng là như vậy.

— Còn mistake thứ hai là gì?

— Anh chưa nghĩ ra. Hihi.

Lời cảm ơn

Kết thúc bài ở đây thì hơi ngắn, nhưng mà tớ buồn ngủ quá. Nên tạm thời dừng ở đây nha. Hị hị

Cảm ơn bạn đã bỏ thời gian ra đọc bài. Nếu có gì sai hoặc có thể improve được tốt hơn, bạn hãy comment cho tớ biết nhé ^^

Have a nice day!