我们刚刚完成了面向对象编程语言中常用的四人组合设计模式。在过去几十年中,它们被广泛使用(甚至在书中明确定义之前)。
在本章中,我们将看到 Go 语言中的并发性。我们将了解到,使用多核和多进程,应用程序可以帮助我们实现更好的性能和无限的可能性。我们将研究如何以同时安全的方式使用一些已知的模式。
当我们谈论围棋的并发性时,不可能不谈论历史。在过去的几十年中,我们看到了 CPU 速度的提高,直到我们达到了当前硬件材料、设计和体系结构所施加的硬件限制。当我们达到这一点,我们开始玩第一个多核计算机,第一个双 CPU 主板,然后单 CPU 与多个核心在他们的心脏。
不幸的是,我们使用的语言仍然是我们拥有单核 CPU(如 java 或 C++)时创建的语言。虽然它们是非常优秀的系统语言,但在设计上缺乏适当的并发支持。您可以通过使用第三方工具或开发自己的工具(这不是一项很容易的任务),以项目中使用的两种语言开发并发应用程序。
Go 的并发性设计时考虑了这些警告。创建者希望使用新手熟悉的垃圾收集和过程化语言,但同时可以轻松地编写并发应用程序,而不会影响语言的核心。
我们在早期章节中已经经历了这一点。我们已经开发了 20 多个设计模式,对并发性只字不提。这清楚地表明,作为核心语言的一部分,Go 语言的并发特性与核心语言完全分离,这是抽象和封装的完美示例。
计算机科学中有许多并发模型,最著名的是存在于语言中的 actor 模型,如Erlang或Scala。另一方面,Go 使用通信顺序进程(CSP),这有一种不同的并发方法。
许多人误解了两者的区别,甚至认为它们是一样的。Go 的创建者之一 Rob Pike 有一个流行的演讲,并发性不是并行性,这一点我非常同意。作为谈话的简要总结,我们可以摘录以下内容:
并发通过设计正确的并发工作结构来实现并行。
例如,我们可以想到自行车的机械装置。当我们踩踏板时,我们通常会踩下踏板以产生力(这种推动会使我们的另一条腿在另一个踏板上抬起)。我们不能同时用两条腿推,因为曲柄不允许我们推。但这种设计允许建造一辆平行自行车,通常称为串联自行车。双人自行车是两个人可以同时骑的自行车;他们都踩踏板,并对自行车施力。
在自行车示例中,并发是一种自行车的设计,具有两条腿(Goroutines),您可以自行产生动力来移动自行车。设计是并行的和正确的。如果我们使用一辆串联自行车和两个人(两个核心),那么解决方案是并行的、正确的和并行的。但关键是,对于并行设计,我们不必担心并行性;如果我们的并行设计是正确的,我们可以把它看作是一个额外的特性。事实上,我们可以使用双人自行车,但同时设计自行车的腿、踏板、链条和车轮仍然是正确的。
对于并发,在左边,我们有一个设计和一个结构,由同一个 CPU 核心顺序执行。一旦我们有了这种设计和结构,就可以通过在不同的线程上简单地重复这种结构来实现并行性。
这就是 Go 简化并发和并行程序推理的方法,它简单地不太担心并行执行,而更多地关注并发设计和结构。将一个大任务分解成可以并发运行的小任务通常在单核计算机中提供更好的性能,但是,如果这种设计也可以并行运行,我们可以获得更高的吞吐量(或者不,取决于设计)。
事实上,我们可以通过将环境变量GOMAXPROCS
设置为我们想要的核心数量来设置 Go 应用程序中使用的核心数量。这不仅在使用调度器(如ApacheMesos)时有用,而且还让我们能够更好地控制 Go 应用程序的工作和执行方式。
所以,总而言之,记住并发性是关于结构的,并行性是关于执行的,这一点非常重要。我们必须考虑以更好的方式使我们的程序并发,将它们分解成更小的工作,如果可能并且允许的话,Go 的调度程序将尝试使它们并行。
考虑并发性的最常见、也许也是最直观的方式是与 actor 模型的工作方式相近的方式。
在 actor 模型中,如果actor 1想要与actor 2通信,那么actor 1必须先知道actor 2;例如,它必须有自己的进程 ID(可能来自创建步骤),并将消息放在其收件箱队列中。放置消息后,如果参与者 2无法立即处理消息,则参与者 1可以继续其任务而不会被阻止。
另一方面,CSP 在方程通道中引入了一个新实体。通道是进程之间通信的方式,因为它们是完全匿名的(与参与者不同,我们需要知道它们的进程 ID)。在 CSP 的情况下,我们没有用于通信的进程 ID。相反,我们必须创建到进程的通道,以允许传入和传出通信。在这种情况下,我们知道接收器是它用来接收数据的通道:
在这个图中,我们可以看到进程是匿名的,但是我们有一个 ID 为 1 的通道,即通道 1,它将它们连接在一起。这个抽象并没有告诉我们在通道的每一侧有多少个进程;它只是将它们连接起来,并允许使用通道在进程之间进行通信。
这里的关键是通道隔离了两个极端,以便进程 A 可以通过一个通道发送数据,该通道将由一个或多个对 A 透明的进程处理。它也可以反过来工作;进程 B 可以一次一个地从多个通道接收数据。
在 Go 中,我们通过使用 Goroutines 来实现并发性。它们就像在计算机中同时运行应用程序的进程;事实上,围棋的主循环也可以被认为是一个 Goroutine。Goroutines 用于我们将使用演员的地方。它们执行一些逻辑并死亡(或者在必要时保持循环)。
但 goroutine 不是线程。我们可以同时启动数千个 goroutine,甚至数百万个。它们非常便宜,增长速度很小。我们将使用 Goroutines 来执行我们希望并发工作的代码。例如,对三个服务的三个调用组成一个响应,可以与三个 Goroutine 同时设计,以潜在地并行执行服务调用,并与第四个 Goroutine 一起接收它们并组成响应。这有什么意义?如果我们有一台有四个核心的计算机,我们可能会并行运行这个服务调用,但是如果我们使用一台单核心计算机,设计仍然是正确的,调用将只在一个核心中并发执行。通过设计并发应用程序,我们不需要担心并行执行。
回到自行车的类比,我们用两条腿推着自行车的踏板。这是两个同时踩踏板的 Goroutines。当我们使用串联时,我们总共有四个 goroutine,可能并行工作。但我们也有两只手来操纵前后制动器。我们的双线程自行车总共有八个 Goroutines。实际上,我们刹车时不踩踏板,我们踩踏板时也不刹车;这是一个正确的并行设计。我们的神经系统传输有关何时停止踏板和何时开始制动的信息。在围棋中,我们的神经系统由通道组成;我们会先和戈罗季斯玩一会儿再看他们。
现在解释够了。让我们把手弄脏。对于我们的第一个 Goroutine,我们将在 Goroutine 中打印消息Hello World!
。让我们从我们到目前为止一直在做的事情开始:
package main
func main() {
helloWorld()
}
func helloWorld(){
println("Hello World!")
}
运行这段代码只需在控制台中输出Hello World!
:
$ go run main.go
Hello World!
一点也不令人印象深刻。要在新的 Goroutine 中运行它,我们只需要在函数调用的开头添加关键字go
:
package main
func main() {
go helloWorld()
}
func helloWorld(){
println("Hello World!")
}
用这个简单的词,我们告诉 Go 启动一个新的 Goroutine,运行helloWorld
函数的内容。
那么,让我们运行它:
$ go run main.go
$
什么它什么也没印!为什么呢?当您开始处理并发应用程序时,事情会变得复杂。问题在于main
函数在helloWorld
函数执行之前完成。让我们一步一步地分析一下。main
函数启动并调度一个新的 Goroutine,该 Goroutine 将执行helloWorld
函数,但当函数完成时,该函数不会执行——它仍在调度过程中。
因此,我们的main
问题是main
函数必须等待 Goroutine 执行后才能完成。因此,让我们暂停一下,给 Goroutine 留出一些空间:
package main
import "time"
func main() {
go helloWorld()
time.Sleep(time.Second)
}
func helloWorld(){
println("Hello World!")
}
time.Sleep
函数在继续(和退出)之前有效地使主 Goroutine 休眠一秒钟。如果现在运行此操作,则必须获得以下消息:
$ go run main.go
Hello World!
我想你现在一定已经注意到了一个很小的时间间隔,在这个时间间隔内,程序在完成之前就冻结了。这是睡眠的功能。如果你正在做很多任务,你可能想把等待时间提高到你想要的任何程度。请记住,在任何应用程序中,main
函数都不能在其余 goroutine 之前完成。
我们已经定义了helloWorld
函数,以便可以使用不同的 Goroutine 启动它。这并不是绝对必要的,因为您可以直接在函数的作用域中启动代码片段:
package main
import "time"
func main() {
go func() {
println("Hello World")
}()
time.Sleep(time.Second)
}
这也是有效的。我们使用了一个匿名函数,并在一个新的 Goroutine 中使用go
关键字启动了它。仔细看一看函数的右大括号,它们后面是表示函数执行的左括号和右括号。
我们还可以将数据传递给匿名函数:
package main
import "time"
func main() {
go func(msg string) {
println(msg)
}("Hello World")
time.Sleep(time.Second)
}
这也是有效的。我们定义了一个匿名函数,该函数接收一个字符串,然后打印接收到的字符串。当我们在另一个 Goroutine 中调用函数时,我们传递了想要打印的消息。从这个意义上讲,以下示例也是有效的:
package main
import "time"
func main() {
messagePrinter := func(msg string) {
println(msg)
}
go messagePrinter("Hello World")
go messagePrinter("Hello goroutine")
time.Sleep(time.Second)
}
在本例中,我们在main
函数的范围内定义了一个函数,并将其存储在名为messagePrinter
的变量中。现在我们可以使用messagePrinter(string)
签名同时打印任意数量的消息:
$ go run main.go
Hello World
Hello goroutine
我们刚刚触及了 Go 中并发编程的表面,但我们已经看到它可以非常强大。但我们肯定要对那段睡眠时间做些什么。WaitGroup 可以帮助我们解决这个问题。
WaitGroup 包含在同步包(即sync
包)中,以帮助我们同步许多并发 goroutine。它的工作非常简单——每次我们必须等待一个 Goroutine 完成时,我们将1
添加到组中,一旦所有 Goroutine 都被添加,我们就要求组等待。当 Goroutine 完成时,它会显示Done
,WaitGroup 将从组中选择一个:
package main
import (
"sync"
"fmt"
)
func main() {
var wait sync.WaitGroup
wait.Add(1)
go func(){
fmt.Println("Hello World!")
wait.Done()
}()
wait.Wait()
}
这是 WaitGroup 最简单的示例。首先,我们创建了一个变量来保存它,称为wait
变量。接下来,在启动新的 Goroutine 之前,我们使用wait.Add(1)
方法对 WaitGrouphey, you'll have to wait for one thing to finish
说。现在我们可以启动 WaitGroup 必须等待的1
,在本例中,这是之前的 Goroutine,它打印Hello World
并在 Goroutine 末尾显示Done
(使用wait.Done()
方法)。最后,我们指示 WaitGroup 等待。我们必须记住,函数wait.Wait()
可能是在 Goroutine 之前执行的。
让我们再次运行代码:
$ go run main.go
Hello World!
现在,它只是等待必要的时间,而不是在退出应用程序之前再等待一毫秒。记住,当我们使用Add(value)
方法时,我们向 WaitGroup 添加实体,而当我们使用Done()
方法时,我们减去一个实体。
实际上,Add
函数取一个 delta 值,所以下面的代码相当于前面的代码:
package main
import (
"sync"
"fmt"
)
func main() {
var wait sync.WaitGroup
wait.Add(1)
go func(){
fmt.Println("Hello World!")
wait.Add(-1)
}()
wait.Wait()
}
在本例中,我们在启动 Goroutine 之前添加了1
,并在其末尾添加了-1
(减去 1)。如果我们事先知道我们将要启动多少个 goroutine,我们也可以只调用一次Add
方法:
package main
import (
"fmt"
"sync"
)
func main() {
var wait sync.WaitGroup
goRoutines := 5
wait.Add(goRoutines)
for i := 0; i < goRoutines; i++ {
go func(goRoutineID int) {
fmt.Printf("ID:%d: Hello goroutines!\n", goRoutineID)
wait.Done()
}(i)
}
wait.Wait()
}
在本例中,我们将创建五个 goroutine(如goroutines
变量中所述)。我们事先知道,所以我们只需将它们全部添加到 WaitGroup。然后,我们将通过使用for
循环来启动相同数量的goroutine
变量。每次一个 Goroutine 完成时,它调用 WaitGroup 的Done()
方法,该方法实际上在主循环的末尾等待。
同样,在这种情况下,代码在启动所有 goroutine(如果有)之前到达main
函数的末尾,WaitGroup 使主流的执行等待,直到调用所有Done
消息。让我们运行这个小程序:
$ go run main.go
ID:4: Hello goroutines!
ID:0: Hello goroutines!
ID:1: Hello goroutines!
ID:2: Hello goroutines!
ID:3: Hello goroutines!
我们之前没有提到过,但是我们已经将迭代索引作为参数GoroutineID
传递给每个 Goroutine,以打印消息Hello goroutines!
,您可能还注意到 Goroutine 没有按顺序执行。当然我们正在处理一个调度程序,它不能保证 Goroutines 的执行顺序。这是在编写并发应用程序时要记住的。事实上,如果我们再次执行它,我们不一定会得到相同的输出顺序:
$ go run main.go
ID:4: Hello goroutines!
ID:2: Hello goroutines!
ID:1: Hello goroutines!
ID:3: Hello goroutines!
ID:0: Hello goroutines!
既然我们知道了如何使用 WaitGroups,我们还可以引入回调的概念。如果您曾经使用过像 JavaScript 这样广泛使用它们的语言,那么您将熟悉本节。回调是一个匿名函数,将在不同函数的上下文中执行。
例如,我们希望编写一个函数,将字符串转换为大写,并使其异步。我们如何编写这个函数以便处理回调?有一个小技巧,我们可以有一个函数,它接受一个字符串并返回一个字符串:
func toUpperSync(word string) string {
//Code will go here
}
因此,将此函数的返回类型(字符串)作为匿名函数的第二个参数,如下所示:
func toUpperSync(word string, f func(string)) {
//Code will go here
}
现在,toUpperSync
函数不返回任何内容,但同时也接受了一个函数,碰巧它也接受了一个字符串。我们可以使用通常返回的结果执行此函数。
func toUpperSync(word string, f func(string)) {
f(strings.ToUpper(word))
}
我们执行f
函数,结果是使用提供的单词调用strings.ToUpper
方法(返回大写的单词parameter
。我们也来编写main
函数:
package main
import (
"fmt"
"strings"
)
func main() {
toUpperSync("Hello Callbacks!", func(v string) {
fmt.Printf("Callback: %s\n", v) })
}
func toUpperSync(word string, f func(string)) {
f(strings.ToUpper(word))
}
在主代码中,我们定义了回调。如您所见,我们通过了将其转换为大写的测试Hello Callbacks!
。接下来,我们传递要执行的回调,其结果是将字符串传递为大写。在这种情况下,我们只需在控制台中打印文本,前面有文本Callback
。当我们执行此代码时,我们得到以下结果:
$ go run main.go
Callback: HELLO CALLBACKS!
严格来说,这是一个同步回调。为了使其异步化,我们必须引入一些并发处理:
package main
import (
"fmt"
"strings"
"sync"
)
var wait sync.WaitGroup
func main() {
wait.Add(1)
toUpperAsync("Hello Callbacks!", func(v string) {
fmt.Printf("Callback: %s\n", v)
wait.Done()
})
println("Waiting async response...")
wait.Wait()
}
func toUpperAsync(word string, f func(string)) {
go func(){
f(strings.ToUpper(word))
}()
}
这是异步执行的相同代码。我们使用 WaitGroups 来处理并发性(稍后我们将看到通道也可以用于此)。现在,我们的函数toUpperAsync
顾名思义是异步的。我们在调用回调时使用关键字go
在不同的 Goroutine 中启动回调。我们编写了一条小消息,以更精确地显示并发执行的顺序性质。我们等待回调信号,直到它完成,我们可以安全地退出程序。执行此操作时,我们得到以下结果:
$ go run main.go
Waiting async response...
Callback: HELLO CALLBACKS!
如您所见,程序在执行toUpperAsync
函数中的回调之前到达main
函数的末尾。这种模式带来了许多可能性,但让我们面临一个称为回调地狱的大问题。
术语回调地狱通常用于指多个回调相互堆叠的情况。这使得他们很难在成长太多的时候进行推理和处理。例如,使用与前面相同的代码,我们可以将另一个异步调用与之前打印到控制台的内容堆叠在一起:
func main() {
wait.Add(1)
toUpperAsync("Hello Callbacks!", func(v string) {
toUpperAsync(fmt.Sprintf("Callback: %s\n", v), func(v string) {
fmt.Printf("Callback within %s", v)
wait.Done()
})
})
println("Waiting async response...")
wait.Wait()
}
(我们省略了导入、包名和toUpperAsync
函数,因为它们没有改变。)现在我们在toUpperAsync
函数中有了toUpperAsync
函数,如果需要,我们可以嵌入更多。在本例中,我们再次传递之前在控制台上打印的文本,以便在下面的回调中使用它。内部回调最终在控制台上打印它,并给出以下输出:
$ go run main.go
Waiting async response...
Callback within CALLBACK: HELLO CALLBACKS!
在这种情况下,我们可以假设外部回调将在内部回调之前执行。这就是为什么我们不需要在 WaitGroup 中再添加一个。
这里的要点是,我们在使用回调时必须小心。在非常复杂的系统中,太多的回调很难推理,也很难处理。但只要谨慎和理性,它们是强有力的工具。
如果使用并发应用程序,则必须处理多个可能访问某个内存位置的资源。这通常被称为竞赛条件。
简单地说,比赛条件类似于两个人试图在同一时间得到最后一块比萨饼的时刻——他们的手碰撞。用变量替换比萨饼,用 Goroutines 替换他们的手,我们就有了一个完美的类比。
餐桌上有一个角色来解决这个问题——父亲或母亲。他们把比萨饼放在了另一张桌子上,我们在吃比萨饼之前必须征得他们的允许才能站起来。如果所有的孩子都同时问,那没关系——他们只允许一个孩子站着。
互斥体就像我们的父母。他们将控制谁可以访问比萨饼——我的意思是,一个变量——他们不允许任何其他人访问它。
要使用互斥锁,我们必须主动锁定它;如果它已经被锁定(另一个 Goroutine 正在使用它),我们将不得不等到它再次被解锁。一旦我们访问互斥锁,我们可以再次锁定它,做任何需要的修改,然后再次解锁它。我们将用一个例子来说明这一点。
互斥锁广泛应用于并发编程中。也许 Go 没有这么多,因为它在使用频道时有一种更惯用的并发编程方式,但值得一看的是,在频道不太适合的情况下,它们是如何工作的。
对于我们的示例,我们将开发一个小型并发计数器。此计数器将向Counter
类型的整数字段中添加一个。这应该以并行安全的方式进行。
我们的Counter
结构定义如下:
type Counter struct {
sync.Mutex
value int
}
Counter
结构有一个int
类型的字段,用于存储计数的当前值。它还嵌入了sync
包中的Mutex
类型。嵌入此字段将允许我们在不主动调用特定字段的情况下锁定和解锁整个结构。
我们的main
函数将启动 10 个 goroutine,尝试向Counter
结构的字段值添加一个 goroutine。所有这些都是同时进行的:
package main
import (
"sync"
"time"
)
func main() {
counter := Counter{}
for i := 0; i < 10; i++ {
go func(i int) {
counter.Lock()
counter.value++
defer counter.Unlock()
}(i)
}
time.Sleep(time.Second)
counter.Lock()
defer counter.Unlock()
println(counter.value)
}
我们已经创建了一个名为Counter
的类型。使用for
循环,我们总共启动了 10 个 Goroutines,正如我们在匿名函数作为新 Goroutines部分所看到的。但是在每个 Goroutine 中,我们锁定计数器,这样就没有更多的 Goroutine 可以访问它,在字段值中添加一个,然后再次解锁它,以便其他人可以访问它。
最后,我们将打印计数器持有的值。它一定是 10,因为我们已经启动了 10 个 goroutine。
但是我们怎么知道这个程序是线程安全的呢?嗯,Go 附带了一个非常方便的内置功能,称为“种族检测器”。
我们已经知道什么是比赛条件。概括地说,当两个进程试图同时访问同一资源时,会使用它,此时会涉及一个或多个写入操作(两个进程都在写入,或者一个进程在写入,而另一个进程在读取)。
Go 有一个非常方便的工具来帮助诊断竞争条件,您可以直接在测试或主应用程序中运行。因此,让我们重新使用刚才为互斥体部分编写的示例,并使用种族检测器运行它。这非常简单,只需将-race
命令行标志添加到程序的命令执行中即可:
$ go run -race main.go
10
嗯,不是很令人印象深刻,是吗?但事实上,它告诉我们,在这个程序的代码中,它没有检测到潜在的竞争条件。在我们修改之前,让-race
标志的检测器通过不锁定counter
来警告我们可能的竞争条件:
for i := 0; i < 10; i++ {
go func(i int) {
//counter.Lock()
counter.value++
//counter.Unlock()
}(i)
}
在for
循环中,在向字段值添加1
之前和之后注释Lock
和Unlock
调用。这将引入竞争条件。让我们再次运行相同的程序,并激活竞赛标志:
$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c42007a068 by goroutine 6:
main.main.func1()
[some_path]/concurrency/locks/main.go:19 +0x44
Previous write at 0x00c42007a068 by goroutine 5:
main.main.func1()
[some_path]/concurrency/locks/main.go:19 +0x60
Goroutine 6 (running) created at:
main.main()
[some_path]/concurrency/locks/main.go:21 +0xb6
Goroutine 5 (finished) created at:
main.main()
[some_path]/concurrency/locks/main.go:21 +0xb6
==================
10
Found 1 data race(s)
exit status 66
我已经减少了一点输出,以便看得更清楚。我们可以看到一条大写的大消息,上面写着WARNING: DATA RACE
。但这种输出很容易推理。首先,它告诉我们,main.go
文件中由行 19表示的某个内存位置正在读取某个变量。但是在同一个文件的行 19中还有一个写操作!
这是因为“++
”操作需要读取当前值并写入以添加一个值。这就是为什么竞态条件在同一行中,因为每次执行竞态条件时,它都会读取和写入Counter
结构中的字段。
但是让我们记住,种族检测器在运行时工作。它不会静态地分析我们的代码!这是什么意思?这意味着在我们的设计中,我们可以有一个潜在的竞争条件,而竞争检测器不会检测到。例如:
package main
import "sync"
type Counter struct {
sync.Mutex
value int
}
func main() {
counter := Counter{}
for i := 0; i < 1; i++ {
go func(i int) {
counter.value++
}(i)
}
}
我们将保留前面示例中所示的代码。我们将从代码中获取所有锁定和解锁,并启动一个 Goroutine 来更新value
字段:
$ go run -race main.go
$
没有警告,因此代码是正确的。好吧,我们知道,这不是故意的。我们可以将执行的 goroutine 数量增加到两个,看看会发生什么:
for i := 0; i < 2; i++ {
go func(i int) {
counter.value++
}(i)
}
让我们再次执行该程序:
$ go run -race main.go
WARNING: DATA RACE
Read at 0x00c42007a008 by goroutine 6:
main.main.func1()
[some_path]concurrency/race_detector/main.go:15 +0x44
Previous write at 0x00c42007a008 by goroutine 5:
main.main.func1()
[some_path]/concurrency/race_detector/main.go:15 +0x60
Goroutine 6 (running) created at:
main.main()
[some_path]/concurrency/race_detector/main.go:16 +0xad
Goroutine 5 (finished) created at:
main.main()
[some_path]/concurrency/race_detector/main.go:16 +0xad
==================
Found 1 data race(s)
exit status 66
现在是,检测到竞争条件。但是如果我们将使用的处理器数量减少到一个呢?我们也会有比赛条件吗?
$ GOMAXPROCS=1 go run -race main.go
$
似乎没有检测到竞争条件。这是因为调度程序先执行一个 Goroutine,然后再执行另一个 Goroutine,因此,最后,竞争条件没有发生。但是随着 Goroutine 数量的增加,它也会警告我们关于竞争的情况,即使只使用一个核心。
因此,竞赛检测器可以帮助我们检测代码中正在发生的竞赛条件,但它不能保护我们免受不立即执行竞赛条件的糟糕设计的影响。这是一个非常有用的功能,可以让我们免于很多麻烦。
通道是语言中允许我们编写并发应用程序的第二个原语。我们已经在通信顺序过程部分讨论了一些通道。
渠道是我们在流程之间进行沟通的方式。我们可以共享一个内存位置,并使用互斥来控制进程的访问。但是,通道为我们提供了一种更自然的方式来处理并发应用程序,从而在我们的程序中产生更好的并发设计。
如果我们不能在许多 goroutine 之间创建一些同步,那么使用它们似乎相当困难。一旦同步,执行顺序可能就无关紧要了。通道是在 Go 中编写并发应用程序的第二个关键特性。
现实生活中的电视频道是将发射(从工作室)连接到数百万电视(接收器)的东西。Go 中的频道也以类似的方式工作。一个或多个 Goroutine 可以用作发射器,而一个或多个 Goroutine 可以用作接收器。
默认情况下,还有一个东西通道会阻止 Goroutines 的执行,直到收到某个东西。就好像我们最喜欢的电视节目延迟发射,直到我们打开电视,这样我们就不会错过任何东西。
围棋是怎么做到的?
package main
import "fmt"
func main() {
channel := make(chan string)
go func() {
channel <- "Hello World!"
}()
message := <-channel
fmt.Println(message)
}
要在 Go 中创建通道,我们使用与创建切片相同的语法。make
关键字用于创建频道,我们必须传递关键字chan
和频道将传输的类型,在本例中是字符串。这样,我们就有了一个名为channel
的阻塞通道。接下来,我们启动一个 Goroutines,将消息Hello World!
发送到频道。这由显示流的直观箭头指示,Hello World!
文本进入(<-
频道。这类似于变量中的赋值,因此我们只能通过先写入通道,然后写入箭头,最后写入要传递的值来将某些内容传递给通道。我们不能写"Hello World!" -> channel
。
正如我们前面提到的,在收到消息之前,该通道会阻止 Gorountines 的执行。在这种情况下,main
功能的执行将停止,直到来自已启动 Goroutines 的消息到达线路message := <-channel
中信道的另一端。在这种情况下,箭头指向相同的方向,但它位于通道之前,表示数据正从通道中提取并分配给名为message
的新变量(使用新分配“:=
运算符)。
在这种情况下,我们不需要使用 WaitGroup 将main
函数与创建的 goroutine 同步,因为通道的默认性质是在收到数据之前阻塞。但它是否反过来起作用?如果 Goroutine 发送消息时没有接收器,它是否继续?让我们编辑此示例以了解以下内容:
package main
import (
"fmt"
"time"
)
func main() {
channel := make(chan string)
var waitGroup sync.WaitGroup
waitGroup.Add(1)
go func() {
channel <- "Hello World!"
println("Finishing goroutine")
waitGroup.Done()
}()
time.Sleep(time.Second)
message := <-channel
fmt.Println(message)
waitGroup.Wait()
}
我们将再次使用Sleep
功能。在本例中,我们在 Goroutine 完成时打印一条消息。最大的区别在于main
功能。现在,我们等待一秒钟,然后再收听通道中的数据:
$ go run main.go
Finishing goroutine
Hello World!
输出可能会有所不同,因为执行顺序同样没有保证,但现在我们可以看到,直到一秒钟过去,才会打印消息。在初始延迟之后,我们开始监听通道,获取数据并打印它。因此,发射器还必须等待来自通道另一侧的提示才能继续执行。
总而言之,通道是通过一端发送数据,另一端接收数据(如管道)在 goroutine 之间进行通信的方式。在默认状态下,发射器 Goroutine 将阻止其执行,直到接收器 Goroutine 获取数据。接收器 Goroutine 也是如此,它将阻塞,直到某个发射器通过通道发送数据。因此,您可以使用被动侦听器(等待数据)或被动发射器(等待侦听器)。
缓冲通道的工作方式与默认的无缓冲通道类似。您还可以使用箭头传递和获取它们的值,但是,与无缓冲通道不同,发送者不需要等到某个 Goroutine 拾取它们正在发送的数据:
package main
import (
"fmt"
"time"
)
func main() {
channel := make(chan string, 1)
go func() {
channel <- "Hello World!"
println("Finishing goroutine")
}()
time.Sleep(time.Second)
message := <-channel
fmt.Println(message)
}
这个示例与我们用于通道的第一个示例类似,但现在我们在make
语句中将通道的容量设置为 1。这样,我们告诉编译器,在被阻塞之前,该通道的容量为一个字符串。因此,第一个字符串不会阻止发射器,但第二个字符串会阻止发射器。让我们运行这个示例:
$ go run main.go
Finishing goroutine
Hello World!
现在我们可以任意多次运行这个小程序——输出的顺序总是一样的。这一次,我们启动了并发函数并等待了一秒钟。在此之前,匿名函数将不会继续,直到第二个已经过去,有人可以选择发送的数据。在这种情况下,对于缓冲通道,数据保存在通道中,并释放 Goroutine 以继续执行。在这种情况下,Goroutine 总是在等待时间过去之前完成。
此新通道的大小为 1,因此第二条消息将阻止 Goroutine 执行:
package main
import (
"fmt"
"time"
)
func main() {
channel := make(chan string, 1)
go func() {
channel <- "Hello World! 1"
channel <- "Hello World! 2"
println("Finishing goroutine")
}()
time.Sleep(time.Second)
message := <-channel
fmt.Println(message)
}
在这里,我们添加第二条Hello world! 2
消息,并为其提供索引。在这种情况下,该程序的输出可能如下所示:
$ go run main.go
Hello World! 1
表示我们刚刚从通道缓冲区中获取了一条消息,我们已经打印了它,main
函数在启动的 Goroutine 完成之前完成。Goroutine 在发送第二条消息时被阻止,在另一端接收第一条消息之前无法继续。然后它会快速打印,以至于没有时间打印消息来显示 Goroutine 的结束。如果您继续在控制台上执行程序,调度程序迟早会在主线程之前完成 Goroutine 执行。
Go 频道的一个很酷的特性是,当我们使用它们作为参数时,我们可以限制它们的方向性,以便它们只能用于发送或接收。如果在受限方向上使用通道,编译器将进行投诉。此功能将静态键入应用到 Go 应用程序的新级别,并使代码更易于理解和可读。
我们将举一个有关频道的简单示例:
package main
import (
"fmt"
"time"
)
func main() {
channel := make(chan string, 1)
go func(ch chan<- string) {
ch <- "Hello World!"
println("Finishing goroutine")
}(channel)
time.Sleep(time.Second)
message := <-channel
fmt.Println(message)
}
我们启动新 Goroutinego func(ch chan<- string)
的那一行指出,传递给该函数的通道只能用作输入通道,您不能收听它。
我们还可以传递一个仅用作接收器通道的通道:
func receivingCh(ch <-chan string) {
msg := <-ch
println(msg)
}
如您所见,箭头位于关键字chan
的另一侧,表示从频道进行提取操作。请记住,通道箭头始终指向左侧,要指示接收通道,它必须位于左侧;要指示插入通道,它必须位于右侧。
如果我们试图通过此仅接收通道发送值,编译器将对此进行投诉:
func receivingCh(ch <-chan string) {
msg := <-ch
println(msg)
ch <- "hello"
}
此函数有一个仅接收通道,我们将尝试使用该通道发送消息hello
。让我们看看编译器是怎么说的:
$ go run main.go
./main.go:20: invalid operation: ch <- "hello2" (send to receive-only type <-chan string)
它不喜欢它,并要求我们纠正它。现在代码更具可读性和安全性,我们刚刚在chan
参数的前面或后面放置了一个箭头。
select 语句也是 Go 的一个关键特性。它用于处理 Goroutine 中的多个通道输入。事实上,它开启了许多可能性,我们将在接下来的章节中广泛使用它。
在select
结构中,我们要求程序在一个或多个通道之间进行选择以接收数据。我们可以将此数据保存在变量中,并在完成选择之前使用它制作一些东西。select
结构只执行一次;不管它是否正在侦听更多通道,它只会执行一次,代码将继续执行。如果我们想让它多次处理相同的通道,我们必须将它放入for
循环中。
我们将制作一个小应用程序,将消息hello
和消息goodbye
发送到同一个 Goroutine,该 Goroutine 将打印它们,如果在五秒钟内没有收到任何其他内容,则退出。
首先,我们将创建一个通过通道发送字符串的通用函数:
func sendString(ch chan<- string, s string) {
ch <- s
}
现在我们可以通过调用sendString
方法通过通道发送字符串。该接电话了。接收器将从两个通道获取消息——发送hello
消息的通道和发送goodbye
消息的通道。您也可以在前面的图表中看到这一点:
func receiver(helloCh, goodbyeCh <-chan string, quitCh chan<- bool) {
for {
select {
case msg := <-helloCh:
println(msg)
case msg := <-goodbyeCh:
println(msg)
case <-time.After(time.Second * 2):
println("Nothing received in 2 seconds. Exiting")
quitCh <- true
break
}
}
}
让我们从论点开始。这个函数需要三个通道——两个接收通道和一个通过它发送东西的通道。然后,它开始一个带有for
关键字的无限循环。这样我们就可以一直收听两个频道。
在select
块的范围内,我们必须为要处理的每个通道使用一个 case(您是否意识到它与switch
语句有多么相似?)。让我们一步一步地看三个案例:
helloCh
参数获取传入数据,并将其保存在名为msg
的变量中。然后它打印这个变量的内容。goodbyeCh
参数获取传入数据,并将其保存在名为msg
的变量中。然后它还打印这个变量的内容。time
函数。之后,如果我们检查它的签名,它接受一个时间和持续时间值并返回一个接收通道。该接收通道将在指定的持续时间过后接收一个时间值time
。在我们的示例中,我们使用它返回的通道作为超时。由于 select 在每个句柄之后都会重新启动,因此计时器也会重新启动。这是一种非常简单的方法,可以为等待一个或多个通道响应的 Goroutine 设置计时器。main
功能已准备就绪:
package main
import "time"
func main() {
helloCh := make(chan string, 1)
goodbyeCh := make(chan string, 1)
quitCh := make(chan bool)
go receiver(helloCh, goodbyeCh, quitCh)
go sendString(helloCh, "hello!")
time.Sleep(time.Second)
go sendString(goodbyeCh, "goodbye!")
<-quitCh
}
同样,我们一步一步地创建了本练习中需要的三个通道。然后,我们在另一个 Goroutine 中启动了receiver
函数。这个 Goroutine 由 Go 的调度器处理,我们的程序继续。我们启动了一个新的 Goroutine,将消息hello
发送到helloCh
参数。同样,当 Go 的调度程序决定时,最终会发生这种情况。
我们的程序再次继续并等待一秒钟。在此休息期间,Go 的调度程序将有时间执行接收器和第一条消息(如果尚未执行),因此在休息期间,hello!
消息将出现在控制台上。
新消息通过goodbye
通道发送,带有新 Goroutine 中的goodbye!
文本,我们的程序再次继续到quitCh
参数中等待传入消息的行。
我们已经启动了三个 goroutine——它仍在运行的接收器,第一条消息在select
语句处理消息时已经完成,第二条消息几乎立即打印出来,也已经完成。因此,此时接收器正在运行,如果在接下来的两秒钟内没有收到任何其他消息,它将处理来自time
结构的传入消息。在channel
输入后,打印一条消息说它正在退出,向quitCh
发送一条true
,并在循环的地方打破无限循环。
让我们运行这个小应用程序:
$ go run main.go
hello!
goodbye!
Nothing received in 2 seconds. Exiting
结果可能不是很令人印象深刻,但概念是明确的。通过使用 select 语句,我们可以在同一 Goroutine 中处理多个传入通道。
我们将看到的关于通道的最后一个特性是在通道上进行测距。我们正在讨论 range 关键字。我们已经广泛使用它来覆盖列表,我们也可以使用它覆盖频道:
package main
import "time"
func main() {
ch := make(chan int)
go func() {
ch <- 1
time.Sleep(time.Second)
ch <- 2
close(ch)
}()
for v := range ch {
println(v)
}
}
在本例中,我们创建了一个无缓冲通道,但它也可以使用缓冲通道。我们在一个新的 Goroutine 中启动了一个函数,它通过一个通道发送数字“1”,等待一秒钟,发送数字“2”,然后关闭通道。
最后一步是在通道上进行测距。语法非常类似于列表范围。我们将来自通道的输入数据存储在变量v
中,并将该变量打印到控制台。该范围不断迭代,直到通道关闭,从通道获取数据。
你能猜出这个小程序的输出吗?
$ go run main.go
1
2
再一次,不是很令人印象深刻。它打印数字“1”,然后等待一秒钟,打印数字“2”,然后退出应用程序。
根据这个并发应用程序的设计,范围是迭代来自
频道
直到并发 Goroutine 关闭此通道。此时,射程结束,应用程序可以退出。
范围在从通道获取数据时非常有用,它通常用于扇入模式,其中许多不同的 goroutin 将数据发送到同一通道。
现在我们知道了如何创建 goroutine 和 channels,我们将把所有知识放在一个包中。回想前几章,当我们解释单例模式时,它是某种结构或变量,在代码中只能存在一次。对该结构的所有访问都应该使用所描述的模式来完成,但事实上,它不是并发安全的。
现在,我们将在编写时考虑并发性。我们将编写一个并发计数器,就像我们在互斥体部分中编写的一样,但这次我们将使用通道解决它。
为了限制对singleton
实例的并发访问,只有一个 Goroutine 能够访问它。我们将使用通道访问它——第一个通道添加一个,第二个通道获取当前计数,第三个通道停止 Goroutine。
我们将使用从两个不同的singleton
实例启动的 10000 个不同 goroutine 添加 10000 次。然后,我们将引入一个循环来检查singleton
的计数,直到它是 5000,但我们将在开始循环之前写入计数的多少。
一旦计数达到 5000,循环将退出并退出正在运行的 Goroutine。测试代码如下所示:
package channel_singleton
import (
"testing"
"time"
"fmt"
)
func TestStartInstance(t *testing.T) {
singleton := GetInstance()
singleton2 := GetInstance()
n := 5000
for i := 0; i < n; i++ {
go singleton.AddOne()
go singleton2.AddOne()
}
fmt.Printf("Before loop, current count is %d\n", singleton.GetCount())
var val int
for val != n*2 {
val = singleton.GetCount()
time.Sleep(10 * time.Millisecond)
}
singleton.Stop()
}
在这里,我们可以看到我们将使用的完整测试。在创建了两个singleton
实例之后,我们创建了一个for
循环,从每个实例启动AddOne
方法 5000 次。这还没有发生;他们正在计划,并将最终执行。我们正在打印singleton
实例的计数,以清楚地看到这种可能性;根据计算机的不同,它将打印一些大于 0 小于 10000 的数字。
停止保存计数的 Goroutine 之前的最后一步是进入一个循环,该循环检查计数的值,如果该值不是预期值(10000),则等待 10 毫秒。一旦达到这个值,循环将退出,我们可以停止singleton
实例。
我们将直接跳到实现,因为需求非常简单。
首先,我们将创建保存计数的 Goroutine:
var addCh chan bool = make(chan bool)
var getCountCh chan chan int = make(chan chan int)
var quitCh chan bool = make(chan bool)
func init() {
var count int
go func(addCh <-chan bool, getCountCh <-chan chan int, quitCh <-chan bool) {
for {
select {
case <-addCh:
count++
case ch := <-getCountCh:
ch <- count
case <-quitCh:
return
}
}
}(addCh, getCountCh, quitCh)
}
如前所述,我们创建了三个渠道:
addCh
通道用于与计数加一的动作进行通信,并接收一个bool
类型,仅用于信号“加一”(我们不需要发送号码,尽管我们可以)。getCountCh
通道将返回一个接收当前计数值的通道。花点时间来解释一下getCountCh
通道,它是一个接收整数类型的通道。这听起来有点复杂,但当我们完成示例时,它会更有意义,不要担心。quitCh
通道将与 Goroutine 通信,它应该结束其无限循环并完成自身。现在我们有了执行所需操作所需的通道。接下来,我们启动 Goroutine,将通道作为参数传递。如您所见,我们正在限制通道的方向,以提供更多类型的安全性。在这个 Goroutine 中,我们创建了一个无限for
循环。在循环中执行中断之前,该循环不会停止。
最后,如果您还记得的话,select
语句是一种同时从不同通道接收数据的方法。我们有三种情况,所以我们听作为参数输入的三个传入通道:
addCh
案例将增加一个计数。请记住,每次迭代只能执行一个案例,因此在我们完成添加一个案例之前,任何 Goroutine 都无法访问当前计数。getCountCh
通道接收一个接收整数的通道,因此我们捕获这个新通道并通过它将当前值发送到另一端。quitCh
通道中断for
循环,因此 Goroutine 结束。最后一件事。任何包中的init()
函数都会在程序执行时执行,因此我们不必担心专门从代码中执行此函数。
现在,我们将创建测试所期望的类型。我们将看到,在这种类型中,所有的魔术和逻辑对最终用户都是隐藏的(正如我们在测试代码中看到的):
type singleton struct {}
var instance singleton
func GetInstance() *singleton {
return &instance
}
singleton
类型的工作方式与第 2 章中的创作模式【Singleton、Builder、Factory、Prototype 和 Abstract Factory】中的工作方式类似,但这次它不会保留计数值。我们为它创建了一个名为instance
的本地值,并在调用GetInstance()
方法时返回指向该实例的指针。严格来说,没有必要这样做,但我们不需要每次访问 count 变量时都分配一个新的singleton
类型的实例。
首先,AddOne()
方法必须在当前计数上加一。怎样通过发送true
到addCh
通道。这很简单:
func (s *singleton) AddOne() {
addCh <- true
}
这个小片段将依次触发我们 Goroutine 中的addCh
案例。addCh
案例简单执行count++
并完成,让init
功能上执行的select
通道控制流执行下一条指令:
func (s *singleton) GetCount() int {
resCh := make(chan int)
defer close(resCh)
getCountCh <- resCh
return <-resCh
}
GetCount
方法每次调用时都会创建一个通道,并延迟在函数结束时关闭通道的操作。正如我们在本章前面所看到的,此通道是无缓冲的。无缓冲通道会阻止执行,直到它接收到一些数据。因此,我们将此通道发送到getCountCh
,这也是一个通道,实际上,我们希望chan int
类型通过它发送回当前计数值。直到count
变量的值到达resCh
通道,GetCount()
方法才会返回。
您可能会想,为什么我们不在两个方向上使用相同的通道来接收计数值?这样我们将避免分配。好的,如果我们在GetCount()
方法中使用相同的通道,我们将在这个通道中有两个侦听器——一个在select
语句中,在init
函数的文件开头,另一个在那里,因此它可以在返回值时解析为其中任何一个:
func (s *singleton) Stop() {
quitCh <- true
close(addCh)
close(getCountCh)
close(quitCh)
}
最后,我们必须在某个时刻停止这场狂欢。Stop
方法将该值发送到singleton
类型的 Goroutine,从而触发quitCh
案例并中断for
循环。下一步是关闭所有通道,以便不再通过它们发送数据。当你知道你将不再使用某些频道时,这是非常方便的。
执行测试并查看的时间:
$ go test -v .
=== RUN TestStartInstance
Before loop, current count is 4911
--- PASS: TestStartInstance (0.03s)
PASS
ok
代码输出很少,但一切都按预期进行。在测试中,我们在进入循环之前打印了 count 的值,循环一直迭代到值 10000。正如我们前面看到的,Go 调度程序将尝试使用您通过使用GOMAXPROCS
配置配置的尽可能多的 OS 线程来运行 Goroutines 的内容。在我的电脑中,它被设置为4
,因为我的电脑有四个核心。但关键是,我们可以看到,在启动 Goroutine(或 10000)和下一个执行行之后,很多事情都可能发生。
但它对互斥的使用又如何呢?
type singleton struct {
count int
sync.RWMutex
}
var instance singleton
func GetInstance() *singleton {
return &instance
}
func (s *singleton) AddOne() {
s.Lock()
defer s.Unlock()
s.count++
}
func (s *singleton) GetCount()int {
s.RLock()
defer s.RUnlock()
return s.count
}
在这种情况下,代码更加精简。如前所述,我们可以将互斥体嵌入到singleton
结构中。计数也保存在count
字段中,AddOne()
和GetCount()
方法锁定和解锁该值以同时确保安全。
还有一件事。在这个singleton
实例中,我们使用的是RWMutex
类型,而不是已知的sync.Mutex
类型。这里的主要区别在于RWMutex
类型有两种类型的锁——读锁和写锁。通过调用RLock
方法执行的读锁仅在写锁当前处于活动状态时等待。同时,它只阻塞一个写锁,因此许多读操作可以并行完成。这很有道理;我们不希望仅仅因为另一个 Goroutine 也在读取它不会更改的值,就阻止一个 Goroutine 读取一个值。sync.RWMutex
类型帮助我们在代码中实现这种逻辑。
我们已经了解了如何使用互斥体和通道编写并发单例。虽然通道示例更为复杂,但它也展示了 Go 并发的核心功能,因为您可以通过简单地使用通道来实现复杂级别的事件驱动体系结构。
请记住,如果您过去没有编写过并发代码,那么以一种舒适的方式开始并发思考可能需要一些时间。但没有什么是实践不能解决的。
我们已经看到了设计并发应用程序以实现程序并行性的重要性。我们已经处理了 Go 编写并发应用程序的大部分原语,现在我们可以编写通用的并发设计模式。