From affbebe806490846886bb19ba2f607da10e1dc68 Mon Sep 17 00:00:00 2001 From: zhuxu Date: Fri, 19 Sep 2025 16:17:46 +0800 Subject: [PATCH] add data and update config --- config/config.go | 30 +--- config/influx.go | 20 ++- config/kafka.go | 213 ------------------------ config/mongo.go | 33 ++-- config/postgres.go | 35 +++- config/rabbit.go | 255 +++++++++++++++++------------ config/redis.go | 53 ++++-- config/server.go | 23 ++- config/url.go | 72 --------- configs/kafka.json | 17 -- configs/mongo.json | 2 +- configs/rabbit.json | 15 +- configs/redis.json | 2 +- configs/server.json | 8 +- configs/url.json | 8 - data/data.go | 3 + data/influx/common.go | 305 +++++++++++++++++++++++++++++++++++ data/influx/influx.go | 97 +++++++++++ data/influx/ssu_point.go | 93 +++++++++++ data/mongo/alarm.go | 77 +++++++++ data/mongo/event.go | 128 +++++++++++++++ data/mongo/mongo.go | 50 ++++++ data/postgres/measurement.go | 76 +++++++++ data/postgres/postgres.go | 33 ++++ data/postgres/station.go | 40 +++++ data/rabbit/client.go | 46 ++++++ data/rabbit/consume.go | 50 ++++++ data/rabbit/management.go | 73 +++++++++ data/rabbit/publish.go | 55 +++++++ data/rabbit/rabbit.go | 63 ++++++++ data/redis/hash.go | 33 ++++ data/redis/redis.go | 71 ++++++++ data/redis/string.go | 23 +++ data/redis/zset.go | 39 +++++ go.mod | 24 +++ go.sum | 106 ++++++++++++ 36 files changed, 1773 insertions(+), 498 deletions(-) delete mode 100644 config/kafka.go delete mode 100644 config/url.go delete mode 100644 configs/kafka.json delete mode 100644 configs/url.json create mode 100644 data/data.go create mode 100644 data/influx/common.go create mode 100644 data/influx/influx.go create mode 100644 data/influx/ssu_point.go create mode 100644 data/mongo/alarm.go create mode 100644 data/mongo/event.go create mode 100644 data/mongo/mongo.go create mode 100644 data/postgres/measurement.go create mode 100644 data/postgres/postgres.go create mode 100644 data/postgres/station.go create mode 100644 data/rabbit/client.go create mode 100644 data/rabbit/consume.go create mode 100644 data/rabbit/management.go create mode 100644 data/rabbit/publish.go create mode 100644 data/rabbit/rabbit.go create mode 100644 data/redis/hash.go create mode 100644 data/redis/redis.go create mode 100644 data/redis/string.go create mode 100644 data/redis/zset.go diff --git a/config/config.go b/config/config.go index 2e613f7..d2ea6c5 100644 --- a/config/config.go +++ b/config/config.go @@ -13,9 +13,7 @@ type config struct { influxConf map[string]*influxConfig redisConf map[string]*redisConfig mongoConf map[string]*mongoConfig - kafkaConf map[string]*kafkaConfig - rabbitConf map[string]*rabbitConfig - urlConf map[string]*urlConfig + rabbitConf map[string][]*rabbitConfig } var conf *config @@ -51,17 +49,9 @@ func init() { mongoConf := confPath + string(os.PathSeparator) + mongoConfigName() conf.unmarshalJsonFile(mongoConf, &conf.mongoConf) - conf.kafkaConf = make(map[string]*kafkaConfig) - kafkaConf := confPath + string(os.PathSeparator) + kafkaConfigName() - conf.unmarshalJsonFile(kafkaConf, &conf.kafkaConf) - - conf.rabbitConf = make(map[string]*rabbitConfig) + conf.rabbitConf = make(map[string][]*rabbitConfig) rabbitConf := confPath + string(os.PathSeparator) + rabbitConfigName() conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf) - - conf.urlConf = make(map[string]*urlConfig) - urlConf := confPath + string(os.PathSeparator) + urlConfigName() - conf.unmarshalJsonFile(urlConf, &conf.urlConf) } func Conf() *config { @@ -110,27 +100,13 @@ func (c *config) MongoConf(tag string) *mongoConfig { return c.mongoConf[tag] } -func (c *config) KafkaConf(tag string) *kafkaConfig { - if c == nil || c.kafkaConf == nil { - panic("kafka config is nil") - } - return c.kafkaConf[tag] -} - -func (c *config) RabbitConf(tag string) *rabbitConfig { +func (c *config) RabbitConf(tag string) []*rabbitConfig { if c == nil || c.rabbitConf == nil { panic("rabbit config is nil") } return c.rabbitConf[tag] } -func (c *config) URLConf(tag string) *urlConfig { - if c == nil || c.urlConf == nil { - panic("modelrt config is nil") - } - return c.urlConf[tag] -} - func (c *config) unmarshalJsonFile(file string, dest any) { if filejson, err := os.ReadFile(file); err != nil { panic(err.Error()) diff --git a/config/influx.go b/config/influx.go index 28f6f7b..7dc0f32 100644 --- a/config/influx.go +++ b/config/influx.go @@ -15,56 +15,68 @@ func (conf *influxConfig) GetURL() string { if conf == nil { panic("influx config is nil") } + return conf.URL } -func (conf *influxConfig) SetURL(url string) { +func (conf *influxConfig) SetURL(url string) *influxConfig { if conf == nil { panic("influx config is nil") } conf.URL = url + + return conf } func (conf *influxConfig) GetToken() string { if conf == nil { panic("influx config is nil") } + return conf.Token } -func (conf *influxConfig) SetToken(token string) { +func (conf *influxConfig) SetToken(token string) *influxConfig { if conf == nil { panic("influx config is nil") } conf.Token = token + + return conf } func (conf *influxConfig) GetOrg() string { if conf == nil { panic("influx config is nil") } + return conf.Org } -func (conf *influxConfig) SetOrg(org string) { +func (conf *influxConfig) SetOrg(org string) *influxConfig { if conf == nil { panic("influx config is nil") } conf.Org = org + + return conf } func (conf *influxConfig) GetTimeout() int { if conf == nil { panic("influx config is nil") } + return conf.Timeout } -func (conf *influxConfig) SetTimeout(timeout int) { +func (conf *influxConfig) SetTimeout(timeout int) *influxConfig { if conf == nil { panic("influx config is nil") } conf.Timeout = timeout + + return conf } func influxConfigName() string { diff --git a/config/kafka.go b/config/kafka.go deleted file mode 100644 index 7d470c5..0000000 --- a/config/kafka.go +++ /dev/null @@ -1,213 +0,0 @@ -package config - -type kafkaConsumer struct { - GroupID string `json:"groupid" yaml:"groupid"` - // OffsetNewest, -1, stands for the log head offset, i.e. the offset that will be - // assigned to the next message that will be produced to the partition. You - // can send this to a client's GetOffset method to get this offset, or when - // calling ConsumePartition to start consuming new messages. - // - // OffsetOldest, -2, stands for the oldest offset available on the broker for a - // partition. You can send this to a client's GetOffset method to get this - // offset, or when calling ConsumePartition to start consuming from the - // oldest offset that is still available on the broker. - InitialOffset int64 `json:"initialoffset" yaml:"initialoffset"` -} - -type kafkaProducer struct { - // NoResponse, 0, doesn't send any response, the TCP ACK is all you get. - // - // WaitForLocal, 1, waits for only the local commit to succeed before responding. - // - // WaitForAll, -1, waits for all in-sync replicas to commit before responding. - // The minimum number of in-sync replicas is configured on the broker via - // the `min.insync.replicas` configuration key. - RequiredAcks int16 `json:"requiredacks" yaml:"requiredacks"` - // manual/random/roundrobin/customhash/hash/referencehash/consistentcrchash - Partitioner string `json:"partitioner" yaml:"partitioner"` - ReturnSuccesses bool `json:"returnsuccesses" yaml:"returnsuccesses"` - ReturnErrors bool `json:"returnerrors" yaml:"returnerrors"` - // CompressionNone, 0, no compression - // CompressionGZIP, 1, compression using GZIP - // CompressionSnappy, 2, compression using snappy - // CompressionLZ4, 3, compression using LZ4 - // CompressionZSTD, 4, compression using ZSTD - Compression int8 `json:"compression" yaml:"compression"` -} - -type kafkaConfig struct { - Brokers []string `json:"brokers" yaml:"brokers"` - Topics []string `json:"topics" yaml:"topics"` - Consumer *kafkaConsumer `json:"consumer" yaml:"consumer"` - Producer *kafkaProducer `json:"producer" yaml:"producer"` -} - -func NewKafkaConfig() *kafkaConfig { - return new(kafkaConfig) -} - -func (conf *kafkaConfig) GetBrokers() []string { - if conf == nil { - panic("kafka config is nil") - } - return conf.Brokers -} - -func (conf *kafkaConfig) SetBrokers(brokers []string) { - if conf == nil { - panic("kafka config is nil") - } - conf.Brokers = brokers -} - -func (conf *kafkaConfig) GetTopics() []string { - if conf == nil { - panic("kafka config is nil") - } - return conf.Topics -} - -func (conf *kafkaConfig) SetTopics(topics []string) { - if conf == nil { - panic("kafka config is nil") - } - conf.Topics = topics -} - -func (conf *kafkaConfig) GetConsumer() *kafkaConsumer { - if conf == nil { - panic("kafka config is nil") - } - return conf.Consumer -} - -func (conf *kafkaConfig) InitConsumer() *kafkaConsumer { - if conf == nil { - panic("kafka config is nil") - } - conf.Consumer = new(kafkaConsumer) - return conf.Consumer -} - -func (conf *kafkaConsumer) GetGroupID() string { - if conf == nil { - panic("kafka consumer is nil") - } - return conf.GroupID -} - -func (conf *kafkaConsumer) SetGroupID(groupID string) { - if conf == nil { - panic("kafka consumer is nil") - } - conf.GroupID = groupID -} - -func (conf *kafkaConsumer) GetInitialOffset() int64 { - if conf == nil { - panic("kafka consumer is nil") - } - return conf.InitialOffset -} - -func (conf *kafkaConsumer) SetInitialOffset(initialOffset int64) { - if conf == nil { - panic("kafka consumer is nil") - } - if initialOffset != -1 && initialOffset != -2 { - panic("initialOffset is invalid") - } - conf.InitialOffset = initialOffset -} - -func (conf *kafkaConfig) GetProducer() *kafkaProducer { - if conf == nil { - panic("kafka config is nil") - } - return conf.Producer -} - -func (conf *kafkaConfig) InitProducer() *kafkaProducer { - if conf == nil { - panic("kafka config is nil") - } - conf.Producer = new(kafkaProducer) - return conf.Producer -} - -func (conf *kafkaProducer) GetRequiredAcks() int16 { - if conf == nil { - panic("kafka producer is nil") - } - return conf.RequiredAcks -} - -func (conf *kafkaProducer) SetRequiredAcks(requiredAcks int16) { - if conf == nil { - panic("kafka producer is nil") - } - if requiredAcks < -1 || requiredAcks > 1 { - panic("requiredAcks is invalid") - } - conf.RequiredAcks = requiredAcks -} - -func (conf *kafkaProducer) GetPartitioner() string { - if conf == nil { - panic("kafka producer is nil") - } - return conf.Partitioner -} - -func (conf *kafkaProducer) SetPartitioner(partitioner string) { - if conf == nil { - panic("kafka producer is nil") - } - conf.Partitioner = partitioner -} - -func (conf *kafkaProducer) GetReturnSuccesses() bool { - if conf == nil { - panic("kafka producer is nil") - } - return conf.ReturnSuccesses -} - -func (conf *kafkaProducer) SetReturnSuccesses(returnSuccesses bool) { - if conf == nil { - panic("kafka producer is nil") - } - conf.ReturnSuccesses = returnSuccesses -} - -func (conf *kafkaProducer) GetReturnErrors() bool { - if conf == nil { - panic("kafka producer is nil") - } - return conf.ReturnErrors -} - -func (conf *kafkaProducer) SetReturnErrors(returnErrors bool) { - if conf == nil { - panic("kafka producer is nil") - } - conf.ReturnErrors = returnErrors -} - -func (conf *kafkaProducer) GetCompression() int8 { - if conf == nil { - panic("kafka producer is nil") - } - return conf.Compression -} - -func (conf *kafkaProducer) SetCompression(compression int8) { - if conf == nil { - panic("kafka producer is nil") - } - conf.Compression = compression -} - -func kafkaConfigName() string { - return "kafka.json" -} diff --git a/config/mongo.go b/config/mongo.go index 4619b9a..42c8f70 100644 --- a/config/mongo.go +++ b/config/mongo.go @@ -1,7 +1,7 @@ package config type mongoConfig struct { - Hosts []string `json:"hosts" yaml:"hosts"` + Addrs []string `json:"addrs" yaml:"addrs"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` AuthSource string `josn:"authsource" yaml:"authsource"` @@ -12,74 +12,89 @@ func NewMongoConfig() *mongoConfig { return new(mongoConfig) } -func (conf *mongoConfig) GetHosts() []string { +func (conf *mongoConfig) GetAddrs() []string { if conf == nil { panic("mongo config is nil") } - return conf.Hosts + + return conf.Addrs } -func (conf *mongoConfig) SetHosts(hosts []string) { +func (conf *mongoConfig) SetAddrs(addrs []string) *mongoConfig { if conf == nil { panic("mongo config is nil") } - conf.Hosts = hosts + conf.Addrs = addrs + + return conf } func (conf *mongoConfig) GetUsername() string { if conf == nil { panic("mongo config is nil") } + return conf.Username } -func (conf *mongoConfig) SetUsername(username string) { +func (conf *mongoConfig) SetUsername(username string) *mongoConfig { if conf == nil { panic("mongo config is nil") } conf.Username = username + + return conf } func (conf *mongoConfig) GetPassword() string { if conf == nil { panic("mongo config is nil") } + return conf.Password } -func (conf *mongoConfig) SetPassword(password string) { +func (conf *mongoConfig) SetPassword(password string) *mongoConfig { if conf == nil { panic("mongo config is nil") } conf.Password = password + + return conf } func (conf *mongoConfig) GetAuthSource() string { if conf == nil { panic("mongo config is nil") } + return conf.AuthSource } -func (conf *mongoConfig) SetAuthSource(authSource string) { +func (conf *mongoConfig) SetAuthSource(authSource string) *mongoConfig { if conf == nil { panic("mongo config is nil") } conf.AuthSource = authSource + + return conf } func (conf *mongoConfig) GetAuthMechanism() string { if conf == nil { panic("mongo config is nil") } + return conf.AuthMechanism } -func (conf *mongoConfig) SetAuthMechanism(authMechanism string) { +func (conf *mongoConfig) SetAuthMechanism(authMechanism string) *mongoConfig { if conf == nil { panic("mongo config is nil") } conf.AuthMechanism = authMechanism + + return conf } func mongoConfigName() string { diff --git a/config/postgres.go b/config/postgres.go index 589edfc..d20f2f2 100644 --- a/config/postgres.go +++ b/config/postgres.go @@ -18,98 +18,119 @@ func (conf *postgresConfig) GetHost() string { if conf == nil { panic("postgres config is nil") } + return conf.Host } -func (conf *postgresConfig) SetHost(host string) { +func (conf *postgresConfig) SetHost(host string) *postgresConfig { if conf == nil { panic("postgres config is nil") } conf.Host = host + + return conf } func (conf *postgresConfig) GetPort() int { if conf == nil { panic("postgres config is nil") } + return conf.Port } -func (conf *postgresConfig) SetPort(port int) { +func (conf *postgresConfig) SetPort(port int) *postgresConfig { if conf == nil { panic("postgres config is nil") } conf.Port = port + + return conf } func (conf *postgresConfig) GetUser() string { if conf == nil { panic("postgres config is nil") } + return conf.User } -func (conf *postgresConfig) SetUser(user string) { +func (conf *postgresConfig) SetUser(user string) *postgresConfig { if conf == nil { panic("postgres config is nil") } conf.User = user + + return conf } func (conf *postgresConfig) GetPassword() string { if conf == nil { panic("postgres config is nil") } + return conf.Password } -func (conf *postgresConfig) SetPassword(password string) { +func (conf *postgresConfig) SetPassword(password string) *postgresConfig { if conf == nil { panic("postgres config is nil") } conf.Password = password + + return conf } func (conf *postgresConfig) GetDBName() string { if conf == nil { panic("postgres config is nil") } + return conf.DBName } -func (conf *postgresConfig) SetDBName(dbName string) { +func (conf *postgresConfig) SetDBName(dbName string) *postgresConfig { if conf == nil { panic("postgres config is nil") } conf.DBName = dbName + + return conf } func (conf *postgresConfig) GetSSLMode() string { if conf == nil { panic("postgres config is nil") } + return conf.SSLMode } -func (conf *postgresConfig) SetSSLMode(sslMode string) { +func (conf *postgresConfig) SetSSLMode(sslMode string) *postgresConfig { if conf == nil { panic("postgres config is nil") } conf.SSLMode = sslMode + + return conf } func (conf *postgresConfig) GetTimeZone() string { if conf == nil { panic("postgres config is nil") } + return conf.TimeZone } -func (conf *postgresConfig) SetTimeZone(timeZone string) { +func (conf *postgresConfig) SetTimeZone(timeZone string) *postgresConfig { if conf == nil { panic("postgres config is nil") } conf.TimeZone = timeZone + + return conf } func postgresConfigName() string { diff --git a/config/rabbit.go b/config/rabbit.go index 28979f7..b6d1dd8 100644 --- a/config/rabbit.go +++ b/config/rabbit.go @@ -1,11 +1,15 @@ package config -type xqr struct { - ExchangeName string `json:"exchangename" yaml:"exchangename"` - QueueName string `json:"queuename" yaml:"queuename"` - RoutingKey string `json:"routingkey" yaml:"routingkey"` - QueueLength int64 `json:"queuelength" yaml:"queuelength"` -} +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "os" + + "github.com/youmark/pkcs8" +) type tlsConfig struct { CAPath string `json:"capath" yaml:"capath"` @@ -17,11 +21,9 @@ type tlsConfig struct { } type rabbitConfig struct { - Hosts []string `json:"hosts" yaml:"hosts"` + Broker string `json:"broker" yaml:"broker"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` - NXQR *xqr `json:"nxqr" yaml:"nxqr"` - DXQR *xqr `json:"dxqr" yaml:"dxqr"` TLS *tlsConfig `json:"tls" yaml:"tls"` } @@ -29,143 +31,88 @@ func NewRabbitConfig() *rabbitConfig { return new(rabbitConfig) } -func (conf *rabbitConfig) GetHosts() []string { +func (conf *rabbitConfig) GenAddress(tls bool) string { if conf == nil { panic("rabbit config is nil") } - return conf.Hosts + + address := "amqp://" + if tls { + address = "amqps://" + } + if conf.GetUsername() != "" && conf.GetPassword() != "" { + address += conf.GetUsername() + ":" + conf.GetPassword() + "@" + } + address += conf.GetBroker() + "/" + + return address } -func (conf *rabbitConfig) SetHosts(hosts []string) { +func (conf *rabbitConfig) GetBroker() string { if conf == nil { panic("rabbit config is nil") } - conf.Hosts = hosts + + return conf.Broker +} + +func (conf *rabbitConfig) SetBroker(broker string) *rabbitConfig { + if conf == nil { + panic("rabbit config is nil") + } + conf.Broker = broker + + return conf } func (conf *rabbitConfig) GetUsername() string { if conf == nil { panic("rabbit config is nil") } + return conf.Username } -func (conf *rabbitConfig) SetUsername(username string) { +func (conf *rabbitConfig) SetUsername(username string) *rabbitConfig { if conf == nil { panic("rabbit config is nil") } conf.Username = username + + return conf } func (conf *rabbitConfig) GetPassword() string { if conf == nil { panic("rabbit config is nil") } + return conf.Password } -func (conf *rabbitConfig) SetPassword(password string) { +func (conf *rabbitConfig) SetPassword(password string) *rabbitConfig { if conf == nil { panic("rabbit config is nil") } conf.Password = password + + return conf } -func (conf *rabbitConfig) InitNXQR() { - if conf == nil { - panic("rabbit config is nil") - } - conf.NXQR = new(xqr) -} - -func (conf *rabbitConfig) GetNXQR() *xqr { - if conf == nil { - panic("rabbit config is nil") - } - return conf.NXQR -} - -func (conf *rabbitConfig) InitDXQR() { - if conf == nil { - panic("rabbit config is nil") - } - conf.DXQR = new(xqr) -} - -func (conf *rabbitConfig) GetDXQR() *xqr { - if conf == nil { - panic("rabbit config is nil") - } - return conf.DXQR -} - -func (conf *xqr) GetExchangeName() string { - if conf == nil { - panic("rabbit xqr is nil") - } - return conf.ExchangeName -} - -func (conf *xqr) SetExchangeName(exchangeName string) { - if conf == nil { - panic("rabbit xqr is nil") - } - conf.ExchangeName = exchangeName -} - -func (conf *xqr) GetQueueName() string { - if conf == nil { - panic("rabbit xqr is nil") - } - return conf.QueueName -} - -func (conf *xqr) SetQueueName(queueName string) { - if conf == nil { - panic("rabbit xqr is nil") - } - conf.QueueName = queueName -} - -func (conf *xqr) GetRoutingKey() string { - if conf == nil { - panic("rabbit xqr is nil") - } - return conf.RoutingKey -} - -func (conf *xqr) SetRoutingKey(routingKey string) { - if conf == nil { - panic("rabbit xqr is nil") - } - conf.RoutingKey = routingKey -} - -func (conf *xqr) GetQueueLength() int64 { - if conf == nil { - panic("rabbit xqr is nil") - } - return conf.QueueLength -} - -func (conf *xqr) SetQueueLength(queueLength int64) { - if conf == nil { - panic("rabbit xqr is nil") - } - conf.QueueLength = queueLength -} - -func (conf *rabbitConfig) InitTLS() { +func (conf *rabbitConfig) InitTLS() *rabbitConfig { if conf == nil { panic("rabbit config is nil") } conf.TLS = new(tlsConfig) + + return conf } func (conf *rabbitConfig) GetTLS() *tlsConfig { if conf == nil { panic("rabbit config is nil") } + return conf.TLS } @@ -173,84 +120,180 @@ func (conf *tlsConfig) GetCAPath() string { if conf == nil { panic("rabbit tls is nil") } + return conf.CAPath } -func (conf *tlsConfig) SetCAPath(caPath string) { +func (conf *tlsConfig) SetCAPath(caPath string) *tlsConfig { if conf == nil { panic("rabbit tls is nil") } conf.CAPath = caPath + + return conf } func (conf *tlsConfig) GetKeyPath() string { if conf == nil { panic("rabbit tls is nil") } + return conf.KeyPath } -func (conf *tlsConfig) SetKeyPath(keyPath string) { +func (conf *tlsConfig) SetKeyPath(keyPath string) *tlsConfig { if conf == nil { panic("rabbit tls is nil") } conf.KeyPath = keyPath + + return conf } func (conf *tlsConfig) GetCertPath() string { if conf == nil { panic("rabbit tls is nil") } + return conf.CertPath } -func (conf *tlsConfig) SetCertPath(certPath string) { +func (conf *tlsConfig) SetCertPath(certPath string) *tlsConfig { if conf == nil { panic("rabbit tls is nil") } + conf.CertPath = certPath + + return conf } func (conf *tlsConfig) GetPassword() string { if conf == nil { panic("rabbit tls is nil") } + return conf.Password } -func (conf *tlsConfig) SetPassword(password string) { +func (conf *tlsConfig) SetPassword(password string) *tlsConfig { if conf == nil { panic("rabbit tls is nil") } conf.Password = password + + return conf } func (conf *tlsConfig) GetSkipVerify() bool { if conf == nil { panic("rabbit tls is nil") } + return conf.SkipVerify } -func (conf *tlsConfig) SetSkipVerify(skipVerify bool) { +func (conf *tlsConfig) SetSkipVerify(skipVerify bool) *tlsConfig { if conf == nil { panic("rabbit tls is nil") } conf.SkipVerify = skipVerify + + return conf } func (conf *tlsConfig) GetServerName() string { if conf == nil { panic("rabbit tls is nil") } + return conf.ServerName } -func (conf *tlsConfig) SetServerName(serverName string) { +func (conf *tlsConfig) SetServerName(serverName string) *tlsConfig { if conf == nil { panic("rabbit tls is nil") } conf.ServerName = serverName + + return conf +} + +func (conf *tlsConfig) GenTLSConfig(tag string) (*tls.Config, error) { + if conf == nil { + return nil, nil + } + + if conf.GetCAPath() == "" || conf.GetCertPath() == "" || + conf.GetKeyPath() == "" { + return nil, errors.New("rabbit tls not valid") + } + + caPem, err := os.ReadFile(conf.GetCAPath()) + if err != nil { + return nil, err + } + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(caPem) + + keyPem, err := os.ReadFile(conf.GetKeyPath()) + if err != nil { + return nil, err + } + certPem, err := os.ReadFile(conf.GetCertPath()) + if err != nil { + return nil, err + } + + pemBlock, err := parsePrivateKey(keyPem, []byte(conf.GetPassword())) + if err != nil { + return nil, err + } + + cliCert, err := tls.X509KeyPair(certPem, pem.EncodeToMemory(pemBlock)) + if err != nil { + return nil, err + } + + return &tls.Config{ + Certificates: []tls.Certificate{cliCert}, + RootCAs: certPool, + ServerName: conf.GetServerName(), + InsecureSkipVerify: conf.GetSkipVerify(), + }, nil +} + +func parsePrivateKey(key, password []byte) (*pem.Block, error) { + block, _ := pem.Decode(key) + if block == nil { + return nil, errors.New("no valid pem") + } + + var privateKey any + var err error + switch block.Type { + case "RSA PRIVATE KEY": + privateKey, err = x509.ParsePKCS1PrivateKey(block.Bytes) + case "PRIVATE KEY": + privateKey, err = x509.ParsePKCS8PrivateKey(block.Bytes) + case "ENCRYPTED PRIVATE KEY": + privateKey, err = pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) + default: + return nil, fmt.Errorf("unsupported key type: %s", block.Type) + } + if err != nil { + return nil, err + } + + pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + return nil, err + } + + return &pem.Block{ + Type: "PRIVATE KEY", + Bytes: pemBytes, + }, nil } func rabbitConfigName() string { diff --git a/config/redis.go b/config/redis.go index 4a8ef2d..706a625 100644 --- a/config/redis.go +++ b/config/redis.go @@ -5,7 +5,7 @@ type redisConfig struct { Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` DB int `json:"db" yaml:"db"` - RESP int `json:"resp" yaml:"resp"` + Protocol int `json:"protocol" yaml:"protocol"` DialTimeout int `json:"dialtimeout" yaml:"dialtimeout"` ReadTimeout int `json:"readtimeout" yaml:"readtimeout"` WriteTimeout int `json:"writetimeout" yaml:"writetimeout"` @@ -20,126 +20,153 @@ func (conf *redisConfig) GetAddr() string { if conf == nil { panic("redis config is nil") } + return conf.Addr } -func (conf *redisConfig) SetAddr(addr string) { +func (conf *redisConfig) SetAddr(addr string) *redisConfig { if conf == nil { panic("redis config is nil") } conf.Addr = addr + + return conf } func (conf *redisConfig) GetUsername() string { if conf == nil { panic("redis config is nil") } + return conf.Username } -func (conf *redisConfig) SetUsername(username string) { +func (conf *redisConfig) SetUsername(username string) *redisConfig { if conf == nil { panic("redis config is nil") } conf.Username = username + + return conf } func (conf *redisConfig) GetPassword() string { if conf == nil { panic("redis config is nil") } + return conf.Password } -func (conf *redisConfig) SetPassword(password string) { +func (conf *redisConfig) SetPassword(password string) *redisConfig { if conf == nil { panic("redis config is nil") } conf.Password = password + + return conf } func (conf *redisConfig) GetDB() int { if conf == nil { panic("redis config is nil") } + return conf.DB } -func (conf *redisConfig) SetDB(db int) { +func (conf *redisConfig) SetDB(db int) *redisConfig { if conf == nil { panic("redis config is nil") } conf.DB = db + + return conf } -func (conf *redisConfig) GetRESP() int { +func (conf *redisConfig) GetProtocol() int { if conf == nil { panic("redis config is nil") } - return conf.RESP + + return conf.Protocol } -func (conf *redisConfig) SetRESP(resp int) { +func (conf *redisConfig) SetProtocol(protocol int) *redisConfig { if conf == nil { panic("redis config is nil") } - conf.RESP = resp + conf.Protocol = protocol + + return conf } func (conf *redisConfig) GetDialTimeout() int { if conf == nil { panic("redis config is nil") } + return conf.DialTimeout } -func (conf *redisConfig) SetDialTimeout(dialTimeout int) { +func (conf *redisConfig) SetDialTimeout(dialTimeout int) *redisConfig { if conf == nil { panic("redis config is nil") } conf.DialTimeout = dialTimeout + + return conf } func (conf *redisConfig) GetReadTimeout() int { if conf == nil { panic("redis config is nil") } + return conf.ReadTimeout } -func (conf *redisConfig) SetReadTimeout(readTimeout int) { +func (conf *redisConfig) SetReadTimeout(readTimeout int) *redisConfig { if conf == nil { panic("redis config is nil") } conf.ReadTimeout = readTimeout + + return conf } func (conf *redisConfig) GetWriteTimeout() int { if conf == nil { panic("redis config is nil") } + return conf.WriteTimeout } -func (conf *redisConfig) SetWriteTimeout(writeTimeout int) { +func (conf *redisConfig) SetWriteTimeout(writeTimeout int) *redisConfig { if conf == nil { panic("redis config is nil") } conf.WriteTimeout = writeTimeout + + return conf } func (conf *redisConfig) GetPoolSize() int { if conf == nil { panic("redis config is nil") } + return conf.PoolSize } -func (conf *redisConfig) SetPoolSIze(poolSize int) { +func (conf *redisConfig) SetPoolSIze(poolSize int) *redisConfig { if conf == nil { panic("redis config is nil") } conf.PoolSize = poolSize + + return conf } func redisConfigName() string { diff --git a/config/server.go b/config/server.go index ba53c66..8c5305a 100644 --- a/config/server.go +++ b/config/server.go @@ -3,7 +3,7 @@ package config import "maps" type serverConfig struct { - Host string `json:"host" yaml:"host"` + Name string `json:"name" yaml:"name"` Port int `json:"port" yaml:"port"` SSUType map[string]uint8 `json:"ssutype" yaml:"ssutype"` } @@ -12,42 +12,49 @@ func NewServerConfig() *serverConfig { return new(serverConfig) } -func (conf *serverConfig) GetHost() string { +func (conf *serverConfig) GetName() string { if conf == nil { panic("server config is nil") } - return conf.Host + + return conf.Name } -func (conf *serverConfig) SetHost(host string) { +func (conf *serverConfig) SetName(name string) *serverConfig { if conf == nil { panic("server config is nil") } - conf.Host = host + conf.Name = name + + return conf } func (conf *serverConfig) GetPort() int { if conf == nil { panic("server config is nil") } + return conf.Port } -func (conf *serverConfig) SetPort(port int) { +func (conf *serverConfig) SetPort(port int) *serverConfig { if conf == nil { panic("server config is nil") } conf.Port = port + + return conf } func (conf *serverConfig) GetSSUType() map[string]uint8 { if conf == nil { panic("server config is nil") } + return conf.SSUType } -func (conf *serverConfig) SetSSUType(ssuType map[string]uint8) { +func (conf *serverConfig) SetSSUType(ssuType map[string]uint8) *serverConfig { if conf == nil { panic("server config is nil") } @@ -56,6 +63,8 @@ func (conf *serverConfig) SetSSUType(ssuType map[string]uint8) { } else { maps.Copy(conf.SSUType, ssuType) } + + return conf } func serverConfigName() string { diff --git a/config/url.go b/config/url.go deleted file mode 100644 index 4f2b42c..0000000 --- a/config/url.go +++ /dev/null @@ -1,72 +0,0 @@ -package config - -type urlConfig struct { - Scheme string `json:"scheme" yaml:"scheme"` - Host string `json:"host" yaml:"host"` // host or host:port - Path string `json:"path" yaml:"path"` - Timeout int `json:"timeout" yaml:"timeout"` -} - -func NewURLConfig() *urlConfig { - return new(urlConfig) -} - -func (conf *urlConfig) GetScheme() string { - if conf == nil { - panic("url config is nil") - } - return conf.Scheme -} - -func (conf *urlConfig) SetScheme(scheme string) { - if conf == nil { - panic("url config is nil") - } - conf.Scheme = scheme -} - -func (conf *urlConfig) GetHost() string { - if conf == nil { - panic("url config is nil") - } - return conf.Host -} - -func (conf *urlConfig) SetHost(host string) { - if conf == nil { - panic("url config is nil") - } - conf.Host = host -} - -func (conf *urlConfig) GetPath() string { - if conf == nil { - panic("url config is nil") - } - return conf.Path -} - -func (conf *urlConfig) SetPath(path string) { - if conf == nil { - panic("url config is nil") - } - conf.Path = path -} - -func (conf *urlConfig) GetTimeout() int { - if conf == nil { - panic("url config is nil") - } - return conf.Timeout -} - -func (conf *urlConfig) SetTimeout(timeout int) { - if conf == nil { - panic("url config is nil") - } - conf.Timeout = timeout -} - -func urlConfigName() string { - return "url.json" -} diff --git a/configs/kafka.json b/configs/kafka.json deleted file mode 100644 index 40a710d..0000000 --- a/configs/kafka.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "default": { - "brokers": ["localhost:9092"], - "topics": ["ssu000"], - "consumer": { - "groupid": "datart_sample", - "initialoffset": -1 - }, - "producer": { - "requiredacks":-1, - "partitioner":"hash", - "returnsuccesses":true, - "returnerrors":true, - "compression":4 - } - } -} \ No newline at end of file diff --git a/configs/mongo.json b/configs/mongo.json index 0678e59..fb758a4 100644 --- a/configs/mongo.json +++ b/configs/mongo.json @@ -1,6 +1,6 @@ { "default":{ - "hosts":["192.168.46.100:27017"], + "addrs":["192.168.46.100:27017"], "username":"mongo", "password":"123RTYjkl", "authsource":"events", diff --git a/configs/rabbit.json b/configs/rabbit.json index cfb51e6..3220508 100644 --- a/configs/rabbit.json +++ b/configs/rabbit.json @@ -1,12 +1,9 @@ { - "default": { - "hosts": [""], - "username": "", - "password": "", - "nxqr": { - "exchangename": "", - "queuename": "", - "routingkey": "" + "default": [ + { + "broker": "127.0.0.1:5672", + "username": "", + "password": "" } - } + ] } \ No newline at end of file diff --git a/configs/redis.json b/configs/redis.json index cd92762..2d326ed 100644 --- a/configs/redis.json +++ b/configs/redis.json @@ -4,7 +4,7 @@ "username":"", "password":"", "db":0, - "resp":3, + "protocol":3, "dialtimeout":50, "readtimeout":200, "writetimeout":200, diff --git a/configs/server.json b/configs/server.json index 5c0b12f..28ec565 100644 --- a/configs/server.json +++ b/configs/server.json @@ -1,7 +1,7 @@ { - "host": "", - "port": 8888, - "ssutype": { - "ssu000": 0 + "name":"datart", + "port":8888, + "ssutype":{ + "ssu000":1 } } \ No newline at end of file diff --git a/configs/url.json b/configs/url.json deleted file mode 100644 index d42487c..0000000 --- a/configs/url.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "default":{ - "schema":"https", - "host":"www.baidu.com", - "path":"/", - "timeout":1000 - } -} \ No newline at end of file diff --git a/data/data.go b/data/data.go new file mode 100644 index 0000000..1967a30 --- /dev/null +++ b/data/data.go @@ -0,0 +1,3 @@ +package data + +// something basic diff --git a/data/influx/common.go b/data/influx/common.go new file mode 100644 index 0000000..e4c8928 --- /dev/null +++ b/data/influx/common.go @@ -0,0 +1,305 @@ +package influx + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/csv" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +// for influx data, one measurement +type jsonResp struct { + Results []*result `json:"results"` +} + +type result struct { + StatementID int `json:"statement_id"` + Series []*fields `json:"series"` +} + +type fields struct { + Name string `json:"name"` + Column []string `json:"columns"` + Values [][]any `json:"values"` +} + +// line protocol, better to gzip and sort tags by key in lexicographic order +func (client *influxClient) writeLinesData(ctx context.Context, db string, data []byte, compress bool) error { + if compress { + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + if _, err := gz.Write(data); err != nil { + return err + } + if err := gz.Close(); err != nil { + return err + } + data = buf.Bytes() + } + + request, err := http.NewRequest(http.MethodPost, + client.url+"/write?db="+db, bytes.NewReader(data)) + if err != nil { + return err + } + + request.Header.Set("Content-Type", "text/plain; charset=utf-8") + request.Header.Set("Authorization", "Token "+client.token) + request.Header.Set("Accept", "application/json") + if compress { + request.Header.Set("Content-Encoding", "gzip") + } + + response, err := client.Do(request.WithContext(ctx)) + if err != nil { + return err + } + defer response.Body.Close() + + // http.StatusNoContent is the expected response, + // 200,201,202,206,207,208 + // but if we get these we should still accept it as delivered. + if response.StatusCode != http.StatusNoContent { + return fmt.Errorf("unexpected status code: %d", response.StatusCode) + } + + return nil +} + +// respType json/csv +// json_time:"2024-12-18T08:12:21.4735154Z" +// csv_time:"1734572793695885000" +func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values, + respType string) ([]*TV, error) { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, + client.url+"/query?"+reqData.Encode(), nil) + if err != nil { + return nil, err + } + + request.Header.Set("Content-Type", "application/json") + request.Header.Set("Authorization", "Token "+client.token) + if respType == "csv" { + request.Header.Set("Accept", "application/csv") + } + + response, err := client.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode) + } + + respData, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + return respDataToTVs(respData, respType) +} + +// respType json/csv +// json_time:"2024-12-18T08:12:21.4735154Z" +// csv_time:"1734572793695885000" +func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values, + respType string) (map[string][]*TV, error) { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, + client.url+"/query?"+reqData.Encode(), nil) + if err != nil { + return nil, err + } + + request.Header.Set("Content-Type", "application/json") + request.Header.Set("Authorization", "Token "+client.token) + if respType == "csv" { + request.Header.Set("Accept", "application/csv") + } + + response, err := client.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode) + } + + respData, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + return respDataToF2TVs(respData, respType) +} + +func respDataToTVs(respData []byte, respType string) ([]*TV, error) { + switch respType { + case "json": + resp := new(jsonResp) + err := json.Unmarshal(respData, resp) + if err != nil { + return nil, err + } + if len(resp.Results) > 0 && + len(resp.Results[0].Series) > 0 { + return turnJsonToTVs(resp.Results[0].Series[0].Values) + } + case "csv": + rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll() + if err != nil { + return nil, err + } + if len(rows) > 1 { + return turnCsvToTVs(rows[1:]) + } + } + + return nil, errors.New("unsupported response type") +} + +func respDataToF2TVs(respData []byte, respType string) (map[string][]*TV, error) { + switch respType { + case "json": + resp := new(jsonResp) + err := json.Unmarshal(respData, resp) + if err != nil { + return nil, err + } + if len(resp.Results) > 0 && + len(resp.Results[0].Series) > 0 { + return turnJsonToF2TVs(resp.Results[0].Series[0].Column, + resp.Results[0].Series[0].Values) + } + case "csv": + rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll() + if err != nil { + return nil, err + } + if len(rows) > 1 { + return turnCsvToF2TVs(rows) + } + } + + return nil, errors.New("unsupported response type") +} + +// measure at different times +func turnJsonToTVs(data [][]interface{}) ([]*TV, error) { + ret := make([]*TV, 0, len(data)) + + for _, row := range data { + if len(row) > 1 { + tstr, ok := (row[0]).(string) + if !ok { + return nil, errors.New("not expected data type") + } + t, err := time.Parse("2006-01-02T15:04:05.99Z", tstr) + if err != nil { + return nil, err + } + v, ok := (row[1]).(float64) + if !ok { + return nil, errors.New("not expected data type") + } + ret = append(ret, &TV{ + Time: t.UnixNano(), + Value: v, + }) + } + } + + return ret, nil +} + +// different measures at different times +func turnJsonToF2TVs(cols []string, data [][]interface{}) (map[string][]*TV, error) { + f2tvs := make(map[string][]*TV) + + for _, row := range data { + if len(row) > 1 { + tstr, ok := (row[0]).(string) + if !ok { + return nil, errors.New("not expected data type") + } + t, err := time.Parse("2006-01-02T15:04:05.99Z", tstr) + if err != nil { + return nil, err + } + for i := 1; i < len(row); i++ { + v, ok := (row[i]).(float64) + if !ok { + return nil, errors.New("not expected data type") + } + f2tvs[cols[i]] = append(f2tvs[cols[i]], &TV{ + Time: t.UnixNano(), + Value: v, + }) + } + } + } + + return f2tvs, nil +} + +// measure at different times +func turnCsvToTVs(data [][]string) ([]*TV, error) { + ret := make([]*TV, 0, len(data)) + + for _, row := range data { + if len(row) > 3 { + ns, err := strconv.ParseInt(row[2], 10, 64) + if err != nil { + return nil, err + } + v, err := strconv.ParseFloat(row[3], 64) + if err != nil { + return nil, err + } + ret = append(ret, &TV{ + Time: ns, + Value: v, + }) + } + } + + return ret, nil +} + +// different measures at different times +func turnCsvToF2TVs(data [][]string) (map[string][]*TV, error) { + f2tvs := make(map[string][]*TV) + + for _, row := range data { + if len(row) > 3 { + ns, err := strconv.ParseInt(row[2], 10, 64) + if err != nil { + return nil, err + } + for i := 3; i < len(row); i++ { + v, err := strconv.ParseFloat(row[i], 64) + if err != nil { + return nil, err + } + f2tvs[data[0][i]] = append(f2tvs[data[0][i]], &TV{ + Time: ns, + Value: v, + }) + } + } + } + + return f2tvs, nil +} diff --git a/data/influx/influx.go b/data/influx/influx.go new file mode 100644 index 0000000..969b100 --- /dev/null +++ b/data/influx/influx.go @@ -0,0 +1,97 @@ +package influx + +import ( + "context" + "datart/config" + "net" + "net/http" + "time" +) + +type influxClient struct { + *http.Client + url string + token string + org string +} + +type Request struct { + RespType string + Measure string + Station string + MainPos string + SubPos string // separate whith ',' + Begin int64 + End int64 + Operate string + Step string + Default string +} + +var client *influxClient + +func init() { + client = new(influxClient) + + influxConfig := config.Conf().InfluxConf("default") + client.Client = &http.Client{ + Timeout: time.Duration(influxConfig.GetTimeout()) * time.Millisecond, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + }, + } + + client.url = influxConfig.GetURL() + client.token = influxConfig.GetToken() + client.org = influxConfig.GetOrg() +} + +func Close() { + client.CloseIdleConnections() +} + +func NewInfluxClient(cli *http.Client, url, org, token string) *influxClient { + return &influxClient{ + Client: cli, + url: url, + org: org, + token: token, + } +} + +func WriteLinesData(ctx context.Context, data []byte) error { + return client.WriteLinesData(ctx, data) +} + +type TV struct { + Time int64 `json:"time"` + Value float64 `json:"value"` +} + +func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]*TV, error) { + req.Begin = time.Now().UnixMilli() - int64(limit*20+20) + return client.GetSSUPointLastLimit(ctx, req, limit) +} + +func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]*TV, error) { + req.Begin = time.Now().UnixMilli() - int64(limit*20+20) + return client.GetSSUPointsLastLimit(ctx, req, limit) +} + +func GetSSUPointData(ctx context.Context, req *Request) ([]*TV, error) { + return client.GetSSUPointData(ctx, req) +} + +func GetSSUPointAfterOne(ctx context.Context, req *Request) ([]*TV, error) { + return client.GetSSUPointAfterOne(ctx, req) +} + +func GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]*TV, error) { + req.Begin = req.End - 20 - 20 + return client.GetSSUPointBeforeOne(ctx, req) +} diff --git a/data/influx/ssu_point.go b/data/influx/ssu_point.go new file mode 100644 index 0000000..890c302 --- /dev/null +++ b/data/influx/ssu_point.go @@ -0,0 +1,93 @@ +package influx + +import ( + "context" + "fmt" + "net/url" + "strings" +) + +const ( + bucket string = "influxBucket" +) + +func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]*TV, error) { + sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and device='%s';", + req.SubPos, req.SubPos, req.Measure, req.Station, req.MainPos) + if limit > 1 { + sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms order by time desc limit %d;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) + } + + reqData := url.Values{ + "db": {bucket}, + "q": {sql}, + } + + return client.getTVsResp(ctx, reqData, req.RespType) +} + +func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]*TV, error) { + sql := "" + if limit == 1 { + fields := strings.Split(req.SubPos, ",") + for i, field := range fields { + fields[i] = "last(" + field + ") as " + field + } + sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s';", + strings.Join(fields, ","), req.Measure, req.Station, req.MainPos) + } else { + sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms order by time desc limit %d;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, limit) // begin = time.Now().UnixMilli()-int64(limit*20+20) + } + + reqData := url.Values{ + "db": {bucket}, + "q": {sql}, + } + + return client.getF2TVsResp(ctx, reqData, req.RespType) +} + +func (client *influxClient) GetSSUPointData(ctx context.Context, req *Request) ([]*TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End) + + if req.Operate != "" && req.Step != "" && req.Default != "" { + sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms group by time(%s) fill(%s);", + req.Operate, req.SubPos, req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default) + } + + reqData := url.Values{ + "db": {bucket}, + "q": {sql}, + } + + return client.getTVsResp(ctx, reqData, req.RespType) +} + +func (client *influxClient) GetSSUPointAfterOne(ctx context.Context, req *Request) ([]*TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms limit 1;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin) + + reqData := url.Values{ + "db": {bucket}, + "q": {sql}, + } + + return client.getTVsResp(ctx, reqData, req.RespType) +} + +func (client *influxClient) GetSSUPointBeforeOne(ctx context.Context, req *Request) ([]*TV, error) { + reqData := url.Values{ + "db": {bucket}, + "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>%dms and time<=%dms order by time desc limit 1;", + req.SubPos, req.Measure, req.Station, req.MainPos, req.Begin, req.End)}, // begin = req.End-20-20 + } + + return client.getTVsResp(ctx, reqData, req.RespType) +} + +func (client *influxClient) WriteLinesData(ctx context.Context, data []byte) error { + return client.writeLinesData(ctx, bucket, data, true) +} diff --git a/data/mongo/alarm.go b/data/mongo/alarm.go new file mode 100644 index 0000000..fc117ca --- /dev/null +++ b/data/mongo/alarm.go @@ -0,0 +1,77 @@ +package mongo + +import ( + "encoding/json" + + "github.com/google/uuid" +) + +const ( + _ = iota + almCodeCommmExcept // 通信异常 + almCodeADFault // AD故障 + almCodePPSExcept // 同步秒脉冲异常 + almCodeBackup // 备用 + almCodeUnitInit // 单元初始化 + almCodeReadParamErr // 读参数错 + almCodeReserve // 备用 + almCodeStartSample // 启动采样-内部转换信号 + almCodeOverSample // 秒内采样点数过量 + almCodeUnderSample // 秒内采样点数欠量 +) + +const ( + almStatusReset = iota + almStatusAction +) + +type Alarm struct { + DriverName string `bson:"driver_name" json:"driver_name"` + DeviceNo string `bson:"device_no" json:"device_no"` + AlarmCode int `bson:"alarm_code" json:"alarm_code"` + AlarmTime int64 `bson:"alarm_time" json:"alarm_time"` // ms + AlarmStatus int `bson:"alarm_status" josn:"alarm_status"` // 0 "复位", 1 "动作/产生/告警" +} + +var almCode2Name = map[int]string{ + almCodeCommmExcept: "通信异常", + almCodeADFault: "AD故障", + almCodePPSExcept: "同步秒脉冲异常", + almCodeBackup: "备用", + almCodeUnitInit: "单元初始化", + almCodeReadParamErr: "读参数错", + almCodeReserve: "备用", + almCodeStartSample: "启动采样-内部转换信号", + almCodeOverSample: "秒内采样点数过量", + almCodeUnderSample: "秒内采样点数欠量", +} + +func (a *Alarm) GetName() string { + return almCode2Name[a.AlarmCode] +} + +func (a *Alarm) ConvertToEvent(ip string) *Event { + e := new(Event) + if a != nil { + e.Event = a.GetName() + e.EventUUID = uuid.NewString() + e.Type = genEventType(0, 2) + e.Priority = 5 + e.Status = eventStatusHappen + e.From = "station" + e.Operations = append(e.Operations, &operation{ + Action: eventActionHappened, // TODO + OP: ip, + TS: a.AlarmTime, + }) + e.Alarm = a + } + + return e +} + +func UnmarshallToAlarm(data []byte) (*Alarm, error) { + alm := new(Alarm) + err := json.Unmarshal(data, alm) + return alm, err +} diff --git a/data/mongo/event.go b/data/mongo/event.go new file mode 100644 index 0000000..6715b23 --- /dev/null +++ b/data/mongo/event.go @@ -0,0 +1,128 @@ +package mongo + +import ( + "context" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +const ( + dbevent string = "event" + tbevent string = "event" +) + +const ( + eventStatusHappen = iota + eventStatusDataAt + eventStatusReport + eventStatusConfirm + eventStatusPersist + eventStatusClose +) + +const ( + eventActionHappened = "happened" +) + +type operation struct { + Action string `bson:"action" json:"action"` + OP string `bson:"op" json:"op"` + TS int64 `bson:"ts" json:"ts"` +} + +type Event struct { + Event string `bson:"event" json:"event"` + EventUUID string `bson:"event_uuid" json:"event_uuid"` + Type int `bson:"type" json:"type"` + Priority int `bson:"priority" json:"priority"` // 0~9 + Status int `bson:"status" json:"status"` + From string `bson:"from" json:"from"` + Operations []*operation `bson:"operations" json:"operations"` + // TODO complete + Alarm *Alarm `bson:"alarm" json:"alarm"` +} + +func InsertOneEvent(ctx context.Context, doc *Event) error { + _, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc) + return err +} + +func InsertEvents(ctx context.Context, docs []*Event) error { + _, err := getCollection(dbevent, tbevent).InsertMany(ctx, docs) + return err +} + +func DeleteOneEvent(ctx context.Context, filter *bson.D) error { + _, err := getCollection(dbevent, tbevent).DeleteOne(ctx, filter) + return err +} + +func DeleteEvents(ctx context.Context, filter *bson.D) error { + _, err := getCollection(dbevent, tbevent).DeleteMany(ctx, filter) + return err +} + +func UpdateOneEvent(ctx context.Context, filter *bson.D, update bson.D) error { + opts := options.UpdateOne().SetUpsert(true) + _, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update, opts) + return err +} + +func UpdateEvents(ctx context.Context, filter *bson.D, update bson.D) error { + opts := options.UpdateMany().SetUpsert(true) + _, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update, opts) + return err +} + +func FindOneEvent(ctx context.Context, filter *bson.D) (*Event, error) { + doc := new(Event) + err := getCollection(dbevent, tbevent).FindOne(ctx, filter).Decode(doc) + if err != nil { + return nil, err + } + + return doc, nil +} + +func FindEvents(ctx context.Context, filter *bson.D) ([]*Event, error) { + cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + var docs []*Event + if err = cursor.All(ctx, &docs); err != nil { + return nil, err + } + + return docs, nil +} + +func FindEventsInBatch(ctx context.Context, filter *bson.D, batchSize int32) ([]*Event, error) { + opt := options.Find().SetBatchSize(batchSize) + cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt) + if err != nil { + return nil, err + } + defer cursor.Close(ctx) + + var docs []*Event + for cursor.Next(ctx) { + doc := new(Event) + if err = cursor.Decode(doc); err != nil { + return nil, err + } + docs = append(docs, doc) + } + + return docs, nil +} + +// sys: 0-hard/1-platform/2-application +// +// level:1-info/2-warn/3-error +func genEventType(sys int, level int) int { + return sys + level*3 +} diff --git a/data/mongo/mongo.go b/data/mongo/mongo.go new file mode 100644 index 0000000..585347b --- /dev/null +++ b/data/mongo/mongo.go @@ -0,0 +1,50 @@ +package mongo + +import ( + "context" + "datart/config" + "strings" + "time" + + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +var client *mongo.Client + +func init() { + conf := config.Conf().MongoConf("default") + uri := "mongodb://" + strings.Join(conf.GetAddrs(), ",") + cliOpts := options.Client(). + ApplyURI(uri).SetTimeout(1 * time.Second). + SetAuth(options.Credential{ + AuthMechanism: "SCRAM-SHA-256", + AuthSource: "events", + Username: conf.GetUsername(), + Password: conf.GetPassword(), + }) + + cli, err := mongo.Connect(cliOpts) + if err != nil { + panic(err) + } + client = cli + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + if err := client.Ping(ctx, nil); err != nil { + panic(err) + } +} + +func NewMongoClient(opts ...*options.ClientOptions) (*mongo.Client, error) { + return mongo.Connect(opts...) +} + +func Disconnect(ctx context.Context) error { + return client.Disconnect(ctx) +} + +func getCollection(db string, tb string) *mongo.Collection { + return client.Database(db).Collection(tb) +} diff --git a/data/postgres/measurement.go b/data/postgres/measurement.go new file mode 100644 index 0000000..a15c030 --- /dev/null +++ b/data/postgres/measurement.go @@ -0,0 +1,76 @@ +package postgres + +import "context" + +const ( + tbmeasurement string = "public.measurement" +) + +const () + +type addrSSU struct { + Station string `json:"station"` + Device string `json:"device"` + Channel string `json:"channel"` +} + +type dataSource struct { + Type int `json:"type"` + Addr any `json:"io_address"` +} + +type measurement struct { + ID int64 `gorm:"colunmn:id"` + Tag string `gorm:"column:tag"` + Size int `gorm:"column:size"` + DataSource *dataSource `gorm:"type:jsonb;column:data_source"` + // mapping TODO +} + +func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) { + var totalRecords []*measurement + + id := 0 + for { + var records []*measurement + result := client.WithContext(ctx).Table(tbmeasurement).Where("id > ?", id). + Order("id ASC").Limit(batchSize).Find(&records) + if result.Error != nil { + return totalRecords, result.Error + } + + length := len(records) + if length <= 0 { + break + } + id += length + + totalRecords = append(totalRecords, records...) + } + + return totalRecords, nil +} + +func GetStationMeasurements(ctx context.Context, batchSize int, station string) ([]*measurement, error) { + var totalRecords []*measurement + + id := 0 + for { + var records []*measurement + result := client.WithContext(ctx).Table(tbmeasurement).Where("station = ?", station). + Where("id > ?", id).Order("id ASC").Limit(batchSize).Find(&records) + if result.Error != nil { + return totalRecords, result.Error + } + + length := len(records) + if length <= 0 { + break + } + id += length + + totalRecords = append(totalRecords, records...) + } + + return totalRecords, nil +} diff --git a/data/postgres/postgres.go b/data/postgres/postgres.go new file mode 100644 index 0000000..3eca0f9 --- /dev/null +++ b/data/postgres/postgres.go @@ -0,0 +1,33 @@ +package postgres + +import ( + "datart/config" + "fmt" + + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +var client *gorm.DB + +func init() { + postgresConfig := config.Conf().PostgresConf("default") + dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=%s TimeZone=%s", + postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(), + postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetSSLMode(), + postgresConfig.GetTimeZone()) + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + if err != nil { + panic(err) + } + client = db +} + +// close postgres default client +func Close() error { + db, err := client.DB() + if err != nil { + return err + } + return db.Close() +} diff --git a/data/postgres/station.go b/data/postgres/station.go new file mode 100644 index 0000000..2adb089 --- /dev/null +++ b/data/postgres/station.go @@ -0,0 +1,40 @@ +package postgres + +import ( + "context" +) + +const ( + tbstation = "public.station" +) + +type station struct { + ID int64 `gorm:"colunmn:id"` + ZoneID int64 `gorm:"colunmn:zone_id"` + TagName string `gorm:"column:tagname"` + Name string `gorm:"colunmn:name"` + // Description string `gorm:"colunmn:description"` + IsLocal bool `gorm:"colunmn:is_local"` + // OP int `gorm:"colunmn:op"` + // TS time.Time `gorm:"colunmn:ts"` +} + +func GetStations(ctx context.Context) ([]*station, error) { + var records []*station + result := client.WithContext(ctx).Table(tbstation).Find(&records) + if result.Error != nil { + return nil, result.Error + } + + return records, nil +} + +func GetLocalStation(ctx context.Context) (*station, error) { + var record *station + result := client.WithContext(ctx).Table(tbstation).Where("is_local=?", true).Find(&record) + if result.Error != nil { + return nil, result.Error + } + + return record, nil +} diff --git a/data/rabbit/client.go b/data/rabbit/client.go new file mode 100644 index 0000000..9c2c5d2 --- /dev/null +++ b/data/rabbit/client.go @@ -0,0 +1,46 @@ +package rabbit + +import ( + "context" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +type rabbitClient struct { + env *rmq.Environment + conn *rmq.AmqpConnection +} + +func NewClient(ctx context.Context, endpoints []rmq.Endpoint) (*rabbitClient, error) { + + env := rmq.NewClusterEnvironment(endpoints) + conn, err := client.env.NewConnection(ctx) + if err != nil { + return nil, err + } + + return &rabbitClient{ + env: env, + conn: conn, + }, nil +} + +func (c *rabbitClient) Management() *rmq.AmqpManagement { + return c.conn.Management() +} + +func (c *rabbitClient) NewPublisher(ctx context.Context, destination rmq.ITargetAddress, + options rmq.IConsumerOptions) (*rmq.Publisher, error) { + + return c.conn.NewPublisher(ctx, destination, options) +} + +func (c *rabbitClient) NewConsumer(ctx context.Context, queueName string, + options rmq.IConsumerOptions) (*rmq.Consumer, error) { + + return c.conn.NewConsumer(ctx, queueName, options) +} + +func (c *rabbitClient) Close(ctx context.Context) error { + return c.env.CloseConnections(ctx) +} diff --git a/data/rabbit/consume.go b/data/rabbit/consume.go new file mode 100644 index 0000000..f81d436 --- /dev/null +++ b/data/rabbit/consume.go @@ -0,0 +1,50 @@ +package rabbit + +import ( + "context" + "errors" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +func NewConsumer(ctx context.Context, tag string, xqr *XQR) (*rmq.Consumer, error) { + cli := client + if tag != "default" { + endpoints, err := genEndpoints(tag) + if err != nil { + return nil, err + } + cli, err = NewClient(ctx, endpoints) + if err != nil { + return nil, err + } + } + + return cli.conn.NewConsumer(ctx, xqr.QueueName, nil) +} + +func Consume(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byte) { + for { + deliCtx, err := consumer.Receive(ctx) + if errors.Is(err, context.Canceled) { + // The consumer was closed correctly + // TODO + return + } + if err != nil { + // An error occurred receiving the message + // TODO + return + } + + for _, data := range deliCtx.Message().Data { + msgChan <- data + } + + err = deliCtx.Accept(ctx) + if err != nil { + // TODO + return + } + } +} diff --git a/data/rabbit/management.go b/data/rabbit/management.go new file mode 100644 index 0000000..ed36640 --- /dev/null +++ b/data/rabbit/management.go @@ -0,0 +1,73 @@ +package rabbit + +import ( + "context" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +type Management struct { + m *rmq.AmqpManagement + xName string + x rmq.IExchangeSpecification + qName string + q rmq.IQueueSpecification + bPath string + b rmq.IBindingSpecification +} + +func (m *Management) Init(ctx context.Context, rm *rmq.AmqpManagement, + rx rmq.IExchangeSpecification, rq rmq.IQueueSpecification, + rb rmq.IBindingSpecification) { + + m.m = rm + m.x = rx + m.q = rq + m.b = rb +} + +func (m *Management) DeclareAndBind(ctx context.Context) error { + xinfo, err := m.m.DeclareExchange(ctx, m.x) + if err != nil { + return err + } + m.xName = xinfo.Name() + + qinfo, err := m.m.DeclareQueue(ctx, m.q) + if err != nil { + return err + } + m.qName = qinfo.Name() + + bPath, err := m.m.Bind(ctx, m.b) + if err != nil { + return err + } + m.bPath = bPath + + return nil +} + +func (m *Management) UnbindAndDelete(ctx context.Context) (purged int, err error) { + err = m.m.Unbind(ctx, m.bPath) + if err != nil { + return + } + + err = m.m.DeleteExchange(ctx, m.xName) + if err != nil { + return + } + + purged, err = m.m.PurgeQueue(ctx, m.qName) + if err != nil { + return + } + + err = m.m.DeleteQueue(ctx, m.qName) + if err != nil { + return + } + + return +} diff --git a/data/rabbit/publish.go b/data/rabbit/publish.go new file mode 100644 index 0000000..fbc7a88 --- /dev/null +++ b/data/rabbit/publish.go @@ -0,0 +1,55 @@ +package rabbit + +import ( + "context" + "fmt" + "time" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +func NewPublisher(ctx context.Context, tag string, xqr *XQR) (*rmq.Publisher, error) { + cli := client + if tag != "default" { + endpoints, err := genEndpoints(tag) + if err != nil { + return nil, err + } + cli, err = NewClient(ctx, endpoints) + if err != nil { + return nil, err + } + } + return cli.conn.NewPublisher(context.Background(), &rmq.ExchangeAddress{ + Exchange: xqr.ExchangeName, + Key: xqr.RoutingKey, + }, nil) +} + +func Publish(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan []byte) { + for { + select { + case msg := <-msgChan: + result, err := publisher.Publish(ctx, rmq.NewMessage(msg)) + if err != nil { + _ = err // TODO + time.Sleep(1 * time.Second) + continue + } + + switch result.Outcome.(type) { + case *rmq.StateAccepted: + // TODO: "Message accepted" + case *rmq.StateReleased: + // TODO: "Message was not routed" + case *rmq.StateRejected: + // TODO: stateType := publishResult.Outcome.(*rmq.StateRejected) + default: + // TODO: ("Message state: %v", publishResult.Outcome) + } + case <-time.After(time.Second): + // TODO + fmt.Println("second passed") + } + } +} diff --git a/data/rabbit/rabbit.go b/data/rabbit/rabbit.go new file mode 100644 index 0000000..805edbc --- /dev/null +++ b/data/rabbit/rabbit.go @@ -0,0 +1,63 @@ +package rabbit + +import ( + "context" + "datart/config" + + "github.com/Azure/go-amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +type XQR struct { + ExchangeName string `json:"exchangename" yaml:"exchangename"` + QueueName string `json:"queuename" yaml:"queuename"` + RoutingKey string `json:"routingkey" yaml:"routingkey"` + QueueLength int64 `json:"queuelength" yaml:"queuelength"` +} + +var client *rabbitClient + +func init() { + endpoints, err := genEndpoints("default") + if err != nil { + panic(err) + } + + client = new(rabbitClient) + client.env = rmq.NewClusterEnvironment(endpoints) + conn, err := client.env.NewConnection(context.Background()) + if err != nil { + panic(err) + } + client.conn = conn +} + +func Close(ctx context.Context) error { + return client.Close(ctx) +} + +func genEndpoints(tag string) ([]rmq.Endpoint, error) { + confs := config.Conf().RabbitConf(tag) + endpoints := make([]rmq.Endpoint, len(confs)) + + for i, conf := range confs { + tlsConfig, err := conf.GetTLS().GenTLSConfig(tag) + if err != nil { + return nil, err + } + + var options *rmq.AmqpConnOptions + var tls bool + if tlsConfig != nil { + options = &rmq.AmqpConnOptions{ + SASLType: amqp.SASLTypeExternal(""), + TLSConfig: tlsConfig} + tls = true + } + + endpoints[i].Address = conf.GenAddress(tls) + endpoints[i].Options = options + } + + return endpoints, nil +} diff --git a/data/redis/hash.go b/data/redis/hash.go new file mode 100644 index 0000000..fbe9874 --- /dev/null +++ b/data/redis/hash.go @@ -0,0 +1,33 @@ +package redis + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +func HGetAll(ctx context.Context, key string) (map[string]string, error) { + hash, err := client.HGetAll(ctx, key).Result() + if err == redis.Nil { + return nil, nil + } else if err != nil { + return nil, err + } + + return hash, nil +} + +func HGet(ctx context.Context, key, field string) (string, error) { + str, err := client.HGet(ctx, key, field).Result() + if err == redis.Nil { + return "", nil + } else if err != nil { + return "", err + } + + return str, nil +} + +func HSet(ctx context.Context, key string, values map[string]interface{}) error { + return client.HSet(ctx, key, values).Err() +} diff --git a/data/redis/redis.go b/data/redis/redis.go new file mode 100644 index 0000000..9c1156c --- /dev/null +++ b/data/redis/redis.go @@ -0,0 +1,71 @@ +package redis + +import ( + "context" + "datart/config" + "time" + + "github.com/redis/go-redis/v9" +) + +var client *redis.Client + +func init() { + config := config.Conf().RedisConf("default") + client = redis.NewClient(&redis.Options{ + Addr: config.Addr, + Username: config.Username, + Password: config.Password, + DB: config.DB, + Protocol: config.Protocol, + DialTimeout: time.Duration(config.GetDialTimeout()) * time.Millisecond, + ReadTimeout: time.Duration(config.GetReadTimeout()) * time.Millisecond, + WriteTimeout: time.Duration(config.GetWriteTimeout()) * time.Millisecond, + PoolSize: config.PoolSize, + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + pong, err := client.Ping(ctx).Result() + if err != nil { + panic(err) + } + if pong != "PONG" { + panic("redis ping failed") + } +} + +// close redis client +func Close() error { + return client.Close() +} + +func Lock(ctx context.Context, key string, value interface{}, expiration time.Duration) error { + return client.SetNX(ctx, key, value, expiration).Err() +} + +func Unlock(ctx context.Context, key string) error { + return client.Del(ctx, key).Err() +} + +func Keys(ctx context.Context, pattern string) ([]string, error) { + batch := int64(1000) + cursor := uint64(0) + keys := make([]string, 0) + + for { + ks, nextCursor, err := client.Scan(ctx, cursor, pattern, batch).Result() + if err != nil { + return nil, err + } + + keys = append(keys, ks...) + + cursor = nextCursor + if cursor == 0 { + break + } + } + + return keys, nil +} diff --git a/data/redis/string.go b/data/redis/string.go new file mode 100644 index 0000000..785830a --- /dev/null +++ b/data/redis/string.go @@ -0,0 +1,23 @@ +package redis + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9" +) + +func Get(ctx context.Context, key string) (string, error) { + str, err := client.Get(ctx, key).Result() + if err == redis.Nil { + return "", nil + } else if err != nil { + return "", err + } + + return str, nil +} + +func Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { + return client.Set(ctx, key, value, expiration).Err() +} diff --git a/data/redis/zset.go b/data/redis/zset.go new file mode 100644 index 0000000..a4503b2 --- /dev/null +++ b/data/redis/zset.go @@ -0,0 +1,39 @@ +package redis + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +func ZRangeWithScores(ctx context.Context, key string, start int64, stop int64) ([]redis.Z, error) { + zset, err := client.ZRangeWithScores(ctx, key, start, stop).Result() + if err == redis.Nil { + return nil, nil + } else if err != nil { + return nil, err + } + + return zset, nil +} + +func ZAdd(ctx context.Context, key string, members []redis.Z) error { + return client.ZAdd(ctx, key, members...).Err() +} + +func ZAtomicReplace(ctx context.Context, key string, members []redis.Z) error { + script := ` + redis.call('DEL', KEYS[1]) + for i = 1, #ARGV, 2 do + redis.call('ZADD', KEYS[1], ARGV[i], ARGV[i+1]) + end + return true + ` + + var args []any + for _, z := range members { + args = append(args, z.Score, z.Member) + } + + return client.Eval(ctx, script, []string{key}, args...).Err() +} diff --git a/go.mod b/go.mod index b9ecc81..7f9581e 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,35 @@ module datart go 1.24.0 require ( + github.com/Azure/go-amqp v1.5.0 + github.com/google/uuid v1.6.0 + github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0 + github.com/redis/go-redis/v9 v9.14.0 + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 + go.mongodb.org/mongo-driver/v2 v2.3.0 go.uber.org/zap v1.27.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 + gorm.io/driver/postgres v1.6.0 + gorm.io/gorm v1.31.0 ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/stretchr/testify v1.9.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect go.uber.org/multierr v1.10.0 // indirect + golang.org/x/crypto v0.33.0 // indirect + golang.org/x/sync v0.15.0 // indirect + golang.org/x/text v0.26.0 // indirect ) diff --git a/go.sum b/go.sum index 15aea8e..1388790 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,122 @@ +github.com/Azure/go-amqp v1.5.0 h1:GRiQK1VhrNFbyx5VlmI6BsA1FCp27W5rb9kxOZScnTo= +github.com/Azure/go-amqp v1.5.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= +github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus= +github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8= +github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY= +github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0 h1:q0zPF8/7Bdm+XwjWevFynB8fNiuE65x4q2vmFxU2cjM= +github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0/go.mod h1:t5oaK/4mJjw9dNpDzwvH6bE7p9XtM1JyObEHszFu3lU= +github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE= +github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver/v2 v2.3.0 h1:sh55yOXA2vUjW1QYw/2tRlHSQViwDyPnW61AwpZ4rtU= +go.mongodb.org/mongo-driver/v2 v2.3.0/go.mod h1:jHeEDJHJq7tm6ZF45Issun9dbogjfnPySb1vXA7EeAI= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc= +golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4= +gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo= +gorm.io/gorm v1.31.0 h1:0VlycGreVhK7RF/Bwt51Fk8v0xLiiiFdbGDPIZQ7mJY= +gorm.io/gorm v1.31.0/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=