From 0012c37d60c8b422047d44d980c69f3f4eb4b228 Mon Sep 17 00:00:00 2001 From: zhuxu Date: Thu, 20 Nov 2025 20:58:51 +0800 Subject: [PATCH] update files --- .gitignore | 3 +- config/postgres.go | 18 --- config/rabbit.go | 224 +---------------------------------- configs/postgres.json | 1 - data/common.go | 62 ++++++++++ data/influx/influx.go | 2 +- data/influx/ssu_point.go | 4 +- data/mongo/alarm.go | 22 +++- data/mongo/event.go | 6 +- data/mongo/mongo.go | 2 +- data/postgres/measurement.go | 39 ++++-- data/postgres/postgres.go | 8 +- data/publish_event.go | 53 +++++---- data/rabbit/consume.go | 4 +- data/rabbit/manage.go | 10 +- data/rabbit/publish.go | 17 +-- data/rabbit/rabbit.go | 21 +--- data/redis/redis.go | 2 +- data/update_phasor.go | 56 +++------ go.mod | 4 +- route/admin/command.go | 8 +- route/api/alarm.go | 17 ++- route/api/event.go | 63 ++++++++-- route/api/point.go | 39 ++++-- route/route.go | 2 +- 25 files changed, 285 insertions(+), 402 deletions(-) create mode 100644 data/common.go diff --git a/.gitignore b/.gitignore index 4d0bad4..220bfe0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .vscode -logs \ No newline at end of file +logs +.idea \ No newline at end of file diff --git a/config/postgres.go b/config/postgres.go index d20f2f2..9f88a48 100644 --- a/config/postgres.go +++ b/config/postgres.go @@ -6,7 +6,6 @@ type postgresConfig struct { User string `json:"user" yaml:"user"` Password string `json:"password" yaml:"password"` DBName string `json:"dbname" yaml:"dbname"` - SSLMode string `json:"sslmode" yaml:"sslmode"` TimeZone string `json:"timezone" yaml:"timezone"` } @@ -99,23 +98,6 @@ func (conf *postgresConfig) SetDBName(dbName string) *postgresConfig { return conf } -func (conf *postgresConfig) GetSSLMode() string { - if conf == nil { - panic("postgres config is nil") - } - - return conf.SSLMode -} - -func (conf *postgresConfig) SetSSLMode(sslMode string) *postgresConfig { - if conf == nil { - panic("postgres config is nil") - } - conf.SSLMode = sslMode - - return conf -} - func (conf *postgresConfig) GetTimeZone() string { if conf == nil { panic("postgres config is nil") diff --git a/config/rabbit.go b/config/rabbit.go index b6d1dd8..04b19f8 100644 --- a/config/rabbit.go +++ b/config/rabbit.go @@ -1,30 +1,9 @@ package config -import ( - "crypto/tls" - "crypto/x509" - "encoding/pem" - "errors" - "fmt" - "os" - - "github.com/youmark/pkcs8" -) - -type tlsConfig struct { - CAPath string `json:"capath" yaml:"capath"` - KeyPath string `json:"keypath" yaml:"keypath"` - CertPath string `json:"certpath" yaml:"certpath"` - Password string `json:"password" yaml:"password"` - SkipVerify bool `json:"skipverify" yaml:"skipverify"` - ServerName string `json:"servername" yaml:"servername"` -} - type rabbitConfig struct { - Broker string `json:"broker" yaml:"broker"` - Username string `json:"username" yaml:"username"` - Password string `json:"password" yaml:"password"` - TLS *tlsConfig `json:"tls" yaml:"tls"` + Broker string `json:"broker" yaml:"broker"` + Username string `json:"username" yaml:"username"` + Password string `json:"password" yaml:"password"` } func NewRabbitConfig() *rabbitConfig { @@ -99,203 +78,6 @@ func (conf *rabbitConfig) SetPassword(password string) *rabbitConfig { return conf } -func (conf *rabbitConfig) InitTLS() *rabbitConfig { - if conf == nil { - panic("rabbit config is nil") - } - conf.TLS = new(tlsConfig) - - return conf -} - -func (conf *rabbitConfig) GetTLS() *tlsConfig { - if conf == nil { - panic("rabbit config is nil") - } - - return conf.TLS -} - -func (conf *tlsConfig) GetCAPath() string { - if conf == nil { - panic("rabbit tls is nil") - } - - return conf.CAPath -} - -func (conf *tlsConfig) SetCAPath(caPath string) *tlsConfig { - if conf == nil { - panic("rabbit tls is nil") - } - conf.CAPath = caPath - - return conf -} - -func (conf *tlsConfig) GetKeyPath() string { - if conf == nil { - panic("rabbit tls is nil") - } - - return conf.KeyPath -} - -func (conf *tlsConfig) SetKeyPath(keyPath string) *tlsConfig { - if conf == nil { - panic("rabbit tls is nil") - } - conf.KeyPath = keyPath - - return conf -} - -func (conf *tlsConfig) GetCertPath() string { - if conf == nil { - panic("rabbit tls is nil") - } - - return conf.CertPath -} - -func (conf *tlsConfig) SetCertPath(certPath string) *tlsConfig { - if conf == nil { - panic("rabbit tls is nil") - } - - conf.CertPath = certPath - - return conf -} - -func (conf *tlsConfig) GetPassword() string { - if conf == nil { - panic("rabbit tls is nil") - } - - return conf.Password -} - -func (conf *tlsConfig) SetPassword(password string) *tlsConfig { - if conf == nil { - panic("rabbit tls is nil") - } - conf.Password = password - - return conf -} - -func (conf *tlsConfig) GetSkipVerify() bool { - if conf == nil { - panic("rabbit tls is nil") - } - - return conf.SkipVerify -} - -func (conf *tlsConfig) SetSkipVerify(skipVerify bool) *tlsConfig { - if conf == nil { - panic("rabbit tls is nil") - } - conf.SkipVerify = skipVerify - - return conf -} - -func (conf *tlsConfig) GetServerName() string { - if conf == nil { - panic("rabbit tls is nil") - } - - return conf.ServerName -} - -func (conf *tlsConfig) SetServerName(serverName string) *tlsConfig { - if conf == nil { - panic("rabbit tls is nil") - } - conf.ServerName = serverName - - return conf -} - -func (conf *tlsConfig) GenTLSConfig(tag string) (*tls.Config, error) { - if conf == nil { - return nil, nil - } - - if conf.GetCAPath() == "" || conf.GetCertPath() == "" || - conf.GetKeyPath() == "" { - return nil, errors.New("rabbit tls not valid") - } - - caPem, err := os.ReadFile(conf.GetCAPath()) - if err != nil { - return nil, err - } - certPool := x509.NewCertPool() - certPool.AppendCertsFromPEM(caPem) - - keyPem, err := os.ReadFile(conf.GetKeyPath()) - if err != nil { - return nil, err - } - certPem, err := os.ReadFile(conf.GetCertPath()) - if err != nil { - return nil, err - } - - pemBlock, err := parsePrivateKey(keyPem, []byte(conf.GetPassword())) - if err != nil { - return nil, err - } - - cliCert, err := tls.X509KeyPair(certPem, pem.EncodeToMemory(pemBlock)) - if err != nil { - return nil, err - } - - return &tls.Config{ - Certificates: []tls.Certificate{cliCert}, - RootCAs: certPool, - ServerName: conf.GetServerName(), - InsecureSkipVerify: conf.GetSkipVerify(), - }, nil -} - -func parsePrivateKey(key, password []byte) (*pem.Block, error) { - block, _ := pem.Decode(key) - if block == nil { - return nil, errors.New("no valid pem") - } - - var privateKey any - var err error - switch block.Type { - case "RSA PRIVATE KEY": - privateKey, err = x509.ParsePKCS1PrivateKey(block.Bytes) - case "PRIVATE KEY": - privateKey, err = x509.ParsePKCS8PrivateKey(block.Bytes) - case "ENCRYPTED PRIVATE KEY": - privateKey, err = pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) - default: - return nil, fmt.Errorf("unsupported key type: %s", block.Type) - } - if err != nil { - return nil, err - } - - pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) - if err != nil { - return nil, err - } - - return &pem.Block{ - Type: "PRIVATE KEY", - Bytes: pemBytes, - }, nil -} - func rabbitConfigName() string { return "rabbit.json" } diff --git a/configs/postgres.json b/configs/postgres.json index fda7e61..6b50ff4 100644 --- a/configs/postgres.json +++ b/configs/postgres.json @@ -5,7 +5,6 @@ "user":"postgres", "password":"123RTYjkl", "dbname":"metamodule", - "sslmode":"disable", "timezone":"Asia/Shanghai" } } \ No newline at end of file diff --git a/data/common.go b/data/common.go new file mode 100644 index 0000000..8ce7f6c --- /dev/null +++ b/data/common.go @@ -0,0 +1,62 @@ +package data + +import ( + "datart/data/influx" + "datart/data/postgres" + "strings" + + "github.com/redis/go-redis/v9" +) + +func GenPhasorFields(channel string) []string { + fields := make([]string, 0, 3) + + switch { + case strings.HasPrefix(channel, postgres.ChannelYCPrefix): + + fieldPrefix := strings.ToLower(channel) + fields = append(fields, + fieldPrefix+influx.FieldSuffixAMP, + fieldPrefix+influx.FieldSuffixPA, + fieldPrefix+influx.FieldSuffixRMS) + + case strings.HasPrefix(channel, postgres.ChannelYXPrefix): + + fields = append(fields, strings.ToLower(channel)) + + case strings.HasPrefix(channel, postgres.ChannelUPrefix): + + fieldUPrefix := strings.ToLower(channel) + fields = append(fields, + fieldUPrefix+influx.FieldSuffixAMP, + fieldUPrefix+influx.FieldSuffixPA, + fieldUPrefix+influx.FieldSuffixRMS) + + case channel == postgres.ChannelP, + channel == postgres.ChannelQ, + channel == postgres.ChannelS, + channel == postgres.ChannelPF, + channel == postgres.ChannelF, + channel == postgres.ChannelDF: + + fields = append(fields, strings.ToLower(channel)) + } + + return fields +} + +type zUnit struct { + Key string + Members []redis.Z +} + +func convertTVsToMenmbers(tvs []influx.TV) []redis.Z { + members := make([]redis.Z, len(tvs)) + + for i, tv := range tvs { + members[i].Member = tv.Time + members[i].Score = tv.Value + } + + return members +} diff --git a/data/influx/influx.go b/data/influx/influx.go index 2ee0d0d..e1500f5 100644 --- a/data/influx/influx.go +++ b/data/influx/influx.go @@ -39,7 +39,7 @@ func init() { client.org = influxConfig.GetOrg() } -func Close() { +func CloseDefault() { client.CloseIdleConnections() } diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go index 9571e68..ea2a8fa 100644 --- a/data/influx/ssu_point.go +++ b/data/influx/ssu_point.go @@ -36,12 +36,12 @@ const ( ) func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) { - req.Begin = time.Now().UnixMilli() - int64(limit*20+20) + req.Begin = time.Now().UnixMilli() - int64(limit*20+10000) return client.GetSSUPointLastLimit(ctx, req, limit) } func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) { - req.Begin = time.Now().UnixMilli() - int64(limit*20+20) + req.Begin = time.Now().UnixMilli() - int64(limit*20+10000) return client.GetSSUPointsLastLimit(ctx, req, limit) } diff --git a/data/mongo/alarm.go b/data/mongo/alarm.go index 540de1e..ab40a4f 100644 --- a/data/mongo/alarm.go +++ b/data/mongo/alarm.go @@ -71,25 +71,39 @@ func (a *Alarm) GetPriority() int { return -1 } -func (a *Alarm) ConvertToEvent(ip string) *Event { +func GenEvent(alarm []byte, ip string) (*Event, error) { + a, err := UnmarshallToAlarm(alarm) + if err != nil { + return nil, err + } + + return a.ConvertToEvent(ip) +} + +func (a *Alarm) ConvertToEvent(ip string) (*Event, error) { e := new(Event) + uid, err := uuid.NewV7() + if err != nil { + return nil, err + } + if a != nil { e.Event = a.GetName() - e.EventUUID = uuid.NewString() + e.EventUUID = uid.String() e.Type = a.GetType() e.Priority = a.GetPriority() e.Status = EventStatusHappen e.Timestamp = a.AlarmTime e.From = "station" e.Operations = append(e.Operations, &operation{ - Action: EventActionHappened, // TODO + Action: EventActionHappened, OP: ip, TS: a.AlarmTime, }) e.Alarm = a } - return e + return e, nil } func UnmarshallToAlarm(data []byte) (*Alarm, error) { diff --git a/data/mongo/event.go b/data/mongo/event.go index 6532b27..e4cd131 100644 --- a/data/mongo/event.go +++ b/data/mongo/event.go @@ -41,7 +41,7 @@ type Event struct { Timestamp int64 `bson:"timestamp" json:"timestamp"` From string `bson:"from" json:"from"` Operations []*operation `bson:"operations" json:"operations"` - // TODO complete + // others Alarm *Alarm `bson:"alarm" json:"alarm"` } @@ -49,7 +49,7 @@ func (e *Event) Marshall() ([]byte, error) { return json.Marshal(e) } -func InsertOneEvent(ctx context.Context, doc *Event) error { +func InsertOneEvent(ctx context.Context, doc any) error { _, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc) return err } @@ -140,6 +140,8 @@ func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T, opt := options.Find() if sort == 1 || sort == -1 { opt.SetSort(bson.D{{Key: "timestamp", Value: sort}}) + } else { + opt.SetSort(bson.D{{Key: "_id", Value: 1}}) } if page > 0 && limit > 0 { opt.SetSkip(limit * (page - 1)).SetLimit(limit) diff --git a/data/mongo/mongo.go b/data/mongo/mongo.go index b5ab0de..0de0696 100644 --- a/data/mongo/mongo.go +++ b/data/mongo/mongo.go @@ -41,7 +41,7 @@ func NewMongoClient(opts ...*options.ClientOptions) (*mongo.Client, error) { return mongo.Connect(opts...) } -func Disconnect(ctx context.Context) error { +func CloseDefault(ctx context.Context) error { return client.Disconnect(ctx) } diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go index 69fdefc..accffaf 100644 --- a/data/postgres/measurement.go +++ b/data/postgres/measurement.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "encoding/json" "errors" + "sync" "sync/atomic" ) @@ -49,7 +50,7 @@ type measurement struct { Tag string `gorm:"column:tag"` Size int `gorm:"column:size"` DataSource *dataSource `gorm:"column:data_source;type:jsonb"` - // mapping TODO + // others } type ChannelSize struct { @@ -61,6 +62,7 @@ type ChannelSize struct { // channel is original var SSU2ChannelSizes atomic.Value +var ChannelSizes sync.Map func init() { SSU2ChannelSizes.Store(map[string][]ChannelSize{}) @@ -133,6 +135,7 @@ func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error { id := int64(0) + ssu2Channel2Exist := make(map[string]map[string]struct{}) ssu2ChannelSizes := make(map[string][]ChannelSize) for { var records []*measurement @@ -153,9 +156,8 @@ func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error { continue } - addrType := record.DataSource.Type - addr := record.DataSource.Addr - if err := genMappingFromAddr(ssu2ChannelSizes, addrType, addr, record.Size); err != nil { + if err := genMappingFromAddr(ssu2ChannelSizes, ssu2Channel2Exist, record); err != nil { + return err } } @@ -168,10 +170,12 @@ func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error { return nil } -func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, addrType int, addr any, size int) error { - switch addrType { +func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, + ssu2Channel2Exist map[string]map[string]struct{}, record *measurement) error { + + switch record.DataSource.Type { case 1: - if rawAddr, ok := addr.(map[string]interface{}); ok { + if rawAddr, ok := record.DataSource.Addr.(map[string]interface{}); ok { station, ok := rawAddr["station"].(string) if !ok { return errors.New("invalid station") @@ -184,12 +188,27 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, addrType int, if !ok { return errors.New("invalid channel") } - ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], ChannelSize{ + + if _, ok := ssu2Channel2Exist[device]; !ok { + ssu2Channel2Exist[device] = make(map[string]struct{}) + } + + if _, ok := ssu2Channel2Exist[device][channel]; ok { + return nil + } else { + ssu2Channel2Exist[device][channel] = struct{}{} + } + + channelSize := ChannelSize{ Station: station, Device: device, Channel: channel, - Size: size, - }) + Size: record.Size, + } + + ChannelSizes.Store(record.Tag, channelSize) + + ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], channelSize) } else { return errors.New("invalid io_address") } diff --git a/data/postgres/postgres.go b/data/postgres/postgres.go index 02ffc98..fedf575 100644 --- a/data/postgres/postgres.go +++ b/data/postgres/postgres.go @@ -12,10 +12,10 @@ var client *gorm.DB func init() { postgresConfig := config.Conf().PostgresConf("default") - dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=%s TimeZone=%s", + dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d timezone=%s", postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(), - postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetSSLMode(), - postgresConfig.GetTimeZone()) + postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetTimeZone()) + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{SkipDefaultTransaction: true}) if err != nil { panic(err) @@ -24,7 +24,7 @@ func init() { } // close postgres default client -func Close() error { +func CloseDefault() error { db, err := client.DB() if err != nil { return err diff --git a/data/publish_event.go b/data/publish_event.go index b889d60..e192f0d 100644 --- a/data/publish_event.go +++ b/data/publish_event.go @@ -2,65 +2,70 @@ package data import ( "context" - "datart/data/mongo" "datart/data/rabbit" "datart/log" + "encoding/json" + "errors" "fmt" rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) -var eventXQK = rabbit.XQK{ - Exchange: "event_produce_exchange", - // Key: "event_produce_key", +var eventNotifyXQK = rabbit.XQK{ + Exchange: "event_notify_fanout", } -var eventPublisher *rmq.Publisher +var eventNotifyPublisher *rmq.Publisher func init() { - publisher, err := rabbit.NewPublisher(context.Background(), "default", &eventXQK) + ctx := context.Background() + + m := new(rabbit.Manage) + + rm := rabbit.DefaultManagement() + + rx := &rmq.FanOutExchangeSpecification{ + Name: eventNotifyXQK.Exchange, + } + + m.Init(ctx, rm, rx, nil, nil) + + m.DeclareExchange(ctx) + + publisher, err := rabbit.NewPublisher(ctx, "default", &eventNotifyXQK) if err != nil { panic(err) } - eventPublisher = publisher + eventNotifyPublisher = publisher } -func PublishEvent(ctx context.Context, event *mongo.Event) error { - data, err := event.Marshall() +func PublishEvent(ctx context.Context, event any) error { + data, err := json.Marshal(event) if err != nil { return err } - result, err := eventPublisher.Publish(ctx, rmq.NewMessage(data)) + result, err := eventNotifyPublisher.Publish(ctx, rmq.NewMessage(data)) if err != nil { return err } switch result.Outcome.(type) { case *rmq.StateAccepted: - // TODO: "Message accepted" + // "message accepted" case *rmq.StateReleased: - return fmt.Errorf("message not routed: %v", event) + // "message released" case *rmq.StateRejected: - return fmt.Errorf("message rejected: %v", event) + return errors.New("message rejected") default: - return fmt.Errorf("invalid message %v state: %v", event, result.Outcome) + return fmt.Errorf("invalid message state: %v", result.Outcome) } return nil } -func GenEvent(data []byte, ip string) (*mongo.Event, error) { - alarm, err := mongo.UnmarshallToAlarm(data) - if err != nil { - return nil, err - } - - return alarm.ConvertToEvent(ip), nil -} - func CloseEventPublisher(ctx context.Context) { - if err := eventPublisher.Close(ctx); err != nil { + if err := eventNotifyPublisher.Close(ctx); err != nil { log.Error(err) } } diff --git a/data/rabbit/consume.go b/data/rabbit/consume.go index b4e0c6f..7b5a0c7 100644 --- a/data/rabbit/consume.go +++ b/data/rabbit/consume.go @@ -25,12 +25,10 @@ func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byt deliCtx, err := consumer.Receive(ctx) if errors.Is(err, context.Canceled) { // The consumer was closed correctly - // TODO return } if err != nil { // An error occurred receiving the message - // TODO return } @@ -40,7 +38,7 @@ func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byt err = deliCtx.Accept(ctx) if err != nil { - // TODO + // accept error return } } diff --git a/data/rabbit/manage.go b/data/rabbit/manage.go index 291c5bc..d948863 100644 --- a/data/rabbit/manage.go +++ b/data/rabbit/manage.go @@ -26,16 +26,12 @@ func (m *Manage) Init(ctx context.Context, rm *rmq.AmqpManagement, m.b = rb } -func (m *Manage) DeclareExchangeQueue(ctx context.Context) error { - _, err := m.m.DeclareExchange(ctx, m.x) - if err != nil { - return err - } - - _, err = m.m.DeclareQueue(ctx, m.q) +func (m *Manage) DeclareExchange(ctx context.Context) error { + xinfo, err := m.m.DeclareExchange(ctx, m.x) if err != nil { return err } + m.xName = xinfo.Name() return nil } diff --git a/data/rabbit/publish.go b/data/rabbit/publish.go index 5ced8b4..c05c6ca 100644 --- a/data/rabbit/publish.go +++ b/data/rabbit/publish.go @@ -2,8 +2,6 @@ package rabbit import ( "context" - "fmt" - "time" rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) @@ -29,29 +27,26 @@ func Publishing(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan [] case msg := <-msgChan: result, err := publisher.Publish(ctx, rmq.NewMessage(msg)) if err != nil { - _ = err // TODO - time.Sleep(1 * time.Second) + _ = err // publish error continue } switch result.Outcome.(type) { case *rmq.StateAccepted: - // TODO: "Message accepted" + // "message accepted" case *rmq.StateReleased: - // TODO: "Message not routed" + // "message not routed" case *rmq.StateRejected: - // TODO: "Message rejected" + // "message rejected" default: // *rmp.StateModified // *rmq.StateReceived - // TODO: ("Message state: %v", publishResult.Outcome) } - case <-time.After(time.Second): - // TODO - fmt.Println("second passed") + case <-ctx.Done(): + return } } } diff --git a/data/rabbit/rabbit.go b/data/rabbit/rabbit.go index 04877c8..12edcbf 100644 --- a/data/rabbit/rabbit.go +++ b/data/rabbit/rabbit.go @@ -4,7 +4,6 @@ import ( "context" "datart/config" - "github.com/Azure/go-amqp" rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) @@ -32,30 +31,22 @@ func init() { client.conn = conn } -func Close(ctx context.Context) error { +func CloseDefault(ctx context.Context) error { return client.Close(ctx) } +func DefaultManagement() *rmq.AmqpManagement { + return client.Management() +} + func genEndpoints(tag string) ([]rmq.Endpoint, error) { confs := config.Conf().RabbitConf(tag) endpoints := make([]rmq.Endpoint, len(confs)) for i, conf := range confs { - tlsConfig, err := conf.GetTLS().GenTLSConfig(tag) - if err != nil { - return nil, err - } - var options *rmq.AmqpConnOptions - var tls bool - if tlsConfig != nil { - options = &rmq.AmqpConnOptions{ - SASLType: amqp.SASLTypeExternal(""), - TLSConfig: tlsConfig} - tls = true - } - endpoints[i].Address = conf.GenAddress(tls) + endpoints[i].Address = conf.GenAddress(false) endpoints[i].Options = options } diff --git a/data/redis/redis.go b/data/redis/redis.go index 5a5e16b..8757ba1 100644 --- a/data/redis/redis.go +++ b/data/redis/redis.go @@ -36,7 +36,7 @@ func init() { } // close redis client -func Close() error { +func CloseDefault() error { return client.Close() } diff --git a/data/update_phasor.go b/data/update_phasor.go index 6d99b16..15b7fd4 100644 --- a/data/update_phasor.go +++ b/data/update_phasor.go @@ -12,7 +12,7 @@ import ( ) const ( - duration time.Duration = time.Second * 5 + duration time.Duration = 5 * time.Second ) func updatingRedisPhasor(ctx context.Context) { @@ -65,15 +65,26 @@ func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) } func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) { - fields := genPhasorFields(channelSize.Channel) + fields := GenPhasorFields(channelSize.Channel) f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device, fields, channelSize.Size) if err != nil { log.Error(err) } - for field, tvs := range f2tvs { - key := genRedisPhasorKey(channelSize.Station, channelSize.Device, field) + // if len(f2tvs) <= 0 { + // log.Info(channelSize.Station, " ", channelSize.Device, " ", + // fields, " query none of ", channelSize.Size) + // } + + for f, tvs := range f2tvs { + sdf := strings.Split(f, ".") + if len(sdf) != 3 { + log.Error("invalid influx field") + return + } + + key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2]) members := convertTVsToMenmbers(tvs) ssuChan <- zUnit{ Key: key, @@ -109,43 +120,6 @@ func updateZUnitToRedis(ctx context.Context, unit zUnit) error { return redis.ZAtomicReplace(ctx, unit.Key, unit.Members) } -func genPhasorFields(channel string) []string { - fields := make([]string, 0, 3) - - switch { - case strings.HasPrefix(channel, postgres.ChannelYCPrefix): - - fieldPrefix := strings.ToLower(channel) - fields = append(fields, - fieldPrefix+influx.FieldSuffixAMP, - fieldPrefix+influx.FieldSuffixPA, - fieldPrefix+influx.FieldSuffixRMS) - - case strings.HasPrefix(channel, postgres.ChannelYXPrefix): - - fields = append(fields, strings.ToLower(channel)) - - case strings.HasPrefix(channel, postgres.ChannelUPrefix): - - fieldUPrefix := strings.ToLower(channel) - fields = append(fields, - fieldUPrefix+influx.FieldSuffixAMP, - fieldUPrefix+influx.FieldSuffixPA, - fieldUPrefix+influx.FieldSuffixRMS) - - case channel == postgres.ChannelP, - channel == postgres.ChannelQ, - channel == postgres.ChannelS, - channel == postgres.ChannelPF, - channel == postgres.ChannelF, - channel == postgres.ChannelDF: - - fields = append(fields, strings.ToLower(channel)) - } - - return fields -} - func genRedisPhasorKey(station, device, field string) string { return strings.Join([]string{station, device, "phasor", field}, ":") } diff --git a/go.mod b/go.mod index d299450..2f40a2d 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,10 @@ module datart go 1.24.0 require ( - github.com/Azure/go-amqp v1.5.0 github.com/gin-gonic/gin v1.11.0 github.com/google/uuid v1.6.0 github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0 github.com/redis/go-redis/v9 v9.14.0 - github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 go.mongodb.org/mongo-driver/v2 v2.3.0 go.uber.org/zap v1.27.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 @@ -17,6 +15,7 @@ require ( ) require ( + github.com/Azure/go-amqp v1.5.0 // indirect github.com/bytedance/sonic v1.14.0 // indirect github.com/bytedance/sonic/loader v0.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -51,6 +50,7 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.uber.org/mock v0.5.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/arch v0.20.0 // indirect diff --git a/route/admin/command.go b/route/admin/command.go index d114cab..e736662 100644 --- a/route/admin/command.go +++ b/route/admin/command.go @@ -10,9 +10,9 @@ import ( ) type command struct { - Function string `json:"function"` - Timeout int64 `json:"timeout"` - Args []any `json:"args"` + Command string `json:"command"` + Timeout int64 `json:"timeout"` + Args []any `json:"args"` } func (a *Admin) PostExecuteCommand(ctx *gin.Context) { @@ -49,7 +49,7 @@ func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, er return req, errors.New("invalid body param") } - if req.Function != "GenSSU2ChannelSizes" { + if req.Command != "GenSSU2ChannelSizes" { return nil, errors.New("invalid function") } diff --git a/route/api/alarm.go b/route/api/alarm.go index 328da88..a301461 100644 --- a/route/api/alarm.go +++ b/route/api/alarm.go @@ -24,14 +24,25 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) { return } - event := alarm.ConvertToEvent(ip) + event, err := alarm.ConvertToEvent(ip) + if err != nil { + + log.Error(err, fmt.Sprintf(" params: %v", alarm)) + + ctx.JSON(200, gin.H{ + "code": 2, + "msg": "convert error", + }) + return + } + err = mongo.InsertOneEvent(ctx.Request.Context(), event) if err != nil { log.Error(err, fmt.Sprintf(" params: %v", event)) ctx.JSON(200, gin.H{ - "code": 2, + "code": 3, "msg": "insert error", }) return @@ -43,7 +54,7 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) { log.Error(err, fmt.Sprintf(" params: %v", event)) ctx.JSON(200, gin.H{ - "code": 3, + "code": 4, "msg": "publish error", }) return diff --git a/route/api/event.go b/route/api/event.go index c385f72..a36df53 100644 --- a/route/api/event.go +++ b/route/api/event.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strconv" + "strings" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -13,7 +14,7 @@ import ( ) const ( - pageSizeLimit = 100 + pageSizeLimit = 500 ) func (a *Api) GetEvents(ctx *gin.Context) { @@ -49,7 +50,7 @@ func (a *Api) GetEvents(ctx *gin.Context) { } func (a *Api) PostUpsertEvents(ctx *gin.Context) { - filter, update, err := a.checkAndGenUpsertEventsRequest(ctx) + noUUID, eventUUID, upsert, err := a.checkAndGenUpsertEventsRequest(ctx) if err != nil { log.Error(err) ctx.JSON(200, gin.H{ @@ -59,10 +60,24 @@ func (a *Api) PostUpsertEvents(ctx *gin.Context) { return } - if err = mongo.UpsertOneEvent(ctx.Request.Context(), filter, update); err != nil { - log.Error(err, fmt.Sprintf(" params: %v, %v", filter, update)) + if noUUID { + if err = mongo.InsertOneEvent(ctx.Request.Context(), upsert); err != nil { + + log.Error(err, fmt.Sprintf(" params: %v", upsert)) + + ctx.JSON(200, gin.H{ + "code": 2, + "msg": err.Error(), + }) + return + } + } else if err = mongo.UpsertOneEvent(ctx.Request.Context(), bson.M{"event_uuid": eventUUID}, + bson.M{"$set": bson.M(upsert)}); err != nil { + + log.Error(err, fmt.Sprintf(" params: %v", upsert)) + ctx.JSON(200, gin.H{ - "code": 2, + "code": 3, "msg": err.Error(), }) return @@ -116,6 +131,20 @@ func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, filter["timestamp"] = bson.M{"$gte": begin, "$lte": end} } + statusStr := ctx.Query("status") + if len(statusStr) > 0 { + statusStrs := strings.Split(statusStr, ",") + statuss := make([]int, len(statusStrs)) + for i := range statusStrs { + s, err := strconv.Atoi(statusStrs[i]) + if err != nil { + return nil, 0, -1, -1, errors.New("invalid status") + } + statuss[i] = s + } + filter["status"] = bson.M{"$in": statuss} + } + var sort int sortStr := ctx.Query("sort") if len(sortStr) > 0 { @@ -156,25 +185,33 @@ func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, return filter, sort, int64(pageNo), int64(pageSize), nil } -func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (bson.M, bson.M, error) { +func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (bool, string, map[string]any, error) { e := map[string]any{} err := ctx.ShouldBindJSON(&e) if err != nil { - return nil, nil, errors.New("invalid body param") + return false, "", nil, errors.New("invalid body param") } eventUUID := "" + noUUID := true if eu, ok := e["event_uuid"]; ok { if eUUID, ok := eu.(string); ok { - if uuid.Validate(eUUID) != nil { - return nil, nil, errors.New("invalid param") - } else { + if uuid.Validate(eUUID) == nil { eventUUID = eUUID + noUUID = false } } - } else { - return nil, nil, errors.New("no uuid") } - return bson.M{"event_uuid": eventUUID}, bson.M{"$set": bson.M(e)}, nil + if noUUID { + noUUID = true + if uid, err := uuid.NewV7(); err != nil { + return false, "", nil, err + } else { + eventUUID = uid.String() + e["event_uuid"] = eventUUID + } + } + + return noUUID, eventUUID, e, nil } diff --git a/route/api/point.go b/route/api/point.go index 41249d3..c9afdc7 100644 --- a/route/api/point.go +++ b/route/api/point.go @@ -1,11 +1,14 @@ package api import ( + "datart/data" "datart/data/influx" + "datart/data/postgres" "datart/log" "datart/util" "errors" "fmt" + "strings" "github.com/gin-gonic/gin" ) @@ -63,21 +66,33 @@ func (a *Api) checkAndGenGetPointRequest(ctx *gin.Context) (*influx.Request, err return nil, errors.New("invalid type") } - // tag TODO + station, mainPos, subPos := "", "", "" - station := ctx.DefaultQuery("station", "") - if len(station) <= 0 { - return nil, errors.New("invalid station") - } + mtag := ctx.DefaultQuery("mtag", "") + v, ok := postgres.ChannelSizes.Load(mtag) + if ok { + if channelSize, ok := v.(postgres.ChannelSize); ok { + fields := data.GenPhasorFields(channelSize.Channel) - mainPos := ctx.DefaultQuery("main_pos", "") - if len(mainPos) <= 0 { - return nil, errors.New("invalid main_pos") - } + station = channelSize.Station + mainPos = channelSize.Device + subPos = strings.Join(fields, ",") + } + } else { + station = ctx.DefaultQuery("station", "") + if len(station) <= 0 { + return nil, errors.New("invalid station") + } - subPos := ctx.DefaultQuery("sub_pos", "") - if len(subPos) <= 0 { - return nil, errors.New("invalid sub_pos") + mainPos = ctx.DefaultQuery("main_pos", "") + if len(mainPos) <= 0 { + return nil, errors.New("invalid main_pos") + } + + subPos = ctx.DefaultQuery("sub_pos", "") + if len(subPos) <= 0 { + return nil, errors.New("invalid sub_pos") + } } beginStr := ctx.DefaultQuery("begin", "") diff --git a/route/route.go b/route/route.go index 996e057..ac6992c 100644 --- a/route/route.go +++ b/route/route.go @@ -8,7 +8,7 @@ import ( ) func LoadRoute(engine *gin.Engine) { - engine.Use(gin.Recovery()) // TODO + engine.Use(gin.Recovery()) a := new(api.Api) ga := engine.Group("api")