modelRT/pool/concurrency_anchor_parse.go

101 lines
2.7 KiB
Go
Raw Normal View History

2024-12-18 16:25:49 +08:00
// Package pool define concurrency call function in ants
package pool
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"sync"
"time"
"modelRT/config"
"go.uber.org/zap"
)
var AnchorFunc = func(anchorConfig interface{}) {
logger := zap.L()
paramConfig, ok := anchorConfig.(config.AnchorParamConfig)
if !ok {
logger.Error("conversion model anchor param config type failed")
return
}
fmt.Println(paramConfig)
// TODO 解析 paramConfig 轮询 dataRT http 接口
}
type APIEndpoint struct {
URL string `json:"url"`
Method string `json:"method"` // HTTP 方法,如 "GET", "POST"
Headers map[string]string `json:"headers"`
QueryParams map[string]string `json:"query_params"`
Body string `json:"body"` // 对于 POST 请求等,可能需要一个请求体
Interval int `json:"interval"` // 轮询间隔时间(秒)
}
// fetchAPI 执行 HTTP 请求并返回响应体(作为字符串)或错误
func fetchAPI(endpoint APIEndpoint) (string, error) {
client := &http.Client{}
// 构建请求
req, err := http.NewRequest(endpoint.Method, endpoint.URL, nil)
if err != nil {
return "", err
}
// 设置请求头
for key, value := range endpoint.Headers {
req.Header.Set(key, value)
}
// 设置查询参数(如果需要)
q := req.URL.Query()
for key, value := range endpoint.QueryParams {
q.Set(key, value)
}
req.URL.RawQuery = q.Encode()
// 设置请求体(如果需要,例如 POST 请求)
if endpoint.Method == "POST" || endpoint.Method == "PUT" {
req.Body = ioutil.NopCloser(strings.NewReader(endpoint.Body))
req.Header.Set("Content-Type", "application/json") // 假设是 JSON 请求体
}
// 执行请求
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
// 读取响应体
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
// pollAPIEndpoints 轮询 API 端点列表,并根据指定的间隔时间执行请求
func pollAPIEndpoints(endpoints []APIEndpoint, interval int, wg *sync.WaitGroup, results chan<- string) {
defer wg.Done()
for _, endpoint := range endpoints {
for {
body, err := fetchAPI(endpoint)
if err != nil {
log.Printf("Error fetching from %s: %v", endpoint.URL, err)
} else {
results <- fmt.Sprintf("Response from %s: %s", endpoint.URL, body)
}
time.Sleep(time.Duration(interval) * time.Second)
// 注意:这里使用了 endpoint.Interval 而不是传入的 interval
// 但为了示例简单,我们统一使用传入的 interval。
// 如果要根据每个端点的不同间隔来轮询,应该使用 endpoint.Interval。
}
}
}