add log and process data
This commit is contained in:
parent
affbebe806
commit
352fa80e26
|
|
@ -2,8 +2,8 @@
|
||||||
"default": [
|
"default": [
|
||||||
{
|
{
|
||||||
"broker": "127.0.0.1:5672",
|
"broker": "127.0.0.1:5672",
|
||||||
"username": "",
|
"username": "rabbitmq",
|
||||||
"password": ""
|
"password": "password"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
@ -1,3 +1 @@
|
||||||
package data
|
package data
|
||||||
|
|
||||||
// something basic
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// for influx data, one measurement
|
// for influx json response data
|
||||||
type jsonResp struct {
|
type jsonResp struct {
|
||||||
Results []*result `json:"results"`
|
Results []*result `json:"results"`
|
||||||
}
|
}
|
||||||
|
|
@ -33,7 +33,9 @@ type fields struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// line protocol, better to gzip and sort tags by key in lexicographic order
|
// line protocol, better to gzip and sort tags by key in lexicographic order
|
||||||
func (client *influxClient) writeLinesData(ctx context.Context, db string, data []byte, compress bool) error {
|
func (client *influxClient) writeLinesData(ctx context.Context, db string,
|
||||||
|
data []byte, compress bool) error {
|
||||||
|
|
||||||
if compress {
|
if compress {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
gz := gzip.NewWriter(&buf)
|
gz := gzip.NewWriter(&buf)
|
||||||
|
|
@ -79,7 +81,7 @@ func (client *influxClient) writeLinesData(ctx context.Context, db string, data
|
||||||
// json_time:"2024-12-18T08:12:21.4735154Z"
|
// json_time:"2024-12-18T08:12:21.4735154Z"
|
||||||
// csv_time:"1734572793695885000"
|
// csv_time:"1734572793695885000"
|
||||||
func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values,
|
func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values,
|
||||||
respType string) ([]*TV, error) {
|
respType string) ([]TV, error) {
|
||||||
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||||
client.url+"/query?"+reqData.Encode(), nil)
|
client.url+"/query?"+reqData.Encode(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -114,7 +116,7 @@ func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values,
|
||||||
// json_time:"2024-12-18T08:12:21.4735154Z"
|
// json_time:"2024-12-18T08:12:21.4735154Z"
|
||||||
// csv_time:"1734572793695885000"
|
// csv_time:"1734572793695885000"
|
||||||
func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values,
|
func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values,
|
||||||
respType string) (map[string][]*TV, error) {
|
respType string) (map[string][]TV, error) {
|
||||||
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||||
client.url+"/query?"+reqData.Encode(), nil)
|
client.url+"/query?"+reqData.Encode(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -145,7 +147,7 @@ func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values
|
||||||
return respDataToF2TVs(respData, respType)
|
return respDataToF2TVs(respData, respType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func respDataToTVs(respData []byte, respType string) ([]*TV, error) {
|
func respDataToTVs(respData []byte, respType string) ([]TV, error) {
|
||||||
switch respType {
|
switch respType {
|
||||||
case "json":
|
case "json":
|
||||||
resp := new(jsonResp)
|
resp := new(jsonResp)
|
||||||
|
|
@ -155,7 +157,7 @@ func respDataToTVs(respData []byte, respType string) ([]*TV, error) {
|
||||||
}
|
}
|
||||||
if len(resp.Results) > 0 &&
|
if len(resp.Results) > 0 &&
|
||||||
len(resp.Results[0].Series) > 0 {
|
len(resp.Results[0].Series) > 0 {
|
||||||
return turnJsonToTVs(resp.Results[0].Series[0].Values)
|
return convertJsonToTVs(resp.Results[0].Series[0].Values)
|
||||||
}
|
}
|
||||||
case "csv":
|
case "csv":
|
||||||
rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll()
|
rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll()
|
||||||
|
|
@ -163,14 +165,14 @@ func respDataToTVs(respData []byte, respType string) ([]*TV, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(rows) > 1 {
|
if len(rows) > 1 {
|
||||||
return turnCsvToTVs(rows[1:])
|
return convertCsvToTVs(rows[1:])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.New("unsupported response type")
|
return nil, errors.New("unsupported response type")
|
||||||
}
|
}
|
||||||
|
|
||||||
func respDataToF2TVs(respData []byte, respType string) (map[string][]*TV, error) {
|
func respDataToF2TVs(respData []byte, respType string) (map[string][]TV, error) {
|
||||||
switch respType {
|
switch respType {
|
||||||
case "json":
|
case "json":
|
||||||
resp := new(jsonResp)
|
resp := new(jsonResp)
|
||||||
|
|
@ -180,7 +182,7 @@ func respDataToF2TVs(respData []byte, respType string) (map[string][]*TV, error)
|
||||||
}
|
}
|
||||||
if len(resp.Results) > 0 &&
|
if len(resp.Results) > 0 &&
|
||||||
len(resp.Results[0].Series) > 0 {
|
len(resp.Results[0].Series) > 0 {
|
||||||
return turnJsonToF2TVs(resp.Results[0].Series[0].Column,
|
return convertJsonToF2TVs(resp.Results[0].Series[0].Column,
|
||||||
resp.Results[0].Series[0].Values)
|
resp.Results[0].Series[0].Values)
|
||||||
}
|
}
|
||||||
case "csv":
|
case "csv":
|
||||||
|
|
@ -189,7 +191,7 @@ func respDataToF2TVs(respData []byte, respType string) (map[string][]*TV, error)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(rows) > 1 {
|
if len(rows) > 1 {
|
||||||
return turnCsvToF2TVs(rows)
|
return convertCsvToF2TVs(rows)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -197,8 +199,8 @@ func respDataToF2TVs(respData []byte, respType string) (map[string][]*TV, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// measure at different times
|
// measure at different times
|
||||||
func turnJsonToTVs(data [][]interface{}) ([]*TV, error) {
|
func convertJsonToTVs(data [][]any) ([]TV, error) {
|
||||||
ret := make([]*TV, 0, len(data))
|
ret := make([]TV, 0, len(data))
|
||||||
|
|
||||||
for _, row := range data {
|
for _, row := range data {
|
||||||
if len(row) > 1 {
|
if len(row) > 1 {
|
||||||
|
|
@ -214,7 +216,7 @@ func turnJsonToTVs(data [][]interface{}) ([]*TV, error) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("not expected data type")
|
return nil, errors.New("not expected data type")
|
||||||
}
|
}
|
||||||
ret = append(ret, &TV{
|
ret = append(ret, TV{
|
||||||
Time: t.UnixNano(),
|
Time: t.UnixNano(),
|
||||||
Value: v,
|
Value: v,
|
||||||
})
|
})
|
||||||
|
|
@ -225,8 +227,8 @@ func turnJsonToTVs(data [][]interface{}) ([]*TV, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// different measures at different times
|
// different measures at different times
|
||||||
func turnJsonToF2TVs(cols []string, data [][]interface{}) (map[string][]*TV, error) {
|
func convertJsonToF2TVs(cols []string, data [][]any) (map[string][]TV, error) {
|
||||||
f2tvs := make(map[string][]*TV)
|
f2tvs := make(map[string][]TV)
|
||||||
|
|
||||||
for _, row := range data {
|
for _, row := range data {
|
||||||
if len(row) > 1 {
|
if len(row) > 1 {
|
||||||
|
|
@ -243,7 +245,7 @@ func turnJsonToF2TVs(cols []string, data [][]interface{}) (map[string][]*TV, err
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("not expected data type")
|
return nil, errors.New("not expected data type")
|
||||||
}
|
}
|
||||||
f2tvs[cols[i]] = append(f2tvs[cols[i]], &TV{
|
f2tvs[cols[i]] = append(f2tvs[cols[i]], TV{
|
||||||
Time: t.UnixNano(),
|
Time: t.UnixNano(),
|
||||||
Value: v,
|
Value: v,
|
||||||
})
|
})
|
||||||
|
|
@ -255,8 +257,8 @@ func turnJsonToF2TVs(cols []string, data [][]interface{}) (map[string][]*TV, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// measure at different times
|
// measure at different times
|
||||||
func turnCsvToTVs(data [][]string) ([]*TV, error) {
|
func convertCsvToTVs(data [][]string) ([]TV, error) {
|
||||||
ret := make([]*TV, 0, len(data))
|
ret := make([]TV, 0, len(data))
|
||||||
|
|
||||||
for _, row := range data {
|
for _, row := range data {
|
||||||
if len(row) > 3 {
|
if len(row) > 3 {
|
||||||
|
|
@ -268,7 +270,7 @@ func turnCsvToTVs(data [][]string) ([]*TV, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ret = append(ret, &TV{
|
ret = append(ret, TV{
|
||||||
Time: ns,
|
Time: ns,
|
||||||
Value: v,
|
Value: v,
|
||||||
})
|
})
|
||||||
|
|
@ -279,8 +281,8 @@ func turnCsvToTVs(data [][]string) ([]*TV, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// different measures at different times
|
// different measures at different times
|
||||||
func turnCsvToF2TVs(data [][]string) (map[string][]*TV, error) {
|
func convertCsvToF2TVs(data [][]string) (map[string][]TV, error) {
|
||||||
f2tvs := make(map[string][]*TV)
|
f2tvs := make(map[string][]TV)
|
||||||
|
|
||||||
for _, row := range data {
|
for _, row := range data {
|
||||||
if len(row) > 3 {
|
if len(row) > 3 {
|
||||||
|
|
@ -293,7 +295,7 @@ func turnCsvToF2TVs(data [][]string) (map[string][]*TV, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
f2tvs[data[0][i]] = append(f2tvs[data[0][i]], &TV{
|
f2tvs[data[0][i]] = append(f2tvs[data[0][i]], TV{
|
||||||
Time: ns,
|
Time: ns,
|
||||||
Value: v,
|
Value: v,
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -73,25 +73,25 @@ type TV struct {
|
||||||
Value float64 `json:"value"`
|
Value float64 `json:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]*TV, error) {
|
func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
||||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
||||||
return client.GetSSUPointLastLimit(ctx, req, limit)
|
return client.GetSSUPointLastLimit(ctx, req, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]*TV, error) {
|
func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
|
||||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
||||||
return client.GetSSUPointsLastLimit(ctx, req, limit)
|
return client.GetSSUPointsLastLimit(ctx, req, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetSSUPointData(ctx context.Context, req *Request) ([]*TV, error) {
|
func GetSSUPointData(ctx context.Context, req *Request) ([]TV, error) {
|
||||||
return client.GetSSUPointData(ctx, req)
|
return client.GetSSUPointData(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetSSUPointAfterOne(ctx context.Context, req *Request) ([]*TV, error) {
|
func GetSSUPointAfterOne(ctx context.Context, req *Request) ([]TV, error) {
|
||||||
return client.GetSSUPointAfterOne(ctx, req)
|
return client.GetSSUPointAfterOne(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]*TV, error) {
|
func GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]TV, error) {
|
||||||
req.Begin = req.End - 20 - 20
|
req.Begin = req.End - 20 - 20
|
||||||
return client.GetSSUPointBeforeOne(ctx, req)
|
return client.GetSSUPointBeforeOne(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,28 @@ const (
|
||||||
bucket string = "influxBucket"
|
bucket string = "influxBucket"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]*TV, error) {
|
const (
|
||||||
|
FieldCPrifix string = "c"
|
||||||
|
FieldIPrefix string = "i"
|
||||||
|
|
||||||
|
// FieldP string = "p"
|
||||||
|
// FieldQ string = "q"
|
||||||
|
// FieldS string = "s"
|
||||||
|
// FieldPF string = "pf"
|
||||||
|
|
||||||
|
// FieldF string = "f"
|
||||||
|
FieldDF string = "df"
|
||||||
|
|
||||||
|
// FieldUABPrefix string = "uab"
|
||||||
|
// FieldUBCPrefix string = "ubc"
|
||||||
|
// FieldUCAPrefix string = "uca"
|
||||||
|
|
||||||
|
FieldSuffixAMP = "_amp"
|
||||||
|
FieldSuffixPA = "_pa"
|
||||||
|
FieldSuffixRMS = "_rms"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
||||||
sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and device='%s';",
|
sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and device='%s';",
|
||||||
req.SubPos, req.SubPos, req.Measure, req.Station, req.MainPos)
|
req.SubPos, req.SubPos, req.Measure, req.Station, req.MainPos)
|
||||||
if limit > 1 {
|
if limit > 1 {
|
||||||
|
|
@ -27,7 +48,7 @@ func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Reque
|
||||||
return client.getTVsResp(ctx, reqData, req.RespType)
|
return client.getTVsResp(ctx, reqData, req.RespType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]*TV, error) {
|
func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
|
||||||
sql := ""
|
sql := ""
|
||||||
if limit == 1 {
|
if limit == 1 {
|
||||||
fields := strings.Split(req.SubPos, ",")
|
fields := strings.Split(req.SubPos, ",")
|
||||||
|
|
@ -49,7 +70,7 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ
|
||||||
return client.getF2TVsResp(ctx, reqData, req.RespType)
|
return client.getF2TVsResp(ctx, reqData, req.RespType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointData(ctx context.Context, req *Request) ([]*TV, error) {
|
func (client *influxClient) GetSSUPointData(ctx context.Context, req *Request) ([]TV, error) {
|
||||||
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;",
|
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;",
|
||||||
req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End)
|
req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End)
|
||||||
|
|
||||||
|
|
@ -66,7 +87,7 @@ func (client *influxClient) GetSSUPointData(ctx context.Context, req *Request) (
|
||||||
return client.getTVsResp(ctx, reqData, req.RespType)
|
return client.getTVsResp(ctx, reqData, req.RespType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointAfterOne(ctx context.Context, req *Request) ([]*TV, error) {
|
func (client *influxClient) GetSSUPointAfterOne(ctx context.Context, req *Request) ([]TV, error) {
|
||||||
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms limit 1;",
|
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms limit 1;",
|
||||||
req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin)
|
req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin)
|
||||||
|
|
||||||
|
|
@ -78,7 +99,7 @@ func (client *influxClient) GetSSUPointAfterOne(ctx context.Context, req *Reques
|
||||||
return client.getTVsResp(ctx, reqData, req.RespType)
|
return client.getTVsResp(ctx, reqData, req.RespType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]*TV, error) {
|
func (client *influxClient) GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]TV, error) {
|
||||||
reqData := url.Values{
|
reqData := url.Values{
|
||||||
"db": {bucket},
|
"db": {bucket},
|
||||||
"q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms and time<=%dms order by time desc limit 1;",
|
"q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms and time<=%dms order by time desc limit 1;",
|
||||||
|
|
|
||||||
|
|
@ -20,11 +20,6 @@ const (
|
||||||
almCodeUnderSample // 秒内采样点数欠量
|
almCodeUnderSample // 秒内采样点数欠量
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
almStatusReset = iota
|
|
||||||
almStatusAction
|
|
||||||
)
|
|
||||||
|
|
||||||
type Alarm struct {
|
type Alarm struct {
|
||||||
DriverName string `bson:"driver_name" json:"driver_name"`
|
DriverName string `bson:"driver_name" json:"driver_name"`
|
||||||
DeviceNo string `bson:"device_no" json:"device_no"`
|
DeviceNo string `bson:"device_no" json:"device_no"`
|
||||||
|
|
@ -58,11 +53,12 @@ func (a *Alarm) ConvertToEvent(ip string) *Event {
|
||||||
e.Type = genEventType(0, 2)
|
e.Type = genEventType(0, 2)
|
||||||
e.Priority = 5
|
e.Priority = 5
|
||||||
e.Status = eventStatusHappen
|
e.Status = eventStatusHappen
|
||||||
|
e.Timestamp = a.AlarmTime / 1e3 // TODO ms ?
|
||||||
e.From = "station"
|
e.From = "station"
|
||||||
e.Operations = append(e.Operations, &operation{
|
e.Operations = append(e.Operations, &operation{
|
||||||
Action: eventActionHappened, // TODO
|
Action: eventActionHappened, // TODO
|
||||||
OP: ip,
|
OP: ip,
|
||||||
TS: a.AlarmTime,
|
TS: a.AlarmTime / 1e3,
|
||||||
})
|
})
|
||||||
e.Alarm = a
|
e.Alarm = a
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package mongo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||||
|
|
@ -37,12 +38,17 @@ type Event struct {
|
||||||
Type int `bson:"type" json:"type"`
|
Type int `bson:"type" json:"type"`
|
||||||
Priority int `bson:"priority" json:"priority"` // 0~9
|
Priority int `bson:"priority" json:"priority"` // 0~9
|
||||||
Status int `bson:"status" json:"status"`
|
Status int `bson:"status" json:"status"`
|
||||||
|
Timestamp int64 `bson:"timestamp" json:"timestamp"`
|
||||||
From string `bson:"from" json:"from"`
|
From string `bson:"from" json:"from"`
|
||||||
Operations []*operation `bson:"operations" json:"operations"`
|
Operations []*operation `bson:"operations" json:"operations"`
|
||||||
// TODO complete
|
// TODO complete
|
||||||
Alarm *Alarm `bson:"alarm" json:"alarm"`
|
Alarm *Alarm `bson:"alarm" json:"alarm"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Event) Marshall() ([]byte, error) {
|
||||||
|
return json.Marshal(e)
|
||||||
|
}
|
||||||
|
|
||||||
func InsertOneEvent(ctx context.Context, doc *Event) error {
|
func InsertOneEvent(ctx context.Context, doc *Event) error {
|
||||||
_, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc)
|
_, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc)
|
||||||
return err
|
return err
|
||||||
|
|
@ -53,31 +59,34 @@ func InsertEvents(ctx context.Context, docs []*Event) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteOneEvent(ctx context.Context, filter *bson.D) error {
|
func DeleteOneEvent[T bson.M | *bson.D](ctx context.Context, filter T) error {
|
||||||
_, err := getCollection(dbevent, tbevent).DeleteOne(ctx, filter)
|
_, err := getCollection(dbevent, tbevent).DeleteOne(ctx, filter)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteEvents(ctx context.Context, filter *bson.D) error {
|
func DeleteEvents[T bson.M | *bson.D](ctx context.Context, filter T) error {
|
||||||
_, err := getCollection(dbevent, tbevent).DeleteMany(ctx, filter)
|
_, err := getCollection(dbevent, tbevent).DeleteMany(ctx, filter)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpdateOneEvent(ctx context.Context, filter *bson.D, update bson.D) error {
|
func UpdateOneEvent[T bson.M | *bson.D](ctx context.Context, filter T, update T) error {
|
||||||
opts := options.UpdateOne().SetUpsert(true)
|
opts := options.UpdateOne().SetUpsert(true)
|
||||||
_, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update, opts)
|
_, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update, opts)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpdateEvents(ctx context.Context, filter *bson.D, update bson.D) error {
|
func UpdateEvents[T bson.M | *bson.D](ctx context.Context, filter T, update T) error {
|
||||||
opts := options.UpdateMany().SetUpsert(true)
|
opts := options.UpdateMany().SetUpsert(true)
|
||||||
_, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update, opts)
|
_, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update, opts)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func FindOneEvent(ctx context.Context, filter *bson.D) (*Event, error) {
|
func FindOneEvent[T bson.M | *bson.D](ctx context.Context, filter T) (*Event, error) {
|
||||||
doc := new(Event)
|
doc := new(Event)
|
||||||
err := getCollection(dbevent, tbevent).FindOne(ctx, filter).Decode(doc)
|
err := getCollection(dbevent, tbevent).FindOne(ctx, filter).Decode(doc)
|
||||||
|
// if errors.Is(err, mongo.ErrNoDocuments) {
|
||||||
|
// return nil, nil
|
||||||
|
// }
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -85,7 +94,7 @@ func FindOneEvent(ctx context.Context, filter *bson.D) (*Event, error) {
|
||||||
return doc, nil
|
return doc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func FindEvents(ctx context.Context, filter *bson.D) ([]*Event, error) {
|
func FindEvents[T bson.M | *bson.D](ctx context.Context, filter T) ([]*Event, error) {
|
||||||
cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter)
|
cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -100,7 +109,7 @@ func FindEvents(ctx context.Context, filter *bson.D) ([]*Event, error) {
|
||||||
return docs, nil
|
return docs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func FindEventsInBatch(ctx context.Context, filter *bson.D, batchSize int32) ([]*Event, error) {
|
func FindEventsInBatch[T bson.M | *bson.D](ctx context.Context, filter T, batchSize int32) ([]*Event, error) {
|
||||||
opt := options.Find().SetBatchSize(batchSize)
|
opt := options.Find().SetBatchSize(batchSize)
|
||||||
cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt)
|
cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,8 @@ func init() {
|
||||||
cliOpts := options.Client().
|
cliOpts := options.Client().
|
||||||
ApplyURI(uri).SetTimeout(1 * time.Second).
|
ApplyURI(uri).SetTimeout(1 * time.Second).
|
||||||
SetAuth(options.Credential{
|
SetAuth(options.Credential{
|
||||||
AuthMechanism: "SCRAM-SHA-256",
|
AuthMechanism: conf.GetAuthMechanism(),
|
||||||
AuthSource: "events",
|
AuthSource: conf.GetAuthSource(),
|
||||||
Username: conf.GetUsername(),
|
Username: conf.GetUsername(),
|
||||||
Password: conf.GetPassword(),
|
Password: conf.GetPassword(),
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,25 @@
|
||||||
package postgres
|
package postgres
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
tbmeasurement string = "public.measurement"
|
tbmeasurement string = "public.measurement"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ()
|
const (
|
||||||
|
ChannelCPrefix string = "Telemetry"
|
||||||
|
ChannelIPrefix string = "Telesignal"
|
||||||
|
ChannelP string = "P"
|
||||||
|
ChannelQ string = "Q"
|
||||||
|
ChannelS string = "S"
|
||||||
|
ChannelPF string = "PF"
|
||||||
|
ChannelF string = "F"
|
||||||
|
ChannelDF string = "deltaF"
|
||||||
|
ChannelUPrefix string = "U"
|
||||||
|
)
|
||||||
|
|
||||||
type addrSSU struct {
|
type addrSSU struct {
|
||||||
Station string `json:"station"`
|
Station string `json:"station"`
|
||||||
|
|
@ -27,10 +40,18 @@ type measurement struct {
|
||||||
// mapping TODO
|
// mapping TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type channelSize struct {
|
||||||
|
addrSSU
|
||||||
|
Size int
|
||||||
|
}
|
||||||
|
|
||||||
|
// channel is original
|
||||||
|
var SSU2ChannelSizes map[string][]channelSize
|
||||||
|
|
||||||
func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) {
|
func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) {
|
||||||
var totalRecords []*measurement
|
var totalRecords []*measurement
|
||||||
|
|
||||||
id := 0
|
id := int64(0)
|
||||||
for {
|
for {
|
||||||
var records []*measurement
|
var records []*measurement
|
||||||
result := client.WithContext(ctx).Table(tbmeasurement).Where("id > ?", id).
|
result := client.WithContext(ctx).Table(tbmeasurement).Where("id > ?", id).
|
||||||
|
|
@ -43,7 +64,7 @@ func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error)
|
||||||
if length <= 0 {
|
if length <= 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
id += length
|
id = records[length-1].ID
|
||||||
|
|
||||||
totalRecords = append(totalRecords, records...)
|
totalRecords = append(totalRecords, records...)
|
||||||
}
|
}
|
||||||
|
|
@ -51,26 +72,62 @@ func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error)
|
||||||
return totalRecords, nil
|
return totalRecords, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetStationMeasurements(ctx context.Context, batchSize int, station string) ([]*measurement, error) {
|
func GenSSU2ChannelSize(ctx context.Context, batchSize int) error {
|
||||||
var totalRecords []*measurement
|
id := int64(0)
|
||||||
|
|
||||||
id := 0
|
|
||||||
for {
|
for {
|
||||||
var records []*measurement
|
var records []*measurement
|
||||||
result := client.WithContext(ctx).Table(tbmeasurement).Where("station = ?", station).
|
result := client.WithContext(ctx).Table(tbmeasurement).
|
||||||
Where("id > ?", id).Order("id ASC").Limit(batchSize).Find(&records)
|
Where("id > ?", id).Order("id ASC").Limit(batchSize).Find(&records)
|
||||||
if result.Error != nil {
|
if result.Error != nil {
|
||||||
return totalRecords, result.Error
|
return result.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
length := len(records)
|
length := len(records)
|
||||||
if length <= 0 {
|
if length <= 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
id += length
|
|
||||||
|
|
||||||
totalRecords = append(totalRecords, records...)
|
for _, record := range records {
|
||||||
|
if record == nil || record.DataSource == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
addrType := record.DataSource.Type
|
||||||
|
addr := record.DataSource.Addr
|
||||||
|
if err := genMappingFromAddr(addrType, addr, record.Size); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
id = records[length-1].ID
|
||||||
}
|
}
|
||||||
|
|
||||||
return totalRecords, nil
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func genMappingFromAddr(addrType int, addr any, size int) error {
|
||||||
|
switch addrType {
|
||||||
|
case 1:
|
||||||
|
if ins, ok := addr.(*addrSSU); ok {
|
||||||
|
channelSize := genChannelSize(ins, size)
|
||||||
|
SSU2ChannelSizes[ins.Device] = append(SSU2ChannelSizes[ins.Device], channelSize)
|
||||||
|
} else {
|
||||||
|
return errors.New("io_address not valid")
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return errors.New("data_source.type not valid")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func genChannelSize(addr *addrSSU, size int) channelSize {
|
||||||
|
return channelSize{
|
||||||
|
addrSSU{
|
||||||
|
Station: addr.Station,
|
||||||
|
Device: addr.Device,
|
||||||
|
Channel: addr.Channel,
|
||||||
|
},
|
||||||
|
size,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,10 +13,7 @@ type station struct {
|
||||||
ZoneID int64 `gorm:"colunmn:zone_id"`
|
ZoneID int64 `gorm:"colunmn:zone_id"`
|
||||||
TagName string `gorm:"column:tagname"`
|
TagName string `gorm:"column:tagname"`
|
||||||
Name string `gorm:"colunmn:name"`
|
Name string `gorm:"colunmn:name"`
|
||||||
// Description string `gorm:"colunmn:description"`
|
IsLocal bool `gorm:"colunmn:is_local"`
|
||||||
IsLocal bool `gorm:"colunmn:is_local"`
|
|
||||||
// OP int `gorm:"colunmn:op"`
|
|
||||||
// TS time.Time `gorm:"colunmn:ts"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetStations(ctx context.Context) ([]*station, error) {
|
func GetStations(ctx context.Context) ([]*station, error) {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
package data
|
||||||
|
|
@ -23,7 +23,7 @@ func NewConsumer(ctx context.Context, tag string, xqr *XQR) (*rmq.Consumer, erro
|
||||||
return cli.conn.NewConsumer(ctx, xqr.QueueName, nil)
|
return cli.conn.NewConsumer(ctx, xqr.QueueName, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Consume(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byte) {
|
func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byte) {
|
||||||
for {
|
for {
|
||||||
deliCtx, err := consumer.Receive(ctx)
|
deliCtx, err := consumer.Receive(ctx)
|
||||||
if errors.Is(err, context.Canceled) {
|
if errors.Is(err, context.Canceled) {
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,20 @@ func (m *Management) Init(ctx context.Context, rm *rmq.AmqpManagement,
|
||||||
m.b = rb
|
m.b = rb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Management) DeclareExchangeQueue(ctx context.Context) error {
|
||||||
|
_, err := m.m.DeclareExchange(ctx, m.x)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = m.m.DeclareQueue(ctx, m.q)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Management) DeclareAndBind(ctx context.Context) error {
|
func (m *Management) DeclareAndBind(ctx context.Context) error {
|
||||||
xinfo, err := m.m.DeclareExchange(ctx, m.x)
|
xinfo, err := m.m.DeclareExchange(ctx, m.x)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ func NewPublisher(ctx context.Context, tag string, xqr *XQR) (*rmq.Publisher, er
|
||||||
}, nil)
|
}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Publish(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan []byte) {
|
func Publishing(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan []byte) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-msgChan:
|
case msg := <-msgChan:
|
||||||
|
|
@ -40,11 +40,16 @@ func Publish(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan []byt
|
||||||
switch result.Outcome.(type) {
|
switch result.Outcome.(type) {
|
||||||
case *rmq.StateAccepted:
|
case *rmq.StateAccepted:
|
||||||
// TODO: "Message accepted"
|
// TODO: "Message accepted"
|
||||||
|
|
||||||
case *rmq.StateReleased:
|
case *rmq.StateReleased:
|
||||||
// TODO: "Message was not routed"
|
// TODO: "Message not routed"
|
||||||
|
|
||||||
case *rmq.StateRejected:
|
case *rmq.StateRejected:
|
||||||
// TODO: stateType := publishResult.Outcome.(*rmq.StateRejected)
|
// TODO: "Message rejected"
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
// *rmp.StateModified
|
||||||
|
// *rmq.StateReceived
|
||||||
// TODO: ("Message state: %v", publishResult.Outcome)
|
// TODO: ("Message state: %v", publishResult.Outcome)
|
||||||
}
|
}
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
|
|
|
||||||
|
|
@ -69,3 +69,7 @@ func Keys(ctx context.Context, pattern string) ([]string, error) {
|
||||||
|
|
||||||
return keys, nil
|
return keys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewPipeline() redis.Pipeliner {
|
||||||
|
return client.Pipeline()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
package data
|
||||||
|
|
@ -0,0 +1,92 @@
|
||||||
|
package log
|
||||||
|
|
||||||
|
import (
|
||||||
|
"datart/config"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log *zap.SugaredLogger
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
logConfig := zap.NewProductionEncoderConfig()
|
||||||
|
logConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||||
|
fileEncoder := zapcore.NewJSONEncoder(logConfig)
|
||||||
|
core := zapcore.NewCore(
|
||||||
|
fileEncoder,
|
||||||
|
zapcore.AddSync(config.Conf().LogConf()), // conf is not null
|
||||||
|
// DebugLevel, -1, logs are typically voluminous, and are usually disabled in
|
||||||
|
// production.
|
||||||
|
|
||||||
|
// InfoLevel, 0, is the default logging priority.
|
||||||
|
|
||||||
|
// WarnLevel, 1, logs are more important than Info, but don't need individual
|
||||||
|
// human review.
|
||||||
|
|
||||||
|
// ErrorLevel, 2, logs are high-priority. If an application is running smoothly,
|
||||||
|
// it shouldn't generate any error-level logs.
|
||||||
|
|
||||||
|
// DPanicLevel, 3, logs are particularly important errors. In development the
|
||||||
|
// logger panics after writing the message.
|
||||||
|
|
||||||
|
// PanicLevel, 4, logs a message, then panics.
|
||||||
|
|
||||||
|
// FatalLevel, 5, logs a message, then calls os.Exit(1).
|
||||||
|
config.Conf().LogConf().GetLogLevel(),
|
||||||
|
)
|
||||||
|
|
||||||
|
log = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1)).Sugar()
|
||||||
|
}
|
||||||
|
|
||||||
|
func Sync() {
|
||||||
|
log.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
|
func Log(lvl zapcore.Level, args ...interface{}) {
|
||||||
|
log.Log(lvl, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Debug(args ...interface{}) {
|
||||||
|
log.Debug(args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Info(args ...interface{}) {
|
||||||
|
log.Info(args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Warn(args ...interface{}) {
|
||||||
|
log.Warn(args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Error(args ...interface{}) {
|
||||||
|
log.Error(args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Panic(args ...interface{}) {
|
||||||
|
log.Panic(args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Logf(lvl zapcore.Level, template string, args ...interface{}) {
|
||||||
|
log.Logf(lvl, template, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Debugf(template string, args ...interface{}) {
|
||||||
|
log.Debugf(template, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Infof(template string, args ...interface{}) {
|
||||||
|
log.Infof(template, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Warnf(template string, args ...interface{}) {
|
||||||
|
log.Warnf(template, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Errorf(template string, args ...interface{}) {
|
||||||
|
log.Errorf(template, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Panicf(template string, args ...interface{}) {
|
||||||
|
log.Panicf(template, args...)
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue