dataRT/data/influx/common.go

154 lines
3.3 KiB
Go

package influx
import (
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
)
// for influx data, one measurement
type jsonResp struct {
Results []*result `json:"results"`
}
type result struct {
StatementID int `json:"statement_id"`
Series []*fields `json:"series"`
}
type fields struct {
Name string `json:"name"`
Column []string `json:"column"`
Values [][]interface{} `json:"values"`
}
// respType json/csv
// json_time:"2024-12-18T08:12:21.4735154Z"
// csv_time:"1734572793695885000"
func (client *influxClient) getRespData(ctx context.Context, reqData url.Values,
respType string) ([]*TV, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
client.url+"/query?"+reqData.Encode(), nil)
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Authorization", "Token "+client.token)
if respType == "csv" {
request.Header.Set("Accept", "application/csv")
}
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode)
}
respData, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
switch respType {
case "json":
resp := new(jsonResp)
err = json.Unmarshal(respData, resp)
if err != nil {
return nil, err
}
if len(resp.Results) > 0 &&
len(resp.Results[0].Series) > 0 {
return client.turnJsonRespDataToTVs(resp.Results[0].Series[0].Values)
}
case "csv":
rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll()
if err != nil {
return nil, err
}
if len(rows) > 1 {
return client.turnCsvRespDataToTVs(rows[1:])
}
default:
return nil, errors.New("unsupported response type")
}
return nil, errors.New("response has no data")
}
func (client *influxClient) turnJsonRespDataToTVs(data [][]interface{}) ([]*TV, error) {
ret := make([]*TV, 0, len(data))
for _, row := range data {
if len(row) > 1 {
ret = append(ret, &TV{
Time: row[0],
Value: row[1],
})
}
}
return ret, nil
}
func (client *influxClient) turnCsvRespDataToTVs(data [][]string) ([]*TV, error) {
ret := make([]*TV, 0, len(data))
for _, row := range data {
if len(row) > 3 {
tv := &TV{}
if ns, err := strconv.ParseInt(row[2], 10, 64); err == nil {
tv.Time = ns
} else {
return nil, err
}
if v, err := strconv.ParseFloat(row[3], 64); err == nil {
tv.Value = v
} else {
return nil, err
}
ret = append(ret, tv)
}
}
return ret, nil
}
// line protocol, better to gzip and sort tags by key in lexicographic order
func (client *influxClient) setLineData(db, line string, gzip bool) error {
request, err := http.NewRequest(http.MethodPost,
client.url+"/write?db="+db, strings.NewReader(line))
if err != nil {
return err
}
request.Header.Set("Content-Type", "text/plain")
request.Header.Set("Authorization", "Token "+client.token)
if gzip {
request.Header.Set("Content-Encoding", "gzip")
}
response, err := client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusNoContent {
return fmt.Errorf("unexpected status code: %d", response.StatusCode)
}
return nil
}