Súbežné programovanie v Go

Sunday, Oct 18, 2020 by Nelo
Go (golang) programovanie - navod na subeznost (concurrency), kanaly (channels), synchronizacia a mutexy.

Veľká výhoda Go je, že bolo od začiatku navrhované na jednoduché súbežné programovanie. Súbežnosť nie je paralelizmus, ale je nutným predpokladom k paralelizmu, druhým je bežať program na viacjadrových procesoroch. Gorutiny sú základom súbežného programovania v Go. Od systémových threadov sa líšia tým, že ich inicializačný zásobník malý (typicky 4KB namiesto 1MB) a môže dynamicky rásť a zmenšovať sa. Gorutiny majú taktiež menšiu réžiu ako thready a sú obyčajne multiplexované na menší počet systémových threadov, teda nie 1:1. Go runtime plánovač optimálne rozhadzuje gorutiny čakajúce vo fronte do jednotlivých threadov bežiacich na viacerých jadrách procesorov a nie je také nutné riešiť počet threadov ako v threadpoole. To všetko znamená, že narozdiel od typicky 2 threadov na jedno jadro CPU, respektíve technicky niekoľko tisíc threadov (pamäťový limit), môžme v Go spustiť niekoľko miliónov gorutin, preto ak potrebujeme masívne súbežný systém Go je výborná voľba.

Druhá veľká výhoda, ktorú Go prináša je komunikácia medzi gorutinami. Tradičné viacvláknové modely napríklad v C++, Jave, C# alebo Pythone komunikujú medzi vláknami prostredníctvom zdieľanej pamäte. Dátové štruktúry sú pri zápise a čítaní chránené lockmy, semaformi a mutexami. Programátor si osvojuje situácie ako race condition, deadlock, undefined behavior. Go koncept hovorí “Don’t communicate by sharing memory, share memory by communicating” - namiesto správy zámkov chrániacich dáta, Go odporúča používať kanály na sprostredkovanie referencie na dáta medzi gorutinami. Kanály môžu byť asynchrónne (buffrované) alebo synchrónne zabezpečujú, že len jedna gorutina ma súčasne prístup k dátovej štruktúre. Taktiež ponúka detekciu race condition a deadlockov.

V tomto článku sa zameriame na vytvorenie web spiderbota, ktorý stiahne a zobrazí aktuálne trhové ceny vybraných akcií. Na to použijeme web scraping a regulárne výrazy. Pri používaní web scrapingu (zber dát) a web crawlingu (sledovanie odkazov do určitej hĺbky) treba poznať technické a právne obmedzenia a dbať na ich dodržiavanie.

Prvá, sekvenčná verzia

Prvá verzia spiderbota nie je nijako úchvatná. Ak ju spustíme s parametrami akciových tickerov, sekvenčne zavolá yahoo finance server, stiahne HTML obsah, prvý regulárny výraz nájde dátový blok a druhý vyextrahuje dáta do equity štruktúry. Ak s nič nepokazilo, tak funkcia na konci vypíše equity a čas a celkový čas trvania s počtom načítaných stránok.

Výstup:

$ go run . TSLA AMZN GOOG MSFT
reading TSLA
{TSLA Tesla, Inc. 439.67 USD} 1.6309993s
reading AMZN
{AMZN Amazon.com, Inc. 3272.71 USD} 456.997ms
reading GOOG
{GOOG Alphabet Inc. 1573.01 USD} 672.9979ms
reading MSFT
{MSFT Microsoft Corporation 219.66 USD} 478.9996ms
Total time: 3.2419967s , Total pages read: 4

Zdrojový kód:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"net/http"
	"regexp"
	"strconv"
	"time"
)

type equity struct {
	ticker   string
	name     string
	price    float64
	currency string
}

const (
	site           = "https://finance.yahoo.com/quote/"
	matchPattern   = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
	groupPattern   = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
	defaultRefresh = 5
)

var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")
var counter = 0

func main() {
	t0 := time.Now()
	flag.Parse()
	for _, a := range flag.Args() {
		readSymbol(a)
	}
	fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
}

func readSymbol(symbol string) {
	t0 := time.Now()
	fmt.Println("reading " + symbol)
	// nacitanie stranky
	response, err := http.Get(site + symbol)
	if err != nil {
		fmt.Println(err)
		return
	}
	// klient musi zavriet response body ked s nim skonci pracovat
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		fmt.Println(err)
		return
	}
	// parsovanie obsahu
	match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
	group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
	if len(group) > 1 {
		p, err := strconv.ParseFloat(group[3], 64)
		if err != nil {
			fmt.Println(err)
			return
		}
		equ := equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
		fmt.Println(equ, time.Now().Sub(t0))
		counter++
	}
}

Druhá, nainvná súbežná verzia

Povedali sme že gorutiny sú jednoduché, stačí pridať pred volanie funkcie go však? Nie tak celkom, pozrime sa ako by program zbehol ak by sme zmenili riadok 34 na:

33
34
35
    for _, a := range flag.Args() {
		go readSymbol(a)
    }

Výstup: Program síce zbehne a keďže ho nič neblokuje, skončí oveľa skôr ako stihne prvá gorutina načítať stránku:

$ go run . TSLA AMZN GOOG MSFT
reading TSLA
Total time: 0s , Total pages read: 0

Druhá, funkčná súbežná verzia

Pre čakanie na ukončenie gorutin môžme použiť wait group z balíčka sync.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"net/http"
	"regexp"
	"strconv"
	"sync"
	"time"
)

type equity struct {
	ticker   string
	name     string
	price    float64
	currency string
}

const (
	site           = "https://finance.yahoo.com/quote/"
	matchPattern   = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
	groupPattern   = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
	defaultRefresh = 5
)

var wg sync.WaitGroup // wait group
var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")
var counter = 0

func main() {
	t0 := time.Now()
	flag.Parse()
	for _, a := range flag.Args() {
		wg.Add(1) // zvysit counter wait group
		go readSymbol(a)
	}
	wg.Wait() // wait group counter, cakat na ukoncenie vsetkych gorutin
	fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
}

func readSymbol(symbol string) {
	defer wg.Done() // zavola sa po navrate z funkcie
	t0 := time.Now()
	fmt.Println("reading " + symbol)
	// nacitanie stranky
	response, err := http.Get(site + symbol)
	if err != nil {
		fmt.Println(err)
		return
	}
	// klient musi zavriet response body ked s nim skonci pracovat
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		fmt.Println(err)
		return
	}
	// parsovanie obsahu
	match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
	group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
	if len(group) > 1 {
		p, err := strconv.ParseFloat(group[3], 64)
		if err != nil {
			fmt.Println(err)
			return
		}
		equ := equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
		fmt.Println(equ, time.Now().Sub(t0))
		counter++
	}
}

Výstup:

$ go run . TSLA AMZN GOOG MSFT
reading MSFT
reading GOOG
reading TSLA
reading AMZN
{MSFT Microsoft Corporation 214.22 USD} 1.470001s
{GOOG Alphabet Inc. 1534.61 USD} 1.5740001s
{TSLA Tesla, Inc. 430.83 USD} 1.6290002s
{AMZN Amazon.com, Inc. 3207.21 USD} 1.6549999s
Total time: 1.6549999s , Total pages read: 4

Táto verzia je síce funkčná, ale kód hlavnej gorutiny je blokovaný čakaním na všetky gorutiny. Čiastkové výstupy nemôžme nijako spracovávať a musíme udržiavať počítadlo gorutin. Buď všetko alebo nič.

Tretia, vylepšená súbežná verzia

Tu nastupuje potrebná komunikácia medzi gorutinami. Zmenené riadky sú označené a okomentované:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"net/http"
	"regexp"
	"strconv"
	"sync"
	"time"
)

type equity struct {
	ticker   string
	name     string
	price    float64
	currency string
}

const (
	site           = "https://finance.yahoo.com/quote/"
	matchPattern   = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
	groupPattern   = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
	defaultRefresh = 5
)

var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")
var counter = 0
var mux sync.Mutex // deklarovat mutex na uzamykanie countra

func main() {
	t0 := time.Now()
	c := make(chan int) // vytvorit kanal
	flag.Parse()
	for _, a := range flag.Args() {
		go readSymbol(a, c) // spustit gorutiny
	}
	for range flag.Args() {
		<-c // pockat na dokoncenie
	}
	fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
}

func readSymbol(symbol string, c chan<- int) {
	t0 := time.Now()
	fmt.Println("reading " + symbol)
	// nacitanie stranky
	response, err := http.Get(site + symbol)
	if err != nil {
		fmt.Println(err)
		c <- 1 // komunikujem ze som skoncila
		return
	}
	// klient musi zavriet response body ked s nim skonci pracovat
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		fmt.Println(err)
		c <- 1 // komunikujem ze som skoncila
		return
	}
	// parsovanie obsahu
	match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
	group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
	if len(group) > 1 {
		p, err := strconv.ParseFloat(group[3], 64)
		if err != nil {
			fmt.Println(err)
			c <- 1 // komunikujem ze som skoncila
			return
		}
		equ := equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
		fmt.Println(equ, time.Now().Sub(t0))
		mux.Lock() // musim uzamknut pristup ku countru
		counter++
		mux.Unlock() // musim odomknut pristup ku countru
	}
	c <- 1 // komunikujem ze som skoncila
}

Výstup:

$ go run . TSLA AMZN GOOG MSFT
reading AMZN
reading MSFT
reading TSLA
reading GOOG
{AMZN Amazon.com, Inc. 3272.71 USD} 1.7000158s
{MSFT Microsoft Corporation 219.66 USD} 1.8280157s
{GOOG Alphabet Inc. 1573.01 USD} 2.0010173s
{TSLA Tesla, Inc. 439.67 USD} 2.0260156s
Total time: 2.0260156s , Total pages read: 4

Táto verzia sa mi však stále veľmi nepáči. Nechcem pamätať na uzavretie kanálu, dá sa elegantnejšie urobiť cez funkčný modifikátor defer, ktorý sa vždy zavolá pri opustení funkcie (ak je ich definovaných viacero zavolajú sa v opačnom poradí, ako boli definované).

Tiež nechcem zdielať dátové štruktúry ak skutočne netreba, zamykať a odomykať counter. V náročnejšej aplikácii by to ľahko čoskoro spôsobilo veľmi ťažko odhaliteľné komplikácie. Preto chcem s countrom pracovať len v main gorutine a preradiť ho z globálnej premennej do scope main gorutiny.

Spracovávať (vypísať) dáta v readSymbol funkcii taktiež nechcem, nie je to škálovateľný SOLID prístup, funkcia má dáta len načítať a spracovať ich chcem spoločne s ostatnými dátami v main gorutine. Na to poslúži kanál, ktorým sa presunie referencia.

Štvrtá finálna verzia

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"net/http"
	"regexp"
	"strconv"
	"time"
)

type equity struct {
	ticker   string
	name     string
	price    float64
	currency string
}

const (
	site           = "https://finance.yahoo.com/quote/"
	matchPattern   = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
	groupPattern   = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
	defaultRefresh = 5
)

var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")

func main() {
	t0 := time.Now()
	var counter = 0
	chEquity := make(chan *equity) // vytvorit kanal
	flag.Parse()
	for _, a := range flag.Args() {
		go readSymbol(a, chEquity) // spustit gorutiny
	}
	chTimeout := time.After(10 * time.Second) // timeout nastaveny na 10 sekund
	for {
		select { // spracuje case, ktory pride prvy
		case <-chTimeout: // timeout
			fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
			return
		case equ := <-chEquity: // nova equity referencia
			if equ != nil {
				fmt.Println(*equ)
				counter++
			}
		}
	}
}

func readSymbol(symbol string, c chan<- *equity) {
	// zavolá sa po ukončení funkcie
	defer func() {
		time.Sleep(time.Duration(*sleep) * time.Second)
		readSymbol(symbol, c) // dalsia iteracia po prebudeni
	}()
	fmt.Println("reading " + symbol)
	// nacitanie stranky
	response, err := http.Get(site + symbol)
	if err != nil {
		fmt.Println(err)
		return
	}
	// klient musi zavriet response body ked s nim skonci pracovat
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		fmt.Println(err)
		return
	}
	// parsovanie obsahu
	match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
	group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
	if len(group) > 1 {
		p, err := strconv.ParseFloat(group[3], 64)
		if err != nil {
			fmt.Println(err)
			return
		}
		c <- &equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
	}
}

Výstup:

$ go run . -r 3 TSLA AMZN GOOG MSFT
reading TSLA
reading GOOG
reading MSFT
reading AMZN
{TSLA Tesla, Inc. 439.67 USD}
{AMZN Amazon.com, Inc. 3272.71 USD}
{GOOG Alphabet Inc. 1573.01 USD}
{MSFT Microsoft Corporation 219.66 USD}
reading TSLA
reading AMZN
{TSLA Tesla, Inc. 439.67 USD}
{AMZN Amazon.com, Inc. 3272.71 USD}
reading GOOG
reading MSFT
{GOOG Alphabet Inc. 1573.01 USD}
{MSFT Microsoft Corporation 219.66 USD}
reading TSLA
reading AMZN
{TSLA Tesla, Inc. 439.67 USD}
reading GOOG
{AMZN Amazon.com, Inc. 3272.71 USD}
{GOOG Alphabet Inc. 1573.01 USD}
reading MSFT
{MSFT Microsoft Corporation 219.66 USD}
Total time: 10.0005958s , Total pages read: 12

Inicializácia konfigurácie

Niekedy potrebujeme inicializovať konfiguráciu alebo zdroje len raz počas behu programu. Môžme teoreticky použiť globálnu premennú, ktorú nastavíme a porovnáme cez ==nil alebo ==0, ale to nie bezpečné z pohľadu súbežnosti (race condition). Preto balíček ponúka sync.Once mutex. Príklad demonštruje ako použiť bezpečné spustenie inicializácie zaručene len jediný raz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
	"fmt"
	"sync"
	"time"
)

var initOnce sync.Once

func main() {
	for i := 0; i < 5; i++ {
		go process()
	}
	time.Sleep(1 * time.Second)
}

func process() {
	initOnce.Do(func() {
		fmt.Println("Loading...")
		time.Sleep(500 * time.Millisecond)
	})
	fmt.Println("Processing")
}

Výstup:

1
2
3
4
5
6
7
$ go run .
Loading...
Processing
Processing
Processing
Processing
Processing

Zhrnutie

  1. go spustí funkciu v gorutine, gorutiny sú odľahčené vlákna ktoré bežia súbežne v jednom alebo viacerých systémových vláknach a sú efektívne riadené go runtime plánovačom
  2. chan je komunikačný kanál, ktorým gorutiny odovzdávajú informácie, referencia na dáta a synchronizujú beh programu. Kanály sú v Go “first-class values” a môžme definovať kanály kanálov, t.j. odovzdávať kanál cez iný kanál. Sú thread-safe a preferované všade kde je možné vyhnúť sa mutex lockom. Môžu byť synchrónne alebo asynchrónne (buffrované)
  3. select zablokuje gorutinu a čaká na prvú dostupnú hodnotu v prvom kanály, má podobný syntax ako switch
  4. time.After() čaká na uplynutie doby a pošle aktuálny čas do vráteného kanálu, používa sa pri timeoutoch, pozri go doc time.After
  5. sync.WaitGroup je blokujúce počítadlo gorutin, ktoré môžme použiť na čakanie kým gorutiny neskončia. Zvýšime ho funkciou Add(delta int), znížime ho Done() a čakáme na dokončenie všetkých gorutin pomocou Wait()
  6. sync.Mutex je blokujúci “mutual exclusive lock”, s funkciami Lock() a Unlock(), ktorým sa uzamyká blok proti “race condition”. Môže však spôsobiť deadlock. Využíva sa pri ochrane zdrojov so sychrónnym prístupom (napr. súbory) alebo synchronizácii prístupu k dátovým štruktúram (slice, map, atď)
  7. sync.Once je objekt s mutexom a funkciou Do(f func()), ktorá sa zavolá bezpečne a zaručene len raz

Referencie

  1. Effective Go
  2. Introduction to programming in Go
  3. Building Concurrent Workflows in Go with Goroutines and Channels

Vaše otázky, návrhy a komentáre

Tento blog vznikol na podnet od vás. Verím, že vás tento návod inšpiroval a budem vďačný ak dáte spätnú väzbu a pomôže mi zamerať sa na to čo by vás zaujímalo.

TAK ČO HOVORÍŠ ?

Kontaktuj nás ak potrebuješ pomoc, veľmi radi pomôžeme.

Kontaktuj nás