update files
This commit is contained in:
parent
71ebf6b938
commit
0012c37d60
|
|
@ -1,2 +1,3 @@
|
|||
.vscode
|
||||
logs
|
||||
logs
|
||||
.idea
|
||||
|
|
@ -6,7 +6,6 @@ type postgresConfig struct {
|
|||
User string `json:"user" yaml:"user"`
|
||||
Password string `json:"password" yaml:"password"`
|
||||
DBName string `json:"dbname" yaml:"dbname"`
|
||||
SSLMode string `json:"sslmode" yaml:"sslmode"`
|
||||
TimeZone string `json:"timezone" yaml:"timezone"`
|
||||
}
|
||||
|
||||
|
|
@ -99,23 +98,6 @@ func (conf *postgresConfig) SetDBName(dbName string) *postgresConfig {
|
|||
return conf
|
||||
}
|
||||
|
||||
func (conf *postgresConfig) GetSSLMode() string {
|
||||
if conf == nil {
|
||||
panic("postgres config is nil")
|
||||
}
|
||||
|
||||
return conf.SSLMode
|
||||
}
|
||||
|
||||
func (conf *postgresConfig) SetSSLMode(sslMode string) *postgresConfig {
|
||||
if conf == nil {
|
||||
panic("postgres config is nil")
|
||||
}
|
||||
conf.SSLMode = sslMode
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *postgresConfig) GetTimeZone() string {
|
||||
if conf == nil {
|
||||
panic("postgres config is nil")
|
||||
|
|
|
|||
224
config/rabbit.go
224
config/rabbit.go
|
|
@ -1,30 +1,9 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/youmark/pkcs8"
|
||||
)
|
||||
|
||||
type tlsConfig struct {
|
||||
CAPath string `json:"capath" yaml:"capath"`
|
||||
KeyPath string `json:"keypath" yaml:"keypath"`
|
||||
CertPath string `json:"certpath" yaml:"certpath"`
|
||||
Password string `json:"password" yaml:"password"`
|
||||
SkipVerify bool `json:"skipverify" yaml:"skipverify"`
|
||||
ServerName string `json:"servername" yaml:"servername"`
|
||||
}
|
||||
|
||||
type rabbitConfig struct {
|
||||
Broker string `json:"broker" yaml:"broker"`
|
||||
Username string `json:"username" yaml:"username"`
|
||||
Password string `json:"password" yaml:"password"`
|
||||
TLS *tlsConfig `json:"tls" yaml:"tls"`
|
||||
Broker string `json:"broker" yaml:"broker"`
|
||||
Username string `json:"username" yaml:"username"`
|
||||
Password string `json:"password" yaml:"password"`
|
||||
}
|
||||
|
||||
func NewRabbitConfig() *rabbitConfig {
|
||||
|
|
@ -99,203 +78,6 @@ func (conf *rabbitConfig) SetPassword(password string) *rabbitConfig {
|
|||
return conf
|
||||
}
|
||||
|
||||
func (conf *rabbitConfig) InitTLS() *rabbitConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit config is nil")
|
||||
}
|
||||
conf.TLS = new(tlsConfig)
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *rabbitConfig) GetTLS() *tlsConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit config is nil")
|
||||
}
|
||||
|
||||
return conf.TLS
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) GetCAPath() string {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
|
||||
return conf.CAPath
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) SetCAPath(caPath string) *tlsConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
conf.CAPath = caPath
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) GetKeyPath() string {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
|
||||
return conf.KeyPath
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) SetKeyPath(keyPath string) *tlsConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
conf.KeyPath = keyPath
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) GetCertPath() string {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
|
||||
return conf.CertPath
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) SetCertPath(certPath string) *tlsConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
|
||||
conf.CertPath = certPath
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) GetPassword() string {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
|
||||
return conf.Password
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) SetPassword(password string) *tlsConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
conf.Password = password
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) GetSkipVerify() bool {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
|
||||
return conf.SkipVerify
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) SetSkipVerify(skipVerify bool) *tlsConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
conf.SkipVerify = skipVerify
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) GetServerName() string {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
|
||||
return conf.ServerName
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) SetServerName(serverName string) *tlsConfig {
|
||||
if conf == nil {
|
||||
panic("rabbit tls is nil")
|
||||
}
|
||||
conf.ServerName = serverName
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *tlsConfig) GenTLSConfig(tag string) (*tls.Config, error) {
|
||||
if conf == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if conf.GetCAPath() == "" || conf.GetCertPath() == "" ||
|
||||
conf.GetKeyPath() == "" {
|
||||
return nil, errors.New("rabbit tls not valid")
|
||||
}
|
||||
|
||||
caPem, err := os.ReadFile(conf.GetCAPath())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
certPool := x509.NewCertPool()
|
||||
certPool.AppendCertsFromPEM(caPem)
|
||||
|
||||
keyPem, err := os.ReadFile(conf.GetKeyPath())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
certPem, err := os.ReadFile(conf.GetCertPath())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pemBlock, err := parsePrivateKey(keyPem, []byte(conf.GetPassword()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cliCert, err := tls.X509KeyPair(certPem, pem.EncodeToMemory(pemBlock))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &tls.Config{
|
||||
Certificates: []tls.Certificate{cliCert},
|
||||
RootCAs: certPool,
|
||||
ServerName: conf.GetServerName(),
|
||||
InsecureSkipVerify: conf.GetSkipVerify(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parsePrivateKey(key, password []byte) (*pem.Block, error) {
|
||||
block, _ := pem.Decode(key)
|
||||
if block == nil {
|
||||
return nil, errors.New("no valid pem")
|
||||
}
|
||||
|
||||
var privateKey any
|
||||
var err error
|
||||
switch block.Type {
|
||||
case "RSA PRIVATE KEY":
|
||||
privateKey, err = x509.ParsePKCS1PrivateKey(block.Bytes)
|
||||
case "PRIVATE KEY":
|
||||
privateKey, err = x509.ParsePKCS8PrivateKey(block.Bytes)
|
||||
case "ENCRYPTED PRIVATE KEY":
|
||||
privateKey, err = pkcs8.ParsePKCS8PrivateKey(block.Bytes, password)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported key type: %s", block.Type)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pem.Block{
|
||||
Type: "PRIVATE KEY",
|
||||
Bytes: pemBytes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func rabbitConfigName() string {
|
||||
return "rabbit.json"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
"user":"postgres",
|
||||
"password":"123RTYjkl",
|
||||
"dbname":"metamodule",
|
||||
"sslmode":"disable",
|
||||
"timezone":"Asia/Shanghai"
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"datart/data/influx"
|
||||
"datart/data/postgres"
|
||||
"strings"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func GenPhasorFields(channel string) []string {
|
||||
fields := make([]string, 0, 3)
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(channel, postgres.ChannelYCPrefix):
|
||||
|
||||
fieldPrefix := strings.ToLower(channel)
|
||||
fields = append(fields,
|
||||
fieldPrefix+influx.FieldSuffixAMP,
|
||||
fieldPrefix+influx.FieldSuffixPA,
|
||||
fieldPrefix+influx.FieldSuffixRMS)
|
||||
|
||||
case strings.HasPrefix(channel, postgres.ChannelYXPrefix):
|
||||
|
||||
fields = append(fields, strings.ToLower(channel))
|
||||
|
||||
case strings.HasPrefix(channel, postgres.ChannelUPrefix):
|
||||
|
||||
fieldUPrefix := strings.ToLower(channel)
|
||||
fields = append(fields,
|
||||
fieldUPrefix+influx.FieldSuffixAMP,
|
||||
fieldUPrefix+influx.FieldSuffixPA,
|
||||
fieldUPrefix+influx.FieldSuffixRMS)
|
||||
|
||||
case channel == postgres.ChannelP,
|
||||
channel == postgres.ChannelQ,
|
||||
channel == postgres.ChannelS,
|
||||
channel == postgres.ChannelPF,
|
||||
channel == postgres.ChannelF,
|
||||
channel == postgres.ChannelDF:
|
||||
|
||||
fields = append(fields, strings.ToLower(channel))
|
||||
}
|
||||
|
||||
return fields
|
||||
}
|
||||
|
||||
type zUnit struct {
|
||||
Key string
|
||||
Members []redis.Z
|
||||
}
|
||||
|
||||
func convertTVsToMenmbers(tvs []influx.TV) []redis.Z {
|
||||
members := make([]redis.Z, len(tvs))
|
||||
|
||||
for i, tv := range tvs {
|
||||
members[i].Member = tv.Time
|
||||
members[i].Score = tv.Value
|
||||
}
|
||||
|
||||
return members
|
||||
}
|
||||
|
|
@ -39,7 +39,7 @@ func init() {
|
|||
client.org = influxConfig.GetOrg()
|
||||
}
|
||||
|
||||
func Close() {
|
||||
func CloseDefault() {
|
||||
client.CloseIdleConnections()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,12 +36,12 @@ const (
|
|||
)
|
||||
|
||||
func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+10000)
|
||||
return client.GetSSUPointLastLimit(ctx, req, limit)
|
||||
}
|
||||
|
||||
func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
|
||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+10000)
|
||||
return client.GetSSUPointsLastLimit(ctx, req, limit)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -71,25 +71,39 @@ func (a *Alarm) GetPriority() int {
|
|||
return -1
|
||||
}
|
||||
|
||||
func (a *Alarm) ConvertToEvent(ip string) *Event {
|
||||
func GenEvent(alarm []byte, ip string) (*Event, error) {
|
||||
a, err := UnmarshallToAlarm(alarm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a.ConvertToEvent(ip)
|
||||
}
|
||||
|
||||
func (a *Alarm) ConvertToEvent(ip string) (*Event, error) {
|
||||
e := new(Event)
|
||||
uid, err := uuid.NewV7()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if a != nil {
|
||||
e.Event = a.GetName()
|
||||
e.EventUUID = uuid.NewString()
|
||||
e.EventUUID = uid.String()
|
||||
e.Type = a.GetType()
|
||||
e.Priority = a.GetPriority()
|
||||
e.Status = EventStatusHappen
|
||||
e.Timestamp = a.AlarmTime
|
||||
e.From = "station"
|
||||
e.Operations = append(e.Operations, &operation{
|
||||
Action: EventActionHappened, // TODO
|
||||
Action: EventActionHappened,
|
||||
OP: ip,
|
||||
TS: a.AlarmTime,
|
||||
})
|
||||
e.Alarm = a
|
||||
}
|
||||
|
||||
return e
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func UnmarshallToAlarm(data []byte) (*Alarm, error) {
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ type Event struct {
|
|||
Timestamp int64 `bson:"timestamp" json:"timestamp"`
|
||||
From string `bson:"from" json:"from"`
|
||||
Operations []*operation `bson:"operations" json:"operations"`
|
||||
// TODO complete
|
||||
// others
|
||||
Alarm *Alarm `bson:"alarm" json:"alarm"`
|
||||
}
|
||||
|
||||
|
|
@ -49,7 +49,7 @@ func (e *Event) Marshall() ([]byte, error) {
|
|||
return json.Marshal(e)
|
||||
}
|
||||
|
||||
func InsertOneEvent(ctx context.Context, doc *Event) error {
|
||||
func InsertOneEvent(ctx context.Context, doc any) error {
|
||||
_, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc)
|
||||
return err
|
||||
}
|
||||
|
|
@ -140,6 +140,8 @@ func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T,
|
|||
opt := options.Find()
|
||||
if sort == 1 || sort == -1 {
|
||||
opt.SetSort(bson.D{{Key: "timestamp", Value: sort}})
|
||||
} else {
|
||||
opt.SetSort(bson.D{{Key: "_id", Value: 1}})
|
||||
}
|
||||
if page > 0 && limit > 0 {
|
||||
opt.SetSkip(limit * (page - 1)).SetLimit(limit)
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ func NewMongoClient(opts ...*options.ClientOptions) (*mongo.Client, error) {
|
|||
return mongo.Connect(opts...)
|
||||
}
|
||||
|
||||
func Disconnect(ctx context.Context) error {
|
||||
func CloseDefault(ctx context.Context) error {
|
||||
return client.Disconnect(ctx)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
|
|
@ -49,7 +50,7 @@ type measurement struct {
|
|||
Tag string `gorm:"column:tag"`
|
||||
Size int `gorm:"column:size"`
|
||||
DataSource *dataSource `gorm:"column:data_source;type:jsonb"`
|
||||
// mapping TODO
|
||||
// others
|
||||
}
|
||||
|
||||
type ChannelSize struct {
|
||||
|
|
@ -61,6 +62,7 @@ type ChannelSize struct {
|
|||
|
||||
// channel is original
|
||||
var SSU2ChannelSizes atomic.Value
|
||||
var ChannelSizes sync.Map
|
||||
|
||||
func init() {
|
||||
SSU2ChannelSizes.Store(map[string][]ChannelSize{})
|
||||
|
|
@ -133,6 +135,7 @@ func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error)
|
|||
|
||||
func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
|
||||
id := int64(0)
|
||||
ssu2Channel2Exist := make(map[string]map[string]struct{})
|
||||
ssu2ChannelSizes := make(map[string][]ChannelSize)
|
||||
for {
|
||||
var records []*measurement
|
||||
|
|
@ -153,9 +156,8 @@ func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
|
|||
continue
|
||||
}
|
||||
|
||||
addrType := record.DataSource.Type
|
||||
addr := record.DataSource.Addr
|
||||
if err := genMappingFromAddr(ssu2ChannelSizes, addrType, addr, record.Size); err != nil {
|
||||
if err := genMappingFromAddr(ssu2ChannelSizes, ssu2Channel2Exist, record); err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
@ -168,10 +170,12 @@ func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, addrType int, addr any, size int) error {
|
||||
switch addrType {
|
||||
func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
|
||||
ssu2Channel2Exist map[string]map[string]struct{}, record *measurement) error {
|
||||
|
||||
switch record.DataSource.Type {
|
||||
case 1:
|
||||
if rawAddr, ok := addr.(map[string]interface{}); ok {
|
||||
if rawAddr, ok := record.DataSource.Addr.(map[string]interface{}); ok {
|
||||
station, ok := rawAddr["station"].(string)
|
||||
if !ok {
|
||||
return errors.New("invalid station")
|
||||
|
|
@ -184,12 +188,27 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, addrType int,
|
|||
if !ok {
|
||||
return errors.New("invalid channel")
|
||||
}
|
||||
ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], ChannelSize{
|
||||
|
||||
if _, ok := ssu2Channel2Exist[device]; !ok {
|
||||
ssu2Channel2Exist[device] = make(map[string]struct{})
|
||||
}
|
||||
|
||||
if _, ok := ssu2Channel2Exist[device][channel]; ok {
|
||||
return nil
|
||||
} else {
|
||||
ssu2Channel2Exist[device][channel] = struct{}{}
|
||||
}
|
||||
|
||||
channelSize := ChannelSize{
|
||||
Station: station,
|
||||
Device: device,
|
||||
Channel: channel,
|
||||
Size: size,
|
||||
})
|
||||
Size: record.Size,
|
||||
}
|
||||
|
||||
ChannelSizes.Store(record.Tag, channelSize)
|
||||
|
||||
ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], channelSize)
|
||||
} else {
|
||||
return errors.New("invalid io_address")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,10 +12,10 @@ var client *gorm.DB
|
|||
|
||||
func init() {
|
||||
postgresConfig := config.Conf().PostgresConf("default")
|
||||
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=%s TimeZone=%s",
|
||||
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d timezone=%s",
|
||||
postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(),
|
||||
postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetSSLMode(),
|
||||
postgresConfig.GetTimeZone())
|
||||
postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetTimeZone())
|
||||
|
||||
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{SkipDefaultTransaction: true})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
@ -24,7 +24,7 @@ func init() {
|
|||
}
|
||||
|
||||
// close postgres default client
|
||||
func Close() error {
|
||||
func CloseDefault() error {
|
||||
db, err := client.DB()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -2,65 +2,70 @@ package data
|
|||
|
||||
import (
|
||||
"context"
|
||||
"datart/data/mongo"
|
||||
"datart/data/rabbit"
|
||||
"datart/log"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
||||
)
|
||||
|
||||
var eventXQK = rabbit.XQK{
|
||||
Exchange: "event_produce_exchange",
|
||||
// Key: "event_produce_key",
|
||||
var eventNotifyXQK = rabbit.XQK{
|
||||
Exchange: "event_notify_fanout",
|
||||
}
|
||||
|
||||
var eventPublisher *rmq.Publisher
|
||||
var eventNotifyPublisher *rmq.Publisher
|
||||
|
||||
func init() {
|
||||
publisher, err := rabbit.NewPublisher(context.Background(), "default", &eventXQK)
|
||||
ctx := context.Background()
|
||||
|
||||
m := new(rabbit.Manage)
|
||||
|
||||
rm := rabbit.DefaultManagement()
|
||||
|
||||
rx := &rmq.FanOutExchangeSpecification{
|
||||
Name: eventNotifyXQK.Exchange,
|
||||
}
|
||||
|
||||
m.Init(ctx, rm, rx, nil, nil)
|
||||
|
||||
m.DeclareExchange(ctx)
|
||||
|
||||
publisher, err := rabbit.NewPublisher(ctx, "default", &eventNotifyXQK)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
eventPublisher = publisher
|
||||
eventNotifyPublisher = publisher
|
||||
}
|
||||
|
||||
func PublishEvent(ctx context.Context, event *mongo.Event) error {
|
||||
data, err := event.Marshall()
|
||||
func PublishEvent(ctx context.Context, event any) error {
|
||||
data, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
result, err := eventPublisher.Publish(ctx, rmq.NewMessage(data))
|
||||
result, err := eventNotifyPublisher.Publish(ctx, rmq.NewMessage(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch result.Outcome.(type) {
|
||||
case *rmq.StateAccepted:
|
||||
// TODO: "Message accepted"
|
||||
// "message accepted"
|
||||
case *rmq.StateReleased:
|
||||
return fmt.Errorf("message not routed: %v", event)
|
||||
// "message released"
|
||||
case *rmq.StateRejected:
|
||||
return fmt.Errorf("message rejected: %v", event)
|
||||
return errors.New("message rejected")
|
||||
default:
|
||||
return fmt.Errorf("invalid message %v state: %v", event, result.Outcome)
|
||||
return fmt.Errorf("invalid message state: %v", result.Outcome)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GenEvent(data []byte, ip string) (*mongo.Event, error) {
|
||||
alarm, err := mongo.UnmarshallToAlarm(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return alarm.ConvertToEvent(ip), nil
|
||||
}
|
||||
|
||||
func CloseEventPublisher(ctx context.Context) {
|
||||
if err := eventPublisher.Close(ctx); err != nil {
|
||||
if err := eventNotifyPublisher.Close(ctx); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,12 +25,10 @@ func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byt
|
|||
deliCtx, err := consumer.Receive(ctx)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// The consumer was closed correctly
|
||||
// TODO
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
// An error occurred receiving the message
|
||||
// TODO
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -40,7 +38,7 @@ func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byt
|
|||
|
||||
err = deliCtx.Accept(ctx)
|
||||
if err != nil {
|
||||
// TODO
|
||||
// accept error
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,16 +26,12 @@ func (m *Manage) Init(ctx context.Context, rm *rmq.AmqpManagement,
|
|||
m.b = rb
|
||||
}
|
||||
|
||||
func (m *Manage) DeclareExchangeQueue(ctx context.Context) error {
|
||||
_, err := m.m.DeclareExchange(ctx, m.x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = m.m.DeclareQueue(ctx, m.q)
|
||||
func (m *Manage) DeclareExchange(ctx context.Context) error {
|
||||
xinfo, err := m.m.DeclareExchange(ctx, m.x)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.xName = xinfo.Name()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,8 +2,6 @@ package rabbit
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
||||
)
|
||||
|
|
@ -29,29 +27,26 @@ func Publishing(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan []
|
|||
case msg := <-msgChan:
|
||||
result, err := publisher.Publish(ctx, rmq.NewMessage(msg))
|
||||
if err != nil {
|
||||
_ = err // TODO
|
||||
time.Sleep(1 * time.Second)
|
||||
_ = err // publish error
|
||||
continue
|
||||
}
|
||||
|
||||
switch result.Outcome.(type) {
|
||||
case *rmq.StateAccepted:
|
||||
// TODO: "Message accepted"
|
||||
// "message accepted"
|
||||
|
||||
case *rmq.StateReleased:
|
||||
// TODO: "Message not routed"
|
||||
// "message not routed"
|
||||
|
||||
case *rmq.StateRejected:
|
||||
// TODO: "Message rejected"
|
||||
// "message rejected"
|
||||
|
||||
default:
|
||||
// *rmp.StateModified
|
||||
// *rmq.StateReceived
|
||||
// TODO: ("Message state: %v", publishResult.Outcome)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
// TODO
|
||||
fmt.Println("second passed")
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"datart/config"
|
||||
|
||||
"github.com/Azure/go-amqp"
|
||||
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
||||
)
|
||||
|
||||
|
|
@ -32,30 +31,22 @@ func init() {
|
|||
client.conn = conn
|
||||
}
|
||||
|
||||
func Close(ctx context.Context) error {
|
||||
func CloseDefault(ctx context.Context) error {
|
||||
return client.Close(ctx)
|
||||
}
|
||||
|
||||
func DefaultManagement() *rmq.AmqpManagement {
|
||||
return client.Management()
|
||||
}
|
||||
|
||||
func genEndpoints(tag string) ([]rmq.Endpoint, error) {
|
||||
confs := config.Conf().RabbitConf(tag)
|
||||
endpoints := make([]rmq.Endpoint, len(confs))
|
||||
|
||||
for i, conf := range confs {
|
||||
tlsConfig, err := conf.GetTLS().GenTLSConfig(tag)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var options *rmq.AmqpConnOptions
|
||||
var tls bool
|
||||
if tlsConfig != nil {
|
||||
options = &rmq.AmqpConnOptions{
|
||||
SASLType: amqp.SASLTypeExternal(""),
|
||||
TLSConfig: tlsConfig}
|
||||
tls = true
|
||||
}
|
||||
|
||||
endpoints[i].Address = conf.GenAddress(tls)
|
||||
endpoints[i].Address = conf.GenAddress(false)
|
||||
endpoints[i].Options = options
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ func init() {
|
|||
}
|
||||
|
||||
// close redis client
|
||||
func Close() error {
|
||||
func CloseDefault() error {
|
||||
return client.Close()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
duration time.Duration = time.Second * 5
|
||||
duration time.Duration = 5 * time.Second
|
||||
)
|
||||
|
||||
func updatingRedisPhasor(ctx context.Context) {
|
||||
|
|
@ -65,15 +65,26 @@ func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit)
|
|||
}
|
||||
|
||||
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
|
||||
fields := genPhasorFields(channelSize.Channel)
|
||||
fields := GenPhasorFields(channelSize.Channel)
|
||||
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
|
||||
fields, channelSize.Size)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
for field, tvs := range f2tvs {
|
||||
key := genRedisPhasorKey(channelSize.Station, channelSize.Device, field)
|
||||
// if len(f2tvs) <= 0 {
|
||||
// log.Info(channelSize.Station, " ", channelSize.Device, " ",
|
||||
// fields, " query none of ", channelSize.Size)
|
||||
// }
|
||||
|
||||
for f, tvs := range f2tvs {
|
||||
sdf := strings.Split(f, ".")
|
||||
if len(sdf) != 3 {
|
||||
log.Error("invalid influx field")
|
||||
return
|
||||
}
|
||||
|
||||
key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2])
|
||||
members := convertTVsToMenmbers(tvs)
|
||||
ssuChan <- zUnit{
|
||||
Key: key,
|
||||
|
|
@ -109,43 +120,6 @@ func updateZUnitToRedis(ctx context.Context, unit zUnit) error {
|
|||
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
|
||||
}
|
||||
|
||||
func genPhasorFields(channel string) []string {
|
||||
fields := make([]string, 0, 3)
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(channel, postgres.ChannelYCPrefix):
|
||||
|
||||
fieldPrefix := strings.ToLower(channel)
|
||||
fields = append(fields,
|
||||
fieldPrefix+influx.FieldSuffixAMP,
|
||||
fieldPrefix+influx.FieldSuffixPA,
|
||||
fieldPrefix+influx.FieldSuffixRMS)
|
||||
|
||||
case strings.HasPrefix(channel, postgres.ChannelYXPrefix):
|
||||
|
||||
fields = append(fields, strings.ToLower(channel))
|
||||
|
||||
case strings.HasPrefix(channel, postgres.ChannelUPrefix):
|
||||
|
||||
fieldUPrefix := strings.ToLower(channel)
|
||||
fields = append(fields,
|
||||
fieldUPrefix+influx.FieldSuffixAMP,
|
||||
fieldUPrefix+influx.FieldSuffixPA,
|
||||
fieldUPrefix+influx.FieldSuffixRMS)
|
||||
|
||||
case channel == postgres.ChannelP,
|
||||
channel == postgres.ChannelQ,
|
||||
channel == postgres.ChannelS,
|
||||
channel == postgres.ChannelPF,
|
||||
channel == postgres.ChannelF,
|
||||
channel == postgres.ChannelDF:
|
||||
|
||||
fields = append(fields, strings.ToLower(channel))
|
||||
}
|
||||
|
||||
return fields
|
||||
}
|
||||
|
||||
func genRedisPhasorKey(station, device, field string) string {
|
||||
return strings.Join([]string{station, device, "phasor", field}, ":")
|
||||
}
|
||||
|
|
|
|||
4
go.mod
4
go.mod
|
|
@ -3,12 +3,10 @@ module datart
|
|||
go 1.24.0
|
||||
|
||||
require (
|
||||
github.com/Azure/go-amqp v1.5.0
|
||||
github.com/gin-gonic/gin v1.11.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0
|
||||
github.com/redis/go-redis/v9 v9.14.0
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
|
||||
go.mongodb.org/mongo-driver/v2 v2.3.0
|
||||
go.uber.org/zap v1.27.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
|
|
@ -17,6 +15,7 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/Azure/go-amqp v1.5.0 // indirect
|
||||
github.com/bytedance/sonic v1.14.0 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
|
|
@ -51,6 +50,7 @@ require (
|
|||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/scram v1.1.2 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
|
||||
go.uber.org/mock v0.5.0 // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
golang.org/x/arch v0.20.0 // indirect
|
||||
|
|
|
|||
|
|
@ -10,9 +10,9 @@ import (
|
|||
)
|
||||
|
||||
type command struct {
|
||||
Function string `json:"function"`
|
||||
Timeout int64 `json:"timeout"`
|
||||
Args []any `json:"args"`
|
||||
Command string `json:"command"`
|
||||
Timeout int64 `json:"timeout"`
|
||||
Args []any `json:"args"`
|
||||
}
|
||||
|
||||
func (a *Admin) PostExecuteCommand(ctx *gin.Context) {
|
||||
|
|
@ -49,7 +49,7 @@ func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, er
|
|||
return req, errors.New("invalid body param")
|
||||
}
|
||||
|
||||
if req.Function != "GenSSU2ChannelSizes" {
|
||||
if req.Command != "GenSSU2ChannelSizes" {
|
||||
return nil, errors.New("invalid function")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,14 +24,25 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
event := alarm.ConvertToEvent(ip)
|
||||
event, err := alarm.ConvertToEvent(ip)
|
||||
if err != nil {
|
||||
|
||||
log.Error(err, fmt.Sprintf(" params: %v", alarm))
|
||||
|
||||
ctx.JSON(200, gin.H{
|
||||
"code": 2,
|
||||
"msg": "convert error",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
err = mongo.InsertOneEvent(ctx.Request.Context(), event)
|
||||
if err != nil {
|
||||
|
||||
log.Error(err, fmt.Sprintf(" params: %v", event))
|
||||
|
||||
ctx.JSON(200, gin.H{
|
||||
"code": 2,
|
||||
"code": 3,
|
||||
"msg": "insert error",
|
||||
})
|
||||
return
|
||||
|
|
@ -43,7 +54,7 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) {
|
|||
log.Error(err, fmt.Sprintf(" params: %v", event))
|
||||
|
||||
ctx.JSON(200, gin.H{
|
||||
"code": 3,
|
||||
"code": 4,
|
||||
"msg": "publish error",
|
||||
})
|
||||
return
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
|
|
@ -13,7 +14,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
pageSizeLimit = 100
|
||||
pageSizeLimit = 500
|
||||
)
|
||||
|
||||
func (a *Api) GetEvents(ctx *gin.Context) {
|
||||
|
|
@ -49,7 +50,7 @@ func (a *Api) GetEvents(ctx *gin.Context) {
|
|||
}
|
||||
|
||||
func (a *Api) PostUpsertEvents(ctx *gin.Context) {
|
||||
filter, update, err := a.checkAndGenUpsertEventsRequest(ctx)
|
||||
noUUID, eventUUID, upsert, err := a.checkAndGenUpsertEventsRequest(ctx)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
ctx.JSON(200, gin.H{
|
||||
|
|
@ -59,10 +60,24 @@ func (a *Api) PostUpsertEvents(ctx *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = mongo.UpsertOneEvent(ctx.Request.Context(), filter, update); err != nil {
|
||||
log.Error(err, fmt.Sprintf(" params: %v, %v", filter, update))
|
||||
if noUUID {
|
||||
if err = mongo.InsertOneEvent(ctx.Request.Context(), upsert); err != nil {
|
||||
|
||||
log.Error(err, fmt.Sprintf(" params: %v", upsert))
|
||||
|
||||
ctx.JSON(200, gin.H{
|
||||
"code": 2,
|
||||
"msg": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
} else if err = mongo.UpsertOneEvent(ctx.Request.Context(), bson.M{"event_uuid": eventUUID},
|
||||
bson.M{"$set": bson.M(upsert)}); err != nil {
|
||||
|
||||
log.Error(err, fmt.Sprintf(" params: %v", upsert))
|
||||
|
||||
ctx.JSON(200, gin.H{
|
||||
"code": 2,
|
||||
"code": 3,
|
||||
"msg": err.Error(),
|
||||
})
|
||||
return
|
||||
|
|
@ -116,6 +131,20 @@ func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64,
|
|||
filter["timestamp"] = bson.M{"$gte": begin, "$lte": end}
|
||||
}
|
||||
|
||||
statusStr := ctx.Query("status")
|
||||
if len(statusStr) > 0 {
|
||||
statusStrs := strings.Split(statusStr, ",")
|
||||
statuss := make([]int, len(statusStrs))
|
||||
for i := range statusStrs {
|
||||
s, err := strconv.Atoi(statusStrs[i])
|
||||
if err != nil {
|
||||
return nil, 0, -1, -1, errors.New("invalid status")
|
||||
}
|
||||
statuss[i] = s
|
||||
}
|
||||
filter["status"] = bson.M{"$in": statuss}
|
||||
}
|
||||
|
||||
var sort int
|
||||
sortStr := ctx.Query("sort")
|
||||
if len(sortStr) > 0 {
|
||||
|
|
@ -156,25 +185,33 @@ func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64,
|
|||
return filter, sort, int64(pageNo), int64(pageSize), nil
|
||||
}
|
||||
|
||||
func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (bson.M, bson.M, error) {
|
||||
func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (bool, string, map[string]any, error) {
|
||||
e := map[string]any{}
|
||||
err := ctx.ShouldBindJSON(&e)
|
||||
if err != nil {
|
||||
return nil, nil, errors.New("invalid body param")
|
||||
return false, "", nil, errors.New("invalid body param")
|
||||
}
|
||||
|
||||
eventUUID := ""
|
||||
noUUID := true
|
||||
if eu, ok := e["event_uuid"]; ok {
|
||||
if eUUID, ok := eu.(string); ok {
|
||||
if uuid.Validate(eUUID) != nil {
|
||||
return nil, nil, errors.New("invalid param")
|
||||
} else {
|
||||
if uuid.Validate(eUUID) == nil {
|
||||
eventUUID = eUUID
|
||||
noUUID = false
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return nil, nil, errors.New("no uuid")
|
||||
}
|
||||
|
||||
return bson.M{"event_uuid": eventUUID}, bson.M{"$set": bson.M(e)}, nil
|
||||
if noUUID {
|
||||
noUUID = true
|
||||
if uid, err := uuid.NewV7(); err != nil {
|
||||
return false, "", nil, err
|
||||
} else {
|
||||
eventUUID = uid.String()
|
||||
e["event_uuid"] = eventUUID
|
||||
}
|
||||
}
|
||||
|
||||
return noUUID, eventUUID, e, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,14 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"datart/data"
|
||||
"datart/data/influx"
|
||||
"datart/data/postgres"
|
||||
"datart/log"
|
||||
"datart/util"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
|
@ -63,21 +66,33 @@ func (a *Api) checkAndGenGetPointRequest(ctx *gin.Context) (*influx.Request, err
|
|||
return nil, errors.New("invalid type")
|
||||
}
|
||||
|
||||
// tag TODO
|
||||
station, mainPos, subPos := "", "", ""
|
||||
|
||||
station := ctx.DefaultQuery("station", "")
|
||||
if len(station) <= 0 {
|
||||
return nil, errors.New("invalid station")
|
||||
}
|
||||
mtag := ctx.DefaultQuery("mtag", "")
|
||||
v, ok := postgres.ChannelSizes.Load(mtag)
|
||||
if ok {
|
||||
if channelSize, ok := v.(postgres.ChannelSize); ok {
|
||||
fields := data.GenPhasorFields(channelSize.Channel)
|
||||
|
||||
mainPos := ctx.DefaultQuery("main_pos", "")
|
||||
if len(mainPos) <= 0 {
|
||||
return nil, errors.New("invalid main_pos")
|
||||
}
|
||||
station = channelSize.Station
|
||||
mainPos = channelSize.Device
|
||||
subPos = strings.Join(fields, ",")
|
||||
}
|
||||
} else {
|
||||
station = ctx.DefaultQuery("station", "")
|
||||
if len(station) <= 0 {
|
||||
return nil, errors.New("invalid station")
|
||||
}
|
||||
|
||||
subPos := ctx.DefaultQuery("sub_pos", "")
|
||||
if len(subPos) <= 0 {
|
||||
return nil, errors.New("invalid sub_pos")
|
||||
mainPos = ctx.DefaultQuery("main_pos", "")
|
||||
if len(mainPos) <= 0 {
|
||||
return nil, errors.New("invalid main_pos")
|
||||
}
|
||||
|
||||
subPos = ctx.DefaultQuery("sub_pos", "")
|
||||
if len(subPos) <= 0 {
|
||||
return nil, errors.New("invalid sub_pos")
|
||||
}
|
||||
}
|
||||
|
||||
beginStr := ctx.DefaultQuery("begin", "")
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
func LoadRoute(engine *gin.Engine) {
|
||||
engine.Use(gin.Recovery()) // TODO
|
||||
engine.Use(gin.Recovery())
|
||||
|
||||
a := new(api.Api)
|
||||
ga := engine.Group("api")
|
||||
|
|
|
|||
Loading…
Reference in New Issue