我正在try 使用Golang的两个通道来构建接收方和发送方模式.我正在执行一项任务(API调用),并接收回一个Response struct .我的目标是,当收到响应时,我希望将其发送到另一个通道(writeChan)进行额外处理.

我想在接收器通道(respChan)上连续阅读/收听,并处理通过的任何内容(例如Response).然后,我想启动一个线程,在另一个Goroutine中使用该响应执行进一步的操作.

我想了解如何将此模式链接在一起,以允许数据从API调用流出并并发写入(每个响应都将写入一个单独的文件目的地,由Write()函数处理).

从本质上讲,我目前的模式如下:

package main

import (
    "fmt"
    "sync"
)

func main() {

    var wg sync.WaitGroup
    respChan := make(chan Response) // Response is a struct that contains API response metadata
    defer close(respChan)
    // requests is just a slice of requests to be made to an API
    // This part is working well
    for _, req := range requests {
        wg.Add(1)
        go func(r Request) {
            defer wg.Done()
            resp, _ := r.Get() // Make the API call and receive back a Response struct
            respChan <- resp // Put the response into our channel
        }(req)
    }

    // Now, I want to extract the responses as they become available and send them to another function to do some processing. This I am unsure of how to handle properly
    writeChan := make(chan string)
    defer close(writeChan)
    select {
        case resp := <-respChan: // receive from response channel
            go func(response Response) {
                signal, _ := Write(response) // Separate func to write the response to a file. Not important here in this context.
                writeChan <- signal // Put the signal data into the channel which is a string file path of where the file was written (will be used for a later process)

            }(resp)
        case <-time.After(15 *time.Second):
            fmt.Println("15 seconds have passed without receiving anything...")

    }
    wg.Wait()
}

推荐答案

让我与您分享一个您可以从中受益的工作示例.首先,我将介绍代码,然后,我将向您介绍所有相关部分.

package main

import (
    "fmt"
    "net/http"
    "os"
    "strings"
    "time"
)

type Request struct {
    Url            string
    DelayInSeconds time.Duration
}

type Response struct {
    Url        string
    StatusCode int
}

func main() {
    requests := []Request{
        {"https://www.google.com", 0},
        {"https://stackoverflow.com", 1},
        {"https://www.wikipedia.com", 4},
    }

    respChan := make(chan Response)
    defer close(respChan)

    for _, req := range requests {
        go func(r Request) {
            fmt.Printf("%q - %v\n", r.Url, strings.Repeat("#", 30))
            // simulate heavy work
            time.Sleep(time.Second * r.DelayInSeconds)
            resp, _ := http.Get(r.Url)
            res := Response{r.Url, resp.StatusCode}
            fmt.Println(time.Now())
            respChan <- res
        }(req)
    }

    writeChan := make(chan struct{})
    defer close(writeChan)

    for i := 0; i < len(requests); i++ {
        select {
        case res := <-respChan:
            go func(r Response) {
                f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
                if err != nil {
                    panic(err)
                }
                defer f.Close()
                f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
                writeChan <- struct{}{}
            }(res)
        case <-time.After(time.Second * 2):
            fmt.Println("Timeout")
        }
    }
}

设好

首先,我定义了本例中将使用的两个 struct :RequestResponse.在前者中,我放了一个DelayInSeconds来模拟一些繁重和耗时的操作.然后,我定义了requests变量,它包含必须完成的所有请求.

写作部分

在这里,我的范围超过了requests变量.对于每个请求,我将向目标URL发出一个HTTP请求.time.Sleep模拟了沉重的负荷.然后,我将响应写入未缓冲的respChan通道.

阅读部分

这里的主要变化是将select struct 包装到for循环中.多亏了这一点,我们将确保迭代正确的时间(基于requests变量的长度).

结束语

First of all, bear in mind that the code is oversimplified just to show off the relevant parts. Due to this, a lot of error handling is missing and some inline functions could be extrapolated into named functions. You don't need to use sync.WaitGroup to achieve what you need, the usage of channels will be enough.
Feel free to play with delays and check which files are written!

如果这对你有帮助,请让我知道!

编辑

按照您的要求,我将根据您的需求为您提供更准确的解决方案.新的阅读部分将如下所示:

count := 0
for {
    // this check is need to exit the for loop and not wait indefinitely
    // it can be removed based on your needs
    if count == 3 {
        fmt.Println("all responses arrived...")
        return
    }
    res := <-respChan
    count++
    go func(r Response) {
        f, err := os.Create(fmt.Sprintf("%v.txt", strings.Replace(r.Url, "https://", "", 1)))
        if err != nil {
            panic(err)
        }
        defer f.Close()
        f.Write([]byte(fmt.Sprintf("%q OK with %d\n", r.Url, r.StatusCode)))
        writeChan <- struct{}{}
    }(res)
}

在这里,执行在for循环内无限期地等待.无论每个请求需要多长时间才能完成,它一到达就会被获取.我将if放在for循环的顶部,在它处理了我们需要的请求后退出.但是,您可以避免它,并让代码运行,直到收到取消信号(由您决定).

如果这更符合您的要求,请让我知道,谢谢!

Go相关问答推荐

Go GORM创建表,但不创建列

在Go中根据命名空间/字符串生成UUID

在GO中创建[]字符串类型的变量

是不是有什么原因导致`Strings.EqualFold`不先进行长度比较?

如何使用中间件更改http请求的响应代码?

可以';t从主机连接到ScyllaDB容器

如何将Golang测试用例的测试覆盖率值与特定阈值进行比较

Caddy服务器try 打开端口80而不是8090.

我的神经网络(从头开始)训练,让它离目标更远

Golang - POST 失败(NoSurf CSRF)

你如何在 Golang 代码中测试 filepath.Abs​​ 失败?

如何为导入的嵌入式 struct 文字提供值?

grpc-gateway:重定向与定义不匹配(原始文件)

如何在自定义验证函数中获取 struct 名称

GRPC 元数据未在 Go 中更新

自定义指标未显示在 prometheus web ui 中,grafana 中也是如此

无限期运行 Go routine (完成后重新启动)

gqlgen go,通过添加一个解析器来减少数据库调用

测试包外文件时的 Golang 测试覆盖率

Golang 与 Cassandra db 使用 docker-compose:cannot connect (gocql)