Go 并发模式|屏障、未来和管道设计模式详解

现在我们已经熟悉了并发和并行的概念,并且已经了解了如何使用 Go 的并发原语来实现它们,我们可以看到一些关于并发工作和并行执行的模式。在本章中,我们将看到以下模式:

快速查看三种模式的描述。它们都描述了某种及时同步执行的逻辑。记住这一点非常重要,我们现在正在使用前面章节中看到的所有工具和模式开发并发结构。对于创造模式,我们要处理的是创造对象。在结构模式中,我们学习如何构建惯用结构,在行为模式中,我们主要使用算法进行管理。现在,通过并发模式,我们将主要管理具有多个的应用程序的定时执行和顺序执行。

我们将从障碍模式开始。它的目的很简单——设置一个屏障,以便在我们获得所有需要的结果之前,没有人通过,这在并发应用程序中非常常见。

说明

想象一下,我们有一个微服务应用程序,其中一个服务需要通过合并另外三个微服务的响应来组合其响应。这就是屏障模式可以帮助我们的地方。

我们的屏障模式可能是一个服务,它将阻止其响应,直到它与一个或多个不同 goroutine(或服务)返回的结果组合在一起。我们有什么样的原始生物具有阻塞性?嗯,我们可以使用锁,但在 Go 中使用无缓冲通道更为惯用。

目标

顾名思义,Barrier 模式试图停止执行,以便在准备完成之前不会完成。屏障模式的目标如下:

  • 使用来自一个或多个 goroutine 的数据组合类型的值。
  • 控制任何传入数据管道的正确性,以便不会返回不一致的数据。我们不想要部分填充的结果,因为其中一个管道返回了错误。

HTTP GET 聚合器

在我们的示例中,我们将在 microservices 应用程序中编写一个非常典型的情况,一个执行两个 HTTPGET调用的应用程序,并在控制台上打印的单个响应中加入它们。

我们的小应用程序必须在不同的 Goroutine 中执行每个请求,如果两个响应都正确,则在控制台上打印结果。如果其中任何一个返回错误,那么我们只打印错误。

设计必须是并行的,允许我们利用多核 CPU 并行调用:

An HTTP GET aggregator

在上图中,实线表示调用,虚线表示通道。气球是 Goroutine,因此我们有两个 Goroutine 是通过main函数启动的(也可以认为是 Goroutine)。这两个函数将使用它们在makeRequest调用中创建时收到的公共通道main函数通信。

验收标准

我们在此应用程序中的主要目标是获得两个不同呼叫的合并响应,因此我们可以这样描述我们的接受标准:

  • 在控制台上打印两次调用http://httpbin.org/headershttp://httpbin.org/User-AgentURL 的合并结果。这是两个公共端点,它们使用来自传入连接的数据进行响应。它们在测试中非常流行。你需要一个互联网连接来做这个练习。
  • 如果任何调用失败,它不能仅打印错误消息(如果两个调用都失败,则打印错误消息)。
  • 当两个调用都完成时,输出必须作为合成结果打印。这意味着我们不能先打印一个调用的结果,然后打印另一个调用的结果。

单元测试-集成

为并行设计编写单元测试或集成测试有时会很棘手,但这不会阻止我们编写令人敬畏的单元测试。我们将有一个barrier方法,它接受一组定义为string类型的端点。屏障将向每个端点发出GET请求,并在打印结果之前合成结果。在这种情况下,我们将编写三个集成测试来简化代码,这样就不需要生成模拟响应:

package barrier 

import ( 
    "bytes" 
    "io" 
    "os" 
    "strings" 
    "testing" 
) 

func TestBarrier(t *testing.T) { 
  t.Run("Correct endpoints", func(t *testing.T) { 
    endpoints := []string{"http://httpbin.org/headers",  "http://httpbin.org/User-Agent"
    } 
  }) 

  t.Run("One endpoint incorrect", func(t *testing.T) { 
    endpoints := []string{"http://malformed-url",  "http://httpbin.org/User-Agent"} 
  }) 

  t.Run("Very short timeout", func(t *testing.T) { 
    endpoints := []string{"http://httpbin.org/headers",  "http://httpbin.org/User-Agent"} 
  }) 
} 

我们有一个单独的测试,将执行三个子测试:

  • 第一个测试对正确的端点进行两次调用
  • 第二个测试的端点不正确,因此必须返回错误
  • 最后一个测试将返回最大超时时间,以便我们可以强制超时错误

我们将有一个名为barrier的函数,它将以字符串的形式接受数量不确定的端点。它的签名可以是这样的:

func barrier(endpoints ...string) {} 

如您所见,barrier函数不返回任何值,因为其结果将打印在控制台上。在此之前,我们已经编写了一个io.Writer接口的实现,以模拟操作系统的stdout库上的写入。为了稍微改变一下,我们将捕获stdout库,而不是模拟它。一旦您了解了 Go 中的并发原语,捕获stdout库的过程就不难了:

func captureBarrierOutput(endpoints ...string) string { 
    reader, writer, _ := os.Pipe() 

    os.Stdout = writer 
    out := make(chan string) 

    go func() { 
      var buf bytes.Buffer 
      io.Copy(&buf, reader) 
      out <- buf.String() 
    }() 

    barrier(endpoints...) 

    writer.Close() 
    temp := <-out 

    return temp 
} 

不要被这个代码吓倒;这真的很简单。首先,我们创建了一个管道;我们之前在第 3 章结构模式——适配器、桥接器和复合设计模式中讨论过适配器设计模式。回想一下,管道允许我们将io.Writer接口连接到io.Reader接口,这样读卡器输入就是Writer输出。我们将os.Stdout定义为作者。然后,为了捕获stdout输出,我们将需要一个不同的 Goroutine,在我们向控制台写入时进行侦听。正如你所知,如果我们写作,我们就不会捕捉,如果我们捕捉,我们就不会写作。这里的关键词是while;这是一个很好的经验法则,如果你在某个定义中找到这个词,你可能需要一个并发结构。因此,我们使用go关键字启动一个不同的 Goroutine,它将读取器输入复制到字节缓冲区,然后再通过通道(我们之前应该创建)发送缓冲区的内容。

此时,我们有一个监听 Goroutine,但还没有打印任何内容,因此我们使用提供的端点调用(尚未编写)函数barrier。接下来,我们必须关闭 writer,以向 Goroutine 发出信号,表示不再有输入。我们调用的通道阻塞执行,直到收到某个值(由启动的 Goroutine 发送的值)。最后一步是返回从控制台捕获的内容。

好的,我们有一个名为captureBarrierOutput的函数,它将捕获stdout中的输出,并将它们作为字符串返回。我们现在可以编写测试了:

t.Run("Correct endpoints", func(t *testing.T) { 
    endpoints := []string{"http://httpbin.org/headers", "http://httpbin.org/User-Agent"
    } 

 result := captureBarrierOutput(endpoints...)
 if !strings.Contains(result, "Accept-Encoding") || strings.Contains (result, "User-Agent") 
  {
 t.Fail()
 }
 t.Log(result) 
}) 

所有的测试都很容易实现。总之,captureBarrierOutput函数调用barrier函数。因此,我们传递端点并检查返回的结果。我们对的冷静回应 http://httpbin.org 必须在每个端点的响应中包含文本接受编码用户代理。如果我们找不到那些文本,测试就会失败。出于调试目的,我们记录响应,以防我们想在 go 测试中使用-v标志进行检查:

t.Run("One endpoint incorrect", func(t *testing.T) { 
  endpoints := []string
  {
    "http://malformed-url", "http://httpbin.org/User-Agent"} 

 result := captureBarrierOutput(endpoints...)
 if !strings.Contains(result, "ERROR") {
 t.Fail()
 }
 t.Log(result) 
}) 

这次我们使用了一个不正确的端点 URL,因此响应必须返回以单词error为前缀的错误,我们将在barrier函数中写入该错误。

最后一个函数将 HTTPGET客户端的超时时间减少到最少 1 毫秒,因此我们强制超时:

t.Run("Very short timeout", func(t *testing.T) { 
  endpoints := []string
  {
    "http://httpbin.org/headers", "http://httpbin.org/User-Agent"} 
 timeoutMilliseconds = 1
 result := captureBarrierOutput(endpoints...)
 if !strings.Contains(result, "Timeout") {
 t.Fail()
 }
 t.Log(result) 
  }) 

timeoutMilliseconds变量将是一个包变量,我们将在稍后的实现过程中定义它。

实施

我们需要定义一个名为timeoutMilliseconds的包变量。让我们从这里开始:

package barrier 

import ( 
    "fmt" 
    "io/ioutil" 
    "net/http" 
    "time" 
) 

var timeoutMilliseconds int = 5000 

初始超时延迟为 5 秒(5000 毫秒),我们需要在代码中包含这些包。

好的,我们需要一个函数为每个端点 URL 启动一个 Goroutine。你还记得我们是如何实现 Goroutines 之间的沟通的吗?没错,频道!因此,我们需要一个处理响应的通道和一个处理错误的通道。

但我们可以把它简化一点。我们将收到两个正确的答复、两个错误或一个答复和一个错误;在任何情况下,总是有两个响应,因此我们可以将错误和响应合并为一种类型:

type barrierResp struct { 
    Err  error 
    Resp string 
} 

因此,每个 Goroutine 都将返回一个barrierResp类型的值。该值将有一个用于Err的值或一个用于Resp字段的值。

过程很简单:我们创建一个大小为 2 的通道,该通道将接收barrierResp类型的响应,我们启动两个请求并等待两个响应,然后检查是否有任何错误:

func barrier(endpoints ...string) { 
    requestNumber := len(endpoints) 

    in := make(chan barrierResp, requestNumber) 
    defer close(in) 

    responses := make([]barrierResp, requestNumber) 

    for _, endpoint := range endpoints { 
        go makeRequest(in, endpoint) 
    } 

    var hasError bool 
    for i := 0; i < requestNumber; i++ { 
        resp := <-in 
        if resp.Err != nil { 
            fmt.Println("ERROR: ", resp.Err) 
            hasError = true 
        } 
        responses[i] = resp 
    } 

    if !hasError { 
        for _, resp := range responses { 
            fmt.Println(resp.Resp) 
        } 
    } 
} 

按照前面的描述,我们创建了一个名为in的缓冲通道,使其成为传入端点的大小,并延迟了通道关闭。然后,我们用每个端点和响应通道启动了一个名为makeRequest的函数。

现在我们将循环两次,每个端点一次。在循环中,我们阻止等待来自in通道的数据的执行。如果我们发现一个错误,我们会像我们在测试中预期的那样,将其打印为前缀为单词error,并将hasErrorvar设置为 true。在两个响应之后,如果我们没有发现任何错误(hasError== false,我们将打印每个响应,并且通道将关闭。

我们仍然缺少makeRequest功能:

func makeRequest(out chan<- barrierResp, url string) { 
    res := barrierResp{} 
    client := http.Client{ 
        Timeout: time.Duration(time.Duration(timeoutMilliseconds) * time.Millisecond), 
    } 

    resp, err := client.Get(url) 
    if err != nil { 
        res.Err = err 
        out <- res 
        return 
    } 

    byt, err := ioutil.ReadAll(resp.Body) 
    if err != nil { 
        res.Err = err 
        out <- res 
        return 
    } 

    res.Resp = string(byt) 
    out <- res 
} 

makeRequest函数是一个非常简单的函数,它接受将barrierResp值输出到的通道和请求的 URL。我们创建一个http.Client并将其超时字段设置为timeoutMilliseconds包变量的值。这就是我们如何在in功能测试之前更改超时延迟的方法。然后,我们只需进行GET调用,获取响应,将其解析为字节片,然后通过out通道发送。

我们通过填充一个名为barrierResp类型的res变量来完成这一切。如果我们在执行GET请求或解析结果体时发现错误,我们填充res.Err字段,将其发送到out通道(其另一侧连接到原始 Goroutine),然后退出该函数(因此我们不会错误地通过out通道发送两个值)。

是时候运行测试了。请记住,您需要 Internet 连接,否则前两个测试将失败。我们将首先尝试具有两个正确端点的测试:

go test -run=TestBarrier/Correct_endpoints -v .
=== RUN   TestBarrier
=== RUN   TestBarrier/Correct_endpoints
--- PASS: TestBarrier (0.54s)
 --- PASS: TestBarrier/Correct_endpoints (0.54s)
 barrier_test.go:20: {
 "headers": {
 "Accept-Encoding": "gzip", 
"Host": "httpbin.org",
"User-Agent": "Go-http-client/1.1"
 }
 }
 {
 "User-Agent": "Go-http-client/1.1"
 } 
 ok

完美的我们有一个带有键headers的 JSON 响应,还有一个带有键User-Agent的 JSON 响应。在我们的集成测试中,我们正在寻找存在的字符串User-AgentAccept-Encoding,因此测试成功通过。

现在,我们将运行端点不正确的测试:

go test -run=TestBarrier/One_endpoint_incorrect -v .
=== RUN   TestBarrier
=== RUN   TestBarrier/One_endpoint_incorrect
--- PASS: TestBarrier (0.27s)
 --- PASS: TestBarrier/One_endpoint_incorrect (0.27s)
 barrier_test.go:31: ERROR:  Get http://malformed-url: dial tcp: lookup malformed-url: no such host
ok

我们可以看到我们有一个错误,http://malformed-url返回了一个没有这样的主机错误。对该 URL 的请求必须返回一个前缀为ERROR:的文本,正如我们在验收标准中所述,这就是为什么该测试是正确的(我们没有假阳性)。

在测试中,理解“假阳性”和“假阴性”测试的概念非常重要。假阳性测试大致描述为在不应该通过的情况下通过条件的测试(结果:全部通过),而假阴性正好相反(结果:测试失败)。例如,我们可以测试在执行请求时是否返回字符串,但返回的字符串可能完全为空!这将导致假阴性,这是一种即使在我们检查故意不正确的行为(对http://malformed-url的请求)时也不会失败的测试。

上一次测试将超时时间减少到 1 毫秒:

go test -run=TestBarrier/Very_short_timeout -v .     
=== RUN   TestBarrier 
=== RUN   TestBarrier/Very_short_timeout 
--- PASS: TestBarrier (0.00s) 
    --- PASS: TestBarrier/Very_short_timeout (0.00s) 
        barrier_test.go:43: ERROR:  Get http://httpbin.org/User-Agent: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers) 
        ERROR:  Get http://httpbin.org/headers: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers) 

ok

同样,测试成功通过,我们有两个超时错误。URL 是正确的,但我们在不到一毫秒的时间内没有响应,因此客户端返回了超时错误。

等待屏障设计模式的响应

屏障模式以其可组合性打开了微服务编程的大门。正如你所能想象的,它可以被视为一种结构模式。

屏障模式不仅适用于发出网络请求;我们还可以使用它将一些任务拆分为多个 goroutine。例如,一个昂贵的操作可以分成几个较小的操作,这些操作分布在不同的 goroutine 中,以最大限度地提高并行性并获得更好的性能。

Future 设计模式(也称为Promise是一种实现异步编程并发结构的快速简便的方法。我们将利用 Go 的一流功能开发期货

说明

简言之,我们将在不同的 goroutine 中执行操作之前定义每个可能的行为。Node.js 使用这种方法,默认情况下提供事件驱动编程。这里的想法是实现一个开火并忘记的动作,处理所有可能的结果。

为了更好地理解它,我们可以讨论在执行顺利或失败时嵌入行为的类型。

Description

在上图中,main函数在新 Goroutine 中启动未来。它不会等待任何东西,也不会收到任何未来的进展。它真的会开火并忘记它。

有趣的是,我们可以在一个未来中启动一个新的未来,并在同一个 Goroutine(或新的 Goroutine)中嵌入我们想要的任意多个未来。这个想法是利用一个未来的结果来启动下一个。例如:

Description

在这里,我们有着同样的未来。在这种情况下,如果Execute函数返回了正确的结果,则执行Success函数,并且只有在这种情况下,我们才执行一个新的 Goroutine,其中包含另一个未来(甚至没有 Goroutine)。

这是一种懒惰的编程,未来可能会无限期地调用自己,或者直到满足某些规则。其想法是提前定义行为,让未来解决可能的解决方案。

目标

使用未来模式,我们可以启动许多新的 goroutine,每个 goroutine 都有一个动作和自己的处理程序。这使我们能够执行以下操作:

  • 将操作处理程序委托给其他 Goroutine
  • 在它们之间堆叠许多异步调用(在结果中调用另一个异步调用的异步调用)

一个简单的异步请求程序

我们将开发一个非常简单的示例,试图了解未来是如何运作的。在本例中,我们将有一个返回字符串或错误的方法,但我们希望同时执行它。我们已经学会了这样做的方法。使用通道,我们可以启动一个新的 Goroutine 并处理来自通道的传入结果。

但是在这种情况下,我们必须处理结果(字符串或错误),我们不希望这样。相反,我们将定义在成功的情况下该做什么,以及在错误和开火的情况下该做什么,并忘记 Goroutine。

验收标准

我们没有此任务的功能需求。相反,我们将对其提出技术要求:

  • 将函数执行委托给其他 Goroutine
  • 函数将返回字符串(可能)或错误
  • 必须在执行函数之前定义处理程序
  • 设计必须是可重用的

单元测试

因此,正如我们所提到的,我们将使用第一类函数来实现此行为,我们将需要三种特定类型的函数:

  • type SuccessFunc func(string):如果一切顺利,SuccessFunc功能将被执行。它的字符串参数将是操作的结果,因此该函数将由我们的 Goroutine 调用。
  • type FailFunc func(error)FailFunc函数处理相反的结果,即当出现问题时,正如您所看到的,它将返回一个错误。
  • type ExecuteStringFunc func() (string, error):最后,ExecuteStringFunc函数是定义我们要执行的操作的类型。也许它会返回一个字符串或一个错误。如果这一切看起来令人困惑,不要担心;以后会更清楚。

因此,我们创建future对象,定义成功行为,定义失败行为,并传递一个ExecuteStringFunc类型以执行。在实现文件中,我们需要一个新类型:

type MaybeString struct {} 

我们还将在_test.go文件中创建两个测试:

package future 

import ( 
  "errors" 
  "testing" 
  "sync" 
) 

func TestStringOrError_Execute(t *testing.T) { 
  future := &MaybeString{} 
  t.Run("Success result", func(t *testing.T) { 
    ... 
  }) 
  t.Run("Error result", func(t *testing.T) { 
  ... 
  }) 
} 

我们将通过链接来定义函数,正如您通常在 Node.js 中看到的那样。像这样的代码很紧凑,不太难遵循:

t.Run("Success result", func(t *testing.T) { 
 future.Success(func(s string) {
 t.Log(s)
 }).Fail(func(e error) {
 t.Fail()
 })
 future.Execute(func() (string, error) {
 return "Hello World!", nil
 }) 
}) 

必须在MaybeString结构中定义future.Success函数,以接受一个SuccessFunc函数,如果一切正常,该函数将被执行,并返回指向future对象的相同指针(因此我们可以继续链接)。Fail函数也必须在MaybeString结构中定义,并且必须接受FailFunc函数以稍后返回指针。我们在这两种情况下都返回指针,这样我们就可以定义FailSuccess,反之亦然。

最后,我们使用Execute方法传递ExecuteStringFunc类型(一个不接受任何内容并返回字符串或错误的函数)。在本例中,我们返回一个字符串和 nil,因此我们希望执行SuccessFunc函数,并将结果记录到控制台。如果执行了 fail 函数,则测试失败,因为返回的 nil 错误不应执行FailFunc函数。

但我们仍然缺少一些东西。我们说过该函数必须在不同的 Goroutine 中异步执行,因此我们必须以某种方式同步该测试,以便它不会很快完成。同样,我们可以使用频道或sync.WaitGroup

t.Run("Success result", func(t *testing.T) { 
 var wg sync.WaitGroup
 wg.Add(1) 
    future.Success(func(s string) { 
      t.Log(s) 

 wg.Done() 
    }).Fail(func(e error) { 
      t.Fail() 

 wg.Done() 
    }) 

    future.Execute(func() (string, error) { 
      return "Hello World!", nil 
    }) 
 wg.Wait() 
  }) 

我们以前在上一个频道中见过 WaitGroup。此 WaitGroup 配置为等待一个信号(wg.Add(1)SuccessFail方法将触发WaitGroupDone()方法,以允许执行继续并完成测试(这就是Wait()方法在末尾的原因)。请记住,每个Done()方法将从 WaitGroup 中减去一个,我们只添加了一个,因此我们的Wait()方法将只阻塞,直到执行一个Done()方法。

使用我们所知道的进行Success结果单元测试的方法,通过将t.Fail()方法调用从错误切换到成功,很容易进行失败的结果单元测试,这样,如果调用成功,测试就会失败:

t.Run("Failed result", func(t *testing.T) { 
 var wg sync.WaitGroup
 wg.Add(1)
 future.Success(func(s string) {
 t.Fail()
 wg.Done()
 }).Fail(func(e error) {
 t.Log(e.Error())
 wg.Done()
 })
 future.Execute(func() (string, error) {
 return "", errors.New("Error ocurred")
 })
 wg.Wait() 
}) 

如果您使用的是像我这样的 IDE,那么您的SuccessFailExecute方法调用必须是红色的。这是因为我们在实现文件中缺少方法声明:

package future 

type SuccessFunc func(string) 
type FailFunc func(error) 
type ExecuteStringFunc func() (string, error) 

type MaybeString struct { 
  ... 
} 

func (s *MaybeString) Success(f SuccessFunc) *MaybeString { 
  return nil 
} 

func (s *MaybeString) Fail(f FailFunc) *MaybeString { 
  return nil 
} 

func (s *MaybeString) Execute(f ExecuteStringFunc) { 
  ... 
} 

我们的测试似乎准备好执行了。让我们试一下:

go test -v .
=== RUN   TestStringOrError_Execute
=== RUN   TestStringOrError_Execute/Success_result
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200780c0, 0x5122e9, 0x19, 0x51d750, 0xc420041d30)
 /usr/lib/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200780c0)
 /usr/lib/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200780c0, 0xc420041e20)
 /usr/lib/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x51d758, 0x5931e0, 0x1, 0x1, 0x50feb4)
 /usr/lib/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc420041ee8, 0xc420014550)
 /usr/lib/go/src/testing/testing.go:743 +0x85
main.main()
 go-design-patterns/future/_test/_testmain.go:54 +0xc6
...continue

好测试失败了,是的。。。但不是以可控的方式。为什么会这样?我们还没有任何实现,所以也没有执行SuccessFail函数。我们的 WaitGroup 一直在等待对Done()方法的调用,该调用永远不会到达,因此它无法继续并完成测试。这就是所有 Goroutine 都睡着了的意思——死锁!。在我们的具体示例中,这意味着没有人会调用 Done(),所以我们死定了!

多亏了 Go 编译器和运行时执行器,我们可以轻松地检测死锁。想象一下,如果 Go 运行时无法检测到死锁,我们将被困在一个空白屏幕中,而不知道发生了什么。

那么我们如何解决这个问题呢?好的,一个简单的方法是在等待一段时间后调用Done()方法的超时。对于这段代码,可以安全地等待 1 秒,因为它没有执行长时间运行的操作。

我们将在test文件中声明一个timeout函数,该函数等待一秒钟,然后打印一条消息,将测试设置为失败,并通过调用其Done()方法让 WaitGroup 继续:

func timeout(t *testing.T, wg *sync.WaitGroup) { 
  time.Sleep(time.Second) 
  t.Log("Timeout!") 

  t.Fail() 
  wg.Done() 
} 

每个子测试的最终外观与我们前面的"Success result"示例相似:

t.Run("Success result", func(t *testing.T) { 
  var wg sync.WaitGroup 
  wg.Add(1) 

  //Timeout! 
  go timeout(t, wg) 
  // ... 
}) 

让我们看看当我们再次执行测试时会发生什么:

go test -v .
=== RUN   TestStringOrError_Execute
=== RUN   TestStringOrError_Execute/Success_result
=== RUN   TestStringOrError_Execute/Failed_result
--- FAIL: TestStringOrError_Execute (2.00s)
 --- FAIL: TestStringOrError_Execute/Success_result (1.00s)
 future_test.go:64: Timeout!
 --- FAIL: TestStringOrError_Execute/Failed_result (1.00s)
 future_test.go:64: Timeout!
FAIL
exit status 1
FAIL

我们的测试失败了,但还是有控制的。看看FAIL行的末尾——注意所经过的时间是 1 秒,因为它是由超时触发的,正如我们在日志消息中看到的那样。

是时候开始实施了。

实施

根据我们的测试,实现必须在MaybeString类型内以链式方式取SuccessFunc、取FailFunc、取ExecuteStringFunc函数,并根据ExecuteStringFunc函数返回的结果异步启动ExecuteStringFunc函数调用SuccessFuncFailFunc函数。

链是通过在类型中存储函数并返回指向该类型的指针来实现的。当然,我们讨论的是之前声明的类型方法:

type MaybeString struct { 
  successFunc SuccessFunc 
  failFunc    FailFunc 
} 

func (s *MaybeString) Success(f SuccessFunc) *MaybeString { 
  s.successFunc = f 
  return s 
} 

func (s *MaybeString) Fail(f FailFunc) *MaybeString { 
  s.failFunc = f 
  return s 
} 

我们需要两个字段来存储SuccessFuncFailFunc函数,分别命名为successFuncfailFunc字段。这样,对SuccessFail方法的调用只需将它们的传入函数存储到我们的新字段中。它们只是将指针返回到特定MaybeString值的设置器。这些类型方法使用指向MaybeString结构的指针,所以不要忘记在func声明之后在MaybeString上加上“*”。

Execute 采用ExecuteStringFunc方法并异步执行。这对于 Goroutine 来说似乎很简单,对吧?

func (s *MaybeString) Execute(f ExecuteStringFunc) { 
  go func(s *MaybeString) { 
    str, err := f() 
    if err != nil { 
      s.failFunc(err) 
    } else { 
      s.successFunc(str) 
    } 
  }(s) 
} 

看起来很简单,因为它很简单!我们启动 Goroutine,它执行f方法(一个ExecuteStringFunc,并获取其结果——可能是一个字符串,也可能是一个错误。如果存在错误,我们在MaybeString结构中调用字段failFunc。如果不存在错误,我们调用successFunc字段。我们使用 Goroutine 来委托函数执行和错误处理,因此 Goroutine 不必这样做。

现在让我们运行单元测试:

go test -v .
=== RUN   TestStringOrError_Execute
=== RUN   TestStringOrError_Execute/Success_result
=== RUN   TestStringOrError_Execute/Failed_result
--- PASS: TestStringOrError_Execute (0.00s)
 --- PASS: TestStringOrError_Execute/Success_result (0.00s)
 future_test.go:21: Hello World!
 --- PASS: TestStringOrError_Execute/Failed_result (0.00s)
 future_test.go:49: Error ocurred
PASS
ok 

伟大的看看执行时间现在是如何接近零的,所以我们的超时没有被执行(实际上,它们已经被执行了,但是测试已经完成,并且它们的结果已经被声明)。

更重要的是,现在我们可以使用MaybeString类型异步执行任何类型的函数,这些函数不接受任何内容并返回字符串或错误。一个不接受任何东西的函数似乎有点无用,对吗?但是我们可以使用闭包将上下文引入这种类型的函数。

让我们编写一个setContext函数,将字符串作为参数,并返回一个ExecuteStringFunc方法,该方法返回前一个后缀为Closure!的参数:

func setContext(msg string) ExecuteStringFunc { 
  msg = fmt.Sprintf("%d Closure!\n", msg) 

  return func() (string, error){ 
    return msg, nil 
  } 
} 

因此,我们可以编写一个使用此闭包的新测试:

t.Run("Closure Success result", func(t *testing.T) { 
    var wg sync.WaitGroup 
    wg.Add(1) 
    //Timeout! 
    go timeout(t, &wg) 

    future.Success(func(s string) { 
      t.Log(s) 
      wg.Done() 
    }).Fail(func(e error) { 
      t.Fail() 
      wg.Done() 
    }) 
    future.Execute(setContext("Hello")) 
    wg.Wait() 
  }) 

setContext函数返回一个ExecuteStringFunc方法,它可以直接传递给Execute函数。我们使用我们知道将返回的任意文本调用setContext函数。

让我们再次执行测试。现在一切都要顺利!

go test -v .
=== RUN   TestStringOrError_Execute
=== RUN   TestStringOrError_Execute/Success_result
=== RUN   TestStringOrError_Execute/Failed_result
=== RUN   TestStringOrError_Execute/Closure_Success_result
--- PASS: TestStringOrError_Execute (0.00s)
 --- PASS: TestStringOrError_Execute/Success_result (0.00s)
 future_test.go:21: Hello World!
 --- PASS: TestStringOrError_Execute/Failed_result (0.00s)
 future_test.go:49: Error ocurred
 --- PASS: TestStringOrError_Execute/Closure_Success_result (0.00s)
 future_test.go:69: Hello Closure!
PASS
ok

它也给了我们一个肯定。闭包测试显示了我们之前解释的行为。通过获取一条消息"Hello"并附加其他内容("Closure!",我们可以更改想要返回的文本的上下文。现在,将其扩展到 HTTPGET调用、数据库调用或任何您可以想象的东西。它只需返回字符串或错误即可结束。但是,请记住,setContext函数中除了我们返回的匿名函数之外的所有内容都不是并发的,并且在调用 execute 之前将异步执行,因此我们必须尝试在匿名函数中放入尽可能多的逻辑。

拼凑未来

我们已经看到了使用函数类型系统实现异步编程的好方法。但是,我们可以在没有函数的情况下通过设置带有SuccessFailExecute方法以及满足它们的类型的接口,并使用模板模式异步执行它们,就像我们在本章前面看到的那样。这取决于你!

我们将在本章中看到的第三个也是最后一个模式是管道模式。您将在您的并发结构中大量使用这种模式,并且我们也可以认为它是最有用的。

说明

我们已经知道什么是管道。每次我们编写任何执行某种逻辑的函数时,我们都在编写一个管道:如果这个那么那个,或者其他什么。通过使用几个相互调用的函数,管道模式可以变得更加复杂。他们甚至可以在执行过程中循环。

Go 中的管道模式以类似的方式工作,但管道中的每一步都将以不同的 Goroutine 和通信方式进行,并且将使用通道进行同步。

目标

在创建管道时,我们主要考虑以下好处:

  • 我们可以创建一个多步骤算法的并发结构
  • 通过将算法分解为不同的 goroutine,我们可以利用多核机器的并行性

然而,仅仅因为我们在不同的 goroutine 中分解了一个算法,并不一定意味着它将以最快的速度执行。我们一直在谈论 CPU,因此理想情况下,算法必须是 CPU 密集型的,以利用并发结构。创建 goroutine 和 channel 的开销会使算法变得更小。

并发多操作

我们将为我们的例子做一些数学运算。我们将生成一个数字列表,从 1 开始,以任意数字 N 结束。然后我们将取每个数字,将其幂为 2,并将所得数字相加,得到一个唯一的结果。所以,如果N=3,我们的列表将是[1,2,3]。将其加至 2 后,我们的列表变为[1,4,9]。如果我们对结果列表求和,结果值是 14。

验收标准

从功能上讲,我们的管道模式需要提高到每个数字 2 的幂,然后将它们相加。它将分为一个数字生成器和两个操作,因此:

  1. 生成一个从 1 到 N 的列表,其中 N 可以是任意整数。
  2. 将生成的列表中的每个数字提高到 2 的幂。
  3. 将每个结果数字求和为最终结果并返回。

从测试开始

我们将只创建一个功能来管理一切。我们将调用这个函数LaunchPipeline来简化事情。它将使用一个整数作为参数,它将是我们的 N 个数,即列表中的项数。实现文件中的声明如下所示:

package pipelines 

func LaunchPipeline(amount int) int { 
  return 0 
} 

在我们的测试文件中,我们将使用切片创建一个测试表:

package pipelines 

import "testing" 

func TestLaunchPipeline(t *testing.T) { 
  tableTest := [][]int{ 
    {3, 14}, 
    {5, 55}, 
  } 
  // ... 
} 

我们的表是整数类型的切片。在每个切片上,第一个整数表示列表大小,第二个位置表示列表中的项目。它实际上是一个矩阵。当通过 3 时,它必须返回 14。当通过 5 时,它必须返回 55。然后我们必须迭代该表,并将每个数组的第一个索引传递给LaunchPipeline函数:

  // ... 

  var res int 
  for _, test := range tableTest { 
    res = LaunchPipeline(test[0]) 
    if res != test[1] { 
      t.Fatal() 
    } 

    t.Logf("%d == %d\n", res, test[1]) 
  } 
} 

使用range,我们得到矩阵中的每一行。每一行都包含在一个名为test的临时变量中。test[0]代表Ntest[1]预期结果。我们将预期结果与LaunchPipeline函数的返回值进行比较。如果它们不相同,则测试失败:

go test -v .
=== RUN   TestLaunchPipeline
--- FAIL: TestLaunchPipeline (0.00s)
 pipeline_test.go:15: 
FAIL
exit status 1
FAIL

实施

我们实现的关键是在不同的 Goroutine 中分离每个操作,并将它们与通道连接起来。LaunchPipeline函数是协调所有这些功能的函数,如下图所示:

Implementation

该操作包括三个步骤:生成一个数字列表,将其提高到 2 的幂,然后将结果数字相加。

此管道模式中的每个步骤将具有以下结构:

func functionName(in <-chan int) (<-chan int){ 
  out := make(chan bool, 100) 

  go func(){ 
    for v := range in { 
      // Do something with v and send it to channel out 
} 

close(out) 
   }() 

  return out 
} 

此函数表示一个常见步骤。让我们按照 Go 调度器执行它可能需要的相同顺序来解析它:

  1. functionName函数通常会接收一个通道,从(in <-chan int中获取值。我们称之为in函数,如单词 incoming。我们无法在此函数范围内通过它发送值;这就是为什么箭头指向关键字chanout
  2. functionName函数返回一个通道(<-chan in,函数调用方只能从该通道获取值(同样,由指向关键字chanout的箭头表示)。这也意味着通过该通道的任何值都必须在函数的范围内生成。
  3. 在函数的第一行中,我们创建了一个名为out的通道,它将作为函数的返回(此列表中的点 2
  4. 然后,我们将推出一个新的 Goroutine。它的作用域将在返回此函数后生效,所以让我们继续。
  5. 我们返回先前创建的out频道。
  6. 最终,在完成函数的执行并返回通道out后,Goroutine 执行。它将从in通道获取值,直到关闭。因此,此函数的调用方负责关闭此通道,否则 Goroutine 将永远不会结束!
  7. in通道关闭时,for 循环结束,我们关闭out通道。自上次发送以来,使用此通道的任何 Goroutine 都不会收到任何新值。

唯一不完全适合这种方法的步骤是接收数字的第一步,该数字表示列表上的上限阈值,而不是传入值的通道。因此,如果我们为管道中的每个步骤编写此操作,最终的图表更像这样:

Implementation

虽然想法完全相同,但现在我们可以看到,正是函数LaunchPipeline将接收通道并将其发送回管道中的下一步。使用此图,我们可以按照箭头的编号清楚地看到管道创建的流程。实心箭头表示函数调用,虚线箭头表示通道。

让我们更仔细地看一下代码。

列表生成器

操作的第一步是列表生成。列表从1开始,我们将收到一个表示较高阈值的整数。我们必须将列表中的每个数字传递到下一步:

func generator(max int) <-chan int { 
  outChInt := make(chan int, 100) 

  go func() { 
    for i := 1; i <= max; i++ { 
      outChInt <- i 
    } 

    close(outChInt) 
  }() 
  return outChInt 
} 

正如我们前面提到的,这是我们将在每个步骤中遵循的模式:创建一个通道,启动 Goroutine 通过通道发送数据,然后立即返回通道。这个 Goroutine 将从 1 迭代到 max 参数,这是列表的较高阈值,并通过通道发送每个数字。在发送每个数字后,通道关闭,这样就不能通过它发送更多的数据,但可以检索已缓冲的数据。

将数字提高到 2 的幂

第二步将从第一步的通道(从参数中获取)获取每个传入的数字,并将其提升为 2 的幂。必须使用新通道将每个结果发送到第三步:

func power(in <-chan int) <-chan int { 
  out := make(chan int, 100) 

  go func() { 
    for v := range in { 
      out <- v * v 
    } 
    close(out) 
  }() 
  return out 
} 

我们再次使用相同的模式:创建一个通道并启动 Goroutine,同时返回创建的通道。

for-range循环会无限期地从通道获取值,直到通道关闭。

最终还原操作

第三步也是最后一步接收来自第二步的每个数字,并不断将它们添加到本地值,直到连接通道关闭:

func sum(in <-chan int) <-chan int { 
  out := make(chan int, 100) 
  go func() { 
    var sum int 

    for v := range in { 
      sum += v 
    } 

    out <- sum 
    close(out) 
  }()

  return out 
} 

函数 sum 还将通道作为参数(从步骤 2返回的通道)。它还遵循创建频道、启动 Goroutine 和返回频道的相同模式。Goroutine 不断向名为sum的变量添加值,直到in通道关闭。当in通道关闭时,sum 的值被发送到out通道,并立即关闭。

启动管道模式

最后,我们可以实现LaunchPipeline功能:

func LaunchPipeline(amount int) int { 
  firstCh := generator(amount) 
  secondCh := power(firstCh) 
  thirdCh := sum(secondCh) 

  result := <-thirdCh 

  return result 
} 

函数generator首先返回传递给功率函数的通道。power函数返回传递给sum函数的第二个通道。函数sum最终返回将接收唯一值的第一个通道,即结果。现在让我们尝试测试一下:

go test -v .
=== RUN   TestLaunchPipeline
--- PASS: TestLaunchPipeline (0.00s)
 pipeline_test.go:18: 14 == 14
 pipeline_test.go:18: 55 == 55
PASS
ok

令人惊叹的值得一提的是,LaunchPipeline函数不需要分配每个通道,可以这样重写:

func LaunchPipeline(amount int) int { 
  return <-sum(power(generator(amount))) 
} 

generator函数的结果直接传递给power函数,power的结果传递给sum函数。

管道模式的最后一句话

使用管道模式,我们可以以非常简单的方式创建真正复杂的并发工作流。在我们的例子中,我们创建了一个线性工作流,但它也可以有条件、池以及扇入和扇出行为。我们将在下一章中看到其中一些。

并发设计模式在难度上向前迈进了一步,需要一些时间才能掌握。作为并发程序员,我们最大的错误是从并行性的角度思考(我怎样才能使它并行?或者我怎样才能在新线程中运行它?),而不是从并发结构的角度思考。

纯函数(总是产生相同输出(给定相同输入)而不影响其范围外的任何内容的函数)有助于此设计。

并发编程需要不断的实践。一旦你理解了基本的原语,Go 就很容易了。图表可以帮助您了解可能的数据流,但了解所有数据流的最佳方式只是实践。

在下一章中,我们将看到如何使用管道工作人员池来完成一些工作,而不是使用唯一的管道。此外,我们还将学习如何在并发结构中创建发布/订阅模式,并了解在使用并发进行构建时,同一模式会有多大的不同。

教程来源于Github,感谢apachecn大佬的无私奉献,致敬!

技术教程推荐

从0开发一款iOS App -〔朱德权〕

Java业务开发常见错误100例 -〔朱晔〕

Spring编程常见错误50例 -〔傅健〕

手把手带你写一门编程语言 -〔宫文学〕

零基础实战机器学习 -〔黄佳〕

深入剖析Java新特性 -〔范学雷〕

攻克视频技术 -〔李江〕

中间件核心技术与实战 -〔丁威〕

零基础学Python(2023版) -〔尹会生〕