From 7a9bd01b37698a0c68c836b42baf2e92d1245fe5 Mon Sep 17 00:00:00 2001 From: zhuxu Date: Thu, 23 Oct 2025 18:02:29 +0800 Subject: [PATCH] process data and api --- .drone.yml | 12 +++ data/data.go | 41 ++++++++++ data/influx/common.go | 44 ++++++----- data/influx/influx.go | 61 ++++++++++++--- data/influx/ssu_point.go | 66 +++++++++++----- data/mongo/alarm.go | 6 +- data/mongo/event.go | 44 +++++++++-- data/postgres/measurement.go | 130 ++++++++++++++++++++++++-------- data/publish_event.go | 70 +++++++++++++++++ data/update_phasor.go | 141 +++++++++++++++++++++++++++++++++++ go.mod | 36 ++++++++- go.sum | 89 ++++++++++++++++++---- main.go | 24 ++++++ route/api/alarm.go | 56 ++++++++++++++ route/api/api.go | 3 + route/api/event.go | 1 + route/api/point.go | 1 + route/route.go | 15 ++++ util/util.go | 13 ++++ 19 files changed, 745 insertions(+), 108 deletions(-) create mode 100644 .drone.yml create mode 100644 route/api/alarm.go create mode 100644 route/api/api.go create mode 100644 route/api/event.go create mode 100644 route/api/point.go create mode 100644 route/route.go create mode 100644 util/util.go diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..c92434d --- /dev/null +++ b/.drone.yml @@ -0,0 +1,12 @@ +kind: pipeline +type: docker +name: default + +steps: +- name: build + image: golang:latest + environment: + GO111MODULE: on + GOPROXY: https://goproxy.cn,direct + commands: + - go build \ No newline at end of file diff --git a/data/data.go b/data/data.go index 0ad59c2..424bf61 100644 --- a/data/data.go +++ b/data/data.go @@ -1 +1,42 @@ package data + +import ( + "context" + "datart/data/influx" + + "github.com/redis/go-redis/v9" +) + +type Process struct { + cancel context.CancelFunc +} + +func NewProcess() *Process { + return new(Process) +} + +func (p *Process) StartDataProcessing() { + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + updatingRedisPhasor(ctx) +} + +func (p *Process) Cancel() { + p.cancel() +} + +type zUnit struct { + Key string + Members []redis.Z +} + +func convertTVsToMenmbers(tvs []influx.TV) []redis.Z { + members := make([]redis.Z, len(tvs)) + + for i, tv := range tvs { + members[i].Member = tv.Time + members[i].Score = tv.Value + } + + return members +} diff --git a/data/influx/common.go b/data/influx/common.go index 304c0b4..e930c31 100644 --- a/data/influx/common.go +++ b/data/influx/common.go @@ -16,22 +16,6 @@ import ( "time" ) -// for influx json response data -type jsonResp struct { - Results []*result `json:"results"` -} - -type result struct { - StatementID int `json:"statement_id"` - Series []*fields `json:"series"` -} - -type fields struct { - Name string `json:"name"` - Column []string `json:"columns"` - Values [][]any `json:"values"` -} - // line protocol, better to gzip and sort tags by key in lexicographic order func (client *influxClient) writeLinesData(ctx context.Context, db string, data []byte, compress bool) error { @@ -77,6 +61,22 @@ func (client *influxClient) writeLinesData(ctx context.Context, db string, return nil } +// for influx json response data +type jsonResp struct { + Results []*result `json:"results"` +} + +type result struct { + StatementID int `json:"statement_id"` + Series []*fields `json:"series"` +} + +type fields struct { + Name string `json:"name"` + Column []string `json:"columns"` + Values [][]any `json:"values"` +} + // respType json/csv // json_time:"2024-12-18T08:12:21.4735154Z" // csv_time:"1734572793695885000" @@ -167,9 +167,11 @@ func respDataToTVs(respData []byte, respType string) ([]TV, error) { if len(rows) > 1 { return convertCsvToTVs(rows[1:]) } + default: + return nil, errors.New("invalid response type") } - return nil, errors.New("unsupported response type") + return []TV{}, nil } func respDataToF2TVs(respData []byte, respType string) (map[string][]TV, error) { @@ -193,9 +195,11 @@ func respDataToF2TVs(respData []byte, respType string) (map[string][]TV, error) if len(rows) > 1 { return convertCsvToF2TVs(rows) } + default: + return nil, errors.New("invalid response type") } - return nil, errors.New("unsupported response type") + return map[string][]TV{}, nil } // measure at different times @@ -208,7 +212,7 @@ func convertJsonToTVs(data [][]any) ([]TV, error) { if !ok { return nil, errors.New("not expected data type") } - t, err := time.Parse("2006-01-02T15:04:05.99Z", tstr) + t, err := time.Parse("2006-01-02T15:04:05.999999Z", tstr) if err != nil { return nil, err } @@ -236,7 +240,7 @@ func convertJsonToF2TVs(cols []string, data [][]any) (map[string][]TV, error) { if !ok { return nil, errors.New("not expected data type") } - t, err := time.Parse("2006-01-02T15:04:05.99Z", tstr) + t, err := time.Parse("2006-01-02T15:04:05.999999Z", tstr) if err != nil { return nil, err } diff --git a/data/influx/influx.go b/data/influx/influx.go index c662ff9..736fc49 100644 --- a/data/influx/influx.go +++ b/data/influx/influx.go @@ -3,6 +3,7 @@ package influx import ( "context" "datart/config" + "errors" "net" "net/http" "time" @@ -17,6 +18,7 @@ type influxClient struct { type Request struct { RespType string + Bucket string Measure string Station string MainPos string @@ -28,6 +30,11 @@ type Request struct { Default string } +const ( + PhasorBucket = "influxBucket" + SampleBucket = "influxBucket" +) + var client *influxClient func init() { @@ -64,8 +71,39 @@ func NewInfluxClient(cli *http.Client, url, org, token string) *influxClient { } } -func WriteLinesData(ctx context.Context, data []byte) error { - return client.WriteLinesData(ctx, data) +func GetBucket(tp string) (string, error) { + switch tp { + case "phasor": + return PhasorBucket, nil + case "sample": + return SampleBucket, nil + } + + return "", errors.New("invalid type") +} + +// serverConf +func GetMeasurement(tp string, mainPos string) (string, error) { + switch tp { + case "phasor": + ssu2Type := config.Conf().ServerConf().GetSSUType() + switch ssu2Type[mainPos] { + case 1: + return "current", nil + case 2: + return "voltage", nil + default: + return "", errors.New("invalid main_pos") + } + case "sample": + return "sample", nil + } + + return "", errors.New("invalid type") +} + +func WriteLinesData(ctx context.Context, bucket string, data []byte) error { + return client.WriteLinesData(ctx, bucket, data) } type TV struct { @@ -83,15 +121,20 @@ func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[st return client.GetSSUPointsLastLimit(ctx, req, limit) } -func GetSSUPointData(ctx context.Context, req *Request) ([]TV, error) { - return client.GetSSUPointData(ctx, req) +func GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) { + return client.GetSSUPointDurationData(ctx, req) } -func GetSSUPointAfterOne(ctx context.Context, req *Request) ([]TV, error) { - return client.GetSSUPointAfterOne(ctx, req) +func GetSSUPointsDurationData(ctx context.Context, req *Request) (map[string][]TV, error) { + return client.GetSSUPointsDurationData(ctx, req) } -func GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]TV, error) { - req.Begin = req.End - 20 - 20 - return client.GetSSUPointBeforeOne(ctx, req) +func GetSSUPointsAfterLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { + req.End = req.Begin + int64(limit*20+20) + return client.GetSSUPointsAfterLimit(ctx, req, limit) +} + +func GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { + req.Begin = req.End - int64(limit*20+20) + return client.GetSSUPointsBeforeLimit(ctx, req, limit) } diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go index 8afdfdf..75d8681 100644 --- a/data/influx/ssu_point.go +++ b/data/influx/ssu_point.go @@ -7,10 +7,6 @@ import ( "strings" ) -const ( - bucket string = "influxBucket" -) - const ( FieldCPrifix string = "c" FieldIPrefix string = "i" @@ -41,7 +37,7 @@ func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Reque } reqData := url.Values{ - "db": {bucket}, + "db": {req.Bucket}, "q": {sql}, } @@ -63,14 +59,14 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ } reqData := url.Values{ - "db": {bucket}, + "db": {req.Bucket}, "q": {sql}, } return client.getF2TVsResp(ctx, reqData, req.RespType) } -func (client *influxClient) GetSSUPointData(ctx context.Context, req *Request) ([]TV, error) { +func (client *influxClient) GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) { sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;", req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End) @@ -80,35 +76,69 @@ func (client *influxClient) GetSSUPointData(ctx context.Context, req *Request) ( } reqData := url.Values{ - "db": {bucket}, + "db": {req.Bucket}, "q": {sql}, } return client.getTVsResp(ctx, reqData, req.RespType) } -func (client *influxClient) GetSSUPointAfterOne(ctx context.Context, req *Request) ([]TV, error) { - sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms limit 1;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin) +func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *Request) (map[string][]TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End) + + if req.Operate != "" && req.Step != "" && req.Default != "" { + subPoss := strings.Split(req.SubPos, ",") + selectSections := make([]string, len(subPoss)) + for i, subPos := range subPoss { + selectSections[i] = req.Operate + "(" + subPos + ")" + " as " + subPos + } + sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms group by time(%s) fill(%s);", + strings.Join(selectSections, ", "), req.Measure, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default) + } reqData := url.Values{ - "db": {bucket}, + "db": {req.Bucket}, + "q": {sql}, + } + + return client.getF2TVsResp(ctx, reqData, req.RespType) +} + +func (client *influxClient) GetSSUPointAfterLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms limit %d;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, limit) + + reqData := url.Values{ + "db": {req.Bucket}, "q": {sql}, } return client.getTVsResp(ctx, reqData, req.RespType) } -func (client *influxClient) GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]TV, error) { +func (client *influxClient) GetSSUPointsAfterLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms limit %d;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, limit) + reqData := url.Values{ - "db": {bucket}, - "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms and time<=%dms order by time desc limit 1;", - req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End)}, // begin = req.End-20-20 + "db": {req.Bucket}, + "q": {sql}, } - return client.getTVsResp(ctx, reqData, req.RespType) + return client.getF2TVsResp(ctx, reqData, req.RespType) } -func (client *influxClient) WriteLinesData(ctx context.Context, data []byte) error { +func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { + reqData := url.Values{ + "db": {req.Bucket}, + "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms and time<=%dms order by time desc limit %d;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, limit)}, // begin = req.End-20-20 + } + + return client.getF2TVsResp(ctx, reqData, req.RespType) +} + +func (client *influxClient) WriteLinesData(ctx context.Context, bucket string, data []byte) error { return client.writeLinesData(ctx, bucket, data, true) } diff --git a/data/mongo/alarm.go b/data/mongo/alarm.go index 17a3770..73035a0 100644 --- a/data/mongo/alarm.go +++ b/data/mongo/alarm.go @@ -25,7 +25,7 @@ type Alarm struct { DeviceNo string `bson:"device_no" json:"device_no"` AlarmCode int `bson:"alarm_code" json:"alarm_code"` AlarmTime int64 `bson:"alarm_time" json:"alarm_time"` // ms - AlarmStatus int `bson:"alarm_status" josn:"alarm_status"` // 0 "复位", 1 "动作/产生/告警" + AlarmStatus int `bson:"alarm_status" json:"alarm_status"` // 0 "复位", 1 "动作/产生/告警" } var almCode2Name = map[int]string{ @@ -52,11 +52,11 @@ func (a *Alarm) ConvertToEvent(ip string) *Event { e.EventUUID = uuid.NewString() e.Type = genEventType(0, 2) e.Priority = 5 - e.Status = eventStatusHappen + e.Status = EventStatusHappen e.Timestamp = a.AlarmTime / 1e3 // TODO ms ? e.From = "station" e.Operations = append(e.Operations, &operation{ - Action: eventActionHappened, // TODO + Action: EventActionHappened, // TODO OP: ip, TS: a.AlarmTime / 1e3, }) diff --git a/data/mongo/event.go b/data/mongo/event.go index 5682f28..33263f3 100644 --- a/data/mongo/event.go +++ b/data/mongo/event.go @@ -14,16 +14,16 @@ const ( ) const ( - eventStatusHappen = iota - eventStatusDataAt - eventStatusReport - eventStatusConfirm - eventStatusPersist - eventStatusClose + EventStatusHappen = iota + EventStatusDataAt + EventStatusReport + EventStatusConfirm + EventStatusPersist + EventStatusClose ) const ( - eventActionHappened = "happened" + EventActionHappened = "happened" ) type operation struct { @@ -69,12 +69,14 @@ func DeleteEvents[T bson.M | *bson.D](ctx context.Context, filter T) error { return err } +// insert if not update func UpdateOneEvent[T bson.M | *bson.D](ctx context.Context, filter T, update T) error { opts := options.UpdateOne().SetUpsert(true) _, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update, opts) return err } +// insert if not update func UpdateEvents[T bson.M | *bson.D](ctx context.Context, filter T, update T) error { opts := options.UpdateMany().SetUpsert(true) _, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update, opts) @@ -109,7 +111,9 @@ func FindEvents[T bson.M | *bson.D](ctx context.Context, filter T) ([]*Event, er return docs, nil } -func FindEventsInBatch[T bson.M | *bson.D](ctx context.Context, filter T, batchSize int32) ([]*Event, error) { +func FindEventsInBatch[T bson.M | *bson.D](ctx context.Context, filter T, + batchSize int32) ([]*Event, error) { + opt := options.Find().SetBatchSize(batchSize) cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt) if err != nil { @@ -129,6 +133,30 @@ func FindEventsInBatch[T bson.M | *bson.D](ctx context.Context, filter T, batchS return docs, nil } +func FindEventsWithPageLimit[T bson.M | *bson.D](ctx context.Context, filter T, + sort int, page int64, limit int64) ([]*Event, error) { + // TODO + opt := options.Find().SetSort(bson.D{{Key: "timestamp", Value: sort}}). + SetSkip(limit * (page - 1)).SetLimit(limit) + + cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + var docs []*Event + for cursor.Next(ctx) { + doc := new(Event) + if err := cursor.Decode(doc); err != nil { + return nil, err + } + docs = append(docs, doc) + } + + return docs, nil +} + // sys: 0-hard/1-platform/2-application // // level:1-info/2-warn/3-error diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go index 663d1dd..c5ae795 100644 --- a/data/postgres/measurement.go +++ b/data/postgres/measurement.go @@ -2,7 +2,10 @@ package postgres import ( "context" + "database/sql/driver" + "encoding/json" "errors" + "sync/atomic" ) const ( @@ -10,8 +13,8 @@ const ( ) const ( - ChannelCPrefix string = "Telemetry" - ChannelIPrefix string = "Telesignal" + ChannelCPrefix string = "TM" + ChannelIPrefix string = "TS" ChannelP string = "P" ChannelQ string = "Q" ChannelS string = "S" @@ -21,32 +24,87 @@ const ( ChannelUPrefix string = "U" ) -type addrSSU struct { - Station string `json:"station"` - Device string `json:"device"` - Channel string `json:"channel"` -} - type dataSource struct { Type int `json:"type"` Addr any `json:"io_address"` } +func (ds *dataSource) Scan(value any) error { + if value == nil { + return nil + } + bytes, ok := value.([]byte) + if !ok { + return errors.New("type assertion to []byte failed") + } + return json.Unmarshal(bytes, ds) +} + +func (ds *dataSource) Value() (driver.Value, error) { + return json.Marshal(ds) +} + type measurement struct { ID int64 `gorm:"colunmn:id"` Tag string `gorm:"column:tag"` Size int `gorm:"column:size"` - DataSource *dataSource `gorm:"type:jsonb;column:data_source"` + DataSource *dataSource `gorm:"column:data_source;type:jsonb"` // mapping TODO } -type channelSize struct { - addrSSU - Size int +type ChannelSize struct { + Station string + Device string + Channel string + Size int } // channel is original -var SSU2ChannelSizes map[string][]channelSize +var SSU2ChannelSizes atomic.Value + +func init() { + SSU2ChannelSizes.Store(map[string][]ChannelSize{}) +} + +func LoadSSU2ChannelSizes() map[string][]ChannelSize { + v := SSU2ChannelSizes.Load() + if v == nil { + return nil + } + return v.(map[string][]ChannelSize) +} + +func StoreSSU2ChannelSizes(m map[string][]ChannelSize) { + SSU2ChannelSizes.Store(m) +} + +func GetSSU2ChannelSizesCopy() map[string][]ChannelSize { + src := LoadSSU2ChannelSizes() + if src == nil { + return nil + } + out := make(map[string][]ChannelSize, len(src)) + for k, v := range src { + cp := make([]ChannelSize, len(v)) + copy(cp, v) + out[k] = cp + } + return out +} + +func GetSSU2ChannelSizesFor(ssu string) []ChannelSize { + src := LoadSSU2ChannelSizes() + if src == nil { + return nil + } + v, ok := src[ssu] + if !ok { + return nil + } + cp := make([]ChannelSize, len(v)) + copy(cp, v) + return cp +} func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) { var totalRecords []*measurement @@ -72,8 +130,9 @@ func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) return totalRecords, nil } -func GenSSU2ChannelSize(ctx context.Context, batchSize int) error { +func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error { id := int64(0) + ssu2ChannelSizes := make(map[string][]ChannelSize) for { var records []*measurement result := client.WithContext(ctx).Table(tbmeasurement). @@ -93,7 +152,7 @@ func GenSSU2ChannelSize(ctx context.Context, batchSize int) error { addrType := record.DataSource.Type addr := record.DataSource.Addr - if err := genMappingFromAddr(addrType, addr, record.Size); err != nil { + if err := genMappingFromAddr(ssu2ChannelSizes, addrType, addr, record.Size); err != nil { return err } } @@ -101,33 +160,40 @@ func GenSSU2ChannelSize(ctx context.Context, batchSize int) error { id = records[length-1].ID } + StoreSSU2ChannelSizes(ssu2ChannelSizes) + return nil } -func genMappingFromAddr(addrType int, addr any, size int) error { +func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, addrType int, addr any, size int) error { switch addrType { case 1: - if ins, ok := addr.(*addrSSU); ok { - channelSize := genChannelSize(ins, size) - SSU2ChannelSizes[ins.Device] = append(SSU2ChannelSizes[ins.Device], channelSize) + if rawAddr, ok := addr.(map[string]interface{}); ok { + station, ok := rawAddr["station"].(string) + if !ok { + return errors.New("invalid station") + } + device, ok := rawAddr["device"].(string) + if !ok { + return errors.New("invalid device") + } + channel, ok := rawAddr["channel"].(string) + if !ok { + return errors.New("invalid channel") + } + ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], ChannelSize{ + Station: station, + Device: device, + Channel: channel, + Size: size, + }) } else { - return errors.New("io_address not valid") + return errors.New("invalid io_address") } default: - return errors.New("data_source.type not valid") + return errors.New("invalid data_source.type") } return nil } - -func genChannelSize(addr *addrSSU, size int) channelSize { - return channelSize{ - addrSSU{ - Station: addr.Station, - Device: addr.Device, - Channel: addr.Channel, - }, - size, - } -} diff --git a/data/publish_event.go b/data/publish_event.go index 0ad59c2..ec5c5bb 100644 --- a/data/publish_event.go +++ b/data/publish_event.go @@ -1 +1,71 @@ package data + +import ( + "context" + "datart/data/mongo" + "datart/data/rabbit" + "datart/log" + "fmt" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +var eventXQR = rabbit.XQR{ + ExchangeName: "event_produce_exchange", + QueueName: "event_produce_queue", + RoutingKey: "event_produce_route", +} + +var eventPublisher *rmq.Publisher + +func init() { + publisher, err := rabbit.NewPublisher(context.Background(), "default", &eventXQR) + if err != nil { + panic(err) + } + eventPublisher = publisher +} + +func PublishEvent(ctx context.Context, event *mongo.Event) error { + if err := mongo.InsertOneEvent(ctx, event); err != nil { + return err + } + + data, err := event.Marshall() + if err != nil { + return err + } + + result, err := eventPublisher.Publish(ctx, rmq.NewMessage(data)) + if err != nil { + return err + } + + switch result.Outcome.(type) { + case *rmq.StateAccepted: + // TODO: "Message accepted" + case *rmq.StateReleased: + return fmt.Errorf("message not routed: %v", event) + case *rmq.StateRejected: + return fmt.Errorf("message rejected: %v", event) + default: + return fmt.Errorf("invalid message %v state: %v", event, result.Outcome) + } + + return nil +} + +func GenEvent(data []byte, ip string) (*mongo.Event, error) { + alarm, err := mongo.UnmarshallToAlarm(data) + if err != nil { + return nil, err + } + + return alarm.ConvertToEvent(ip), nil +} + +func CloseEventPublisher(ctx context.Context) { + if err := eventPublisher.Close(ctx); err != nil { + log.Error(err) + } +} diff --git a/data/update_phasor.go b/data/update_phasor.go index 0ad59c2..547a93f 100644 --- a/data/update_phasor.go +++ b/data/update_phasor.go @@ -1 +1,142 @@ package data + +import ( + "context" + "datart/config" + "datart/data/influx" + "datart/data/postgres" + "datart/data/redis" + "datart/log" + "strings" + "time" +) + +const ( + duration time.Duration = time.Second * 5 +) + +func updatingRedisPhasor(ctx context.Context) { + ssuType := config.Conf().ServerConf().GetSSUType() + ssuChans := make(map[string]chan zUnit, len(ssuType)) + for ssu := range ssuType { + ssuChans[ssu] = make(chan zUnit, 32) + } + + go queringSSUInfluxPhasor(ctx, ssuChans) + go updatingSSURedisZUnit(ctx, ssuChans) +} + +func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) { + ssuType := config.Conf().ServerConf().GetSSUType() + for ssu := range ssuType { + go func(ssu string) { + timer := time.Tick(duration) + for { + select { + case <-timer: + channelSizes := postgres.GetSSU2ChannelSizesFor(ssu) + for _, channelSize := range channelSizes { + sendSSUZUnit(ctx, channelSize, ssuChans[ssu]) + } + case <-ctx.Done(): + return + } + } + }(ssu) + } +} + +func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) { + ssuType := config.Conf().ServerConf().GetSSUType() + for ssu := range ssuType { + go func(ssu string) { + for { + select { + case unit := <-ssuChans[ssu]: + if err := updateZUnitToRedis(ctx, unit); err != nil { + log.Error(err) + } + case <-ctx.Done(): + return + } + } + }(ssu) + } +} + +func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) { + fields := genPhasorFields(channelSize.Channel) + f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device, + fields, channelSize.Size) + if err != nil { + log.Error(err) + } + + for field, tvs := range f2tvs { + key := genRedisPhasorKey(channelSize.Station, channelSize.Device, field) + members := convertTVsToMenmbers(tvs) + ssuChan <- zUnit{ + Key: key, + Members: members, + } + } +} + +func queryInfluxPhasor(ctx context.Context, station string, device string, + fileds []string, size int) (map[string][]influx.TV, error) { + measure, err := influx.GetMeasurement("phasor", device) + if err != nil { + return nil, err + } + bucket, err := influx.GetBucket("phasor") + if err != nil { + return nil, err + } + + req := &influx.Request{ + RespType: "csv", + Bucket: bucket, + Measure: measure, + Station: station, + MainPos: device, + SubPos: strings.Join(fileds, ","), + } + return influx.GetSSUPointsLastLimit(ctx, req, size) +} + +func updateZUnitToRedis(ctx context.Context, unit zUnit) error { + return redis.ZAtomicReplace(ctx, unit.Key, unit.Members) +} + +func genPhasorFields(channel string) []string { + fields := make([]string, 0, 3) + switch { + case strings.HasPrefix(channel, postgres.ChannelCPrefix): + if after, ok := strings.CutPrefix(channel, postgres.ChannelCPrefix); ok { + fields = append(fields, + influx.FieldCPrifix+after+influx.FieldSuffixAMP, + influx.FieldCPrifix+after+influx.FieldSuffixPA, + influx.FieldCPrifix+after+influx.FieldSuffixRMS) + } + case strings.HasPrefix(channel, postgres.ChannelIPrefix): + if after, ok := strings.CutPrefix(channel, postgres.ChannelCPrefix); ok { + fields = append(fields, influx.FieldIPrefix+after) + } + case strings.HasPrefix(channel, postgres.ChannelUPrefix): + fieldUPrefix := strings.ToLower(channel) + fields = append(fields, fieldUPrefix+influx.FieldSuffixAMP, + fieldUPrefix+influx.FieldSuffixPA, fieldUPrefix+influx.FieldSuffixRMS) + case channel == postgres.ChannelDF: + fields = append(fields, influx.FieldDF) + case channel == postgres.ChannelP, channel == postgres.ChannelQ, + channel == postgres.ChannelS, channel == postgres.ChannelPF, + channel == postgres.ChannelF: + fields = append(fields, strings.ToLower(channel)) + } + + return fields +} + +func genRedisPhasorKey(station, device, field string) string { + return strings.Join([]string{station, device, "phasor", field}, ":") +} diff --git a/go.mod b/go.mod index 7f9581e..d299450 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.24.0 require ( github.com/Azure/go-amqp v1.5.0 + github.com/gin-gonic/gin v1.11.0 github.com/google/uuid v1.6.0 github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0 github.com/redis/go-redis/v9 v9.14.0 @@ -16,8 +17,18 @@ require ( ) require ( + github.com/bytedance/sonic v1.14.0 // indirect + github.com/bytedance/sonic/loader v0.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gabriel-vasile/mimetype v1.4.8 // indirect + github.com/gin-contrib/sse v1.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.27.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/goccy/go-yaml v1.18.0 // indirect github.com/golang/snappy v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect @@ -25,13 +36,30 @@ require ( github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.16.7 // indirect - github.com/stretchr/testify v1.9.0 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/quic-go/qpack v0.5.1 // indirect + github.com/quic-go/quic-go v0.54.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.3.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect + go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/crypto v0.33.0 // indirect - golang.org/x/sync v0.15.0 // indirect - golang.org/x/text v0.26.0 // indirect + golang.org/x/arch v0.20.0 // indirect + golang.org/x/crypto v0.40.0 // indirect + golang.org/x/mod v0.25.0 // indirect + golang.org/x/net v0.42.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.27.0 // indirect + golang.org/x/tools v0.34.0 // indirect + google.golang.org/protobuf v1.36.9 // indirect ) diff --git a/go.sum b/go.sum index 1388790..4ca6138 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,14 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= +github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= +github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= +github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -13,16 +19,35 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= +github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= +github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w= +github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= +github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk= +github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4= +github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= +github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8= github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -39,23 +64,50 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus= github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8= github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY= github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= +github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= +github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= +github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0 h1:q0zPF8/7Bdm+XwjWevFynB8fNiuE65x4q2vmFxU2cjM= github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0/go.mod h1:t5oaK/4mJjw9dNpDzwvH6bE7p9XtM1JyObEHszFu3lU= github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE= github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA= +github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -71,45 +123,54 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c= +golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= +golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= +golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= -golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= +golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= -golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc= -golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI= +golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= +golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= +google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= diff --git a/main.go b/main.go index 06ab7d0..0deb7e8 100644 --- a/main.go +++ b/main.go @@ -1 +1,25 @@ package main + +import ( + "datart/config" + "datart/data" + "datart/route" + "strconv" + + "github.com/gin-gonic/gin" +) + +func main() { + gin.SetMode(gin.ReleaseMode) + engine := gin.New() + + route.LoadRoute(engine) + + process := data.NewProcess() + process.StartDataProcessing() + + port := strconv.Itoa(config.Conf().ServerConf().GetPort()) + if err := engine.Run(":" + port); err != nil { + panic(err) + } +} diff --git a/route/api/alarm.go b/route/api/alarm.go new file mode 100644 index 0000000..931648f --- /dev/null +++ b/route/api/alarm.go @@ -0,0 +1,56 @@ +package api + +import ( + "datart/data/mongo" + "datart/log" + "errors" + "regexp" + + "github.com/gin-gonic/gin" +) + +func (a *Api) PostInsertAlarm(ctx *gin.Context) { + alarm, ip, err := a.checkAndGenInsertAlarmRequest(ctx) + if err != nil { + log.Error(err) + ctx.JSON(200, gin.H{ + "code": 1, + "msg": "invalid param", + }) + return + } + + _ = alarm + _ = ip + + ctx.JSON(200, gin.H{ + "code": 0, + "msg": "success", + }) +} + +func (a *Api) checkAndGenInsertAlarmRequest(ctx *gin.Context) (*mongo.Alarm, string, error) { + alarm := new(mongo.Alarm) + err := ctx.ShouldBindJSON(alarm) + if err != nil { + return nil, "", err + } + + ok, err := regexp.MatchString(`ssu\d{3}`, alarm.DeviceNo) + if err != nil { + return nil, "", err + } + if !ok { + return nil, "", errors.New("invalid device_no") + } + + if alarm.AlarmCode < 1 || alarm.AlarmCode > 10 { + return nil, "", errors.New("invalid alarm_code") + } + + if alarm.AlarmStatus < 0 || alarm.AlarmStatus > 1 { + return nil, "", errors.New("invalid alarm_status") + } + + return alarm, ctx.RemoteIP(), nil +} diff --git a/route/api/api.go b/route/api/api.go new file mode 100644 index 0000000..41aaa1e --- /dev/null +++ b/route/api/api.go @@ -0,0 +1,3 @@ +package api + +type Api struct{} diff --git a/route/api/event.go b/route/api/event.go new file mode 100644 index 0000000..778f64e --- /dev/null +++ b/route/api/event.go @@ -0,0 +1 @@ +package api diff --git a/route/api/point.go b/route/api/point.go new file mode 100644 index 0000000..778f64e --- /dev/null +++ b/route/api/point.go @@ -0,0 +1 @@ +package api diff --git a/route/route.go b/route/route.go new file mode 100644 index 0000000..5055d46 --- /dev/null +++ b/route/route.go @@ -0,0 +1,15 @@ +package route + +import ( + "datart/route/api" + + "github.com/gin-gonic/gin" +) + +func LoadRoute(engine *gin.Engine) { + engine.Use(gin.Recovery()) // TODO + + a := new(api.Api) + ga := engine.Group("api") + ga.POST("/alarm", a.PostInsertAlarm) +} diff --git a/util/util.go b/util/util.go new file mode 100644 index 0000000..834b3a7 --- /dev/null +++ b/util/util.go @@ -0,0 +1,13 @@ +package util + +import ( + "strconv" +) + +func ConvertToTimestampDefault(tsStr string, defaultTS int64) int64 { + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + return defaultTS + } + return ts +}