我正在使用一些第三方API,每个API都有自己的速率限制.终结点1的速率限制为10/s,终结点2的速率限制为20/s.

我需要通过端点1处理我的数据,它将返回一个对象数组(在2-3000个对象之间).然后,我需要获取每个对象并将其中的一些数据发送到第二个端点,同时遵守第二个端点的速率限制.

我计划在GO routine 中通过一次发送10个请求来发送第一个端点的请求,确保如果所有10个请求在<1秒内完成,我不会在这1秒窗口内发送更多请求.

最终,我希望能够限制每个端点一次发出的并发响应数量.尤其是如果我必须为由于服务器500+响应等而失败的请求构建重试.

出于提问的目的,我使用HTTPbin请求来模拟以下场景:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "sync"
    "time"
)

type HttpBinGetRequest struct {
    url string
}

type HttpBinGetResponse struct {
    Uuid       string `json:"uuid"`
    StatusCode int
}

type HttpBinPostRequest struct {
    url  string
    uuid string // Item to post to API
}

type HttpBinPostResponse struct {
    Data       string `json:"data"`
    StatusCode int
}

func main() {

    // Prepare GET requests for 500 requests
    var requests []*HttpBinGetRequest
    for i := 0; i < 500; i++ {
        uri := "https://httpbin.org/uuid"
        request := &HttpBinGetRequest{
            url: uri,
        }
        requests = append(requests, request)
    }

    // Create semaphore and rate limit for the GET endpoint
    getSemaphore := make(chan struct{}, 10)
    getRate := make(chan struct{}, 10)
    for i := 0; i < cap(getRate); i++ {
        getRate <- struct{}{}
    }

    go func() {
        // ticker corresponding to 1/10th of a second
        ticker := time.NewTicker(100 * time.Millisecond)
        defer ticker.Stop()
        for range ticker.C {
            _, ok := <-getRate
            if !ok {
                return
            }
        }
    }()

    // Send our GET requests to obtain a random UUID
    var wg sync.WaitGroup
    for _, request := range requests {
        wg.Add(1)
        // Go func to make request and receive the response
        go func(r *HttpBinGetRequest) {
            defer wg.Done()

            // Check the rate limiter and block if it is empty
            getRate <- struct{}{}

            // Add a token to the semaphore
            getSemaphore <- struct{}{}

            // Remove token when function is complete
            defer func() {
                <-getSemaphore
            }()
            resp, _ := get(r)
            fmt.Printf("%+v\n", resp)
        }(request)
    }
    wg.Wait()

    // I need to add code that obtains the response data from the above for loop
    // then sends the UUID it to its own go routines for a POST request, following a similar pattern above
    // To not violate the rate limit of the second endpoint which is 20 calls per second
    // postSemaphore := make(chan struct{}, 20)
    // postRate := make(chan struct{}, 20)
    // for i := 0; i < cap(postRate); i++ {
    //  postRate <- struct{}{}
    // }
}

func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {

    httpResp := &HttpBinGetResponse{}
    client := &http.Client{}
    req, err := http.NewRequest("GET", hbgr.url, nil)
    if err != nil {
        fmt.Println("error making request")
        return httpResp, err
    }

    req.Header = http.Header{
        "accept": {"application/json"},
    }

    resp, err := client.Do(req)
    if err != nil {
        fmt.Println(err)
        fmt.Println("error getting response")
        return httpResp, err
    }

    // Read Response
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("error reading response body")
        return httpResp, err
    }
    json.Unmarshal(body, &httpResp)
    httpResp.StatusCode = resp.StatusCode
    return httpResp, nil
}

// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {

    httpResp := &HttpBinPostResponse{}
    client := &http.Client{}
    req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
    if err != nil {
        fmt.Println("error making request")
        return httpResp, err
    }

    req.Header = http.Header{
        "accept": {"application/json"},
    }

    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("error getting response")
        return httpResp, err
    }

    if resp.StatusCode == 429 {
        fmt.Println(resp.Header.Get("Retry-After"))
    }

    // Read Response
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("error reading response body")
        return httpResp, err
    }
    json.Unmarshal(body, &httpResp)
    httpResp.StatusCode = resp.StatusCode
    fmt.Printf("%+v", httpResp)
    return httpResp, nil
}

推荐答案

这是一种生产者/消费者模式.您可以使用CHAN将它们连接起来.

关于速率限制器,我会使用套餐golang.org/x/time/rate.

因为我们已经决定使用CHAN来连接生产者和消费者,所以很自然地将失败的任务发送给同一个CHAN,以便消费者可以再次try .

我已经将逻辑封装到类型Scheduler[T]中.请参阅下面的演示.请注意,演示是匆忙编写的,只是为了说明 idea .它没有经过彻底的测试.

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "math/rand"
    "net/http"
    "net/http/httptest"
    "sort"
    "sync"
    "time"

    "golang.org/x/time/rate"
)

type task[T any] struct {
    param       T
    failedCount int
}

type Scheduler[T any] struct {
    name     string
    limit    int
    maxTries int
    wg       sync.WaitGroup
    tasks    chan task[T]
    action   func(param T) error
}

// NewScheduler creates a scheduler that runs the action with the specified rate limit.
// It will retry the action if the action returns a non-nil error.
func NewScheduler[T any](name string, limit, maxTries, chanSize int, action func(param T) error) *Scheduler[T] {
    return &Scheduler[T]{
        name:     name,
        limit:    limit,
        maxTries: maxTries,
        tasks:    make(chan task[T], chanSize),
        action:   action,
    }
}

func (s *Scheduler[T]) AddTask(param T) {
    s.wg.Add(1)
    s.tasks <- task[T]{param: param}
}

func (s *Scheduler[T]) retryLater(t task[T]) {
    s.wg.Add(1)
    s.tasks <- t
}

func (s *Scheduler[T]) Run() {
    lim := rate.NewLimiter(rate.Limit(s.limit), 1)
    for t := range s.tasks {
        t := t
        if err := lim.Wait(context.Background()); err != nil {
            log.Fatalf("wait: %s", err)
            return
        }
        go func() {
            defer s.wg.Done()
            err := s.action(t.param)
            if err != nil {
                log.Printf("task %s, param %v failed: %v", s.name, t.param, err)
                t.failedCount++

                if t.failedCount == s.maxTries {
                    log.Printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxTries)
                    return
                }

                s.retryLater(t)
            }
        }()
    }
}

func (s *Scheduler[T]) Wait() {
    s.wg.Wait()
    close(s.tasks)
}

func main() {
    s := &server{}
    ts := httptest.NewServer(s)
    defer ts.Close()

    schedulerPost := NewScheduler("post", 20, 3, 1, func(param string) error {
        return post(fmt.Sprintf("%s/%s", ts.URL, param))
    })

    go schedulerPost.Run()

    schedulerGet := NewScheduler("get", 10, 3, 1, func(param int) error {
        id, err := get(fmt.Sprintf("%s/%d", ts.URL, param))
        if err != nil {
            return err
        }

        schedulerPost.AddTask(id)
        return nil
    })

    go schedulerGet.Run()

    for i := 0; i < 100; i++ {
        schedulerGet.AddTask(i)
    }

    schedulerGet.Wait()
    schedulerPost.Wait()

    s.printStats()
}

func get(url string) (string, error) {
    resp, err := http.Get(url)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()

    if resp.StatusCode != 200 {
        return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
    }

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }

    return string(body), nil
}

func post(url string) error {
    resp, err := http.Post(url, "", nil)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode != 200 {
        return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
    }

    return nil
}

type server struct {
    gMu  sync.Mutex
    gets []int64

    pMu   sync.Mutex
    posts []int64
}

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    log.Printf("%s: %s", r.Method, r.URL.Path)

    // collect request stats.
    if r.Method == http.MethodGet {
        s.gMu.Lock()
        s.gets = append(s.gets, time.Now().UnixMilli())
        s.gMu.Unlock()
    } else {
        s.pMu.Lock()
        s.posts = append(s.posts, time.Now().UnixMilli())
        s.pMu.Unlock()
    }

    n := rand.Intn(1000)
    // simulate latency.
    time.Sleep(time.Duration(n) * time.Millisecond)

    // simulate errors.
    if n%10 == 0 {
        w.WriteHeader(http.StatusInternalServerError)
        return
    }

    if r.Method == http.MethodGet {
        fmt.Fprintf(w, "%s", r.URL.Path[1:])
        return
    }
}

func (s *server) printStats() {
    log.Printf("GETS (total: %d):\n", len(s.gets))
    printStats(s.gets)
    log.Printf("POSTS (total: %d):\n", len(s.posts))
    printStats(s.posts)
}

func printStats(ts []int64) {
    sort.Slice(ts, func(i, j int) bool {
        return ts[i] < ts[j]
    })

    count := 0
    to := ts[0] + 1000
    for i := 0; i < len(ts); i++ {
        if ts[i] < to {
            count++
        } else {
            fmt.Printf("  %d: %d\n", to, count)
            i-- // push back the current item
            count = 0
            to += 1000
        }
    }
    if count > 0 {
        fmt.Printf("  %d: %d\n", to, count)
    }
}

输出如下所示:

...
2023/03/25 21:03:30 GETS (total: 112):
  1679749398998: 10
  1679749399998: 10
  1679749400998: 10
  1679749401998: 10
  1679749402998: 10
  1679749403998: 10
  1679749404998: 10
  1679749405998: 10
  1679749406998: 10
  1679749407998: 10
  1679749408998: 10
  1679749409998: 2
2023/03/25 21:03:30 POSTS (total: 111):
  1679749399079: 8
  1679749400079: 8
  1679749401079: 12
  1679749402079: 8
  1679749403079: 10
  1679749404079: 9
  1679749405079: 9
  1679749406079: 8
  1679749407079: 14
  1679749408079: 12
  1679749409079: 9
  1679749410079: 4

Go相关问答推荐

将Go程序导出到WASM—构建约束排除所有Go文件

具有GRPC的RBAC(基于角色的访问控制)-网关生成的REST风格的API

租户GUID X的租户不存在self 邮箱帐户的租户(我是唯一的成员)

如何使用工作区方法扩展克隆的Golang库

使用Digitorus/pdfsign在GO(Golang)中签署pdf文件

Go:如何在不加载正文的情况下创建 http 代理通行证

用 fork 替换 Go 依赖:...用于两个不同的模块路径

使用Cookie身份验证的Gorilla Golang Websocket优化

下载和合并时输出文件已损坏

无法使用带有 422 的 go-github 创建提交 - 更新不是快进

对 CSV 进行单元测试失败

在 Cloud Run 中找不到默认凭据

我突然无法再将我的 GoLang 应用程序部署到 Google AppEngine

如何从 Go 中的 `HijackedResponse` 中删除 Cursor Position ANSI 转义码?

是否可以从 golang 中的参数推断类型?

无法使用 Golang 扫描文件路径

如果 transaction.Commit 对带有 postgres 连接的 SQL 失败,您是否需要调用 transaction.RollBack

如何将多个切片打印为一个切片?

Go:用于 XML 解码的嵌套 struct 中的提升字段

如何解决在mac m1中运行gcc失败退出状态1?