update files
This commit is contained in:
parent
71ebf6b938
commit
a003ba00a6
|
|
@ -1,2 +1,3 @@
|
||||||
.vscode
|
.vscode
|
||||||
logs
|
logs
|
||||||
|
.idea
|
||||||
|
|
@ -17,40 +17,40 @@ type config struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
var conf *config
|
var conf *config
|
||||||
var confPath string
|
var confDir string
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
flag.StringVar(&confPath, "conf_path", "./configs", "conf path")
|
flag.StringVar(&confDir, "conf_dir", "./configs", "conf dir")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
conf = new(config)
|
conf = new(config)
|
||||||
|
|
||||||
conf.serverConf = new(serverConfig)
|
conf.serverConf = new(serverConfig)
|
||||||
serverConf := confPath + string(os.PathSeparator) + serverConfigName()
|
serverConf := confDir + string(os.PathSeparator) + serverConfigName()
|
||||||
conf.unmarshalJsonFile(serverConf, conf.serverConf)
|
conf.unmarshalJsonFile(serverConf, conf.serverConf)
|
||||||
|
|
||||||
conf.logConf = new(logConfig)
|
conf.logConf = new(logConfig)
|
||||||
logConf := confPath + string(os.PathSeparator) + logConfigName()
|
logConf := confDir + string(os.PathSeparator) + logConfigName()
|
||||||
conf.unmarshalJsonFile(logConf, conf.logConf)
|
conf.unmarshalJsonFile(logConf, conf.logConf)
|
||||||
|
|
||||||
conf.postgresConf = make(map[string]*postgresConfig)
|
conf.postgresConf = make(map[string]*postgresConfig)
|
||||||
postgresConf := confPath + string(os.PathSeparator) + postgresConfigName()
|
postgresConf := confDir + string(os.PathSeparator) + postgresConfigName()
|
||||||
conf.unmarshalJsonFile(postgresConf, &conf.postgresConf)
|
conf.unmarshalJsonFile(postgresConf, &conf.postgresConf)
|
||||||
|
|
||||||
conf.influxConf = make(map[string]*influxConfig)
|
conf.influxConf = make(map[string]*influxConfig)
|
||||||
influxConf := confPath + string(os.PathSeparator) + influxConfigName()
|
influxConf := confDir + string(os.PathSeparator) + influxConfigName()
|
||||||
conf.unmarshalJsonFile(influxConf, &conf.influxConf)
|
conf.unmarshalJsonFile(influxConf, &conf.influxConf)
|
||||||
|
|
||||||
conf.redisConf = make(map[string]*redisConfig)
|
conf.redisConf = make(map[string]*redisConfig)
|
||||||
redisConf := confPath + string(os.PathSeparator) + redisConfigName()
|
redisConf := confDir + string(os.PathSeparator) + redisConfigName()
|
||||||
conf.unmarshalJsonFile(redisConf, &conf.redisConf)
|
conf.unmarshalJsonFile(redisConf, &conf.redisConf)
|
||||||
|
|
||||||
conf.mongoConf = make(map[string]*mongoConfig)
|
conf.mongoConf = make(map[string]*mongoConfig)
|
||||||
mongoConf := confPath + string(os.PathSeparator) + mongoConfigName()
|
mongoConf := confDir + string(os.PathSeparator) + mongoConfigName()
|
||||||
conf.unmarshalJsonFile(mongoConf, &conf.mongoConf)
|
conf.unmarshalJsonFile(mongoConf, &conf.mongoConf)
|
||||||
|
|
||||||
conf.rabbitConf = make(map[string][]*rabbitConfig)
|
conf.rabbitConf = make(map[string][]*rabbitConfig)
|
||||||
rabbitConf := confPath + string(os.PathSeparator) + rabbitConfigName()
|
rabbitConf := confDir + string(os.PathSeparator) + rabbitConfigName()
|
||||||
conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf)
|
conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ type postgresConfig struct {
|
||||||
User string `json:"user" yaml:"user"`
|
User string `json:"user" yaml:"user"`
|
||||||
Password string `json:"password" yaml:"password"`
|
Password string `json:"password" yaml:"password"`
|
||||||
DBName string `json:"dbname" yaml:"dbname"`
|
DBName string `json:"dbname" yaml:"dbname"`
|
||||||
SSLMode string `json:"sslmode" yaml:"sslmode"`
|
|
||||||
TimeZone string `json:"timezone" yaml:"timezone"`
|
TimeZone string `json:"timezone" yaml:"timezone"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -99,23 +98,6 @@ func (conf *postgresConfig) SetDBName(dbName string) *postgresConfig {
|
||||||
return conf
|
return conf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conf *postgresConfig) GetSSLMode() string {
|
|
||||||
if conf == nil {
|
|
||||||
panic("postgres config is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.SSLMode
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *postgresConfig) SetSSLMode(sslMode string) *postgresConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("postgres config is nil")
|
|
||||||
}
|
|
||||||
conf.SSLMode = sslMode
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *postgresConfig) GetTimeZone() string {
|
func (conf *postgresConfig) GetTimeZone() string {
|
||||||
if conf == nil {
|
if conf == nil {
|
||||||
panic("postgres config is nil")
|
panic("postgres config is nil")
|
||||||
|
|
|
||||||
227
config/rabbit.go
227
config/rabbit.go
|
|
@ -1,30 +1,9 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/tls"
|
|
||||||
"crypto/x509"
|
|
||||||
"encoding/pem"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/youmark/pkcs8"
|
|
||||||
)
|
|
||||||
|
|
||||||
type tlsConfig struct {
|
|
||||||
CAPath string `json:"capath" yaml:"capath"`
|
|
||||||
KeyPath string `json:"keypath" yaml:"keypath"`
|
|
||||||
CertPath string `json:"certpath" yaml:"certpath"`
|
|
||||||
Password string `json:"password" yaml:"password"`
|
|
||||||
SkipVerify bool `json:"skipverify" yaml:"skipverify"`
|
|
||||||
ServerName string `json:"servername" yaml:"servername"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type rabbitConfig struct {
|
type rabbitConfig struct {
|
||||||
Broker string `json:"broker" yaml:"broker"`
|
Broker string `json:"broker" yaml:"broker"`
|
||||||
Username string `json:"username" yaml:"username"`
|
Username string `json:"username" yaml:"username"`
|
||||||
Password string `json:"password" yaml:"password"`
|
Password string `json:"password" yaml:"password"`
|
||||||
TLS *tlsConfig `json:"tls" yaml:"tls"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRabbitConfig() *rabbitConfig {
|
func NewRabbitConfig() *rabbitConfig {
|
||||||
|
|
@ -37,9 +16,6 @@ func (conf *rabbitConfig) GenAddress(tls bool) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
address := "amqp://"
|
address := "amqp://"
|
||||||
if tls {
|
|
||||||
address = "amqps://"
|
|
||||||
}
|
|
||||||
if conf.GetUsername() != "" && conf.GetPassword() != "" {
|
if conf.GetUsername() != "" && conf.GetPassword() != "" {
|
||||||
address += conf.GetUsername() + ":" + conf.GetPassword() + "@"
|
address += conf.GetUsername() + ":" + conf.GetPassword() + "@"
|
||||||
}
|
}
|
||||||
|
|
@ -99,203 +75,6 @@ func (conf *rabbitConfig) SetPassword(password string) *rabbitConfig {
|
||||||
return conf
|
return conf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conf *rabbitConfig) InitTLS() *rabbitConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit config is nil")
|
|
||||||
}
|
|
||||||
conf.TLS = new(tlsConfig)
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *rabbitConfig) GetTLS() *tlsConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit config is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.TLS
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) GetCAPath() string {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.CAPath
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) SetCAPath(caPath string) *tlsConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
conf.CAPath = caPath
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) GetKeyPath() string {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.KeyPath
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) SetKeyPath(keyPath string) *tlsConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
conf.KeyPath = keyPath
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) GetCertPath() string {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.CertPath
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) SetCertPath(certPath string) *tlsConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
conf.CertPath = certPath
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) GetPassword() string {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.Password
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) SetPassword(password string) *tlsConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
conf.Password = password
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) GetSkipVerify() bool {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.SkipVerify
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) SetSkipVerify(skipVerify bool) *tlsConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
conf.SkipVerify = skipVerify
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) GetServerName() string {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
return conf.ServerName
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) SetServerName(serverName string) *tlsConfig {
|
|
||||||
if conf == nil {
|
|
||||||
panic("rabbit tls is nil")
|
|
||||||
}
|
|
||||||
conf.ServerName = serverName
|
|
||||||
|
|
||||||
return conf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conf *tlsConfig) GenTLSConfig(tag string) (*tls.Config, error) {
|
|
||||||
if conf == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if conf.GetCAPath() == "" || conf.GetCertPath() == "" ||
|
|
||||||
conf.GetKeyPath() == "" {
|
|
||||||
return nil, errors.New("rabbit tls not valid")
|
|
||||||
}
|
|
||||||
|
|
||||||
caPem, err := os.ReadFile(conf.GetCAPath())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
certPool := x509.NewCertPool()
|
|
||||||
certPool.AppendCertsFromPEM(caPem)
|
|
||||||
|
|
||||||
keyPem, err := os.ReadFile(conf.GetKeyPath())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
certPem, err := os.ReadFile(conf.GetCertPath())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pemBlock, err := parsePrivateKey(keyPem, []byte(conf.GetPassword()))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cliCert, err := tls.X509KeyPair(certPem, pem.EncodeToMemory(pemBlock))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &tls.Config{
|
|
||||||
Certificates: []tls.Certificate{cliCert},
|
|
||||||
RootCAs: certPool,
|
|
||||||
ServerName: conf.GetServerName(),
|
|
||||||
InsecureSkipVerify: conf.GetSkipVerify(),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parsePrivateKey(key, password []byte) (*pem.Block, error) {
|
|
||||||
block, _ := pem.Decode(key)
|
|
||||||
if block == nil {
|
|
||||||
return nil, errors.New("no valid pem")
|
|
||||||
}
|
|
||||||
|
|
||||||
var privateKey any
|
|
||||||
var err error
|
|
||||||
switch block.Type {
|
|
||||||
case "RSA PRIVATE KEY":
|
|
||||||
privateKey, err = x509.ParsePKCS1PrivateKey(block.Bytes)
|
|
||||||
case "PRIVATE KEY":
|
|
||||||
privateKey, err = x509.ParsePKCS8PrivateKey(block.Bytes)
|
|
||||||
case "ENCRYPTED PRIVATE KEY":
|
|
||||||
privateKey, err = pkcs8.ParsePKCS8PrivateKey(block.Bytes, password)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unsupported key type: %s", block.Type)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &pem.Block{
|
|
||||||
Type: "PRIVATE KEY",
|
|
||||||
Bytes: pemBytes,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func rabbitConfigName() string {
|
func rabbitConfigName() string {
|
||||||
return "rabbit.json"
|
return "rabbit.json"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"filename": "./logs/datart.log",
|
"filename": "./logs/datart.log",
|
||||||
"maxsize": 100,
|
"maxsize": 128,
|
||||||
"maxage": 7,
|
"maxage": 7,
|
||||||
"maxbackups": 20,
|
"maxbackups": 20,
|
||||||
"localtime": true,
|
"localtime": true,
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
{
|
{
|
||||||
"default":{
|
"default":{
|
||||||
"host":"192.168.46.100",
|
"host":"127.0.0.1",
|
||||||
"port":9432,
|
"port":5432,
|
||||||
"user":"postgres",
|
"user":"postgres",
|
||||||
"password":"123RTYjkl",
|
"password":"password",
|
||||||
"dbname":"metamodule",
|
"dbname":"postgres",
|
||||||
"sslmode":"disable",
|
|
||||||
"timezone":"Asia/Shanghai"
|
"timezone":"Asia/Shanghai"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,62 @@
|
||||||
|
package data
|
||||||
|
|
||||||
|
import (
|
||||||
|
"datart/data/influx"
|
||||||
|
"datart/data/postgres"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GenPhasorFields(channel string) []string {
|
||||||
|
fields := make([]string, 0, 3)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case strings.HasPrefix(channel, postgres.ChannelYCPrefix):
|
||||||
|
|
||||||
|
fieldPrefix := strings.ToLower(channel)
|
||||||
|
fields = append(fields,
|
||||||
|
fieldPrefix+influx.FieldSuffixAMP,
|
||||||
|
fieldPrefix+influx.FieldSuffixPA,
|
||||||
|
fieldPrefix+influx.FieldSuffixRMS)
|
||||||
|
|
||||||
|
case strings.HasPrefix(channel, postgres.ChannelYXPrefix):
|
||||||
|
|
||||||
|
fields = append(fields, strings.ToLower(channel))
|
||||||
|
|
||||||
|
case strings.HasPrefix(channel, postgres.ChannelUPrefix):
|
||||||
|
|
||||||
|
fieldUPrefix := strings.ToLower(channel)
|
||||||
|
fields = append(fields,
|
||||||
|
fieldUPrefix+influx.FieldSuffixAMP,
|
||||||
|
fieldUPrefix+influx.FieldSuffixPA,
|
||||||
|
fieldUPrefix+influx.FieldSuffixRMS)
|
||||||
|
|
||||||
|
case channel == postgres.ChannelP,
|
||||||
|
channel == postgres.ChannelQ,
|
||||||
|
channel == postgres.ChannelS,
|
||||||
|
channel == postgres.ChannelPF,
|
||||||
|
channel == postgres.ChannelF,
|
||||||
|
channel == postgres.ChannelDF:
|
||||||
|
|
||||||
|
fields = append(fields, strings.ToLower(channel))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fields
|
||||||
|
}
|
||||||
|
|
||||||
|
type zUnit struct {
|
||||||
|
Key string
|
||||||
|
Members []redis.Z
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertTVsToMenmbers(tvs []influx.TV) []redis.Z {
|
||||||
|
members := make([]redis.Z, len(tvs))
|
||||||
|
|
||||||
|
for i, tv := range tvs {
|
||||||
|
members[i].Member = tv.Time
|
||||||
|
members[i].Score = tv.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
return members
|
||||||
|
}
|
||||||
46
data/data.go
46
data/data.go
|
|
@ -3,42 +3,40 @@ package data
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"datart/data/influx"
|
"datart/data/influx"
|
||||||
|
"datart/data/mongo"
|
||||||
"datart/data/postgres"
|
"datart/data/postgres"
|
||||||
|
"datart/data/rabbit"
|
||||||
"github.com/redis/go-redis/v9"
|
"datart/data/redis"
|
||||||
|
"datart/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Process struct {
|
type Processes struct {
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProcess() *Process {
|
func NewProcesses() *Processes {
|
||||||
return new(Process)
|
return new(Processes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Process) StartDataProcessing() {
|
func (p *Processes) StartDataProcessing() {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
p.cancel = cancel
|
p.cancel = cancel
|
||||||
postgres.GenSSU2ChannelSizes(ctx, 500)
|
|
||||||
|
if err := postgres.GenSSU2ChannelSizes(ctx, 500); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
updatingRedisPhasor(ctx)
|
updatingRedisPhasor(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Process) Cancel() {
|
func (p *Processes) Cancel(ctx context.Context) {
|
||||||
p.cancel()
|
p.cancel()
|
||||||
}
|
|
||||||
|
eventNotifyPublisher.Close(ctx)
|
||||||
type zUnit struct {
|
|
||||||
Key string
|
influx.CloseDefault()
|
||||||
Members []redis.Z
|
mongo.CloseDefault(ctx)
|
||||||
}
|
postgres.CloseDefault()
|
||||||
|
rabbit.CloseDefault(ctx)
|
||||||
func convertTVsToMenmbers(tvs []influx.TV) []redis.Z {
|
redis.CloseDefault()
|
||||||
members := make([]redis.Z, len(tvs))
|
|
||||||
|
|
||||||
for i, tv := range tvs {
|
|
||||||
members[i].Member = tv.Time
|
|
||||||
members[i].Score = tv.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
return members
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ func init() {
|
||||||
client.org = influxConfig.GetOrg()
|
client.org = influxConfig.GetOrg()
|
||||||
}
|
}
|
||||||
|
|
||||||
func Close() {
|
func CloseDefault() {
|
||||||
client.CloseIdleConnections()
|
client.CloseIdleConnections()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dbphasor = "influxBucket"
|
dbphasor = "ssuBucket"
|
||||||
dbsample = "influxBucket"
|
dbsample = "ssuBucket"
|
||||||
)
|
)
|
||||||
|
|
||||||
// keep consistent with telegraf
|
// keep consistent with telegraf
|
||||||
|
|
@ -35,13 +35,15 @@ const (
|
||||||
FieldSuffixRMS = "_rms"
|
FieldSuffixRMS = "_rms"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const adaptedms = 1000
|
||||||
|
|
||||||
func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
||||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
req.Begin = time.Now().UnixMilli() - int64(limit*20+adaptedms)
|
||||||
return client.GetSSUPointLastLimit(ctx, req, limit)
|
return client.GetSSUPointLastLimit(ctx, req, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
|
func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
|
||||||
req.Begin = time.Now().UnixMilli() - int64(limit*20+20)
|
req.Begin = time.Now().UnixMilli() - int64(limit*20+adaptedms)
|
||||||
return client.GetSSUPointsLastLimit(ctx, req, limit)
|
return client.GetSSUPointsLastLimit(ctx, req, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,7 +70,7 @@ func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Reque
|
||||||
req.SubPos, req.SubPos, req.Table, req.Station, req.MainPos)
|
req.SubPos, req.SubPos, req.Table, req.Station, req.MainPos)
|
||||||
if limit > 1 {
|
if limit > 1 {
|
||||||
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;",
|
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;",
|
||||||
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20)
|
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
reqData := url.Values{
|
reqData := url.Values{
|
||||||
|
|
@ -90,7 +92,7 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ
|
||||||
strings.Join(fields, ","), req.Table, req.Station, req.MainPos)
|
strings.Join(fields, ","), req.Table, req.Station, req.MainPos)
|
||||||
} else {
|
} else {
|
||||||
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;",
|
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;",
|
||||||
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20)
|
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
reqData := url.Values{
|
reqData := url.Values{
|
||||||
|
|
@ -105,7 +107,7 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
ret := make(map[string][]TV, len(f2tvs))
|
||||||
for f, tvs := range f2tvs {
|
for f, tvs := range f2tvs {
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple
|
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
|
@ -154,7 +156,7 @@ func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *R
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
ret := make(map[string][]TV, len(f2tvs))
|
||||||
for f, tvs := range f2tvs {
|
for f, tvs := range f2tvs {
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple
|
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
|
@ -188,7 +190,7 @@ func (client *influxClient) GetSSUPointsAfterLimit(ctx context.Context, req *Req
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
ret := make(map[string][]TV, len(f2tvs))
|
||||||
for f, tvs := range f2tvs {
|
for f, tvs := range f2tvs {
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple
|
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
|
@ -198,7 +200,7 @@ func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Re
|
||||||
reqData := url.Values{
|
reqData := url.Values{
|
||||||
"db": {req.DB},
|
"db": {req.DB},
|
||||||
"q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms order by time desc limit %d;",
|
"q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms order by time desc limit %d;",
|
||||||
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)}, // begin = req.End-20-20
|
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)},
|
||||||
}
|
}
|
||||||
|
|
||||||
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
|
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
|
||||||
|
|
@ -208,7 +210,7 @@ func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Re
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
ret := make(map[string][]TV, len(f2tvs))
|
||||||
for f, tvs := range f2tvs {
|
for f, tvs := range f2tvs {
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs // only req.SubPos support multiple
|
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret, nil
|
return ret, nil
|
||||||
|
|
|
||||||
|
|
@ -7,17 +7,17 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
_ = iota
|
_ = iota
|
||||||
almCodeCommmExcept // 通信异常
|
almCodeCommmExcept
|
||||||
almCodeADFault // AD故障
|
almCodeADFault
|
||||||
almCodePPSExcept // 同步秒脉冲异常
|
almCodePPSExcept
|
||||||
almCodeReserve1 // 备用
|
almCodeReserve1
|
||||||
almCodeUnitInit // 单元初始化
|
almCodeUnitInit
|
||||||
almCodeReadParamErr // 读参数错
|
almCodeReadParamErr
|
||||||
almCodeReserve2 // 备用
|
almCodeReserve2
|
||||||
almCodeStartSample // 启动采样-内部转换信号
|
almCodeStartSample
|
||||||
almCodeOverSample // 秒内采样点数过量
|
almCodeOverSample
|
||||||
almCodeUnderSample // 秒内采样点数欠量
|
almCodeUnderSample
|
||||||
)
|
)
|
||||||
|
|
||||||
type Alarm struct {
|
type Alarm struct {
|
||||||
|
|
@ -47,11 +47,20 @@ func (a *Alarm) GetName() string {
|
||||||
|
|
||||||
func (a *Alarm) GetType() int {
|
func (a *Alarm) GetType() int {
|
||||||
switch a.AlarmCode {
|
switch a.AlarmCode {
|
||||||
case almCodeReserve1, almCodeReserve2, almCodeUnitInit, almCodeStartSample:
|
case almCodeReserve1,
|
||||||
|
almCodeReserve2,
|
||||||
|
almCodeUnitInit,
|
||||||
|
almCodeStartSample:
|
||||||
return genEventType(0, 0)
|
return genEventType(0, 0)
|
||||||
case almCodeOverSample, almCodeUnderSample:
|
|
||||||
|
case almCodeOverSample,
|
||||||
|
almCodeUnderSample:
|
||||||
return genEventType(0, 1)
|
return genEventType(0, 1)
|
||||||
case almCodeCommmExcept, almCodeADFault, almCodePPSExcept, almCodeReadParamErr:
|
|
||||||
|
case almCodeCommmExcept,
|
||||||
|
almCodeADFault,
|
||||||
|
almCodePPSExcept,
|
||||||
|
almCodeReadParamErr:
|
||||||
return genEventType(0, 2)
|
return genEventType(0, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -60,36 +69,59 @@ func (a *Alarm) GetType() int {
|
||||||
|
|
||||||
func (a *Alarm) GetPriority() int {
|
func (a *Alarm) GetPriority() int {
|
||||||
switch a.AlarmCode {
|
switch a.AlarmCode {
|
||||||
case almCodeReserve1, almCodeReserve2, almCodeUnitInit, almCodeStartSample:
|
case almCodeReserve1,
|
||||||
|
almCodeReserve2,
|
||||||
|
almCodeUnitInit,
|
||||||
|
almCodeStartSample:
|
||||||
return 1
|
return 1
|
||||||
case almCodeOverSample, almCodeUnderSample:
|
|
||||||
|
case almCodeOverSample,
|
||||||
|
almCodeUnderSample:
|
||||||
return 4
|
return 4
|
||||||
case almCodeCommmExcept, almCodeADFault, almCodePPSExcept, almCodeReadParamErr:
|
|
||||||
|
case almCodeCommmExcept,
|
||||||
|
almCodeADFault,
|
||||||
|
almCodePPSExcept,
|
||||||
|
almCodeReadParamErr:
|
||||||
return 7
|
return 7
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Alarm) ConvertToEvent(ip string) *Event {
|
func GenEvent(alarm []byte, ip string) (*Event, error) {
|
||||||
|
a, err := UnmarshallToAlarm(alarm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.ConvertToEvent(ip)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Alarm) ConvertToEvent(ip string) (*Event, error) {
|
||||||
e := new(Event)
|
e := new(Event)
|
||||||
|
uid, err := uuid.NewV7()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if a != nil {
|
if a != nil {
|
||||||
e.Event = a.GetName()
|
e.Event = a.GetName()
|
||||||
e.EventUUID = uuid.NewString()
|
e.EventUUID = uid.String()
|
||||||
e.Type = a.GetType()
|
e.Type = a.GetType()
|
||||||
e.Priority = a.GetPriority()
|
e.Priority = a.GetPriority()
|
||||||
e.Status = EventStatusHappen
|
e.Status = EventStatusHappen
|
||||||
e.Timestamp = a.AlarmTime
|
e.Timestamp = a.AlarmTime
|
||||||
e.From = "station"
|
e.From = "station"
|
||||||
e.Operations = append(e.Operations, &operation{
|
e.Operations = append(e.Operations, &operation{
|
||||||
Action: EventActionHappened, // TODO
|
Action: EventActionHappen,
|
||||||
OP: ip,
|
OP: ip,
|
||||||
TS: a.AlarmTime,
|
TS: a.AlarmTime,
|
||||||
})
|
})
|
||||||
e.Alarm = a
|
e.Alarm = a
|
||||||
}
|
}
|
||||||
|
|
||||||
return e
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func UnmarshallToAlarm(data []byte) (*Alarm, error) {
|
func UnmarshallToAlarm(data []byte) (*Alarm, error) {
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,12 @@ package mongo
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -23,15 +27,37 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EventActionHappened = "happened"
|
EventActionHappen = "happen"
|
||||||
|
EventActionDataAt = "data_attach"
|
||||||
|
EventActionReport = "report"
|
||||||
|
EventActionConfirm = "confirm"
|
||||||
|
EventActionPersist = "persist"
|
||||||
|
EventActionClose = "close"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var EventStatusAction = []string{
|
||||||
|
EventStatusHappen: EventActionHappen,
|
||||||
|
EventStatusDataAt: EventActionDataAt,
|
||||||
|
EventStatusReport: EventActionReport,
|
||||||
|
EventStatusConfirm: EventActionConfirm,
|
||||||
|
EventStatusPersist: EventActionPersist,
|
||||||
|
EventStatusClose: EventActionClose,
|
||||||
|
}
|
||||||
|
|
||||||
type operation struct {
|
type operation struct {
|
||||||
Action string `bson:"action" json:"action"`
|
Action string `bson:"action" json:"action"`
|
||||||
OP string `bson:"op" json:"op"`
|
OP string `bson:"op" json:"op"`
|
||||||
TS int64 `bson:"ts" json:"ts"`
|
TS int64 `bson:"ts" json:"ts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GenOperation(action, op string) operation {
|
||||||
|
return operation{
|
||||||
|
Action: action,
|
||||||
|
OP: op,
|
||||||
|
TS: time.Now().UnixMilli(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
Event string `bson:"event" json:"event"`
|
Event string `bson:"event" json:"event"`
|
||||||
EventUUID string `bson:"event_uuid" json:"event_uuid"`
|
EventUUID string `bson:"event_uuid" json:"event_uuid"`
|
||||||
|
|
@ -41,7 +67,7 @@ type Event struct {
|
||||||
Timestamp int64 `bson:"timestamp" json:"timestamp"`
|
Timestamp int64 `bson:"timestamp" json:"timestamp"`
|
||||||
From string `bson:"from" json:"from"`
|
From string `bson:"from" json:"from"`
|
||||||
Operations []*operation `bson:"operations" json:"operations"`
|
Operations []*operation `bson:"operations" json:"operations"`
|
||||||
// TODO complete
|
// others
|
||||||
Alarm *Alarm `bson:"alarm" json:"alarm"`
|
Alarm *Alarm `bson:"alarm" json:"alarm"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -49,98 +75,31 @@ func (e *Event) Marshall() ([]byte, error) {
|
||||||
return json.Marshal(e)
|
return json.Marshal(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func InsertOneEvent(ctx context.Context, doc *Event) error {
|
func InsertEvent(ctx context.Context, doc any) error {
|
||||||
_, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc)
|
_, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func InsertEvents(ctx context.Context, docs []*Event) error {
|
func DeleteEvent[T bson.M | bson.D](ctx context.Context, filter T) error {
|
||||||
_, err := getCollection(dbevent, tbevent).InsertMany(ctx, docs)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func DeleteOneEvent[T bson.M | bson.D](ctx context.Context, filter T) error {
|
|
||||||
_, err := getCollection(dbevent, tbevent).DeleteOne(ctx, filter)
|
_, err := getCollection(dbevent, tbevent).DeleteOne(ctx, filter)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteEvents[T bson.M | bson.D](ctx context.Context, filter T) error {
|
func UpdateEvent[T bson.M | bson.D](ctx context.Context, filter T, update T) error {
|
||||||
_, err := getCollection(dbevent, tbevent).DeleteMany(ctx, filter)
|
_, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpsertOneEvent[T bson.M | bson.D](ctx context.Context, filter T, update T) error {
|
|
||||||
opts := options.UpdateOne().SetUpsert(true)
|
|
||||||
_, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update, opts)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func UpsertEvents[T bson.M | bson.D](ctx context.Context, filter T, update T) error {
|
|
||||||
opts := options.UpdateMany().SetUpsert(true)
|
|
||||||
_, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update, opts)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func FindOneEvent[T bson.M | bson.D](ctx context.Context, filter T) (*Event, error) {
|
|
||||||
doc := new(Event)
|
|
||||||
err := getCollection(dbevent, tbevent).FindOne(ctx, filter).Decode(doc)
|
|
||||||
// if errors.Is(err, mongo.ErrNoDocuments) {
|
|
||||||
// return nil, nil
|
|
||||||
// }
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return doc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func FindEvents[T bson.M | bson.D](ctx context.Context, filter T) ([]*Event, error) {
|
|
||||||
cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer cursor.Close(ctx)
|
|
||||||
|
|
||||||
var docs []*Event
|
|
||||||
if err = cursor.All(ctx, &docs); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return docs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func FindEventsInBatch[T bson.M | bson.D](ctx context.Context, filter T,
|
|
||||||
batchSize int32) ([]*Event, error) {
|
|
||||||
|
|
||||||
opt := options.Find().SetBatchSize(batchSize)
|
|
||||||
cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer cursor.Close(ctx)
|
|
||||||
|
|
||||||
var docs []*Event
|
|
||||||
for cursor.Next(ctx) {
|
|
||||||
doc := new(Event)
|
|
||||||
if err = cursor.Decode(doc); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
docs = append(docs, doc)
|
|
||||||
}
|
|
||||||
if err := cursor.Err(); err != nil {
|
|
||||||
return docs, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return docs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T,
|
func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T,
|
||||||
sort int, page int64, limit int64) ([]*Event, error) {
|
sort int, page int64, limit int64) ([]*Event, error) {
|
||||||
|
|
||||||
opt := options.Find()
|
opt := options.Find()
|
||||||
if sort == 1 || sort == -1 {
|
if sort == 1 || sort == -1 {
|
||||||
opt.SetSort(bson.D{{Key: "timestamp", Value: sort}})
|
opt.SetSort(bson.D{{Key: "timestamp", Value: sort}})
|
||||||
|
} else {
|
||||||
|
opt.SetSort(bson.D{{Key: "_id", Value: 1}})
|
||||||
}
|
}
|
||||||
|
|
||||||
if page > 0 && limit > 0 {
|
if page > 0 && limit > 0 {
|
||||||
opt.SetSkip(limit * (page - 1)).SetLimit(limit)
|
opt.SetSkip(limit * (page - 1)).SetLimit(limit)
|
||||||
}
|
}
|
||||||
|
|
@ -166,9 +125,78 @@ func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T,
|
||||||
return docs, nil
|
return docs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sys: 0-hard/1-platform/2-application
|
func BulkWriteEventsWithUUID(ctx context.Context, curd byte, events []map[string]any) ([]string, error) {
|
||||||
//
|
length := len(events)
|
||||||
// level:0-info/1-warn/2-error
|
if length <= 0 {
|
||||||
|
return nil, errors.New("no event")
|
||||||
|
}
|
||||||
|
|
||||||
|
models := make([]mongo.WriteModel, 0, length)
|
||||||
|
idx2UUID := make(map[int]string, length)
|
||||||
|
for i, event := range events {
|
||||||
|
uuid, ok := event["event_uuid"].(string)
|
||||||
|
if !ok || uuid == "" {
|
||||||
|
return nil, fmt.Errorf("invalid uuid at index %d", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch curd {
|
||||||
|
case 'c':
|
||||||
|
models = append(models,
|
||||||
|
mongo.NewInsertOneModel().
|
||||||
|
SetDocument(event))
|
||||||
|
|
||||||
|
case 'u':
|
||||||
|
filter := bson.M{"event_uuid": uuid}
|
||||||
|
status, ok := event["status"].(int)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid status at index %d", i)
|
||||||
|
}
|
||||||
|
operation, ok := event["operation"].(operation)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid operation at index %d", i)
|
||||||
|
}
|
||||||
|
update := bson.M{"$set": bson.M{"status": status},
|
||||||
|
"$push": bson.M{"operations": operation}}
|
||||||
|
models = append(models,
|
||||||
|
mongo.NewUpdateOneModel().
|
||||||
|
SetFilter(filter).
|
||||||
|
SetUpdate(update))
|
||||||
|
|
||||||
|
case 'd':
|
||||||
|
filter := bson.M{"event_uuid": uuid}
|
||||||
|
models = append(models,
|
||||||
|
mongo.NewDeleteOneModel().
|
||||||
|
SetFilter(filter))
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid curd")
|
||||||
|
}
|
||||||
|
|
||||||
|
idx2UUID[i] = uuid
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := options.BulkWrite().SetOrdered(false)
|
||||||
|
_, err := getCollection(dbevent, tbevent).BulkWrite(ctx, models, opts)
|
||||||
|
|
||||||
|
sUUIDs := []string{}
|
||||||
|
if err != nil {
|
||||||
|
if bulkErr, ok := err.(mongo.BulkWriteException); ok {
|
||||||
|
idxExist := map[int]bool{}
|
||||||
|
for _, writeErr := range bulkErr.WriteErrors {
|
||||||
|
idxExist[writeErr.Index] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx, uuid := range idx2UUID {
|
||||||
|
if !idxExist[idx] {
|
||||||
|
sUUIDs = append(sUUIDs, uuid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sUUIDs, err
|
||||||
|
}
|
||||||
|
|
||||||
func genEventType(sys int, level int) int {
|
func genEventType(sys int, level int) int {
|
||||||
return sys + level*3
|
return sys + level*3
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ func NewMongoClient(opts ...*options.ClientOptions) (*mongo.Client, error) {
|
||||||
return mongo.Connect(opts...)
|
return mongo.Connect(opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Disconnect(ctx context.Context) error {
|
func CloseDefault(ctx context.Context) error {
|
||||||
return client.Disconnect(ctx)
|
return client.Disconnect(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"database/sql/driver"
|
"database/sql/driver"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -49,7 +50,7 @@ type measurement struct {
|
||||||
Tag string `gorm:"column:tag"`
|
Tag string `gorm:"column:tag"`
|
||||||
Size int `gorm:"column:size"`
|
Size int `gorm:"column:size"`
|
||||||
DataSource *dataSource `gorm:"column:data_source;type:jsonb"`
|
DataSource *dataSource `gorm:"column:data_source;type:jsonb"`
|
||||||
// mapping TODO
|
// others
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChannelSize struct {
|
type ChannelSize struct {
|
||||||
|
|
@ -61,6 +62,7 @@ type ChannelSize struct {
|
||||||
|
|
||||||
// channel is original
|
// channel is original
|
||||||
var SSU2ChannelSizes atomic.Value
|
var SSU2ChannelSizes atomic.Value
|
||||||
|
var ChannelSizes sync.Map
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
SSU2ChannelSizes.Store(map[string][]ChannelSize{})
|
SSU2ChannelSizes.Store(map[string][]ChannelSize{})
|
||||||
|
|
@ -133,6 +135,7 @@ func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error)
|
||||||
|
|
||||||
func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
|
func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
|
||||||
id := int64(0)
|
id := int64(0)
|
||||||
|
ssu2Channel2Exist := make(map[string]map[string]struct{})
|
||||||
ssu2ChannelSizes := make(map[string][]ChannelSize)
|
ssu2ChannelSizes := make(map[string][]ChannelSize)
|
||||||
for {
|
for {
|
||||||
var records []*measurement
|
var records []*measurement
|
||||||
|
|
@ -153,9 +156,8 @@ func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
addrType := record.DataSource.Type
|
if err := genMappingFromAddr(ssu2ChannelSizes, ssu2Channel2Exist, record); err != nil {
|
||||||
addr := record.DataSource.Addr
|
|
||||||
if err := genMappingFromAddr(ssu2ChannelSizes, addrType, addr, record.Size); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -168,10 +170,12 @@ func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, addrType int, addr any, size int) error {
|
func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
|
||||||
switch addrType {
|
ssu2Channel2Exist map[string]map[string]struct{}, record *measurement) error {
|
||||||
|
|
||||||
|
switch record.DataSource.Type {
|
||||||
case 1:
|
case 1:
|
||||||
if rawAddr, ok := addr.(map[string]interface{}); ok {
|
if rawAddr, ok := record.DataSource.Addr.(map[string]interface{}); ok {
|
||||||
station, ok := rawAddr["station"].(string)
|
station, ok := rawAddr["station"].(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("invalid station")
|
return errors.New("invalid station")
|
||||||
|
|
@ -184,16 +188,33 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize, addrType int,
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("invalid channel")
|
return errors.New("invalid channel")
|
||||||
}
|
}
|
||||||
ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], ChannelSize{
|
|
||||||
|
if _, ok := ssu2Channel2Exist[device]; !ok {
|
||||||
|
ssu2Channel2Exist[device] = make(map[string]struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := ssu2Channel2Exist[device][channel]; ok {
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
ssu2Channel2Exist[device][channel] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
channelSize := ChannelSize{
|
||||||
Station: station,
|
Station: station,
|
||||||
Device: device,
|
Device: device,
|
||||||
Channel: channel,
|
Channel: channel,
|
||||||
Size: size,
|
Size: record.Size,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
ChannelSizes.Store(record.Tag, channelSize)
|
||||||
|
|
||||||
|
ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], channelSize)
|
||||||
} else {
|
} else {
|
||||||
return errors.New("invalid io_address")
|
return errors.New("invalid io_address")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case 2:
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return errors.New("invalid data_source.type")
|
return errors.New("invalid data_source.type")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,17 +6,20 @@ import (
|
||||||
|
|
||||||
"gorm.io/driver/postgres"
|
"gorm.io/driver/postgres"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
"gorm.io/gorm/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
var client *gorm.DB
|
var client *gorm.DB
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
postgresConfig := config.Conf().PostgresConf("default")
|
postgresConfig := config.Conf().PostgresConf("default")
|
||||||
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=%s TimeZone=%s",
|
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d timezone=%s",
|
||||||
postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(),
|
postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(),
|
||||||
postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetSSLMode(),
|
postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetTimeZone())
|
||||||
postgresConfig.GetTimeZone())
|
|
||||||
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{SkipDefaultTransaction: true})
|
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
|
||||||
|
SkipDefaultTransaction: true,
|
||||||
|
Logger: logger.Default.LogMode(logger.Silent)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
@ -24,7 +27,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// close postgres default client
|
// close postgres default client
|
||||||
func Close() error {
|
func CloseDefault() error {
|
||||||
db, err := client.DB()
|
db, err := client.DB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -2,65 +2,72 @@ package data
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"datart/data/mongo"
|
|
||||||
"datart/data/rabbit"
|
"datart/data/rabbit"
|
||||||
"datart/log"
|
"datart/log"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
||||||
)
|
)
|
||||||
|
|
||||||
var eventXQK = rabbit.XQK{
|
var eventNotifyXQK = rabbit.XQK{
|
||||||
Exchange: "event_produce_exchange",
|
Exchange: "event_notify_fanout",
|
||||||
// Key: "event_produce_key",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var eventPublisher *rmq.Publisher
|
var eventNotifyPublisher *rmq.Publisher
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
publisher, err := rabbit.NewPublisher(context.Background(), "default", &eventXQK)
|
ctx := context.Background()
|
||||||
|
|
||||||
|
m := new(rabbit.Manage)
|
||||||
|
|
||||||
|
rm := rabbit.DefaultManagement()
|
||||||
|
|
||||||
|
rx := &rmq.FanOutExchangeSpecification{
|
||||||
|
Name: eventNotifyXQK.Exchange,
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Init(ctx, rm, rx, nil, nil)
|
||||||
|
|
||||||
|
if err := m.DeclareExchange(ctx); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
publisher, err := rabbit.NewPublisher(ctx, "default", &eventNotifyXQK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
eventPublisher = publisher
|
eventNotifyPublisher = publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func PublishEvent(ctx context.Context, event *mongo.Event) error {
|
func PublishEvent(ctx context.Context, event any) error {
|
||||||
data, err := event.Marshall()
|
data, err := json.Marshal(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := eventPublisher.Publish(ctx, rmq.NewMessage(data))
|
result, err := eventNotifyPublisher.Publish(ctx, rmq.NewMessage(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch result.Outcome.(type) {
|
switch result.Outcome.(type) {
|
||||||
case *rmq.StateAccepted:
|
case *rmq.StateAccepted:
|
||||||
// TODO: "Message accepted"
|
// "message accepted"
|
||||||
case *rmq.StateReleased:
|
case *rmq.StateReleased:
|
||||||
return fmt.Errorf("message not routed: %v", event)
|
// "message released"
|
||||||
case *rmq.StateRejected:
|
case *rmq.StateRejected:
|
||||||
return fmt.Errorf("message rejected: %v", event)
|
return errors.New("message rejected")
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("invalid message %v state: %v", event, result.Outcome)
|
return fmt.Errorf("invalid message state: %v", result.Outcome)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GenEvent(data []byte, ip string) (*mongo.Event, error) {
|
|
||||||
alarm, err := mongo.UnmarshallToAlarm(data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return alarm.ConvertToEvent(ip), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func CloseEventPublisher(ctx context.Context) {
|
func CloseEventPublisher(ctx context.Context) {
|
||||||
if err := eventPublisher.Close(ctx); err != nil {
|
if err := eventNotifyPublisher.Close(ctx); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,12 +25,10 @@ func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byt
|
||||||
deliCtx, err := consumer.Receive(ctx)
|
deliCtx, err := consumer.Receive(ctx)
|
||||||
if errors.Is(err, context.Canceled) {
|
if errors.Is(err, context.Canceled) {
|
||||||
// The consumer was closed correctly
|
// The consumer was closed correctly
|
||||||
// TODO
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// An error occurred receiving the message
|
// An error occurred receiving the message
|
||||||
// TODO
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -40,7 +38,7 @@ func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byt
|
||||||
|
|
||||||
err = deliCtx.Accept(ctx)
|
err = deliCtx.Accept(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO
|
// accept error
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,16 +26,12 @@ func (m *Manage) Init(ctx context.Context, rm *rmq.AmqpManagement,
|
||||||
m.b = rb
|
m.b = rb
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manage) DeclareExchangeQueue(ctx context.Context) error {
|
func (m *Manage) DeclareExchange(ctx context.Context) error {
|
||||||
_, err := m.m.DeclareExchange(ctx, m.x)
|
xinfo, err := m.m.DeclareExchange(ctx, m.x)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = m.m.DeclareQueue(ctx, m.q)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
m.xName = xinfo.Name()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,6 @@ package rabbit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
||||||
)
|
)
|
||||||
|
|
@ -29,29 +27,26 @@ func Publishing(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan []
|
||||||
case msg := <-msgChan:
|
case msg := <-msgChan:
|
||||||
result, err := publisher.Publish(ctx, rmq.NewMessage(msg))
|
result, err := publisher.Publish(ctx, rmq.NewMessage(msg))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = err // TODO
|
_ = err // publish error
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
switch result.Outcome.(type) {
|
switch result.Outcome.(type) {
|
||||||
case *rmq.StateAccepted:
|
case *rmq.StateAccepted:
|
||||||
// TODO: "Message accepted"
|
// "message accepted"
|
||||||
|
|
||||||
case *rmq.StateReleased:
|
case *rmq.StateReleased:
|
||||||
// TODO: "Message not routed"
|
// "message not routed"
|
||||||
|
|
||||||
case *rmq.StateRejected:
|
case *rmq.StateRejected:
|
||||||
// TODO: "Message rejected"
|
// "message rejected"
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// *rmp.StateModified
|
// *rmp.StateModified
|
||||||
// *rmq.StateReceived
|
// *rmq.StateReceived
|
||||||
// TODO: ("Message state: %v", publishResult.Outcome)
|
|
||||||
}
|
}
|
||||||
case <-time.After(time.Second):
|
case <-ctx.Done():
|
||||||
// TODO
|
return
|
||||||
fmt.Println("second passed")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"datart/config"
|
"datart/config"
|
||||||
|
|
||||||
"github.com/Azure/go-amqp"
|
|
||||||
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -32,30 +31,22 @@ func init() {
|
||||||
client.conn = conn
|
client.conn = conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func Close(ctx context.Context) error {
|
func CloseDefault(ctx context.Context) error {
|
||||||
return client.Close(ctx)
|
return client.Close(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DefaultManagement() *rmq.AmqpManagement {
|
||||||
|
return client.Management()
|
||||||
|
}
|
||||||
|
|
||||||
func genEndpoints(tag string) ([]rmq.Endpoint, error) {
|
func genEndpoints(tag string) ([]rmq.Endpoint, error) {
|
||||||
confs := config.Conf().RabbitConf(tag)
|
confs := config.Conf().RabbitConf(tag)
|
||||||
endpoints := make([]rmq.Endpoint, len(confs))
|
endpoints := make([]rmq.Endpoint, len(confs))
|
||||||
|
|
||||||
for i, conf := range confs {
|
for i, conf := range confs {
|
||||||
tlsConfig, err := conf.GetTLS().GenTLSConfig(tag)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var options *rmq.AmqpConnOptions
|
var options *rmq.AmqpConnOptions
|
||||||
var tls bool
|
|
||||||
if tlsConfig != nil {
|
|
||||||
options = &rmq.AmqpConnOptions{
|
|
||||||
SASLType: amqp.SASLTypeExternal(""),
|
|
||||||
TLSConfig: tlsConfig}
|
|
||||||
tls = true
|
|
||||||
}
|
|
||||||
|
|
||||||
endpoints[i].Address = conf.GenAddress(tls)
|
endpoints[i].Address = conf.GenAddress(false)
|
||||||
endpoints[i].Options = options
|
endpoints[i].Options = options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// close redis client
|
// close redis client
|
||||||
func Close() error {
|
func CloseDefault() error {
|
||||||
return client.Close()
|
return client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
duration time.Duration = time.Second * 5
|
updatePhasorDuration time.Duration = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
func updatingRedisPhasor(ctx context.Context) {
|
func updatingRedisPhasor(ctx context.Context) {
|
||||||
|
|
@ -30,7 +30,7 @@ func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit)
|
||||||
ssuType := config.Conf().ServerConf().GetSSUType()
|
ssuType := config.Conf().ServerConf().GetSSUType()
|
||||||
for ssu := range ssuType {
|
for ssu := range ssuType {
|
||||||
go func(ssu string) {
|
go func(ssu string) {
|
||||||
timer := time.Tick(duration)
|
timer := time.Tick(updatePhasorDuration)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timer:
|
case <-timer:
|
||||||
|
|
@ -65,15 +65,26 @@ func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
|
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
|
||||||
fields := genPhasorFields(channelSize.Channel)
|
fields := GenPhasorFields(channelSize.Channel)
|
||||||
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
|
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
|
||||||
fields, channelSize.Size)
|
fields, channelSize.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for field, tvs := range f2tvs {
|
// if len(f2tvs) <= 0 {
|
||||||
key := genRedisPhasorKey(channelSize.Station, channelSize.Device, field)
|
// log.Debug(channelSize.Station, " ", channelSize.Device, " ",
|
||||||
|
// fields, " query none of ", channelSize.Size)
|
||||||
|
// }
|
||||||
|
|
||||||
|
for f, tvs := range f2tvs {
|
||||||
|
sdf := strings.Split(f, ".")
|
||||||
|
if len(sdf) != 3 {
|
||||||
|
log.Error("invalid influx field")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2])
|
||||||
members := convertTVsToMenmbers(tvs)
|
members := convertTVsToMenmbers(tvs)
|
||||||
ssuChan <- zUnit{
|
ssuChan <- zUnit{
|
||||||
Key: key,
|
Key: key,
|
||||||
|
|
@ -109,43 +120,6 @@ func updateZUnitToRedis(ctx context.Context, unit zUnit) error {
|
||||||
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
|
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
|
||||||
}
|
}
|
||||||
|
|
||||||
func genPhasorFields(channel string) []string {
|
|
||||||
fields := make([]string, 0, 3)
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case strings.HasPrefix(channel, postgres.ChannelYCPrefix):
|
|
||||||
|
|
||||||
fieldPrefix := strings.ToLower(channel)
|
|
||||||
fields = append(fields,
|
|
||||||
fieldPrefix+influx.FieldSuffixAMP,
|
|
||||||
fieldPrefix+influx.FieldSuffixPA,
|
|
||||||
fieldPrefix+influx.FieldSuffixRMS)
|
|
||||||
|
|
||||||
case strings.HasPrefix(channel, postgres.ChannelYXPrefix):
|
|
||||||
|
|
||||||
fields = append(fields, strings.ToLower(channel))
|
|
||||||
|
|
||||||
case strings.HasPrefix(channel, postgres.ChannelUPrefix):
|
|
||||||
|
|
||||||
fieldUPrefix := strings.ToLower(channel)
|
|
||||||
fields = append(fields,
|
|
||||||
fieldUPrefix+influx.FieldSuffixAMP,
|
|
||||||
fieldUPrefix+influx.FieldSuffixPA,
|
|
||||||
fieldUPrefix+influx.FieldSuffixRMS)
|
|
||||||
|
|
||||||
case channel == postgres.ChannelP,
|
|
||||||
channel == postgres.ChannelQ,
|
|
||||||
channel == postgres.ChannelS,
|
|
||||||
channel == postgres.ChannelPF,
|
|
||||||
channel == postgres.ChannelF,
|
|
||||||
channel == postgres.ChannelDF:
|
|
||||||
|
|
||||||
fields = append(fields, strings.ToLower(channel))
|
|
||||||
}
|
|
||||||
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
func genRedisPhasorKey(station, device, field string) string {
|
func genRedisPhasorKey(station, device, field string) string {
|
||||||
return strings.Join([]string{station, device, "phasor", field}, ":")
|
return strings.Join([]string{station, device, "phasor", field}, ":")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
4
go.mod
4
go.mod
|
|
@ -3,12 +3,10 @@ module datart
|
||||||
go 1.24.0
|
go 1.24.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Azure/go-amqp v1.5.0
|
|
||||||
github.com/gin-gonic/gin v1.11.0
|
github.com/gin-gonic/gin v1.11.0
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0
|
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0
|
||||||
github.com/redis/go-redis/v9 v9.14.0
|
github.com/redis/go-redis/v9 v9.14.0
|
||||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
|
|
||||||
go.mongodb.org/mongo-driver/v2 v2.3.0
|
go.mongodb.org/mongo-driver/v2 v2.3.0
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||||
|
|
@ -17,6 +15,7 @@ require (
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/Azure/go-amqp v1.5.0 // indirect
|
||||||
github.com/bytedance/sonic v1.14.0 // indirect
|
github.com/bytedance/sonic v1.14.0 // indirect
|
||||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
|
@ -51,6 +50,7 @@ require (
|
||||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||||
github.com/xdg-go/scram v1.1.2 // indirect
|
github.com/xdg-go/scram v1.1.2 // indirect
|
||||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||||
|
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
|
||||||
go.uber.org/mock v0.5.0 // indirect
|
go.uber.org/mock v0.5.0 // indirect
|
||||||
go.uber.org/multierr v1.10.0 // indirect
|
go.uber.org/multierr v1.10.0 // indirect
|
||||||
golang.org/x/arch v0.20.0 // indirect
|
golang.org/x/arch v0.20.0 // indirect
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ func init() {
|
||||||
// ErrorLevel, 2, logs are high-priority. If an application is running smoothly,
|
// ErrorLevel, 2, logs are high-priority. If an application is running smoothly,
|
||||||
// it shouldn't generate any error-level logs.
|
// it shouldn't generate any error-level logs.
|
||||||
|
|
||||||
// DPanicLevel, 3, logs are particularly important errors. In development the
|
// PanicLevel, 3, logs are particularly important errors. In development the
|
||||||
// logger panics after writing the message.
|
// logger panics after writing the message.
|
||||||
|
|
||||||
// PanicLevel, 4, logs a message, then panics.
|
// PanicLevel, 4, logs a message, then panics.
|
||||||
|
|
|
||||||
35
main.go
35
main.go
|
|
@ -1,25 +1,44 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"datart/config"
|
"datart/config"
|
||||||
"datart/data"
|
"datart/data"
|
||||||
"datart/route"
|
"datart/route"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
gin.SetMode(gin.ReleaseMode)
|
signalChan := make(chan os.Signal, 1)
|
||||||
engine := gin.New()
|
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
|
// gin.SetMode(gin.ReleaseMode)
|
||||||
|
|
||||||
|
engine := gin.Default()
|
||||||
|
// engine := gin.New()
|
||||||
|
|
||||||
route.LoadRoute(engine)
|
route.LoadRoute(engine)
|
||||||
|
|
||||||
process := data.NewProcess()
|
processes := data.NewProcesses()
|
||||||
process.StartDataProcessing()
|
processes.StartDataProcessing()
|
||||||
|
|
||||||
port := strconv.Itoa(config.Conf().ServerConf().GetPort())
|
go func() {
|
||||||
if err := engine.Run(":" + port); err != nil {
|
port := strconv.Itoa(config.Conf().ServerConf().GetPort())
|
||||||
panic(err)
|
if err := engine.Run(":" + port); err != nil {
|
||||||
}
|
panic(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-signalChan
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
processes.Cancel(ctx)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,9 +10,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type command struct {
|
type command struct {
|
||||||
Function string `json:"function"`
|
Command string `json:"command"`
|
||||||
Timeout int64 `json:"timeout"`
|
Timeout int64 `json:"timeout"`
|
||||||
Args []any `json:"args"`
|
Args []any `json:"args"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Admin) PostExecuteCommand(ctx *gin.Context) {
|
func (a *Admin) PostExecuteCommand(ctx *gin.Context) {
|
||||||
|
|
@ -49,8 +49,8 @@ func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, er
|
||||||
return req, errors.New("invalid body param")
|
return req, errors.New("invalid body param")
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Function != "GenSSU2ChannelSizes" {
|
if req.Command != "GenSSU2ChannelSizes" {
|
||||||
return nil, errors.New("invalid function")
|
return nil, errors.New("invalid command")
|
||||||
}
|
}
|
||||||
|
|
||||||
return req, nil
|
return req, nil
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"datart/data"
|
"datart/data"
|
||||||
"datart/data/mongo"
|
"datart/data/mongo"
|
||||||
"datart/log"
|
"datart/log"
|
||||||
|
|
@ -11,8 +12,8 @@ import (
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (a *Api) PostInsertAlarm(ctx *gin.Context) {
|
func (a *Api) PostUploadAlarm(ctx *gin.Context) {
|
||||||
alarm, ip, err := a.checkAndGenInsertAlarmRequest(ctx)
|
alarm, ip, err := a.checkAndGenUploadAlarmRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
|
@ -24,38 +25,44 @@ func (a *Api) PostInsertAlarm(ctx *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
event := alarm.ConvertToEvent(ip)
|
event, err := alarm.ConvertToEvent(ip)
|
||||||
err = mongo.InsertOneEvent(ctx.Request.Context(), event)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
log.Error(err, fmt.Sprintf(" params: %v", event))
|
log.Error(err, fmt.Sprintf(" params: %v", alarm))
|
||||||
|
|
||||||
ctx.JSON(200, gin.H{
|
ctx.JSON(200, gin.H{
|
||||||
"code": 2,
|
"code": 2,
|
||||||
"msg": "insert error",
|
"msg": "convert error",
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = data.PublishEvent(ctx.Request.Context(), event)
|
err = mongo.InsertEvent(ctx.Request.Context(), event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
log.Error(err, fmt.Sprintf(" params: %v", event))
|
log.Error(err, fmt.Sprintf(" params: %v", event))
|
||||||
|
|
||||||
ctx.JSON(200, gin.H{
|
ctx.JSON(200, gin.H{
|
||||||
"code": 3,
|
"code": 3,
|
||||||
"msg": "publish error",
|
"msg": "insert error",
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func(e *mongo.Event) {
|
||||||
|
if err := data.PublishEvent(context.Background(), e); err != nil {
|
||||||
|
|
||||||
|
log.Error(err, fmt.Sprintf(" params: %v", e))
|
||||||
|
}
|
||||||
|
}(event)
|
||||||
|
|
||||||
ctx.JSON(200, gin.H{
|
ctx.JSON(200, gin.H{
|
||||||
"code": 0,
|
"code": 0,
|
||||||
"msg": "success",
|
"msg": "success",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Api) checkAndGenInsertAlarmRequest(ctx *gin.Context) (*mongo.Alarm, string, error) {
|
func (a *Api) checkAndGenUploadAlarmRequest(ctx *gin.Context) (*mongo.Alarm, string, error) {
|
||||||
alarm := new(mongo.Alarm)
|
alarm := new(mongo.Alarm)
|
||||||
|
|
||||||
err := ctx.ShouldBindJSON(alarm)
|
err := ctx.ShouldBindJSON(alarm)
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,15 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"datart/data"
|
||||||
"datart/data/mongo"
|
"datart/data/mongo"
|
||||||
"datart/log"
|
"datart/log"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
@ -13,7 +17,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
pageSizeLimit = 100
|
pageSizeLimit = 500
|
||||||
)
|
)
|
||||||
|
|
||||||
func (a *Api) GetEvents(ctx *gin.Context) {
|
func (a *Api) GetEvents(ctx *gin.Context) {
|
||||||
|
|
@ -49,29 +53,68 @@ func (a *Api) GetEvents(ctx *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Api) PostUpsertEvents(ctx *gin.Context) {
|
func (a *Api) PostUpsertEvents(ctx *gin.Context) {
|
||||||
filter, update, err := a.checkAndGenUpsertEventsRequest(ctx)
|
curd, uuids, events, err := a.checkAndGenUpsertEventsRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
ctx.JSON(200, gin.H{
|
ctx.JSON(200, gin.H{
|
||||||
"code": 1,
|
"code": 1,
|
||||||
"msg": err.Error(),
|
"msg": err.Error(),
|
||||||
|
"data": map[string]any{
|
||||||
|
"success_uuids": nil,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = mongo.UpsertOneEvent(ctx.Request.Context(), filter, update); err != nil {
|
suuids, err := mongo.BulkWriteEventsWithUUID(ctx.Request.Context(), curd, events)
|
||||||
log.Error(err, fmt.Sprintf(" params: %v, %v", filter, update))
|
if err != nil {
|
||||||
|
log.Error(err, fmt.Sprintf(" params:%v", events))
|
||||||
ctx.JSON(200, gin.H{
|
ctx.JSON(200, gin.H{
|
||||||
"code": 2,
|
"code": 2,
|
||||||
"msg": err.Error(),
|
"msg": err.Error(),
|
||||||
|
"data": map[string]any{
|
||||||
|
"success_uuids": suuids,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
suuids = uuids
|
||||||
|
ctx.JSON(200, gin.H{
|
||||||
|
"code": 0,
|
||||||
|
"msg": "success",
|
||||||
|
"data": map[string]any{
|
||||||
|
"success_uuids": uuids,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.JSON(200, gin.H{
|
uuid2Event := map[string]any{}
|
||||||
"code": 0,
|
for i, event := range events {
|
||||||
"msg": "success",
|
uuid2Event[uuids[i]] = event
|
||||||
})
|
}
|
||||||
|
|
||||||
|
go func(suuids []string, uuid2Event map[string]any) {
|
||||||
|
workers := 5
|
||||||
|
ch := make(chan any, len(suuids))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for range workers {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for e := range ch {
|
||||||
|
if perr := data.PublishEvent(context.Background(), e); perr != nil {
|
||||||
|
log.Error(perr, fmt.Sprintf("publish event failed: %v", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
for _, suuid := range suuids {
|
||||||
|
if e, ok := uuid2Event[suuid]; ok {
|
||||||
|
ch <- e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
}(suuids, uuid2Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, int64, error) {
|
func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, int64, error) {
|
||||||
|
|
@ -116,6 +159,20 @@ func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64,
|
||||||
filter["timestamp"] = bson.M{"$gte": begin, "$lte": end}
|
filter["timestamp"] = bson.M{"$gte": begin, "$lte": end}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
statusStr := ctx.Query("status")
|
||||||
|
if len(statusStr) > 0 {
|
||||||
|
statusStrs := strings.Split(statusStr, ",")
|
||||||
|
statuss := make([]int, len(statusStrs))
|
||||||
|
for i := range statusStrs {
|
||||||
|
s, err := strconv.Atoi(statusStrs[i])
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, -1, -1, errors.New("invalid status")
|
||||||
|
}
|
||||||
|
statuss[i] = s
|
||||||
|
}
|
||||||
|
filter["status"] = bson.M{"$in": statuss}
|
||||||
|
}
|
||||||
|
|
||||||
var sort int
|
var sort int
|
||||||
sortStr := ctx.Query("sort")
|
sortStr := ctx.Query("sort")
|
||||||
if len(sortStr) > 0 {
|
if len(sortStr) > 0 {
|
||||||
|
|
@ -149,32 +206,86 @@ func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64,
|
||||||
}
|
}
|
||||||
|
|
||||||
if pageSize > pageSizeLimit {
|
if pageSize > pageSizeLimit {
|
||||||
pageSize = pageSizeLimit
|
return nil, 0, -1, -1, fmt.Errorf("too many events, max %d", pageSizeLimit)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return filter, sort, int64(pageNo), int64(pageSize), nil
|
return filter, sort, int64(pageNo), int64(pageSize), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (bson.M, bson.M, error) {
|
func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (byte, []string, []map[string]any, error) {
|
||||||
e := map[string]any{}
|
insert := true
|
||||||
err := ctx.ShouldBindJSON(&e)
|
var status int
|
||||||
if err != nil {
|
var err error
|
||||||
return nil, nil, errors.New("invalid body param")
|
statusStr := ctx.Query("status")
|
||||||
|
if len(statusStr) > 0 {
|
||||||
|
insert = false
|
||||||
|
status, err = strconv.Atoi(statusStr)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
eventUUID := ""
|
events := []map[string]any{}
|
||||||
if eu, ok := e["event_uuid"]; ok {
|
uuids := []string{}
|
||||||
if eUUID, ok := eu.(string); ok {
|
|
||||||
if uuid.Validate(eUUID) != nil {
|
if !insert {
|
||||||
return nil, nil, errors.New("invalid param")
|
err := ctx.ShouldBindJSON(&uuids)
|
||||||
} else {
|
if err != nil {
|
||||||
eventUUID = eUUID
|
return 0, nil, nil, err
|
||||||
|
}
|
||||||
|
if len(uuids) == 0 {
|
||||||
|
return 0, nil, nil, errors.New("no uuid")
|
||||||
|
}
|
||||||
|
if len(uuids) > pageSizeLimit {
|
||||||
|
return 0, nil, nil, fmt.Errorf("too many uuids, max %d", pageSizeLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
operation := mongo.GenOperation(mongo.EventStatusAction[status], ctx.RemoteIP())
|
||||||
|
for _, uid := range uuids {
|
||||||
|
if uuid.Validate(uid) != nil {
|
||||||
|
return 0, nil, nil, errors.New("invalid uuid")
|
||||||
|
}
|
||||||
|
|
||||||
|
events = append(events,
|
||||||
|
map[string]any{"event_uuid": uid,
|
||||||
|
"status": status,
|
||||||
|
"operation": operation})
|
||||||
|
}
|
||||||
|
|
||||||
|
return 'u', uuids, events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ctx.ShouldBindJSON(&events)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, nil, err
|
||||||
|
}
|
||||||
|
if len(events) == 0 {
|
||||||
|
return 0, nil, nil, errors.New("no event")
|
||||||
|
}
|
||||||
|
if len(events) > pageSizeLimit {
|
||||||
|
return 0, nil, nil, fmt.Errorf("too many events, max %d", pageSizeLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
noUUID := true
|
||||||
|
if eu, ok := event["event_uuid"]; ok {
|
||||||
|
if eID, ok := eu.(string); ok {
|
||||||
|
if uuid.Validate(eID) == nil {
|
||||||
|
uuids = append(uuids, eID)
|
||||||
|
noUUID = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return nil, nil, errors.New("no uuid")
|
if noUUID {
|
||||||
|
return 0, nil, nil, errors.New("invalid uuid")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(event) < 2 {
|
||||||
|
return 0, nil, nil, errors.New("invalid event")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return bson.M{"event_uuid": eventUUID}, bson.M{"$set": bson.M(e)}, nil
|
return 'c', uuids, events, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,20 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"datart/data"
|
||||||
"datart/data/influx"
|
"datart/data/influx"
|
||||||
|
"datart/data/postgres"
|
||||||
"datart/log"
|
"datart/log"
|
||||||
"datart/util"
|
"datart/util"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (a *Api) GetPointData(ctx *gin.Context) {
|
func (a *Api) GetPoints(ctx *gin.Context) {
|
||||||
request, err := a.checkAndGenGetPointRequest(ctx)
|
request, err := a.checkAndGenGetPointsRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
|
@ -56,28 +59,40 @@ func (a *Api) GetPointData(ctx *gin.Context) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Api) checkAndGenGetPointRequest(ctx *gin.Context) (*influx.Request, error) {
|
func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, error) {
|
||||||
|
|
||||||
typeStr := ctx.DefaultQuery("type", "")
|
typeStr := ctx.DefaultQuery("type", "")
|
||||||
if len(typeStr) <= 0 {
|
if len(typeStr) <= 0 {
|
||||||
return nil, errors.New("invalid type")
|
return nil, errors.New("invalid type")
|
||||||
}
|
}
|
||||||
|
|
||||||
// tag TODO
|
station, mainPos, subPos := "", "", ""
|
||||||
|
|
||||||
station := ctx.DefaultQuery("station", "")
|
mtag := ctx.DefaultQuery("mtag", "")
|
||||||
if len(station) <= 0 {
|
v, ok := postgres.ChannelSizes.Load(mtag)
|
||||||
return nil, errors.New("invalid station")
|
if ok {
|
||||||
}
|
if channelSize, ok := v.(postgres.ChannelSize); ok {
|
||||||
|
fields := data.GenPhasorFields(channelSize.Channel)
|
||||||
|
|
||||||
mainPos := ctx.DefaultQuery("main_pos", "")
|
station = channelSize.Station
|
||||||
if len(mainPos) <= 0 {
|
mainPos = channelSize.Device
|
||||||
return nil, errors.New("invalid main_pos")
|
subPos = strings.Join(fields, ",")
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
station = ctx.DefaultQuery("station", "")
|
||||||
|
if len(station) <= 0 {
|
||||||
|
return nil, errors.New("invalid station")
|
||||||
|
}
|
||||||
|
|
||||||
subPos := ctx.DefaultQuery("sub_pos", "")
|
mainPos = ctx.DefaultQuery("main_pos", "")
|
||||||
if len(subPos) <= 0 {
|
if len(mainPos) <= 0 {
|
||||||
return nil, errors.New("invalid sub_pos")
|
return nil, errors.New("invalid main_pos")
|
||||||
|
}
|
||||||
|
|
||||||
|
subPos = ctx.DefaultQuery("sub_pos", "")
|
||||||
|
if len(subPos) <= 0 {
|
||||||
|
return nil, errors.New("invalid sub_pos")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
beginStr := ctx.DefaultQuery("begin", "")
|
beginStr := ctx.DefaultQuery("begin", "")
|
||||||
|
|
|
||||||
|
|
@ -8,12 +8,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func LoadRoute(engine *gin.Engine) {
|
func LoadRoute(engine *gin.Engine) {
|
||||||
engine.Use(gin.Recovery()) // TODO
|
engine.Use(gin.Recovery())
|
||||||
|
|
||||||
a := new(api.Api)
|
a := new(api.Api)
|
||||||
ga := engine.Group("api")
|
ga := engine.Group("api")
|
||||||
ga.POST("/alarm", a.PostInsertAlarm)
|
ga.POST("/alarm", a.PostUploadAlarm)
|
||||||
ga.GET("/points", a.GetPointData)
|
ga.GET("/points", a.GetPoints)
|
||||||
ga.GET("/events", a.GetEvents)
|
ga.GET("/events", a.GetEvents)
|
||||||
ga.POST("/events", a.PostUpsertEvents)
|
ga.POST("/events", a.PostUpsertEvents)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue