Veľká výhoda Go je, že bolo od začiatku navrhované na jednoduché súbežné programovanie. Súbežnosť nie je paralelizmus, ale je nutným predpokladom k paralelizmu, druhým je bežať program na viacjadrových procesoroch. Gorutiny sú základom súbežného programovania v Go. Od systémových threadov sa líšia tým, že ich inicializačný zásobník malý (typicky 4KB namiesto 1MB) a môže dynamicky rásť a zmenšovať sa. Gorutiny majú taktiež menšiu réžiu ako thready a sú obyčajne multiplexované na menší počet systémových threadov, teda nie 1:1. Go runtime plánovač optimálne rozhadzuje gorutiny čakajúce vo fronte do jednotlivých threadov bežiacich na viacerých jadrách procesorov a nie je také nutné riešiť počet threadov ako v threadpoole. To všetko znamená, že narozdiel od typicky 2 threadov na jedno jadro CPU, respektíve technicky niekoľko tisíc threadov (pamäťový limit), môžme v Go spustiť niekoľko miliónov gorutin, preto ak potrebujeme masívne súbežný systém Go je výborná voľba.
Druhá veľká výhoda, ktorú Go prináša je komunikácia medzi gorutinami. Tradičné viacvláknové modely napríklad v C++, Jave, C# alebo Pythone komunikujú medzi vláknami prostredníctvom zdieľanej pamäte. Dátové štruktúry sú pri zápise a čítaní chránené lockmy, semaformi a mutexami. Programátor si osvojuje situácie ako race condition, deadlock, undefined behavior. Go koncept hovorí “Don’t communicate by sharing memory, share memory by communicating” - namiesto správy zámkov chrániacich dáta, Go odporúča používať kanály na sprostredkovanie referencie na dáta medzi gorutinami. Kanály môžu byť asynchrónne (buffrované) alebo synchrónne zabezpečujú, že len jedna gorutina ma súčasne prístup k dátovej štruktúre. Taktiež ponúka detekciu race condition a deadlockov.
V tomto článku sa zameriame na vytvorenie web spiderbota, ktorý stiahne a zobrazí aktuálne trhové ceny vybraných akcií.
Na to použijeme web scraping a regulárne výrazy. Pri používaní web scrapingu (zber dát) a web crawlingu (sledovanie odkazov do určitej hĺbky) treba poznať technické a právne obmedzenia a dbať na ich dodržiavanie.
Prvá, sekvenčná verzia
Prvá verzia spiderbota nie je nijako úchvatná. Ak ju spustíme s parametrami akciových tickerov, sekvenčne zavolá yahoo finance server, stiahne HTML obsah, prvý regulárny výraz nájde dátový blok a druhý vyextrahuje dáta do equity
štruktúry. Ak s nič nepokazilo, tak funkcia na konci vypíše equity a čas a celkový čas trvania s počtom načítaných stránok.
Výstup:
$ go run . TSLA AMZN GOOG MSFT
reading TSLA
{TSLA Tesla, Inc. 439.67 USD} 1.6309993s
reading AMZN
{AMZN Amazon.com, Inc. 3272.71 USD} 456.997ms
reading GOOG
{GOOG Alphabet Inc. 1573.01 USD} 672.9979ms
reading MSFT
{MSFT Microsoft Corporation 219.66 USD} 478.9996ms
Total time: 3.2419967s , Total pages read: 4
Zdrojový kód:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"time"
)
type equity struct {
ticker string
name string
price float64
currency string
}
const (
site = "https://finance.yahoo.com/quote/"
matchPattern = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
groupPattern = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
defaultRefresh = 5
)
var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")
var counter = 0
func main() {
t0 := time.Now()
flag.Parse()
for _, a := range flag.Args() {
readSymbol(a)
}
fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
}
func readSymbol(symbol string) {
t0 := time.Now()
fmt.Println("reading " + symbol)
// nacitanie stranky
response, err := http.Get(site + symbol)
if err != nil {
fmt.Println(err)
return
}
// klient musi zavriet response body ked s nim skonci pracovat
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Println(err)
return
}
// parsovanie obsahu
match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
if len(group) > 1 {
p, err := strconv.ParseFloat(group[3], 64)
if err != nil {
fmt.Println(err)
return
}
equ := equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
fmt.Println(equ, time.Now().Sub(t0))
counter++
}
}
|
Druhá, nainvná súbežná verzia
Povedali sme že gorutiny sú jednoduché, stačí pridať pred volanie funkcie go
však?
Nie tak celkom, pozrime sa ako by program zbehol ak by sme zmenili riadok 34 na:
33
34
35
|
for _, a := range flag.Args() {
go readSymbol(a)
}
|
Výstup:
Program síce zbehne a keďže ho nič neblokuje, skončí oveľa skôr ako stihne prvá gorutina načítať stránku:
$ go run . TSLA AMZN GOOG MSFT
reading TSLA
Total time: 0s , Total pages read: 0
Druhá, funkčná súbežná verzia
Pre čakanie na ukončenie gorutin môžme použiť wait group
z balíčka sync
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"sync"
"time"
)
type equity struct {
ticker string
name string
price float64
currency string
}
const (
site = "https://finance.yahoo.com/quote/"
matchPattern = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
groupPattern = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
defaultRefresh = 5
)
var wg sync.WaitGroup // wait group
var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")
var counter = 0
func main() {
t0 := time.Now()
flag.Parse()
for _, a := range flag.Args() {
wg.Add(1) // zvysit counter wait group
go readSymbol(a)
}
wg.Wait() // wait group counter, cakat na ukoncenie vsetkych gorutin
fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
}
func readSymbol(symbol string) {
defer wg.Done() // zavola sa po navrate z funkcie
t0 := time.Now()
fmt.Println("reading " + symbol)
// nacitanie stranky
response, err := http.Get(site + symbol)
if err != nil {
fmt.Println(err)
return
}
// klient musi zavriet response body ked s nim skonci pracovat
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Println(err)
return
}
// parsovanie obsahu
match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
if len(group) > 1 {
p, err := strconv.ParseFloat(group[3], 64)
if err != nil {
fmt.Println(err)
return
}
equ := equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
fmt.Println(equ, time.Now().Sub(t0))
counter++
}
}
|
Výstup:
$ go run . TSLA AMZN GOOG MSFT
reading MSFT
reading GOOG
reading TSLA
reading AMZN
{MSFT Microsoft Corporation 214.22 USD} 1.470001s
{GOOG Alphabet Inc. 1534.61 USD} 1.5740001s
{TSLA Tesla, Inc. 430.83 USD} 1.6290002s
{AMZN Amazon.com, Inc. 3207.21 USD} 1.6549999s
Total time: 1.6549999s , Total pages read: 4
Táto verzia je síce funkčná, ale kód hlavnej gorutiny je blokovaný čakaním na všetky gorutiny. Čiastkové výstupy nemôžme nijako spracovávať a musíme udržiavať počítadlo gorutin. Buď všetko alebo nič.
Tretia, vylepšená súbežná verzia
Tu nastupuje potrebná komunikácia medzi gorutinami. Zmenené riadky sú označené a okomentované:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"sync"
"time"
)
type equity struct {
ticker string
name string
price float64
currency string
}
const (
site = "https://finance.yahoo.com/quote/"
matchPattern = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
groupPattern = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
defaultRefresh = 5
)
var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")
var counter = 0
var mux sync.Mutex // deklarovat mutex na uzamykanie countra
func main() {
t0 := time.Now()
c := make(chan int) // vytvorit kanal
flag.Parse()
for _, a := range flag.Args() {
go readSymbol(a, c) // spustit gorutiny
}
for range flag.Args() {
<-c // pockat na dokoncenie
}
fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
}
func readSymbol(symbol string, c chan<- int) {
t0 := time.Now()
fmt.Println("reading " + symbol)
// nacitanie stranky
response, err := http.Get(site + symbol)
if err != nil {
fmt.Println(err)
c <- 1 // komunikujem ze som skoncila
return
}
// klient musi zavriet response body ked s nim skonci pracovat
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Println(err)
c <- 1 // komunikujem ze som skoncila
return
}
// parsovanie obsahu
match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
if len(group) > 1 {
p, err := strconv.ParseFloat(group[3], 64)
if err != nil {
fmt.Println(err)
c <- 1 // komunikujem ze som skoncila
return
}
equ := equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
fmt.Println(equ, time.Now().Sub(t0))
mux.Lock() // musim uzamknut pristup ku countru
counter++
mux.Unlock() // musim odomknut pristup ku countru
}
c <- 1 // komunikujem ze som skoncila
}
|
Výstup:
$ go run . TSLA AMZN GOOG MSFT
reading AMZN
reading MSFT
reading TSLA
reading GOOG
{AMZN Amazon.com, Inc. 3272.71 USD} 1.7000158s
{MSFT Microsoft Corporation 219.66 USD} 1.8280157s
{GOOG Alphabet Inc. 1573.01 USD} 2.0010173s
{TSLA Tesla, Inc. 439.67 USD} 2.0260156s
Total time: 2.0260156s , Total pages read: 4
Táto verzia sa mi však stále veľmi nepáči. Nechcem pamätať na uzavretie kanálu, dá sa elegantnejšie urobiť cez funkčný modifikátor defer
, ktorý sa vždy zavolá pri opustení funkcie (ak je ich definovaných viacero zavolajú sa v opačnom poradí, ako boli definované).
Tiež nechcem zdielať dátové štruktúry ak skutočne netreba, zamykať a odomykať counter. V náročnejšej aplikácii by to ľahko čoskoro spôsobilo veľmi ťažko odhaliteľné komplikácie. Preto chcem s countrom pracovať len v main gorutine a preradiť ho z globálnej premennej do scope main gorutiny.
Spracovávať (vypísať) dáta v readSymbol
funkcii taktiež nechcem, nie je to škálovateľný SOLID prístup, funkcia má dáta len načítať a spracovať ich chcem spoločne s ostatnými dátami v main gorutine. Na to poslúži kanál, ktorým sa presunie referencia.
Štvrtá finálna verzia
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"time"
)
type equity struct {
ticker string
name string
price float64
currency string
}
const (
site = "https://finance.yahoo.com/quote/"
matchPattern = "\"%s\":\\{\"sourceInterval\"(.*?)\"tradeable\":false\\}"
groupPattern = "\"shortName\":\"(.*?)\".*\"currency\":\"(.*?)\".*\"regularMarketPrice\":\\{\"raw\":(.*?)\\,.*\"symbol\":\"(.*?)\""
defaultRefresh = 5
)
var sleep = flag.Int("r", defaultRefresh, "refresh time in seconds")
func main() {
t0 := time.Now()
var counter = 0
chEquity := make(chan *equity) // vytvorit kanal
flag.Parse()
for _, a := range flag.Args() {
go readSymbol(a, chEquity) // spustit gorutiny
}
chTimeout := time.After(10 * time.Second) // timeout nastaveny na 10 sekund
for {
select { // spracuje case, ktory pride prvy
case <-chTimeout: // timeout
fmt.Println("Total time:", time.Now().Sub(t0), ", Total pages read:", counter)
return
case equ := <-chEquity: // nova equity referencia
if equ != nil {
fmt.Println(*equ)
counter++
}
}
}
}
func readSymbol(symbol string, c chan<- *equity) {
// zavolá sa po ukončení funkcie
defer func() {
time.Sleep(time.Duration(*sleep) * time.Second)
readSymbol(symbol, c) // dalsia iteracia po prebudeni
}()
fmt.Println("reading " + symbol)
// nacitanie stranky
response, err := http.Get(site + symbol)
if err != nil {
fmt.Println(err)
return
}
// klient musi zavriet response body ked s nim skonci pracovat
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Println(err)
return
}
// parsovanie obsahu
match := regexp.MustCompile(fmt.Sprintf(matchPattern, symbol)).FindString(string(body))
group := regexp.MustCompile(groupPattern).FindStringSubmatch(match)
if len(group) > 1 {
p, err := strconv.ParseFloat(group[3], 64)
if err != nil {
fmt.Println(err)
return
}
c <- &equity{name: group[1], currency: group[2], price: p, ticker: group[4]}
}
}
|
Výstup:
$ go run . -r 3 TSLA AMZN GOOG MSFT
reading TSLA
reading GOOG
reading MSFT
reading AMZN
{TSLA Tesla, Inc. 439.67 USD}
{AMZN Amazon.com, Inc. 3272.71 USD}
{GOOG Alphabet Inc. 1573.01 USD}
{MSFT Microsoft Corporation 219.66 USD}
reading TSLA
reading AMZN
{TSLA Tesla, Inc. 439.67 USD}
{AMZN Amazon.com, Inc. 3272.71 USD}
reading GOOG
reading MSFT
{GOOG Alphabet Inc. 1573.01 USD}
{MSFT Microsoft Corporation 219.66 USD}
reading TSLA
reading AMZN
{TSLA Tesla, Inc. 439.67 USD}
reading GOOG
{AMZN Amazon.com, Inc. 3272.71 USD}
{GOOG Alphabet Inc. 1573.01 USD}
reading MSFT
{MSFT Microsoft Corporation 219.66 USD}
Total time: 10.0005958s , Total pages read: 12
Inicializácia konfigurácie
Niekedy potrebujeme inicializovať konfiguráciu alebo zdroje len raz počas behu programu. Môžme teoreticky použiť globálnu premennú, ktorú nastavíme a porovnáme cez ==nil alebo ==0, ale to nie bezpečné z pohľadu súbežnosti (race condition). Preto balíček ponúka sync.Once
mutex. Príklad demonštruje ako použiť bezpečné spustenie inicializácie zaručene len jediný raz:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package main
import (
"fmt"
"sync"
"time"
)
var initOnce sync.Once
func main() {
for i := 0; i < 5; i++ {
go process()
}
time.Sleep(1 * time.Second)
}
func process() {
initOnce.Do(func() {
fmt.Println("Loading...")
time.Sleep(500 * time.Millisecond)
})
fmt.Println("Processing")
}
|
Výstup:
1
2
3
4
5
6
7
|
$ go run .
Loading...
Processing
Processing
Processing
Processing
Processing
|
Zhrnutie
go
spustí funkciu v gorutine, gorutiny sú odľahčené vlákna ktoré bežia súbežne v jednom alebo viacerých systémových vláknach a sú efektívne riadené go runtime plánovačom
chan
je komunikačný kanál, ktorým gorutiny odovzdávajú informácie, referencia na dáta a synchronizujú beh programu. Kanály sú v Go “first-class values” a môžme definovať kanály kanálov, t.j. odovzdávať kanál cez iný kanál. Sú thread-safe a preferované všade kde je možné vyhnúť sa mutex lockom. Môžu byť synchrónne alebo asynchrónne (buffrované)
select
zablokuje gorutinu a čaká na prvú dostupnú hodnotu v prvom kanály, má podobný syntax ako switch
time.After()
čaká na uplynutie doby a pošle aktuálny čas do vráteného kanálu, používa sa pri timeoutoch, pozri go doc time.After
sync.WaitGroup
je blokujúce počítadlo gorutin, ktoré môžme použiť na čakanie kým gorutiny neskončia. Zvýšime ho funkciou Add(delta int)
, znížime ho Done()
a čakáme na dokončenie všetkých gorutin pomocou Wait()
sync.Mutex
je blokujúci “mutual exclusive lock”, s funkciami Lock() a Unlock(), ktorým sa uzamyká blok proti “race condition”. Môže však spôsobiť deadlock. Využíva sa pri ochrane zdrojov so sychrónnym prístupom (napr. súbory) alebo synchronizácii prístupu k dátovým štruktúram (slice, map, atď)
sync.Once
je objekt s mutexom a funkciou Do(f func()), ktorá sa zavolá bezpečne a zaručene len raz
Referencie
- Effective Go
- Introduction to programming in Go
- Building Concurrent Workflows in Go with Goroutines and Channels
Vaše otázky, návrhy a komentáre
Tento blog vznikol na podnet od vás. Verím, že vás tento návod inšpiroval a budem vďačný ak dáte spätnú väzbu a pomôže mi zamerať sa na to čo by vás zaujímalo.