现在我们已经熟悉了并发和并行的概念,并且已经了解了如何使用 Go 的并发原语来实现它们,我们可以看到一些关于并发工作和并行执行的模式。在本章中,我们将看到以下模式:
快速查看三种模式的描述。它们都描述了某种及时同步执行的逻辑。记住这一点非常重要,我们现在正在使用前面章节中看到的所有工具和模式开发并发结构。对于创造模式,我们要处理的是创造对象。在结构模式中,我们学习如何构建惯用结构,在行为模式中,我们主要使用算法进行管理。现在,通过并发模式,我们将主要管理具有多个流的应用程序的定时执行和顺序执行。
我们将从障碍模式开始。它的目的很简单——设置一个屏障,以便在我们获得所有需要的结果之前,没有人通过,这在并发应用程序中非常常见。
想象一下,我们有一个微服务应用程序,其中一个服务需要通过合并另外三个微服务的响应来组合其响应。这就是屏障模式可以帮助我们的地方。
我们的屏障模式可能是一个服务,它将阻止其响应,直到它与一个或多个不同 goroutine(或服务)返回的结果组合在一起。我们有什么样的原始生物具有阻塞性?嗯,我们可以使用锁,但在 Go 中使用无缓冲通道更为惯用。
顾名思义,Barrier 模式试图停止执行,以便在准备完成之前不会完成。屏障模式的目标如下:
在我们的示例中,我们将在 microservices 应用程序中编写一个非常典型的情况,一个执行两个 HTTPGET
调用的应用程序,并在控制台上打印的单个响应中加入它们。
我们的小应用程序必须在不同的 Goroutine 中执行每个请求,如果两个响应都正确,则在控制台上打印结果。如果其中任何一个返回错误,那么我们只打印错误。
设计必须是并行的,允许我们利用多核 CPU 并行调用:
在上图中,实线表示调用,虚线表示通道。气球是 Goroutine,因此我们有两个 Goroutine 是通过main
函数启动的(也可以认为是 Goroutine)。这两个函数将使用它们在makeRequest
调用中创建时收到的公共通道与main
函数通信。
我们在此应用程序中的主要目标是获得两个不同呼叫的合并响应,因此我们可以这样描述我们的接受标准:
http://httpbin.org/headers
和http://httpbin.org/User-Agent
URL 的合并结果。这是两个公共端点,它们使用来自传入连接的数据进行响应。它们在测试中非常流行。你需要一个互联网连接来做这个练习。为并行设计编写单元测试或集成测试有时会很棘手,但这不会阻止我们编写令人敬畏的单元测试。我们将有一个barrier
方法,它接受一组定义为string
类型的端点。屏障将向每个端点发出GET
请求,并在打印结果之前合成结果。在这种情况下,我们将编写三个集成测试来简化代码,这样就不需要生成模拟响应:
package barrier
import (
"bytes"
"io"
"os"
"strings"
"testing"
)
func TestBarrier(t *testing.T) {
t.Run("Correct endpoints", func(t *testing.T) {
endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"
}
})
t.Run("One endpoint incorrect", func(t *testing.T) {
endpoints := []string{"http://malformed-url", "http://httpbin.org/User-Agent"}
})
t.Run("Very short timeout", func(t *testing.T) {
endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"}
})
}
我们有一个单独的测试,将执行三个子测试:
我们将有一个名为barrier
的函数,它将以字符串的形式接受数量不确定的端点。它的签名可以是这样的:
func barrier(endpoints ...string) {}
如您所见,barrier
函数不返回任何值,因为其结果将打印在控制台上。在此之前,我们已经编写了一个io.Writer
接口的实现,以模拟操作系统的stdout
库上的写入。为了稍微改变一下,我们将捕获stdout
库,而不是模拟它。一旦您了解了 Go 中的并发原语,捕获stdout
库的过程就不难了:
func captureBarrierOutput(endpoints ...string) string {
reader, writer, _ := os.Pipe()
os.Stdout = writer
out := make(chan string)
go func() {
var buf bytes.Buffer
io.Copy(&buf, reader)
out <- buf.String()
}()
barrier(endpoints...)
writer.Close()
temp := <-out
return temp
}
不要被这个代码吓倒;这真的很简单。首先,我们创建了一个管道;我们之前在第 3 章结构模式——适配器、桥接器和复合设计模式中讨论过适配器设计模式。回想一下,管道允许我们将io.Writer
接口连接到io.Reader
接口,这样读卡器输入就是Writer
输出。我们将os.Stdout
定义为作者。然后,为了捕获stdout
输出,我们将需要一个不同的 Goroutine,在我们向控制台写入时进行侦听。正如你所知,如果我们写作,我们就不会捕捉,如果我们捕捉,我们就不会写作。这里的关键词是while
;这是一个很好的经验法则,如果你在某个定义中找到这个词,你可能需要一个并发结构。因此,我们使用go
关键字启动一个不同的 Goroutine,它将读取器输入复制到字节缓冲区,然后再通过通道(我们之前应该创建)发送缓冲区的内容。
此时,我们有一个监听 Goroutine,但还没有打印任何内容,因此我们使用提供的端点调用(尚未编写)函数barrier
。接下来,我们必须关闭 writer,以向 Goroutine 发出信号,表示不再有输入。我们调用的通道阻塞执行,直到收到某个值(由启动的 Goroutine 发送的值)。最后一步是返回从控制台捕获的内容。
好的,我们有一个名为captureBarrierOutput
的函数,它将捕获stdout
中的输出,并将它们作为字符串返回。我们现在可以编写测试了:
t.Run("Correct endpoints", func(t *testing.T) {
endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"
}
result := captureBarrierOutput(endpoints...)
if !strings.Contains(result, "Accept-Encoding") || strings.Contains (result, "User-Agent")
{
t.Fail()
}
t.Log(result)
})
所有的测试都很容易实现。总之,captureBarrierOutput
函数调用barrier
函数。因此,我们传递端点并检查返回的结果。我们对的冷静回应 http://httpbin.org 必须在每个端点的响应中包含文本接受编码和用户代理。如果我们找不到那些文本,测试就会失败。出于调试目的,我们记录响应,以防我们想在 go 测试中使用-v
标志进行检查:
t.Run("One endpoint incorrect", func(t *testing.T) {
endpoints := []string
{
"http://malformed-url", "http://httpbin.org/User-Agent"}
result := captureBarrierOutput(endpoints...)
if !strings.Contains(result, "ERROR") {
t.Fail()
}
t.Log(result)
})
这次我们使用了一个不正确的端点 URL,因此响应必须返回以单词error为前缀的错误,我们将在barrier
函数中写入该错误。
最后一个函数将 HTTPGET
客户端的超时时间减少到最少 1 毫秒,因此我们强制超时:
t.Run("Very short timeout", func(t *testing.T) {
endpoints := []string
{
"http://httpbin.org/headers", "http://httpbin.org/User-Agent"}
timeoutMilliseconds = 1
result := captureBarrierOutput(endpoints...)
if !strings.Contains(result, "Timeout") {
t.Fail()
}
t.Log(result)
})
timeoutMilliseconds
变量将是一个包变量,我们将在稍后的实现过程中定义它。
我们需要定义一个名为timeoutMilliseconds
的包变量。让我们从这里开始:
package barrier
import (
"fmt"
"io/ioutil"
"net/http"
"time"
)
var timeoutMilliseconds int = 5000
初始超时延迟为 5 秒(5000 毫秒),我们需要在代码中包含这些包。
好的,我们需要一个函数为每个端点 URL 启动一个 Goroutine。你还记得我们是如何实现 Goroutines 之间的沟通的吗?没错,频道!因此,我们需要一个处理响应的通道和一个处理错误的通道。
但我们可以把它简化一点。我们将收到两个正确的答复、两个错误或一个答复和一个错误;在任何情况下,总是有两个响应,因此我们可以将错误和响应合并为一种类型:
type barrierResp struct {
Err error
Resp string
}
因此,每个 Goroutine 都将返回一个barrierResp
类型的值。该值将有一个用于Err
的值或一个用于Resp
字段的值。
过程很简单:我们创建一个大小为 2 的通道,该通道将接收barrierResp
类型的响应,我们启动两个请求并等待两个响应,然后检查是否有任何错误:
func barrier(endpoints ...string) {
requestNumber := len(endpoints)
in := make(chan barrierResp, requestNumber)
defer close(in)
responses := make([]barrierResp, requestNumber)
for _, endpoint := range endpoints {
go makeRequest(in, endpoint)
}
var hasError bool
for i := 0; i < requestNumber; i++ {
resp := <-in
if resp.Err != nil {
fmt.Println("ERROR: ", resp.Err)
hasError = true
}
responses[i] = resp
}
if !hasError {
for _, resp := range responses {
fmt.Println(resp.Resp)
}
}
}
按照前面的描述,我们创建了一个名为in
的缓冲通道,使其成为传入端点的大小,并延迟了通道关闭。然后,我们用每个端点和响应通道启动了一个名为makeRequest
的函数。
现在我们将循环两次,每个端点一次。在循环中,我们阻止等待来自in
通道的数据的执行。如果我们发现一个错误,我们会像我们在测试中预期的那样,将其打印为前缀为单词error,并将hasErrorvar
设置为 true。在两个响应之后,如果我们没有发现任何错误(hasError== false
,我们将打印每个响应,并且通道将关闭。
我们仍然缺少makeRequest
功能:
func makeRequest(out chan<- barrierResp, url string) {
res := barrierResp{}
client := http.Client{
Timeout: time.Duration(time.Duration(timeoutMilliseconds) * time.Millisecond),
}
resp, err := client.Get(url)
if err != nil {
res.Err = err
out <- res
return
}
byt, err := ioutil.ReadAll(resp.Body)
if err != nil {
res.Err = err
out <- res
return
}
res.Resp = string(byt)
out <- res
}
makeRequest
函数是一个非常简单的函数,它接受将barrierResp
值输出到的通道和请求的 URL。我们创建一个http.Client
并将其超时字段设置为timeoutMilliseconds
包变量的值。这就是我们如何在in
功能测试之前更改超时延迟的方法。然后,我们只需进行GET
调用,获取响应,将其解析为字节片,然后通过out
通道发送。
我们通过填充一个名为barrierResp
类型的res
变量来完成这一切。如果我们在执行GET
请求或解析结果体时发现错误,我们填充res.Err
字段,将其发送到out
通道(其另一侧连接到原始 Goroutine),然后退出该函数(因此我们不会错误地通过out
通道发送两个值)。
是时候运行测试了。请记住,您需要 Internet 连接,否则前两个测试将失败。我们将首先尝试具有两个正确端点的测试:
go test -run=TestBarrier/Correct_endpoints -v .
=== RUN TestBarrier
=== RUN TestBarrier/Correct_endpoints
--- PASS: TestBarrier (0.54s)
--- PASS: TestBarrier/Correct_endpoints (0.54s)
barrier_test.go:20: {
"headers": {
"Accept-Encoding": "gzip",
"Host": "httpbin.org",
"User-Agent": "Go-http-client/1.1"
}
}
{
"User-Agent": "Go-http-client/1.1"
}
ok
完美的我们有一个带有键headers
的 JSON 响应,还有一个带有键User-Agent
的 JSON 响应。在我们的集成测试中,我们正在寻找存在的字符串User-Agent
和Accept-Encoding
,因此测试成功通过。
现在,我们将运行端点不正确的测试:
go test -run=TestBarrier/One_endpoint_incorrect -v .
=== RUN TestBarrier
=== RUN TestBarrier/One_endpoint_incorrect
--- PASS: TestBarrier (0.27s)
--- PASS: TestBarrier/One_endpoint_incorrect (0.27s)
barrier_test.go:31: ERROR: Get http://malformed-url: dial tcp: lookup malformed-url: no such host
ok
我们可以看到我们有一个错误,http://malformed-url
返回了一个没有这样的主机错误。对该 URL 的请求必须返回一个前缀为ERROR:
的文本,正如我们在验收标准中所述,这就是为什么该测试是正确的(我们没有假阳性)。
在测试中,理解“假阳性”和“假阴性”测试的概念非常重要。假阳性测试大致描述为在不应该通过的情况下通过条件的测试(结果:全部通过),而假阴性正好相反(结果:测试失败)。例如,我们可以测试在执行请求时是否返回字符串,但返回的字符串可能完全为空!这将导致假阴性,这是一种即使在我们检查故意不正确的行为(对http://malformed-url
的请求)时也不会失败的测试。
上一次测试将超时时间减少到 1 毫秒:
go test -run=TestBarrier/Very_short_timeout -v .
=== RUN TestBarrier
=== RUN TestBarrier/Very_short_timeout
--- PASS: TestBarrier (0.00s)
--- PASS: TestBarrier/Very_short_timeout (0.00s)
barrier_test.go:43: ERROR: Get http://httpbin.org/User-Agent: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
ERROR: Get http://httpbin.org/headers: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
ok
同样,测试成功通过,我们有两个超时错误。URL 是正确的,但我们在不到一毫秒的时间内没有响应,因此客户端返回了超时错误。
屏障模式以其可组合性打开了微服务编程的大门。正如你所能想象的,它可以被视为一种结构模式。
屏障模式不仅适用于发出网络请求;我们还可以使用它将一些任务拆分为多个 goroutine。例如,一个昂贵的操作可以分成几个较小的操作,这些操作分布在不同的 goroutine 中,以最大限度地提高并行性并获得更好的性能。
Future 设计模式(也称为Promise是一种实现异步编程并发结构的快速简便的方法。我们将利用 Go 的一流功能开发期货。
简言之,我们将在不同的 goroutine 中执行操作之前定义每个可能的行为。Node.js 使用这种方法,默认情况下提供事件驱动编程。这里的想法是实现一个开火并忘记的动作,处理所有可能的结果。
为了更好地理解它,我们可以讨论在执行顺利或失败时嵌入行为的类型。
在上图中,main
函数在新 Goroutine 中启动未来。它不会等待任何东西,也不会收到任何未来的进展。它真的会开火并忘记它。
有趣的是,我们可以在一个未来中启动一个新的未来,并在同一个 Goroutine(或新的 Goroutine)中嵌入我们想要的任意多个未来。这个想法是利用一个未来的结果来启动下一个。例如:
在这里,我们有着同样的未来。在这种情况下,如果Execute
函数返回了正确的结果,则执行Success
函数,并且只有在这种情况下,我们才执行一个新的 Goroutine,其中包含另一个未来(甚至没有 Goroutine)。
这是一种懒惰的编程,未来可能会无限期地调用自己,或者直到满足某些规则。其想法是提前定义行为,让未来解决可能的解决方案。
使用未来模式,我们可以启动许多新的 goroutine,每个 goroutine 都有一个动作和自己的处理程序。这使我们能够执行以下操作:
我们将开发一个非常简单的示例,试图了解未来是如何运作的。在本例中,我们将有一个返回字符串或错误的方法,但我们希望同时执行它。我们已经学会了这样做的方法。使用通道,我们可以启动一个新的 Goroutine 并处理来自通道的传入结果。
但是在这种情况下,我们必须处理结果(字符串或错误),我们不希望这样。相反,我们将定义在成功的情况下该做什么,以及在错误和开火的情况下该做什么,并忘记 Goroutine。
我们没有此任务的功能需求。相反,我们将对其提出技术要求:
因此,正如我们所提到的,我们将使用第一类函数来实现此行为,我们将需要三种特定类型的函数:
type SuccessFunc func(string)
:如果一切顺利,SuccessFunc
功能将被执行。它的字符串参数将是操作的结果,因此该函数将由我们的 Goroutine 调用。type FailFunc func(error)
:FailFunc
函数处理相反的结果,即当出现问题时,正如您所看到的,它将返回一个错误。type ExecuteStringFunc func() (string, error)
:最后,ExecuteStringFunc
函数是定义我们要执行的操作的类型。也许它会返回一个字符串或一个错误。如果这一切看起来令人困惑,不要担心;以后会更清楚。因此,我们创建future
对象,定义成功行为,定义失败行为,并传递一个ExecuteStringFunc
类型以执行。在实现文件中,我们需要一个新类型:
type MaybeString struct {}
我们还将在_test.go
文件中创建两个测试:
package future
import (
"errors"
"testing"
"sync"
)
func TestStringOrError_Execute(t *testing.T) {
future := &MaybeString{}
t.Run("Success result", func(t *testing.T) {
...
})
t.Run("Error result", func(t *testing.T) {
...
})
}
我们将通过链接来定义函数,正如您通常在 Node.js 中看到的那样。像这样的代码很紧凑,不太难遵循:
t.Run("Success result", func(t *testing.T) {
future.Success(func(s string) {
t.Log(s)
}).Fail(func(e error) {
t.Fail()
})
future.Execute(func() (string, error) {
return "Hello World!", nil
})
})
必须在MaybeString
结构中定义future.Success
函数,以接受一个SuccessFunc
函数,如果一切正常,该函数将被执行,并返回指向future
对象的相同指针(因此我们可以继续链接)。Fail
函数也必须在MaybeString
结构中定义,并且必须接受FailFunc
函数以稍后返回指针。我们在这两种情况下都返回指针,这样我们就可以定义Fail
和Success
,反之亦然。
最后,我们使用Execute
方法传递ExecuteStringFunc
类型(一个不接受任何内容并返回字符串或错误的函数)。在本例中,我们返回一个字符串和 nil,因此我们希望执行SuccessFunc
函数,并将结果记录到控制台。如果执行了 fail 函数,则测试失败,因为返回的 nil 错误不应执行FailFunc
函数。
但我们仍然缺少一些东西。我们说过该函数必须在不同的 Goroutine 中异步执行,因此我们必须以某种方式同步该测试,以便它不会很快完成。同样,我们可以使用频道或sync.WaitGroup
:
t.Run("Success result", func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
future.Success(func(s string) {
t.Log(s)
wg.Done()
}).Fail(func(e error) {
t.Fail()
wg.Done()
})
future.Execute(func() (string, error) {
return "Hello World!", nil
})
wg.Wait()
})
我们以前在上一个频道中见过 WaitGroup。此 WaitGroup 配置为等待一个信号(wg.Add(1)
。Success
和Fail
方法将触发WaitGroup
的Done()
方法,以允许执行继续并完成测试(这就是Wait()
方法在末尾的原因)。请记住,每个Done()
方法将从 WaitGroup 中减去一个,我们只添加了一个,因此我们的Wait()
方法将只阻塞,直到执行一个Done()
方法。
使用我们所知道的进行Success
结果单元测试的方法,通过将t.Fail()
方法调用从错误切换到成功,很容易进行失败的结果单元测试,这样,如果调用成功,测试就会失败:
t.Run("Failed result", func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
future.Success(func(s string) {
t.Fail()
wg.Done()
}).Fail(func(e error) {
t.Log(e.Error())
wg.Done()
})
future.Execute(func() (string, error) {
return "", errors.New("Error ocurred")
})
wg.Wait()
})
如果您使用的是像我这样的 IDE,那么您的Success
、Fail
和Execute
方法调用必须是红色的。这是因为我们在实现文件中缺少方法声明:
package future
type SuccessFunc func(string)
type FailFunc func(error)
type ExecuteStringFunc func() (string, error)
type MaybeString struct {
...
}
func (s *MaybeString) Success(f SuccessFunc) *MaybeString {
return nil
}
func (s *MaybeString) Fail(f FailFunc) *MaybeString {
return nil
}
func (s *MaybeString) Execute(f ExecuteStringFunc) {
...
}
我们的测试似乎准备好执行了。让我们试一下:
go test -v .
=== RUN TestStringOrError_Execute
=== RUN TestStringOrError_Execute/Success_result
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200780c0, 0x5122e9, 0x19, 0x51d750, 0xc420041d30)
/usr/lib/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200780c0)
/usr/lib/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200780c0, 0xc420041e20)
/usr/lib/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x51d758, 0x5931e0, 0x1, 0x1, 0x50feb4)
/usr/lib/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc420041ee8, 0xc420014550)
/usr/lib/go/src/testing/testing.go:743 +0x85
main.main()
go-design-patterns/future/_test/_testmain.go:54 +0xc6
...continue
好测试失败了,是的。。。但不是以可控的方式。为什么会这样?我们还没有任何实现,所以也没有执行Success
或Fail
函数。我们的 WaitGroup 一直在等待对Done()
方法的调用,该调用永远不会到达,因此它无法继续并完成测试。这就是所有 Goroutine 都睡着了的意思——死锁!。在我们的具体示例中,这意味着没有人会调用 Done(),所以我们死定了!。
多亏了 Go 编译器和运行时执行器,我们可以轻松地检测死锁。想象一下,如果 Go 运行时无法检测到死锁,我们将被困在一个空白屏幕中,而不知道发生了什么。
那么我们如何解决这个问题呢?好的,一个简单的方法是在等待一段时间后调用Done()
方法的超时。对于这段代码,可以安全地等待 1 秒,因为它没有执行长时间运行的操作。
我们将在test
文件中声明一个timeout
函数,该函数等待一秒钟,然后打印一条消息,将测试设置为失败,并通过调用其Done()
方法让 WaitGroup 继续:
func timeout(t *testing.T, wg *sync.WaitGroup) {
time.Sleep(time.Second)
t.Log("Timeout!")
t.Fail()
wg.Done()
}
每个子测试的最终外观与我们前面的"Success result"
示例相似:
t.Run("Success result", func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
//Timeout!
go timeout(t, wg)
// ...
})
让我们看看当我们再次执行测试时会发生什么:
go test -v .
=== RUN TestStringOrError_Execute
=== RUN TestStringOrError_Execute/Success_result
=== RUN TestStringOrError_Execute/Failed_result
--- FAIL: TestStringOrError_Execute (2.00s)
--- FAIL: TestStringOrError_Execute/Success_result (1.00s)
future_test.go:64: Timeout!
--- FAIL: TestStringOrError_Execute/Failed_result (1.00s)
future_test.go:64: Timeout!
FAIL
exit status 1
FAIL
我们的测试失败了,但还是有控制的。看看FAIL
行的末尾——注意所经过的时间是 1 秒,因为它是由超时触发的,正如我们在日志消息中看到的那样。
是时候开始实施了。
根据我们的测试,实现必须在MaybeString
类型内以链式方式取SuccessFunc
、取FailFunc
、取ExecuteStringFunc
函数,并根据ExecuteStringFunc
函数返回的结果异步启动ExecuteStringFunc
函数调用SuccessFunc
或FailFunc
函数。
链是通过在类型中存储函数并返回指向该类型的指针来实现的。当然,我们讨论的是之前声明的类型方法:
type MaybeString struct {
successFunc SuccessFunc
failFunc FailFunc
}
func (s *MaybeString) Success(f SuccessFunc) *MaybeString {
s.successFunc = f
return s
}
func (s *MaybeString) Fail(f FailFunc) *MaybeString {
s.failFunc = f
return s
}
我们需要两个字段来存储SuccessFunc
和FailFunc
函数,分别命名为successFunc
和failFunc
字段。这样,对Success
和Fail
方法的调用只需将它们的传入函数存储到我们的新字段中。它们只是将指针返回到特定MaybeString
值的设置器。这些类型方法使用指向MaybeString
结构的指针,所以不要忘记在func
声明之后在MaybeString
上加上“*
”。
Execute 采用ExecuteStringFunc
方法并异步执行。这对于 Goroutine 来说似乎很简单,对吧?
func (s *MaybeString) Execute(f ExecuteStringFunc) {
go func(s *MaybeString) {
str, err := f()
if err != nil {
s.failFunc(err)
} else {
s.successFunc(str)
}
}(s)
}
看起来很简单,因为它很简单!我们启动 Goroutine,它执行f
方法(一个ExecuteStringFunc
,并获取其结果——可能是一个字符串,也可能是一个错误。如果存在错误,我们在MaybeString
结构中调用字段failFunc
。如果不存在错误,我们调用successFunc
字段。我们使用 Goroutine 来委托函数执行和错误处理,因此 Goroutine 不必这样做。
现在让我们运行单元测试:
go test -v .
=== RUN TestStringOrError_Execute
=== RUN TestStringOrError_Execute/Success_result
=== RUN TestStringOrError_Execute/Failed_result
--- PASS: TestStringOrError_Execute (0.00s)
--- PASS: TestStringOrError_Execute/Success_result (0.00s)
future_test.go:21: Hello World!
--- PASS: TestStringOrError_Execute/Failed_result (0.00s)
future_test.go:49: Error ocurred
PASS
ok
伟大的看看执行时间现在是如何接近零的,所以我们的超时没有被执行(实际上,它们已经被执行了,但是测试已经完成,并且它们的结果已经被声明)。
更重要的是,现在我们可以使用MaybeString
类型异步执行任何类型的函数,这些函数不接受任何内容并返回字符串或错误。一个不接受任何东西的函数似乎有点无用,对吗?但是我们可以使用闭包将上下文引入这种类型的函数。
让我们编写一个setContext
函数,将字符串作为参数,并返回一个ExecuteStringFunc
方法,该方法返回前一个后缀为Closure!
的参数:
func setContext(msg string) ExecuteStringFunc {
msg = fmt.Sprintf("%d Closure!\n", msg)
return func() (string, error){
return msg, nil
}
}
因此,我们可以编写一个使用此闭包的新测试:
t.Run("Closure Success result", func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
//Timeout!
go timeout(t, &wg)
future.Success(func(s string) {
t.Log(s)
wg.Done()
}).Fail(func(e error) {
t.Fail()
wg.Done()
})
future.Execute(setContext("Hello"))
wg.Wait()
})
setContext
函数返回一个ExecuteStringFunc
方法,它可以直接传递给Execute
函数。我们使用我们知道将返回的任意文本调用setContext
函数。
让我们再次执行测试。现在一切都要顺利!
go test -v .
=== RUN TestStringOrError_Execute
=== RUN TestStringOrError_Execute/Success_result
=== RUN TestStringOrError_Execute/Failed_result
=== RUN TestStringOrError_Execute/Closure_Success_result
--- PASS: TestStringOrError_Execute (0.00s)
--- PASS: TestStringOrError_Execute/Success_result (0.00s)
future_test.go:21: Hello World!
--- PASS: TestStringOrError_Execute/Failed_result (0.00s)
future_test.go:49: Error ocurred
--- PASS: TestStringOrError_Execute/Closure_Success_result (0.00s)
future_test.go:69: Hello Closure!
PASS
ok
它也给了我们一个肯定。闭包测试显示了我们之前解释的行为。通过获取一条消息"Hello"
并附加其他内容("Closure!"
,我们可以更改想要返回的文本的上下文。现在,将其扩展到 HTTPGET
调用、数据库调用或任何您可以想象的东西。它只需返回字符串或错误即可结束。但是,请记住,setContext
函数中除了我们返回的匿名函数之外的所有内容都不是并发的,并且在调用 execute 之前将异步执行,因此我们必须尝试在匿名函数中放入尽可能多的逻辑。
我们已经看到了使用函数类型系统实现异步编程的好方法。但是,我们可以在没有函数的情况下通过设置带有Success
、Fail
和Execute
方法以及满足它们的类型的接口,并使用模板模式异步执行它们,就像我们在本章前面看到的那样。这取决于你!
我们将在本章中看到的第三个也是最后一个模式是管道模式。您将在您的并发结构中大量使用这种模式,并且我们也可以认为它是最有用的。
我们已经知道什么是管道。每次我们编写任何执行某种逻辑的函数时,我们都在编写一个管道:如果这个那么那个,或者其他什么。通过使用几个相互调用的函数,管道模式可以变得更加复杂。他们甚至可以在执行过程中循环。
Go 中的管道模式以类似的方式工作,但管道中的每一步都将以不同的 Goroutine 和通信方式进行,并且将使用通道进行同步。
在创建管道时,我们主要考虑以下好处:
然而,仅仅因为我们在不同的 goroutine 中分解了一个算法,并不一定意味着它将以最快的速度执行。我们一直在谈论 CPU,因此理想情况下,算法必须是 CPU 密集型的,以利用并发结构。创建 goroutine 和 channel 的开销会使算法变得更小。
我们将为我们的例子做一些数学运算。我们将生成一个数字列表,从 1 开始,以任意数字 N 结束。然后我们将取每个数字,将其幂为 2,并将所得数字相加,得到一个唯一的结果。所以,如果N=3,我们的列表将是[1,2,3]。将其加至 2 后,我们的列表变为[1,4,9]。如果我们对结果列表求和,结果值是 14。
从功能上讲,我们的管道模式需要提高到每个数字 2 的幂,然后将它们相加。它将分为一个数字生成器和两个操作,因此:
我们将只创建一个功能来管理一切。我们将调用这个函数LaunchPipeline
来简化事情。它将使用一个整数作为参数,它将是我们的 N 个数,即列表中的项数。实现文件中的声明如下所示:
package pipelines
func LaunchPipeline(amount int) int {
return 0
}
在我们的测试文件中,我们将使用切片创建一个测试表:
package pipelines
import "testing"
func TestLaunchPipeline(t *testing.T) {
tableTest := [][]int{
{3, 14},
{5, 55},
}
// ...
}
我们的表是整数类型的切片。在每个切片上,第一个整数表示列表大小,第二个位置表示列表中的项目。它实际上是一个矩阵。当通过 3 时,它必须返回 14。当通过 5 时,它必须返回 55。然后我们必须迭代该表,并将每个数组的第一个索引传递给LaunchPipeline
函数:
// ...
var res int
for _, test := range tableTest {
res = LaunchPipeline(test[0])
if res != test[1] {
t.Fatal()
}
t.Logf("%d == %d\n", res, test[1])
}
}
使用range
,我们得到矩阵中的每一行。每一行都包含在一个名为test
的临时变量中。test[0]
代表N
和test[1]
预期结果。我们将预期结果与LaunchPipeline
函数的返回值进行比较。如果它们不相同,则测试失败:
go test -v .
=== RUN TestLaunchPipeline
--- FAIL: TestLaunchPipeline (0.00s)
pipeline_test.go:15:
FAIL
exit status 1
FAIL
我们实现的关键是在不同的 Goroutine 中分离每个操作,并将它们与通道连接起来。LaunchPipeline
函数是协调所有这些功能的函数,如下图所示:
该操作包括三个步骤:生成一个数字列表,将其提高到 2 的幂,然后将结果数字相加。
此管道模式中的每个步骤将具有以下结构:
func functionName(in <-chan int) (<-chan int){
out := make(chan bool, 100)
go func(){
for v := range in {
// Do something with v and send it to channel out
}
close(out)
}()
return out
}
此函数表示一个常见步骤。让我们按照 Go 调度器执行它可能需要的相同顺序来解析它:
functionName
函数通常会接收一个通道,从(in <-chan int
中获取值。我们称之为in
函数,如单词 incoming。我们无法在此函数范围内通过它发送值;这就是为什么箭头指向关键字chan
的out
。functionName
函数返回一个通道(<-chan in
,函数调用方只能从该通道获取值(同样,由指向关键字chan
的out
的箭头表示)。这也意味着通过该通道的任何值都必须在函数的范围内生成。out
的通道,它将作为函数的返回(此列表中的点 2。out
频道。out
后,Goroutine 执行。它将从in
通道获取值,直到关闭。因此,此函数的调用方负责关闭此通道,否则 Goroutine 将永远不会结束!in
通道关闭时,for 循环结束,我们关闭out
通道。自上次发送以来,使用此通道的任何 Goroutine 都不会收到任何新值。唯一不完全适合这种方法的步骤是接收数字的第一步,该数字表示列表上的上限阈值,而不是传入值的通道。因此,如果我们为管道中的每个步骤编写此操作,最终的图表更像这样:
虽然想法完全相同,但现在我们可以看到,正是函数LaunchPipeline
将接收通道并将其发送回管道中的下一步。使用此图,我们可以按照箭头的编号清楚地看到管道创建的流程。实心箭头表示函数调用,虚线箭头表示通道。
让我们更仔细地看一下代码。
操作的第一步是列表生成。列表从1
开始,我们将收到一个表示较高阈值的整数。我们必须将列表中的每个数字传递到下一步:
func generator(max int) <-chan int {
outChInt := make(chan int, 100)
go func() {
for i := 1; i <= max; i++ {
outChInt <- i
}
close(outChInt)
}()
return outChInt
}
正如我们前面提到的,这是我们将在每个步骤中遵循的模式:创建一个通道,启动 Goroutine 通过通道发送数据,然后立即返回通道。这个 Goroutine 将从 1 迭代到 max 参数,这是列表的较高阈值,并通过通道发送每个数字。在发送每个数字后,通道关闭,这样就不能通过它发送更多的数据,但可以检索已缓冲的数据。
第二步将从第一步的通道(从参数中获取)获取每个传入的数字,并将其提升为 2 的幂。必须使用新通道将每个结果发送到第三步:
func power(in <-chan int) <-chan int {
out := make(chan int, 100)
go func() {
for v := range in {
out <- v * v
}
close(out)
}()
return out
}
我们再次使用相同的模式:创建一个通道并启动 Goroutine,同时返回创建的通道。
for-range
循环会无限期地从通道获取值,直到通道关闭。
第三步也是最后一步接收来自第二步的每个数字,并不断将它们添加到本地值,直到连接通道关闭:
func sum(in <-chan int) <-chan int {
out := make(chan int, 100)
go func() {
var sum int
for v := range in {
sum += v
}
out <- sum
close(out)
}()
return out
}
函数 sum 还将通道作为参数(从步骤 2返回的通道)。它还遵循创建频道、启动 Goroutine 和返回频道的相同模式。Goroutine 不断向名为sum
的变量添加值,直到in
通道关闭。当in
通道关闭时,sum 的值被发送到out
通道,并立即关闭。
最后,我们可以实现LaunchPipeline
功能:
func LaunchPipeline(amount int) int {
firstCh := generator(amount)
secondCh := power(firstCh)
thirdCh := sum(secondCh)
result := <-thirdCh
return result
}
函数generator
首先返回传递给功率函数的通道。power
函数返回传递给sum
函数的第二个通道。函数sum
最终返回将接收唯一值的第一个通道,即结果。现在让我们尝试测试一下:
go test -v .
=== RUN TestLaunchPipeline
--- PASS: TestLaunchPipeline (0.00s)
pipeline_test.go:18: 14 == 14
pipeline_test.go:18: 55 == 55
PASS
ok
令人惊叹的值得一提的是,LaunchPipeline
函数不需要分配每个通道,可以这样重写:
func LaunchPipeline(amount int) int {
return <-sum(power(generator(amount)))
}
generator
函数的结果直接传递给power
函数,power
的结果传递给sum
函数。
使用管道模式,我们可以以非常简单的方式创建真正复杂的并发工作流。在我们的例子中,我们创建了一个线性工作流,但它也可以有条件、池以及扇入和扇出行为。我们将在下一章中看到其中一些。
并发设计模式在难度上向前迈进了一步,需要一些时间才能掌握。作为并发程序员,我们最大的错误是从并行性的角度思考(我怎样才能使它并行?或者我怎样才能在新线程中运行它?),而不是从并发结构的角度思考。
纯函数(总是产生相同输出(给定相同输入)而不影响其范围外的任何内容的函数)有助于此设计。
并发编程需要不断的实践。一旦你理解了基本的原语,Go 就很容易了。图表可以帮助您了解可能的数据流,但了解所有数据流的最佳方式只是实践。
在下一章中,我们将看到如何使用管道工作人员池来完成一些工作,而不是使用唯一的管道。此外,我们还将学习如何在并发结构中创建发布/订阅模式,并了解在使用并发进行构建时,同一模式会有多大的不同。