我正在使用一些第三方API,每个API都有自己的速率限制.终结点1的速率限制为10/s,终结点2的速率限制为20/s.
我需要通过端点1处理我的数据,它将返回一个对象数组(在2-3000个对象之间).然后,我需要获取每个对象并将其中的一些数据发送到第二个端点,同时遵守第二个端点的速率限制.
我计划在GO routine 中通过一次发送10个请求来发送第一个端点的请求,确保如果所有10个请求在<;1秒内完成,我不会在这1秒窗口内发送更多请求.
最终,我希望能够限制每个端点一次发出的并发响应数量.尤其是如果我必须为由于服务器500+响应等而失败的请求构建重试.
出于提问的目的,我使用HTTPbin请求来模拟以下场景:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid string `json:"uuid"`
StatusCode int
}
type HttpBinPostRequest struct {
url string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data string `json:"data"`
StatusCode int
}
func main() {
// Prepare GET requests for 500 requests
var requests []*HttpBinGetRequest
for i := 0; i < 500; i++ {
uri := "https://httpbin.org/uuid"
request := &HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
for i := 0; i < cap(getRate); i++ {
getRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/10th of a second
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate <- struct{}{}
// Add a token to the semaphore
getSemaphore <- struct{}{}
// Remove token when function is complete
defer func() {
<-getSemaphore
}()
resp, _ := get(r)
fmt.Printf("%+v\n", resp)
}(request)
}
wg.Wait()
// I need to add code that obtains the response data from the above for loop
// then sends the UUID it to its own go routines for a POST request, following a similar pattern above
// To not violate the rate limit of the second endpoint which is 20 calls per second
// postSemaphore := make(chan struct{}, 20)
// postRate := make(chan struct{}, 20)
// for i := 0; i < cap(postRate); i++ {
// postRate <- struct{}{}
// }
}
func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
httpResp := &HttpBinGetResponse{}
client := &http.Client{}
req, err := http.NewRequest("GET", hbgr.url, nil)
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
httpResp := &HttpBinPostResponse{}
client := &http.Client{}
req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("error getting response")
return httpResp, err
}
if resp.StatusCode == 429 {
fmt.Println(resp.Header.Get("Retry-After"))
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
fmt.Printf("%+v", httpResp)
return httpResp, nil
}