Compare commits

..

8 Commits
master ... dev

Author SHA1 Message Date
zhuxu c467f29b55 update files 2025-12-05 17:54:25 +08:00
zhuxu 53a38b387c update files 2025-11-20 21:07:33 +08:00
zhuxu 71ebf6b938 add api and update files 2025-11-07 19:25:03 +08:00
zhuxu 7a9bd01b37 process data and api 2025-10-23 18:02:29 +08:00
zhuxu 352fa80e26 add log and process data 2025-10-11 14:56:11 +08:00
zhuxu affbebe806 add data and update config 2025-09-19 16:17:46 +08:00
zhuxu 1186f5dbff config 2025-09-05 18:35:46 +08:00
zhuxu a8a9e6cf7c new dev 2025-08-15 16:01:02 +08:00
52 changed files with 3981 additions and 1 deletions

12
.drone.yml Normal file
View File

@ -0,0 +1,12 @@
kind: pipeline
type: docker
name: default
steps:
- name: build
image: golang:latest
environment:
GO111MODULE: on
GOPROXY: https://goproxy.cn,direct
commands:
- go build

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
.vscode
logs
*.out
.idea

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# dataRT
[![Build Status](http://192.168.46.100:4080/api/badges/CL-Softwares/dataRT/status.svg?ref=refs/heads/dev)](http://192.168.46.100:4080/CL-Softwares/dataRT)

119
config/config.go Normal file
View File

@ -0,0 +1,119 @@
package config
import (
"encoding/json"
"flag"
"os"
)
type config struct {
serverConf *serverConfig
logConf *logConfig
postgresConf map[string]*postgresConfig
influxConf map[string]*influxConfig
redisConf map[string]*redisConfig
mongoConf map[string]*mongoConfig
rabbitConf map[string][]*rabbitConfig
}
var conf *config
var confDir string
func init() {
flag.StringVar(&confDir, "conf_dir", "./configs", "conf dir")
flag.Parse()
conf = new(config)
conf.serverConf = new(serverConfig)
serverConf := confDir + string(os.PathSeparator) + serverConfigName()
conf.unmarshalJsonFile(serverConf, conf.serverConf)
conf.logConf = new(logConfig)
logConf := confDir + string(os.PathSeparator) + logConfigName()
conf.unmarshalJsonFile(logConf, conf.logConf)
conf.postgresConf = make(map[string]*postgresConfig)
postgresConf := confDir + string(os.PathSeparator) + postgresConfigName()
conf.unmarshalJsonFile(postgresConf, &conf.postgresConf)
conf.influxConf = make(map[string]*influxConfig)
influxConf := confDir + string(os.PathSeparator) + influxConfigName()
conf.unmarshalJsonFile(influxConf, &conf.influxConf)
conf.redisConf = make(map[string]*redisConfig)
redisConf := confDir + string(os.PathSeparator) + redisConfigName()
conf.unmarshalJsonFile(redisConf, &conf.redisConf)
conf.mongoConf = make(map[string]*mongoConfig)
mongoConf := confDir + string(os.PathSeparator) + mongoConfigName()
conf.unmarshalJsonFile(mongoConf, &conf.mongoConf)
conf.rabbitConf = make(map[string][]*rabbitConfig)
rabbitConf := confDir + string(os.PathSeparator) + rabbitConfigName()
conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf)
}
func Conf() *config {
return conf
}
func (c *config) ServerConf() *serverConfig {
if c == nil {
panic("config is nil")
}
return c.serverConf
}
func (c *config) LogConf() *logConfig {
if c == nil {
panic("config is nil")
}
return c.logConf
}
func (c *config) PostgresConf(tag string) *postgresConfig {
if c == nil || c.postgresConf == nil {
panic("postgres config is nil")
}
return c.postgresConf[tag]
}
func (c *config) InfluxConf(tag string) *influxConfig {
if c == nil || c.influxConf == nil {
panic("influx config is nil")
}
return c.influxConf[tag]
}
func (c *config) RedisConf(tag string) *redisConfig {
if c == nil || c.redisConf == nil {
panic("redis config is nil")
}
return c.redisConf[tag]
}
func (c *config) MongoConf(tag string) *mongoConfig {
if c == nil || c.mongoConf == nil {
panic("mongo config is nil")
}
return c.mongoConf[tag]
}
func (c *config) RabbitConf(tag string) []*rabbitConfig {
if c == nil || c.rabbitConf == nil {
panic("rabbit config is nil")
}
return c.rabbitConf[tag]
}
func (c *config) unmarshalJsonFile(file string, dest any) {
if filejson, err := os.ReadFile(file); err != nil {
panic(err.Error())
} else {
err := json.Unmarshal([]byte(filejson), dest)
if err != nil {
panic(err.Error())
}
}
}

84
config/influx.go Normal file
View File

@ -0,0 +1,84 @@
package config
type influxConfig struct {
URL string `json:"url" yaml:"url"`
Token string `json:"token" yaml:"token"`
Org string `json:"org" yaml:"org"`
Timeout int `json:"timeout" yaml:"timeout"`
}
func NewInfluxConfig() *influxConfig {
return new(influxConfig)
}
func (conf *influxConfig) GetURL() string {
if conf == nil {
panic("influx config is nil")
}
return conf.URL
}
func (conf *influxConfig) SetURL(url string) *influxConfig {
if conf == nil {
panic("influx config is nil")
}
conf.URL = url
return conf
}
func (conf *influxConfig) GetToken() string {
if conf == nil {
panic("influx config is nil")
}
return conf.Token
}
func (conf *influxConfig) SetToken(token string) *influxConfig {
if conf == nil {
panic("influx config is nil")
}
conf.Token = token
return conf
}
func (conf *influxConfig) GetOrg() string {
if conf == nil {
panic("influx config is nil")
}
return conf.Org
}
func (conf *influxConfig) SetOrg(org string) *influxConfig {
if conf == nil {
panic("influx config is nil")
}
conf.Org = org
return conf
}
func (conf *influxConfig) GetTimeout() int {
if conf == nil {
panic("influx config is nil")
}
return conf.Timeout
}
func (conf *influxConfig) SetTimeout(timeout int) *influxConfig {
if conf == nil {
panic("influx config is nil")
}
conf.Timeout = timeout
return conf
}
func influxConfigName() string {
return "influx.json"
}

137
config/log.go Normal file
View File

@ -0,0 +1,137 @@
package config
import (
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
type logConfig struct {
lumberjack.Logger
// DebugLevel, -1, logs are typically voluminous, and are usually disabled in
// production.
//
// InfoLevel, 0, is the default logging priority.
//
// WarnLevel, 1, logs are more important than Info, but don't need individual
// human review.
//
// ErrorLevel, 2, logs are high-priority. If an application is running smoothly,
// it shouldn't generate any error-level logs.
//
// DPanicLevel, 3, logs are particularly important errors. In development the
// logger panics after writing the message.
//
// PanicLevel, 4, logs a message, then panics.
//
// FatalLevel, 5, logs a message, then calls os.Exit(1).
LogLevel int8 `json:"loglevel" yaml:"loglevel"`
}
func NewLogCinfig() *logConfig {
return new(logConfig)
}
func (conf *logConfig) GetFileName() string {
if conf == nil {
panic("log config is nil")
}
return conf.Filename
}
func (conf *logConfig) SetFileName(fileName string) {
if conf == nil {
panic("log config is nil")
}
conf.Filename = fileName
}
func (conf *logConfig) GetMaxSize() int {
if conf == nil {
panic("log config is nil")
}
return conf.MaxSize
}
func (conf *logConfig) SetMaxSize(maxSize int) {
if conf == nil {
panic("log config is nil")
}
conf.MaxSize = maxSize
}
func (conf *logConfig) GetMaxAge() int {
if conf == nil {
panic("log config is nil")
}
return conf.MaxAge
}
func (conf *logConfig) SetMaxAge(maxAge int) {
if conf == nil {
panic("log config is nil")
}
conf.MaxAge = maxAge
}
func (conf *logConfig) GetMaxBackups() int {
if conf == nil {
panic("log config is nil")
}
return conf.MaxBackups
}
func (conf *logConfig) SetMaxBackups(maxBackups int) {
if conf == nil {
panic("log config is nil")
}
conf.MaxBackups = maxBackups
}
func (conf *logConfig) GetLocalTime() bool {
if conf == nil {
panic("log config is nil")
}
return conf.LocalTime
}
func (conf *logConfig) SetLocalTime(useLocalTime bool) {
if conf == nil {
panic("log config is nil")
}
conf.LocalTime = useLocalTime
}
func (conf *logConfig) GetCompress() bool {
if conf == nil {
panic("log config is nil")
}
return conf.Compress
}
func (conf *logConfig) SetCompress(doCompress bool) {
if conf == nil {
panic("log config is nil")
}
conf.Compress = doCompress
}
func (conf *logConfig) GetLogLevel() zapcore.Level {
if conf == nil {
panic("log config is nil")
}
return zapcore.Level(conf.LogLevel)
}
func (conf *logConfig) SetLogLevel(logLevel zapcore.Level) {
if conf == nil {
panic("log config is nil")
}
if logLevel < -1 || logLevel > 5 {
panic("logLevel is invalid")
}
conf.LogLevel = int8(logLevel)
}
func logConfigName() string {
return "log.json"
}

102
config/mongo.go Normal file
View File

@ -0,0 +1,102 @@
package config
type mongoConfig struct {
Addrs []string `json:"addrs" yaml:"addrs"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
AuthSource string `josn:"authsource" yaml:"authsource"`
AuthMechanism string `json:"authmechanism" yaml:"authmechanism"`
}
func NewMongoConfig() *mongoConfig {
return new(mongoConfig)
}
func (conf *mongoConfig) GetAddrs() []string {
if conf == nil {
panic("mongo config is nil")
}
return conf.Addrs
}
func (conf *mongoConfig) SetAddrs(addrs []string) *mongoConfig {
if conf == nil {
panic("mongo config is nil")
}
conf.Addrs = addrs
return conf
}
func (conf *mongoConfig) GetUsername() string {
if conf == nil {
panic("mongo config is nil")
}
return conf.Username
}
func (conf *mongoConfig) SetUsername(username string) *mongoConfig {
if conf == nil {
panic("mongo config is nil")
}
conf.Username = username
return conf
}
func (conf *mongoConfig) GetPassword() string {
if conf == nil {
panic("mongo config is nil")
}
return conf.Password
}
func (conf *mongoConfig) SetPassword(password string) *mongoConfig {
if conf == nil {
panic("mongo config is nil")
}
conf.Password = password
return conf
}
func (conf *mongoConfig) GetAuthSource() string {
if conf == nil {
panic("mongo config is nil")
}
return conf.AuthSource
}
func (conf *mongoConfig) SetAuthSource(authSource string) *mongoConfig {
if conf == nil {
panic("mongo config is nil")
}
conf.AuthSource = authSource
return conf
}
func (conf *mongoConfig) GetAuthMechanism() string {
if conf == nil {
panic("mongo config is nil")
}
return conf.AuthMechanism
}
func (conf *mongoConfig) SetAuthMechanism(authMechanism string) *mongoConfig {
if conf == nil {
panic("mongo config is nil")
}
conf.AuthMechanism = authMechanism
return conf
}
func mongoConfigName() string {
return "mongo.json"
}

120
config/postgres.go Normal file
View File

@ -0,0 +1,120 @@
package config
type postgresConfig struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
DBName string `json:"dbname" yaml:"dbname"`
TimeZone string `json:"timezone" yaml:"timezone"`
}
func NewPostgresConfig() *postgresConfig {
return new(postgresConfig)
}
func (conf *postgresConfig) GetHost() string {
if conf == nil {
panic("postgres config is nil")
}
return conf.Host
}
func (conf *postgresConfig) SetHost(host string) *postgresConfig {
if conf == nil {
panic("postgres config is nil")
}
conf.Host = host
return conf
}
func (conf *postgresConfig) GetPort() int {
if conf == nil {
panic("postgres config is nil")
}
return conf.Port
}
func (conf *postgresConfig) SetPort(port int) *postgresConfig {
if conf == nil {
panic("postgres config is nil")
}
conf.Port = port
return conf
}
func (conf *postgresConfig) GetUser() string {
if conf == nil {
panic("postgres config is nil")
}
return conf.User
}
func (conf *postgresConfig) SetUser(user string) *postgresConfig {
if conf == nil {
panic("postgres config is nil")
}
conf.User = user
return conf
}
func (conf *postgresConfig) GetPassword() string {
if conf == nil {
panic("postgres config is nil")
}
return conf.Password
}
func (conf *postgresConfig) SetPassword(password string) *postgresConfig {
if conf == nil {
panic("postgres config is nil")
}
conf.Password = password
return conf
}
func (conf *postgresConfig) GetDBName() string {
if conf == nil {
panic("postgres config is nil")
}
return conf.DBName
}
func (conf *postgresConfig) SetDBName(dbName string) *postgresConfig {
if conf == nil {
panic("postgres config is nil")
}
conf.DBName = dbName
return conf
}
func (conf *postgresConfig) GetTimeZone() string {
if conf == nil {
panic("postgres config is nil")
}
return conf.TimeZone
}
func (conf *postgresConfig) SetTimeZone(timeZone string) *postgresConfig {
if conf == nil {
panic("postgres config is nil")
}
conf.TimeZone = timeZone
return conf
}
func postgresConfigName() string {
return "postgres.json"
}

80
config/rabbit.go Normal file
View File

@ -0,0 +1,80 @@
package config
type rabbitConfig struct {
Broker string `json:"broker" yaml:"broker"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
}
func NewRabbitConfig() *rabbitConfig {
return new(rabbitConfig)
}
func (conf *rabbitConfig) GenAddress(tls bool) string {
if conf == nil {
panic("rabbit config is nil")
}
address := "amqp://"
if conf.GetUsername() != "" && conf.GetPassword() != "" {
address += conf.GetUsername() + ":" + conf.GetPassword() + "@"
}
address += conf.GetBroker() + "/"
return address
}
func (conf *rabbitConfig) GetBroker() string {
if conf == nil {
panic("rabbit config is nil")
}
return conf.Broker
}
func (conf *rabbitConfig) SetBroker(broker string) *rabbitConfig {
if conf == nil {
panic("rabbit config is nil")
}
conf.Broker = broker
return conf
}
func (conf *rabbitConfig) GetUsername() string {
if conf == nil {
panic("rabbit config is nil")
}
return conf.Username
}
func (conf *rabbitConfig) SetUsername(username string) *rabbitConfig {
if conf == nil {
panic("rabbit config is nil")
}
conf.Username = username
return conf
}
func (conf *rabbitConfig) GetPassword() string {
if conf == nil {
panic("rabbit config is nil")
}
return conf.Password
}
func (conf *rabbitConfig) SetPassword(password string) *rabbitConfig {
if conf == nil {
panic("rabbit config is nil")
}
conf.Password = password
return conf
}
func rabbitConfigName() string {
return "rabbit.json"
}

174
config/redis.go Normal file
View File

@ -0,0 +1,174 @@
package config
type redisConfig struct {
Addr string `json:"addr" yaml:"addr"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
DB int `json:"db" yaml:"db"`
Protocol int `json:"protocol" yaml:"protocol"`
DialTimeout int `json:"dialtimeout" yaml:"dialtimeout"`
ReadTimeout int `json:"readtimeout" yaml:"readtimeout"`
WriteTimeout int `json:"writetimeout" yaml:"writetimeout"`
PoolSize int `json:"poolsize" yaml:"poolsize"`
}
func NewRedisConfig() *redisConfig {
return new(redisConfig)
}
func (conf *redisConfig) GetAddr() string {
if conf == nil {
panic("redis config is nil")
}
return conf.Addr
}
func (conf *redisConfig) SetAddr(addr string) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.Addr = addr
return conf
}
func (conf *redisConfig) GetUsername() string {
if conf == nil {
panic("redis config is nil")
}
return conf.Username
}
func (conf *redisConfig) SetUsername(username string) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.Username = username
return conf
}
func (conf *redisConfig) GetPassword() string {
if conf == nil {
panic("redis config is nil")
}
return conf.Password
}
func (conf *redisConfig) SetPassword(password string) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.Password = password
return conf
}
func (conf *redisConfig) GetDB() int {
if conf == nil {
panic("redis config is nil")
}
return conf.DB
}
func (conf *redisConfig) SetDB(db int) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.DB = db
return conf
}
func (conf *redisConfig) GetProtocol() int {
if conf == nil {
panic("redis config is nil")
}
return conf.Protocol
}
func (conf *redisConfig) SetProtocol(protocol int) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.Protocol = protocol
return conf
}
func (conf *redisConfig) GetDialTimeout() int {
if conf == nil {
panic("redis config is nil")
}
return conf.DialTimeout
}
func (conf *redisConfig) SetDialTimeout(dialTimeout int) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.DialTimeout = dialTimeout
return conf
}
func (conf *redisConfig) GetReadTimeout() int {
if conf == nil {
panic("redis config is nil")
}
return conf.ReadTimeout
}
func (conf *redisConfig) SetReadTimeout(readTimeout int) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.ReadTimeout = readTimeout
return conf
}
func (conf *redisConfig) GetWriteTimeout() int {
if conf == nil {
panic("redis config is nil")
}
return conf.WriteTimeout
}
func (conf *redisConfig) SetWriteTimeout(writeTimeout int) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.WriteTimeout = writeTimeout
return conf
}
func (conf *redisConfig) GetPoolSize() int {
if conf == nil {
panic("redis config is nil")
}
return conf.PoolSize
}
func (conf *redisConfig) SetPoolSIze(poolSize int) *redisConfig {
if conf == nil {
panic("redis config is nil")
}
conf.PoolSize = poolSize
return conf
}
func redisConfigName() string {
return "redis.json"
}

72
config/server.go Normal file
View File

@ -0,0 +1,72 @@
package config
import "maps"
type serverConfig struct {
Name string `json:"name" yaml:"name"`
Port int `json:"port" yaml:"port"`
SSUType map[string]uint8 `json:"ssutype" yaml:"ssutype"`
}
func NewServerConfig() *serverConfig {
return new(serverConfig)
}
func (conf *serverConfig) GetName() string {
if conf == nil {
panic("server config is nil")
}
return conf.Name
}
func (conf *serverConfig) SetName(name string) *serverConfig {
if conf == nil {
panic("server config is nil")
}
conf.Name = name
return conf
}
func (conf *serverConfig) GetPort() int {
if conf == nil {
panic("server config is nil")
}
return conf.Port
}
func (conf *serverConfig) SetPort(port int) *serverConfig {
if conf == nil {
panic("server config is nil")
}
conf.Port = port
return conf
}
func (conf *serverConfig) GetSSUType() map[string]uint8 {
if conf == nil {
panic("server config is nil")
}
return conf.SSUType
}
func (conf *serverConfig) SetSSUType(ssuType map[string]uint8) *serverConfig {
if conf == nil {
panic("server config is nil")
}
if conf.SSUType == nil {
conf.SSUType = ssuType
} else {
maps.Copy(conf.SSUType, ssuType)
}
return conf
}
func serverConfigName() string {
return "server.json"
}

8
configs/influx.json Normal file
View File

@ -0,0 +1,8 @@
{
"default": {
"url": "http://192.168.46.100:8086",
"token": "kKtMhMj5ISrnrCAO1ugvL4D4c_HrbAv4HzSHGA3Ai1AeBIEmGbQpQY0qSwjaXYluOmuDAv0zAvFCRRqEWQ0zJw==",
"org": "eCL3000",
"timeout":1000
}
}

9
configs/log.json Normal file
View File

@ -0,0 +1,9 @@
{
"filename": "./logs/datart.log",
"maxsize": 128,
"maxage": 7,
"maxbackups": 20,
"localtime": true,
"compress": false,
"loglevel": -1
}

9
configs/mongo.json Normal file
View File

@ -0,0 +1,9 @@
{
"default":{
"addrs":["127.0.0.1:27017"],
"username":"admin",
"password":"password",
"authsource":"admin",
"authmechanism":"SCRAM-SHA-256"
}
}

10
configs/postgres.json Normal file
View File

@ -0,0 +1,10 @@
{
"default":{
"host":"192.168.46.100",
"port":9432,
"user":"postgres",
"password":"123RTYjkl",
"dbname":"metamodule",
"timezone":"Asia/Shanghai"
}
}

9
configs/rabbit.json Normal file
View File

@ -0,0 +1,9 @@
{
"default": [
{
"broker": "127.0.0.1:5672",
"username": "rabbitmq",
"password": "password"
}
]
}

13
configs/redis.json Normal file
View File

@ -0,0 +1,13 @@
{
"default":{
"addr":"192.168.46.100:6379",
"username":"",
"password":"",
"db":0,
"protocol":3,
"dialtimeout":50,
"readtimeout":250,
"writetimeout":250,
"poolsize":20
}
}

7
configs/server.json Normal file
View File

@ -0,0 +1,7 @@
{
"name":"datart",
"port":8888,
"ssutype":{
"ssu000":1
}
}

62
data/common.go Normal file
View File

@ -0,0 +1,62 @@
package data
import (
"datart/data/influx"
"datart/data/postgres"
"strings"
"github.com/redis/go-redis/v9"
)
func GenPhasorFields(channel string) []string {
fields := make([]string, 0, 3)
switch {
case strings.HasPrefix(channel, postgres.ChannelYCPrefix):
fieldPrefix := strings.ToLower(channel)
fields = append(fields,
fieldPrefix+influx.FieldSuffixAMP,
fieldPrefix+influx.FieldSuffixPA,
fieldPrefix+influx.FieldSuffixRMS)
case strings.HasPrefix(channel, postgres.ChannelYXPrefix):
fields = append(fields, strings.ToLower(channel))
case strings.HasPrefix(channel, postgres.ChannelUPrefix):
fieldUPrefix := strings.ToLower(channel)
fields = append(fields,
fieldUPrefix+influx.FieldSuffixAMP,
fieldUPrefix+influx.FieldSuffixPA,
fieldUPrefix+influx.FieldSuffixRMS)
case channel == postgres.ChannelP,
channel == postgres.ChannelQ,
channel == postgres.ChannelS,
channel == postgres.ChannelPF,
channel == postgres.ChannelF,
channel == postgres.ChannelDF:
fields = append(fields, strings.ToLower(channel))
}
return fields
}
type zUnit struct {
Key string
Members []redis.Z
}
func convertTVsToMenmbers(tvs []influx.TV) []redis.Z {
members := make([]redis.Z, len(tvs))
for i, tv := range tvs {
members[i].Member = tv.Time
members[i].Score = tv.Value
}
return members
}

39
data/data.go Normal file
View File

@ -0,0 +1,39 @@
package data
import (
"context"
"datart/data/influx"
"datart/data/mongo"
"datart/data/postgres"
"datart/data/rabbit"
"datart/data/redis"
)
type Processes struct {
cancel context.CancelFunc
}
func NewProcesses() *Processes {
return new(Processes)
}
func (p *Processes) StartDataProcessing() {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
postgres.GenSSU2ChannelSizes(ctx, 500)
updatingRedisPhasor(ctx)
}
func (p *Processes) Cancel(ctx context.Context) {
p.cancel()
eventNotifyPublisher.Close(ctx)
influx.CloseDefault()
mongo.CloseDefault(ctx)
postgres.CloseDefault()
rabbit.CloseDefault(ctx)
redis.CloseDefault()
}

311
data/influx/common.go Normal file
View File

@ -0,0 +1,311 @@
package influx
import (
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
// line protocol, better to gzip and sort tags by key in lexicographic order
func (client *influxClient) writeLinesData(ctx context.Context, db string,
data []byte, compress bool) error {
if compress {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := gz.Write(data); err != nil {
return err
}
if err := gz.Close(); err != nil {
return err
}
data = buf.Bytes()
}
request, err := http.NewRequest(http.MethodPost,
client.url+"/write?db="+db, bytes.NewReader(data))
if err != nil {
return err
}
request.Header.Set("Content-Type", "text/plain; charset=utf-8")
request.Header.Set("Authorization", "Token "+client.token)
request.Header.Set("Accept", "application/json")
if compress {
request.Header.Set("Content-Encoding", "gzip")
}
response, err := client.Do(request.WithContext(ctx))
if err != nil {
return err
}
defer response.Body.Close()
// http.StatusNoContent is the expected response,
// 200,201,202,206,207,208
// but if we get these we should still accept it as delivered.
if response.StatusCode != http.StatusNoContent {
return fmt.Errorf("unexpected status code: %d", response.StatusCode)
}
return nil
}
// for influx json response data
type jsonResp struct {
Results []*result `json:"results"`
}
type result struct {
StatementID int `json:"statement_id"`
Series []*fields `json:"series"`
}
type fields struct {
Name string `json:"name"`
Column []string `json:"columns"`
Values [][]any `json:"values"`
}
// respType json/csv
// json_time:"2024-12-18T08:12:21.4735154Z"
// csv_time:"1734572793695885000"
func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values,
respType string) ([]TV, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
client.url+"/query?"+reqData.Encode(), nil)
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Authorization", "Token "+client.token)
if respType == "csv" {
request.Header.Set("Accept", "application/csv")
}
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode)
}
respData, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
return respDataToTVs(respData, respType)
}
// respType json/csv
// json_time:"2024-12-18T08:12:21.4735154Z"
// csv_time:"1734572793695885000"
func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values,
respType string) (map[string][]TV, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
client.url+"/query?"+reqData.Encode(), nil)
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Authorization", "Token "+client.token)
if respType == "csv" {
request.Header.Set("Accept", "application/csv")
}
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode)
}
respData, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
return respDataToF2TVs(respData, respType)
}
func respDataToTVs(respData []byte, respType string) ([]TV, error) {
switch respType {
case "json":
resp := new(jsonResp)
err := json.Unmarshal(respData, resp)
if err != nil {
return nil, err
}
if len(resp.Results) > 0 &&
len(resp.Results[0].Series) > 0 {
return convertJsonToTVs(resp.Results[0].Series[0].Values)
}
case "csv":
rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll()
if err != nil {
return nil, err
}
if len(rows) > 1 {
return convertCsvToTVs(rows[1:])
}
default:
return nil, errors.New("invalid response type")
}
return []TV{}, nil
}
func respDataToF2TVs(respData []byte, respType string) (map[string][]TV, error) {
switch respType {
case "json":
resp := new(jsonResp)
err := json.Unmarshal(respData, resp)
if err != nil {
return nil, err
}
if len(resp.Results) > 0 &&
len(resp.Results[0].Series) > 0 {
return convertJsonToF2TVs(resp.Results[0].Series[0].Column,
resp.Results[0].Series[0].Values)
}
case "csv":
rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll()
if err != nil {
return nil, err
}
if len(rows) > 1 {
return convertCsvToF2TVs(rows)
}
default:
return nil, errors.New("invalid response type")
}
return map[string][]TV{}, nil
}
// measure at different times
func convertJsonToTVs(data [][]any) ([]TV, error) {
ret := make([]TV, 0, len(data))
for _, row := range data {
if len(row) > 1 {
tstr, ok := (row[0]).(string)
if !ok {
return nil, errors.New("not expected data type")
}
t, err := time.Parse("2006-01-02T15:04:05.999999Z", tstr)
if err != nil {
return nil, err
}
v, ok := (row[1]).(float64)
if !ok {
return nil, errors.New("not expected data type")
}
ret = append(ret, TV{
Time: t.UnixNano(),
Value: v,
})
}
}
return ret, nil
}
// different measures at different times
func convertJsonToF2TVs(cols []string, data [][]any) (map[string][]TV, error) {
f2tvs := make(map[string][]TV)
for _, row := range data {
if len(row) > 1 {
tstr, ok := (row[0]).(string)
if !ok {
return nil, errors.New("not expected data type")
}
t, err := time.Parse("2006-01-02T15:04:05.999999Z", tstr)
if err != nil {
return nil, err
}
for i := 1; i < len(row); i++ {
v, ok := (row[i]).(float64)
if !ok {
return nil, errors.New("not expected data type")
}
f2tvs[cols[i]] = append(f2tvs[cols[i]], TV{
Time: t.UnixNano(),
Value: v,
})
}
}
}
return f2tvs, nil
}
// measure at different times
func convertCsvToTVs(data [][]string) ([]TV, error) {
ret := make([]TV, 0, len(data))
for _, row := range data[1:] {
if len(row) > 3 {
ns, err := strconv.ParseInt(row[2], 10, 64)
if err != nil {
return nil, err
}
v, err := strconv.ParseFloat(row[3], 64)
if err != nil {
return nil, err
}
ret = append(ret, TV{
Time: ns,
Value: v,
})
}
}
return ret, nil
}
// different measures at different times
func convertCsvToF2TVs(data [][]string) (map[string][]TV, error) {
f2tvs := make(map[string][]TV)
for _, row := range data[1:] {
if len(row) > 3 {
ns, err := strconv.ParseInt(row[2], 10, 64)
if err != nil {
return nil, err
}
for i := 3; i < len(row); i++ {
v, err := strconv.ParseFloat(row[i], 64)
if err != nil {
return nil, err
}
f2tvs[data[0][i]] = append(f2tvs[data[0][i]], TV{
Time: ns,
Value: v,
})
}
}
}
return f2tvs, nil
}

108
data/influx/influx.go Normal file
View File

@ -0,0 +1,108 @@
package influx
import (
"context"
"datart/config"
"errors"
"net"
"net/http"
"time"
)
type influxClient struct {
*http.Client
url string
token string
org string
}
var client *influxClient
func init() {
client = new(influxClient)
influxConfig := config.Conf().InfluxConf("default")
client.Client = &http.Client{
Timeout: time.Duration(influxConfig.GetTimeout()) * time.Millisecond,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
}
client.url = influxConfig.GetURL()
client.token = influxConfig.GetToken()
client.org = influxConfig.GetOrg()
}
func CloseDefault() {
client.CloseIdleConnections()
}
func NewInfluxClient(cli *http.Client, url, org, token string) *influxClient {
return &influxClient{
Client: cli,
url: url,
org: org,
token: token,
}
}
func GetDB(tp string) (string, error) {
switch tp {
case "phasor":
return dbphasor, nil
case "sample":
return dbsample, nil
}
return "", errors.New("invalid type")
}
// serverConf
func GetTable(tp string, mainPos string) (string, error) {
switch tp {
case "phasor":
ssu2Type := config.Conf().ServerConf().GetSSUType()
switch ssu2Type[mainPos] {
case 1:
return "current", nil
case 2:
return "voltage", nil
default:
return "", errors.New("invalid main_pos")
}
case "sample":
return "sample", nil
}
return "", errors.New("invalid type")
}
func WriteLinesData(ctx context.Context, db string, data []byte) error {
return client.WriteLinesData(ctx, db, data)
}
type Request struct {
DB string
Table string
Type string
Station string
MainPos string
SubPos string // separate whith ','
Begin int64
End int64
Operate string
Step string
Default string
}
type TV struct {
Time int64 `json:"time"`
Value float64 `json:"value"`
}

221
data/influx/ssu_point.go Normal file
View File

@ -0,0 +1,221 @@
package influx
import (
"context"
"fmt"
"net/url"
"strings"
"time"
)
const (
dbphasor = "ssuBucket"
dbsample = "ssuBucket"
)
// keep consistent with telegraf
const (
FieldYCPrefix string = "tm"
FieldYXPrefix string = "ts"
// FieldP string = "p"
// FieldQ string = "q"
// FieldS string = "s"
// FieldPF string = "pf"
// FieldF string = "f"
FieldDF string = "df"
// FieldUABPrefix string = "uab"
// FieldUBCPrefix string = "ubc"
// FieldUCAPrefix string = "uca"
FieldSuffixAMP = "_amp"
FieldSuffixPA = "_pa"
FieldSuffixRMS = "_rms"
)
const adaptedms = 5000
func GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
req.Begin = time.Now().UnixMilli() - int64(limit*20+adaptedms)
return client.GetSSUPointLastLimit(ctx, req, limit)
}
func GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
req.Begin = time.Now().UnixMilli() - int64(limit*20+adaptedms)
return client.GetSSUPointsLastLimit(ctx, req, limit)
}
func GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) {
return client.GetSSUPointDurationData(ctx, req)
}
func GetSSUPointsDurationData(ctx context.Context, req *Request) (map[string][]TV, error) {
return client.GetSSUPointsDurationData(ctx, req)
}
func GetSSUPointsAfterLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
req.End = req.Begin + int64(limit*20+20)
return client.GetSSUPointsAfterLimit(ctx, req, limit)
}
func GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
req.Begin = req.End - int64(limit*20+20)
return client.GetSSUPointsBeforeLimit(ctx, req, limit)
}
func (client *influxClient) GetSSUPointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and device='%s';",
req.SubPos, req.SubPos, req.Table, req.Station, req.MainPos)
if limit > 1 {
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;",
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit)
}
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
return client.getTVsResp(ctx, reqData, "csv")
}
func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
sql := ""
if limit == 1 {
fields := strings.Split(req.SubPos, ",")
for i, field := range fields {
fields[i] = "last(" + field + ") as " + field
}
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s';",
strings.Join(fields, ","), req.Table, req.Station, req.MainPos)
} else {
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms order by time desc limit %d;",
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, limit)
}
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
if err != nil {
return f2tvs, nil
}
ret := make(map[string][]TV, len(f2tvs))
for f, tvs := range f2tvs {
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
}
return ret, nil
}
func (client *influxClient) GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) {
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms;",
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End)
if req.Operate != "" && req.Step != "" && req.Default != "" {
sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);",
req.Operate, req.SubPos, req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default)
}
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
return client.getTVsResp(ctx, reqData, "csv")
}
func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *Request) (map[string][]TV, error) {
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms;",
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End)
if req.Operate != "" && req.Step != "" && req.Default != "" {
subPoss := strings.Split(req.SubPos, ",")
selectSections := make([]string, len(subPoss))
for i, subPos := range subPoss {
selectSections[i] = req.Operate + "(" + subPos + ")" + " as " + subPos
}
sql = fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);",
strings.Join(selectSections, ", "), req.Table, req.Station, req.MainPos, req.Begin, req.End, req.Step, req.Default)
}
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
if err != nil {
return f2tvs, nil
}
ret := make(map[string][]TV, len(f2tvs))
for f, tvs := range f2tvs {
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
}
return ret, nil
}
func (client *influxClient) GetSSUPointAfterLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms limit %d;",
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
return client.getTVsResp(ctx, reqData, "csv")
}
func (client *influxClient) GetSSUPointsAfterLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms limit %d;",
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)
reqData := url.Values{
"db": {req.DB},
"q": {sql},
}
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
if err != nil {
return f2tvs, nil
}
ret := make(map[string][]TV, len(f2tvs))
for f, tvs := range f2tvs {
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
}
return ret, nil
}
func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
reqData := url.Values{
"db": {req.DB},
"q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<=%dms order by time desc limit %d;",
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)},
}
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
if err != nil {
return f2tvs, nil
}
ret := make(map[string][]TV, len(f2tvs))
for f, tvs := range f2tvs {
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
}
return ret, nil
}
func (client *influxClient) WriteLinesData(ctx context.Context, db string, data []byte) error {
return client.writeLinesData(ctx, db, data, true)
}

131
data/mongo/alarm.go Normal file
View File

@ -0,0 +1,131 @@
package mongo
import (
"encoding/json"
"github.com/google/uuid"
)
const (
_ = iota
almCodeCommmExcept
almCodeADFault
almCodePPSExcept
almCodeReserve1
almCodeUnitInit
almCodeReadParamErr
almCodeReserve2
almCodeStartSample
almCodeOverSample
almCodeUnderSample
)
type Alarm struct {
DriverName string `bson:"driver_name" json:"driver_name"`
DeviceNo string `bson:"device_no" json:"device_no"`
AlarmCode int `bson:"alarm_code" json:"alarm_code"`
AlarmTime int64 `bson:"alarm_time" json:"alarm_time"` // ms
AlarmStatus int `bson:"alarm_status" json:"alarm_status"` // 0 "复位", 1 "动作/产生/告警"
}
var almCode2Name = []string{
almCodeCommmExcept: "通信异常",
almCodeADFault: "AD故障",
almCodePPSExcept: "同步秒脉冲异常",
almCodeReserve1: "备用",
almCodeUnitInit: "单元初始化",
almCodeReadParamErr: "读参数错",
almCodeReserve2: "备用",
almCodeStartSample: "启动采样-内部转换信号",
almCodeOverSample: "秒内采样点数过量",
almCodeUnderSample: "秒内采样点数欠量",
}
func (a *Alarm) GetName() string {
return almCode2Name[a.AlarmCode]
}
func (a *Alarm) GetType() int {
switch a.AlarmCode {
case almCodeReserve1,
almCodeReserve2,
almCodeUnitInit,
almCodeStartSample:
return genEventType(0, 0)
case almCodeOverSample,
almCodeUnderSample:
return genEventType(0, 1)
case almCodeCommmExcept,
almCodeADFault,
almCodePPSExcept,
almCodeReadParamErr:
return genEventType(0, 2)
}
return -1
}
func (a *Alarm) GetPriority() int {
switch a.AlarmCode {
case almCodeReserve1,
almCodeReserve2,
almCodeUnitInit,
almCodeStartSample:
return 1
case almCodeOverSample,
almCodeUnderSample:
return 4
case almCodeCommmExcept,
almCodeADFault,
almCodePPSExcept,
almCodeReadParamErr:
return 7
}
return -1
}
func GenEvent(alarm []byte, ip string) (*Event, error) {
a, err := UnmarshallToAlarm(alarm)
if err != nil {
return nil, err
}
return a.ConvertToEvent(ip)
}
func (a *Alarm) ConvertToEvent(ip string) (*Event, error) {
e := new(Event)
uid, err := uuid.NewV7()
if err != nil {
return nil, err
}
if a != nil {
e.Event = a.GetName()
e.EventUUID = uid.String()
e.Type = a.GetType()
e.Priority = a.GetPriority()
e.Status = EventStatusHappen
e.Timestamp = a.AlarmTime
e.From = "station"
e.Operations = append(e.Operations, &operation{
Action: EventActionHappen,
OP: ip,
TS: a.AlarmTime,
})
e.Alarm = a
}
return e, nil
}
func UnmarshallToAlarm(data []byte) (*Alarm, error) {
alm := new(Alarm)
err := json.Unmarshal(data, alm)
return alm, err
}

145
data/mongo/event.go Normal file
View File

@ -0,0 +1,145 @@
package mongo
import (
"context"
"encoding/json"
"time"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
const (
dbevent string = "cl"
tbevent string = "events"
)
const (
EventStatusHappen = iota
EventStatusDataAt
EventStatusReport
EventStatusConfirm
EventStatusPersist
EventStatusClose
)
const (
EventActionHappen = "happen"
EventActionDataAt = "data_attach"
EventActionReport = "report"
EventActionConfirm = "confirm"
EventActionPersist = "persist"
EventActionClose = "close"
)
var EventStatusAction = []string{
EventStatusHappen: EventActionHappen,
EventStatusDataAt: EventActionDataAt,
EventStatusReport: EventActionReport,
EventStatusConfirm: EventActionConfirm,
EventStatusPersist: EventActionPersist,
EventStatusClose: EventActionClose,
}
type operation struct {
Action string `bson:"action" json:"action"`
OP string `bson:"op" json:"op"`
TS int64 `bson:"ts" json:"ts"`
}
func GenOperation(action, op string) operation {
return operation{
Action: action,
OP: op,
TS: time.Now().UnixMilli(),
}
}
type Event struct {
Event string `bson:"event" json:"event"`
EventUUID string `bson:"event_uuid" json:"event_uuid"`
Type int `bson:"type" json:"type"`
Priority int `bson:"priority" json:"priority"` // 0~9
Status int `bson:"status" json:"status"`
Timestamp int64 `bson:"timestamp" json:"timestamp"`
From string `bson:"from" json:"from"`
Operations []*operation `bson:"operations" json:"operations"`
// others
Alarm *Alarm `bson:"alarm" json:"alarm"`
}
func (e *Event) Marshall() ([]byte, error) {
return json.Marshal(e)
}
func InsertOneEvent(ctx context.Context, doc any) error {
_, err := getCollection(dbevent, tbevent).InsertOne(ctx, doc)
return err
}
func InsertEvents(ctx context.Context, docs any) error {
_, err := getCollection(dbevent, tbevent).InsertMany(ctx, docs)
return err
}
func DeleteOneEvent[T bson.M | bson.D](ctx context.Context, filter T) error {
_, err := getCollection(dbevent, tbevent).DeleteOne(ctx, filter)
return err
}
func DeleteEvents[T bson.M | bson.D](ctx context.Context, filter T) error {
_, err := getCollection(dbevent, tbevent).DeleteMany(ctx, filter)
return err
}
func UpdateOneEvent[T bson.M | bson.D](ctx context.Context, filter T, update T) error {
_, err := getCollection(dbevent, tbevent).UpdateOne(ctx, filter, update)
return err
}
func UpdateEvents[T bson.M | bson.D](ctx context.Context, filter T, update T) error {
_, err := getCollection(dbevent, tbevent).UpdateMany(ctx, filter, update)
return err
}
func FindEventsWithPageLimit[T bson.M | bson.D](ctx context.Context, filter T,
sort int, page int64, limit int64) ([]*Event, error) {
opt := options.Find()
if sort == 1 || sort == -1 {
opt.SetSort(bson.D{{Key: "timestamp", Value: sort}})
} else {
opt.SetSort(bson.D{{Key: "_id", Value: 1}})
}
if page > 0 && limit > 0 {
opt.SetSkip(limit * (page - 1)).SetLimit(limit)
}
cursor, err := getCollection(dbevent, tbevent).Find(ctx, filter, opt)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var docs []*Event
for cursor.Next(ctx) {
doc := new(Event)
if err := cursor.Decode(doc); err != nil {
return nil, err
}
docs = append(docs, doc)
}
if err := cursor.Err(); err != nil {
return docs, err
}
return docs, nil
}
// sys: 0-hard/1-platform/2-application
//
// level:0-info/1-warn/2-error
func genEventType(sys int, level int) int {
return sys + level*3
}

54
data/mongo/mongo.go Normal file
View File

@ -0,0 +1,54 @@
package mongo
import (
"context"
"datart/config"
"strings"
"time"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
var client *mongo.Client
func init() {
conf := config.Conf().MongoConf("default")
uri := "mongodb://" + strings.Join(conf.GetAddrs(), ",")
cliOpts := options.Client().
ApplyURI(uri).SetTimeout(1 * time.Second).
SetAuth(options.Credential{
AuthMechanism: conf.GetAuthMechanism(),
AuthSource: conf.GetAuthSource(),
Username: conf.GetUsername(),
Password: conf.GetPassword(),
})
cli, err := mongo.Connect(cliOpts)
if err != nil {
panic(err)
}
client = cli
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := client.Ping(ctx, nil); err != nil {
panic(err)
}
}
func NewMongoClient(opts ...*options.ClientOptions) (*mongo.Client, error) {
return mongo.Connect(opts...)
}
func CloseDefault(ctx context.Context) error {
return client.Disconnect(ctx)
}
func GetSession() (*mongo.Session, error) {
return client.StartSession()
}
func getCollection(db string, tb string) *mongo.Collection {
return client.Database(db).Collection(tb)
}

View File

@ -0,0 +1,223 @@
package postgres
import (
"context"
"database/sql/driver"
"encoding/json"
"errors"
"sync"
"sync/atomic"
)
const (
tbmeasurement string = "public.measurement"
)
const (
ChannelYCPrefix string = "TM"
ChannelYXPrefix string = "TS"
ChannelP string = "P"
ChannelQ string = "Q"
ChannelS string = "S"
ChannelPF string = "PF"
ChannelF string = "F"
ChannelDF string = "dF"
ChannelUPrefix string = "U"
)
type dataSource struct {
Type int `json:"type"`
Addr any `json:"io_address"`
}
func (ds *dataSource) Scan(value any) error {
if value == nil {
return nil
}
bytes, ok := value.([]byte)
if !ok {
return errors.New("type assertion to []byte failed")
}
return json.Unmarshal(bytes, ds)
}
func (ds *dataSource) Value() (driver.Value, error) {
return json.Marshal(ds)
}
type measurement struct {
ID int64 `gorm:"colunmn:id"`
Tag string `gorm:"column:tag"`
Size int `gorm:"column:size"`
DataSource *dataSource `gorm:"column:data_source;type:jsonb"`
// others
}
type ChannelSize struct {
Station string
Device string
Channel string
Size int
}
// channel is original
var SSU2ChannelSizes atomic.Value
var ChannelSizes sync.Map
func init() {
SSU2ChannelSizes.Store(map[string][]ChannelSize{})
}
func LoadSSU2ChannelSizes() map[string][]ChannelSize {
v := SSU2ChannelSizes.Load()
if v == nil {
return nil
}
return v.(map[string][]ChannelSize)
}
func StoreSSU2ChannelSizes(m map[string][]ChannelSize) {
SSU2ChannelSizes.Store(m)
}
func GetSSU2ChannelSizesCopy() map[string][]ChannelSize {
src := LoadSSU2ChannelSizes()
if src == nil {
return nil
}
out := make(map[string][]ChannelSize, len(src))
for k, v := range src {
cp := make([]ChannelSize, len(v))
copy(cp, v)
out[k] = cp
}
return out
}
func GetSSU2ChannelSizesFor(ssu string) []ChannelSize {
src := LoadSSU2ChannelSizes()
if src == nil {
return nil
}
v, ok := src[ssu]
if !ok {
return nil
}
cp := make([]ChannelSize, len(v))
copy(cp, v)
return cp
}
func GetMeasurements(ctx context.Context, batchSize int) ([]*measurement, error) {
var totalRecords []*measurement
id := int64(0)
for {
var records []*measurement
result := client.WithContext(ctx).Table(tbmeasurement).Where("id > ?", id).
Order("id ASC").Limit(batchSize).Find(&records)
if result.Error != nil {
return totalRecords, result.Error
}
length := len(records)
if length <= 0 {
break
}
id = records[length-1].ID
totalRecords = append(totalRecords, records...)
}
return totalRecords, nil
}
func GenSSU2ChannelSizes(ctx context.Context, batchSize int) error {
id := int64(0)
ssu2Channel2Exist := make(map[string]map[string]struct{})
ssu2ChannelSizes := make(map[string][]ChannelSize)
for {
var records []*measurement
result := client.WithContext(ctx).Table(tbmeasurement).
Where("id > ?", id).Order("id ASC").Limit(batchSize).Find(&records)
if result.Error != nil {
return result.Error
}
length := len(records)
if length <= 0 {
break
}
for _, record := range records {
if record == nil || record.DataSource == nil {
continue
}
if err := genMappingFromAddr(ssu2ChannelSizes, ssu2Channel2Exist, record); err != nil {
return err
}
}
id = records[length-1].ID
}
StoreSSU2ChannelSizes(ssu2ChannelSizes)
return nil
}
func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
ssu2Channel2Exist map[string]map[string]struct{}, record *measurement) error {
switch record.DataSource.Type {
case 1:
if rawAddr, ok := record.DataSource.Addr.(map[string]interface{}); ok {
station, ok := rawAddr["station"].(string)
if !ok {
return errors.New("invalid station")
}
device, ok := rawAddr["device"].(string)
if !ok {
return errors.New("invalid device")
}
channel, ok := rawAddr["channel"].(string)
if !ok {
return errors.New("invalid channel")
}
if _, ok := ssu2Channel2Exist[device]; !ok {
ssu2Channel2Exist[device] = make(map[string]struct{})
}
if _, ok := ssu2Channel2Exist[device][channel]; ok {
return nil
} else {
ssu2Channel2Exist[device][channel] = struct{}{}
}
channelSize := ChannelSize{
Station: station,
Device: device,
Channel: channel,
Size: record.Size,
}
ChannelSizes.Store(record.Tag, channelSize)
ssu2ChannelSizes[device] = append(ssu2ChannelSizes[device], channelSize)
} else {
return errors.New("invalid io_address")
}
case 2:
default:
return errors.New("invalid data_source.type")
}
return nil
}

33
data/postgres/postgres.go Normal file
View File

@ -0,0 +1,33 @@
package postgres
import (
"datart/config"
"fmt"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
var client *gorm.DB
func init() {
postgresConfig := config.Conf().PostgresConf("default")
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d timezone=%s",
postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(),
postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetTimeZone())
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{SkipDefaultTransaction: true})
if err != nil {
panic(err)
}
client = db
}
// close postgres default client
func CloseDefault() error {
db, err := client.DB()
if err != nil {
return err
}
return db.Close()
}

37
data/postgres/station.go Normal file
View File

@ -0,0 +1,37 @@
package postgres
import (
"context"
)
const (
tbstation = "public.station"
)
type station struct {
ID int64 `gorm:"colunmn:id"`
ZoneID int64 `gorm:"colunmn:zone_id"`
TagName string `gorm:"column:tagname"`
Name string `gorm:"colunmn:name"`
IsLocal bool `gorm:"colunmn:is_local"`
}
func GetStations(ctx context.Context) ([]*station, error) {
var records []*station
result := client.WithContext(ctx).Table(tbstation).Find(&records)
if result.Error != nil {
return nil, result.Error
}
return records, nil
}
func GetLocalStation(ctx context.Context) (*station, error) {
var record *station
result := client.WithContext(ctx).Table(tbstation).Where("is_local=?", true).Find(&record)
if result.Error != nil {
return nil, result.Error
}
return record, nil
}

73
data/publish_event.go Normal file
View File

@ -0,0 +1,73 @@
package data
import (
"context"
"datart/data/rabbit"
"datart/log"
"encoding/json"
"errors"
"fmt"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
var eventNotifyXQK = rabbit.XQK{
Exchange: "event_notify_fanout",
}
var eventNotifyPublisher *rmq.Publisher
func init() {
ctx := context.Background()
m := new(rabbit.Manage)
rm := rabbit.DefaultManagement()
rx := &rmq.FanOutExchangeSpecification{
Name: eventNotifyXQK.Exchange,
}
m.Init(ctx, rm, rx, nil, nil)
if err := m.DeclareExchange(ctx); err != nil {
panic(err)
}
publisher, err := rabbit.NewPublisher(ctx, "default", &eventNotifyXQK)
if err != nil {
panic(err)
}
eventNotifyPublisher = publisher
}
func PublishEvent(ctx context.Context, event any) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
result, err := eventNotifyPublisher.Publish(ctx, rmq.NewMessage(data))
if err != nil {
return err
}
switch result.Outcome.(type) {
case *rmq.StateAccepted:
// "message accepted"
case *rmq.StateReleased:
// "message released"
case *rmq.StateRejected:
return errors.New("message rejected")
default:
return fmt.Errorf("invalid message state: %v", result.Outcome)
}
return nil
}
func CloseEventPublisher(ctx context.Context) {
if err := eventNotifyPublisher.Close(ctx); err != nil {
log.Error(err)
}
}

50
data/rabbit/client.go Normal file
View File

@ -0,0 +1,50 @@
package rabbit
import (
"context"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
type rabbitClient struct {
env *rmq.Environment
conn *rmq.AmqpConnection
}
func NewClient(ctx context.Context, tag string) (*rabbitClient, error) {
endpoints, err := genEndpoints(tag)
if err != nil {
return nil, err
}
cli := new(rabbitClient)
cli.env = rmq.NewClusterEnvironment(endpoints)
conn, err := cli.env.NewConnection(context.Background())
if err != nil {
return nil, err
}
client.conn = conn
return cli, nil
}
func (c *rabbitClient) Management() *rmq.AmqpManagement {
return c.conn.Management()
}
func (c *rabbitClient) NewPublisher(ctx context.Context, destination rmq.ITargetAddress,
options rmq.IConsumerOptions) (*rmq.Publisher, error) {
return c.conn.NewPublisher(ctx, destination, options)
}
func (c *rabbitClient) NewConsumer(ctx context.Context, queueName string,
options rmq.IConsumerOptions) (*rmq.Consumer, error) {
return c.conn.NewConsumer(ctx, queueName, options)
}
func (c *rabbitClient) Close(ctx context.Context) error {
return c.env.CloseConnections(ctx)
}

45
data/rabbit/consume.go Normal file
View File

@ -0,0 +1,45 @@
package rabbit
import (
"context"
"errors"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
func NewConsumer(ctx context.Context, tag string, xqk *XQK) (*rmq.Consumer, error) {
cli := client
if tag != "default" {
var err error
cli, err = NewClient(ctx, tag)
if err != nil {
return nil, err
}
}
return cli.conn.NewConsumer(ctx, xqk.Queue, nil)
}
func Consuming(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byte) {
for {
deliCtx, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
return
}
if err != nil {
// An error occurred receiving the message
return
}
for _, data := range deliCtx.Message().Data {
msgChan <- data
}
err = deliCtx.Accept(ctx)
if err != nil {
// accept error
return
}
}
}

83
data/rabbit/manage.go Normal file
View File

@ -0,0 +1,83 @@
package rabbit
import (
"context"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
type Manage struct {
m *rmq.AmqpManagement
xName string
x rmq.IExchangeSpecification
qName string
q rmq.IQueueSpecification
bPath string
b rmq.IBindingSpecification
}
func (m *Manage) Init(ctx context.Context, rm *rmq.AmqpManagement,
rx rmq.IExchangeSpecification, rq rmq.IQueueSpecification,
rb rmq.IBindingSpecification) {
m.m = rm
m.x = rx
m.q = rq
m.b = rb
}
func (m *Manage) DeclareExchange(ctx context.Context) error {
xinfo, err := m.m.DeclareExchange(ctx, m.x)
if err != nil {
return err
}
m.xName = xinfo.Name()
return nil
}
func (m *Manage) DeclareAndBind(ctx context.Context) error {
xinfo, err := m.m.DeclareExchange(ctx, m.x)
if err != nil {
return err
}
m.xName = xinfo.Name()
qinfo, err := m.m.DeclareQueue(ctx, m.q)
if err != nil {
return err
}
m.qName = qinfo.Name()
bPath, err := m.m.Bind(ctx, m.b)
if err != nil {
return err
}
m.bPath = bPath
return nil
}
func (m *Manage) UnbindAndDelete(ctx context.Context) (purged int, err error) {
err = m.m.Unbind(ctx, m.bPath)
if err != nil {
return
}
err = m.m.DeleteExchange(ctx, m.xName)
if err != nil {
return
}
purged, err = m.m.PurgeQueue(ctx, m.qName)
if err != nil {
return
}
err = m.m.DeleteQueue(ctx, m.qName)
if err != nil {
return
}
return
}

52
data/rabbit/publish.go Normal file
View File

@ -0,0 +1,52 @@
package rabbit
import (
"context"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
func NewPublisher(ctx context.Context, tag string, xqk *XQK) (*rmq.Publisher, error) {
cli := client
if tag != "default" {
var err error
cli, err = NewClient(ctx, tag)
if err != nil {
return nil, err
}
}
return cli.conn.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: xqk.Exchange,
Key: xqk.Key,
}, nil)
}
func Publishing(ctx context.Context, publisher *rmq.Publisher, msgChan <-chan []byte) {
for {
select {
case msg := <-msgChan:
result, err := publisher.Publish(ctx, rmq.NewMessage(msg))
if err != nil {
_ = err // publish error
continue
}
switch result.Outcome.(type) {
case *rmq.StateAccepted:
// "message accepted"
case *rmq.StateReleased:
// "message not routed"
case *rmq.StateRejected:
// "message rejected"
default:
// *rmp.StateModified
// *rmq.StateReceived
}
case <-ctx.Done():
return
}
}
}

54
data/rabbit/rabbit.go Normal file
View File

@ -0,0 +1,54 @@
package rabbit
import (
"context"
"datart/config"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
type XQK struct {
Exchange string `json:"exchange" yaml:"exchange"`
Queue string `json:"queue" yaml:"queue"`
Key string `json:"key" yaml:"key"`
QueueCap int64 `json:"queuecap" yaml:"queuecap"`
}
var client *rabbitClient
func init() {
endpoints, err := genEndpoints("default")
if err != nil {
panic(err)
}
client = new(rabbitClient)
client.env = rmq.NewClusterEnvironment(endpoints)
conn, err := client.env.NewConnection(context.Background())
if err != nil {
panic(err)
}
client.conn = conn
}
func CloseDefault(ctx context.Context) error {
return client.Close(ctx)
}
func DefaultManagement() *rmq.AmqpManagement {
return client.Management()
}
func genEndpoints(tag string) ([]rmq.Endpoint, error) {
confs := config.Conf().RabbitConf(tag)
endpoints := make([]rmq.Endpoint, len(confs))
for i, conf := range confs {
var options *rmq.AmqpConnOptions
endpoints[i].Address = conf.GenAddress(false)
endpoints[i].Options = options
}
return endpoints, nil
}

33
data/redis/hash.go Normal file
View File

@ -0,0 +1,33 @@
package redis
import (
"context"
"github.com/redis/go-redis/v9"
)
func HGetAll(ctx context.Context, key string) (map[string]string, error) {
hash, err := client.HGetAll(ctx, key).Result()
if err == redis.Nil {
return nil, nil
} else if err != nil {
return nil, err
}
return hash, nil
}
func HGet(ctx context.Context, key, field string) (string, error) {
str, err := client.HGet(ctx, key, field).Result()
if err == redis.Nil {
return "", nil
} else if err != nil {
return "", err
}
return str, nil
}
func HSet(ctx context.Context, key string, values map[string]interface{}) error {
return client.HSet(ctx, key, values).Err()
}

75
data/redis/redis.go Normal file
View File

@ -0,0 +1,75 @@
package redis
import (
"context"
"datart/config"
"time"
"github.com/redis/go-redis/v9"
)
var client *redis.Client
func init() {
config := config.Conf().RedisConf("default")
client = redis.NewClient(&redis.Options{
Addr: config.Addr,
Username: config.Username,
Password: config.Password,
DB: config.DB,
Protocol: config.Protocol,
DialTimeout: time.Duration(config.GetDialTimeout()) * time.Millisecond,
ReadTimeout: time.Duration(config.GetReadTimeout()) * time.Millisecond,
WriteTimeout: time.Duration(config.GetWriteTimeout()) * time.Millisecond,
PoolSize: config.PoolSize,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
pong, err := client.Ping(ctx).Result()
if err != nil {
panic(err)
}
if pong != "PONG" {
panic("redis ping failed")
}
}
// close redis client
func CloseDefault() error {
return client.Close()
}
func Lock(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return client.SetNX(ctx, key, value, expiration).Err()
}
func Unlock(ctx context.Context, key string) error {
return client.Del(ctx, key).Err()
}
func Keys(ctx context.Context, pattern string) ([]string, error) {
batch := int64(1000)
cursor := uint64(0)
keys := make([]string, 0)
for {
ks, nextCursor, err := client.Scan(ctx, cursor, pattern, batch).Result()
if err != nil {
return nil, err
}
keys = append(keys, ks...)
cursor = nextCursor
if cursor == 0 {
break
}
}
return keys, nil
}
func NewPipeline() redis.Pipeliner {
return client.Pipeline()
}

23
data/redis/string.go Normal file
View File

@ -0,0 +1,23 @@
package redis
import (
"context"
"time"
"github.com/redis/go-redis/v9"
)
func Get(ctx context.Context, key string) (string, error) {
str, err := client.Get(ctx, key).Result()
if err == redis.Nil {
return "", nil
} else if err != nil {
return "", err
}
return str, nil
}
func Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return client.Set(ctx, key, value, expiration).Err()
}

39
data/redis/zset.go Normal file
View File

@ -0,0 +1,39 @@
package redis
import (
"context"
"github.com/redis/go-redis/v9"
)
func ZRangeWithScores(ctx context.Context, key string, start int64, stop int64) ([]redis.Z, error) {
zset, err := client.ZRangeWithScores(ctx, key, start, stop).Result()
if err == redis.Nil {
return nil, nil
} else if err != nil {
return nil, err
}
return zset, nil
}
func ZAdd(ctx context.Context, key string, members []redis.Z) error {
return client.ZAdd(ctx, key, members...).Err()
}
func ZAtomicReplace(ctx context.Context, key string, members []redis.Z) error {
script := `
redis.call('DEL', KEYS[1])
for i = 1, #ARGV, 2 do
redis.call('ZADD', KEYS[1], ARGV[i], ARGV[i+1])
end
return true
`
var args []any
for _, z := range members {
args = append(args, z.Score, z.Member)
}
return client.Eval(ctx, script, []string{key}, args...).Err()
}

125
data/update_phasor.go Normal file
View File

@ -0,0 +1,125 @@
package data
import (
"context"
"datart/config"
"datart/data/influx"
"datart/data/postgres"
"datart/data/redis"
"datart/log"
"strings"
"time"
)
const (
updatePhasorDuration time.Duration = 5 * time.Second
)
func updatingRedisPhasor(ctx context.Context) {
ssuType := config.Conf().ServerConf().GetSSUType()
ssuChans := make(map[string]chan zUnit, len(ssuType))
for ssu := range ssuType {
ssuChans[ssu] = make(chan zUnit, 32)
}
go queringSSUInfluxPhasor(ctx, ssuChans)
go updatingSSURedisZUnit(ctx, ssuChans)
}
func queringSSUInfluxPhasor(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
timer := time.Tick(updatePhasorDuration)
for {
select {
case <-timer:
channelSizes := postgres.GetSSU2ChannelSizesFor(ssu)
for _, channelSize := range channelSizes {
sendSSUZUnit(ctx, channelSize, ssuChans[ssu])
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit) {
ssuType := config.Conf().ServerConf().GetSSUType()
for ssu := range ssuType {
go func(ssu string) {
for {
select {
case unit := <-ssuChans[ssu]:
if err := updateZUnitToRedis(ctx, unit); err != nil {
log.Error(err)
}
case <-ctx.Done():
return
}
}
}(ssu)
}
}
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
fields := GenPhasorFields(channelSize.Channel)
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
fields, channelSize.Size)
if err != nil {
log.Error(err)
}
// if len(f2tvs) <= 0 {
// log.Debug(channelSize.Station, " ", channelSize.Device, " ",
// fields, " query none of ", channelSize.Size)
// }
for f, tvs := range f2tvs {
sdf := strings.Split(f, ".")
if len(sdf) != 3 {
log.Error("invalid influx field")
return
}
key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2])
members := convertTVsToMenmbers(tvs)
ssuChan <- zUnit{
Key: key,
Members: members,
}
}
}
func queryInfluxPhasor(ctx context.Context, station string, device string,
fileds []string, size int) (map[string][]influx.TV, error) {
bucket, err := influx.GetDB("phasor")
if err != nil {
return nil, err
}
measure, err := influx.GetTable("phasor", device)
if err != nil {
return nil, err
}
req := &influx.Request{
DB: bucket,
Table: measure,
Type: "phasor",
Station: station,
MainPos: device,
SubPos: strings.Join(fileds, ","),
}
return influx.GetSSUPointsLastLimit(ctx, req, size)
}
func updateZUnitToRedis(ctx context.Context, unit zUnit) error {
return redis.ZAtomicReplace(ctx, unit.Key, unit.Members)
}
func genRedisPhasorKey(station, device, field string) string {
return strings.Join([]string{station, device, "phasor", field}, ":")
}

65
go.mod Normal file
View File

@ -0,0 +1,65 @@
module datart
go 1.24.0
require (
github.com/gin-gonic/gin v1.11.0
github.com/google/uuid v1.6.0
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0
github.com/redis/go-redis/v9 v9.14.0
go.mongodb.org/mongo-driver/v2 v2.3.0
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/driver/postgres v1.6.0
gorm.io/gorm v1.31.0
)
require (
github.com/Azure/go-amqp v1.5.0 // indirect
github.com/bytedance/sonic v1.14.0 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/gin-contrib/sse v1.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.27.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.54.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.3.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/arch v0.20.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/tools v0.34.0 // indirect
google.golang.org/protobuf v1.36.9 // indirect
)

183
go.sum Normal file
View File

@ -0,0 +1,183 @@
github.com/Azure/go-amqp v1.5.0 h1:GRiQK1VhrNFbyx5VlmI6BsA1FCp27W5rb9kxOZScnTo=
github.com/Azure/go-amqp v1.5.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w=
github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM=
github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk=
github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8=
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus=
github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8=
github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY=
github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0 h1:q0zPF8/7Bdm+XwjWevFynB8fNiuE65x4q2vmFxU2cjM=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0/go.mod h1:t5oaK/4mJjw9dNpDzwvH6bE7p9XtM1JyObEHszFu3lU=
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA=
github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver/v2 v2.3.0 h1:sh55yOXA2vUjW1QYw/2tRlHSQViwDyPnW61AwpZ4rtU=
go.mongodb.org/mongo-driver/v2 v2.3.0/go.mod h1:jHeEDJHJq7tm6ZF45Issun9dbogjfnPySb1vXA7EeAI=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4=
gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo=
gorm.io/gorm v1.31.0 h1:0VlycGreVhK7RF/Bwt51Fk8v0xLiiiFdbGDPIZQ7mJY=
gorm.io/gorm v1.31.0/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=

92
log/log.go Normal file
View File

@ -0,0 +1,92 @@
package log
import (
"datart/config"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var log *zap.SugaredLogger
func init() {
logConfig := zap.NewProductionEncoderConfig()
logConfig.EncodeTime = zapcore.ISO8601TimeEncoder
fileEncoder := zapcore.NewJSONEncoder(logConfig)
core := zapcore.NewCore(
fileEncoder,
zapcore.AddSync(config.Conf().LogConf()), // conf is not null
// DebugLevel, -1, logs are typically voluminous, and are usually disabled in
// production.
// InfoLevel, 0, is the default logging priority.
// WarnLevel, 1, logs are more important than Info, but don't need individual
// human review.
// ErrorLevel, 2, logs are high-priority. If an application is running smoothly,
// it shouldn't generate any error-level logs.
// PanicLevel, 3, logs are particularly important errors. In development the
// logger panics after writing the message.
// PanicLevel, 4, logs a message, then panics.
// FatalLevel, 5, logs a message, then calls os.Exit(1).
config.Conf().LogConf().GetLogLevel(),
)
log = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1)).Sugar()
}
func Sync() {
log.Sync()
}
func Log(lvl zapcore.Level, args ...interface{}) {
log.Log(lvl, args...)
}
func Debug(args ...interface{}) {
log.Debug(args...)
}
func Info(args ...interface{}) {
log.Info(args...)
}
func Warn(args ...interface{}) {
log.Warn(args...)
}
func Error(args ...interface{}) {
log.Error(args...)
}
func Panic(args ...interface{}) {
log.Panic(args...)
}
func Logf(lvl zapcore.Level, template string, args ...interface{}) {
log.Logf(lvl, template, args...)
}
func Debugf(template string, args ...interface{}) {
log.Debugf(template, args...)
}
func Infof(template string, args ...interface{}) {
log.Infof(template, args...)
}
func Warnf(template string, args ...interface{}) {
log.Warnf(template, args...)
}
func Errorf(template string, args ...interface{}) {
log.Errorf(template, args...)
}
func Panicf(template string, args ...interface{}) {
log.Panicf(template, args...)
}

44
main.go Normal file
View File

@ -0,0 +1,44 @@
package main
import (
"context"
"datart/config"
"datart/data"
"datart/route"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/gin-gonic/gin"
)
func main() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
// gin.SetMode(gin.ReleaseMode)
engine := gin.Default()
// engine := gin.New()
route.LoadRoute(engine)
processes := data.NewProcesses()
processes.StartDataProcessing()
go func() {
port := strconv.Itoa(config.Conf().ServerConf().GetPort())
if err := engine.Run(":" + port); err != nil {
panic(err)
}
}()
<-signalChan
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
processes.Cancel(ctx)
}

3
route/admin/admin.go Normal file
View File

@ -0,0 +1,3 @@
package admin
type Admin struct{}

57
route/admin/command.go Normal file
View File

@ -0,0 +1,57 @@
package admin
import (
"datart/data/postgres"
"datart/log"
"errors"
"fmt"
"github.com/gin-gonic/gin"
)
type command struct {
Command string `json:"command"`
Timeout int64 `json:"timeout"`
Args []any `json:"args"`
}
func (a *Admin) PostExecuteCommand(ctx *gin.Context) {
req, err := a.checkAndGenExecuteCommandRequest(ctx)
if err != nil {
log.Error(err)
ctx.JSON(200, gin.H{
"code": 1,
"msg": err.Error(),
})
return
}
err = postgres.GenSSU2ChannelSizes(ctx.Request.Context(), 500)
if err != nil {
log.Error(err, fmt.Sprintf(" params: %v", req))
ctx.JSON(200, gin.H{
"code": 2,
"msg": err.Error(),
})
return
}
ctx.JSON(200, gin.H{
"code": 0,
"msg": "success",
})
}
func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, error) {
req := new(command)
err := ctx.ShouldBindJSON(req)
if err != nil {
return req, errors.New("invalid body param")
}
if req.Command != "GenSSU2ChannelSizes" {
return nil, errors.New("invalid command")
}
return req, nil
}

91
route/api/alarm.go Normal file
View File

@ -0,0 +1,91 @@
package api
import (
"context"
"datart/data"
"datart/data/mongo"
"datart/log"
"errors"
"fmt"
"regexp"
"github.com/gin-gonic/gin"
)
func (a *Api) PostUploadAlarm(ctx *gin.Context) {
alarm, ip, err := a.checkAndGenUploadAlarmRequest(ctx)
if err != nil {
log.Error(err)
ctx.JSON(200, gin.H{
"code": 1,
"msg": "invalid param",
})
return
}
event, err := alarm.ConvertToEvent(ip)
if err != nil {
log.Error(err, fmt.Sprintf(" params: %v", alarm))
ctx.JSON(200, gin.H{
"code": 2,
"msg": "convert error",
})
return
}
err = mongo.InsertOneEvent(ctx.Request.Context(), event)
if err != nil {
log.Error(err, fmt.Sprintf(" params: %v", event))
ctx.JSON(200, gin.H{
"code": 3,
"msg": "insert error",
})
return
}
go func(e *mongo.Event) {
if err := data.PublishEvent(context.Background(), e); err != nil {
log.Error(err, fmt.Sprintf(" params: %v", e))
}
}(event)
ctx.JSON(200, gin.H{
"code": 0,
"msg": "success",
})
}
func (a *Api) checkAndGenUploadAlarmRequest(ctx *gin.Context) (*mongo.Alarm, string, error) {
alarm := new(mongo.Alarm)
err := ctx.ShouldBindJSON(alarm)
if err != nil {
return nil, "", err
}
ok, err := regexp.MatchString(`ssu\d{3}`, alarm.DeviceNo)
if err != nil {
return nil, "", err
}
if !ok {
return nil, "", errors.New("invalid device_no")
}
if alarm.AlarmCode < 1 || alarm.AlarmCode > 10 {
return nil, "", errors.New("invalid alarm_code")
}
if alarm.AlarmStatus < 0 || alarm.AlarmStatus > 1 {
return nil, "", errors.New("invalid alarm_status")
}
return alarm, ctx.RemoteIP(), nil
}

3
route/api/api.go Normal file
View File

@ -0,0 +1,3 @@
package api
type Api struct{}

287
route/api/event.go Normal file
View File

@ -0,0 +1,287 @@
package api
import (
"context"
"datart/data"
"datart/data/mongo"
"datart/log"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"go.mongodb.org/mongo-driver/v2/bson"
)
const (
pageSizeLimit = 500
)
func (a *Api) GetEvents(ctx *gin.Context) {
filter, sort, pageNo, pageSize, err := a.checkAndGenGetEventsRequest(ctx)
if err != nil {
log.Error(err)
ctx.JSON(200, gin.H{
"code": 1,
"msg": err.Error(),
})
return
}
events, err := mongo.FindEventsWithPageLimit(ctx.Request.Context(), filter, sort, pageNo, pageSize)
if err != nil {
log.Error(err, fmt.Sprintf(" params: %v, %d, %d, %d", filter, sort, pageNo, pageSize))
ctx.JSON(200, gin.H{
"code": 2,
"msg": err.Error(),
})
return
}
ctx.JSON(200, gin.H{
"code": 0,
"msg": "success",
"data": events,
})
}
func (a *Api) PostUpsertEvents(ctx *gin.Context) {
uuids, update, events, err := a.checkAndGenUpsertEventsRequest(ctx)
if err != nil {
log.Error(err)
ctx.JSON(200, gin.H{
"code": 1,
"msg": err.Error(),
})
return
}
if len(uuids) > 0 {
operation := mongo.GenOperation(mongo.EventStatusAction[update.Status], ctx.RemoteIP())
err := mongo.UpdateEvents(ctx.Request.Context(),
bson.M{"event_uuid": bson.M{"$in": uuids}},
bson.M{"$set": bson.M{"status": update.Status}, "$push": bson.M{"operations": operation}})
if err != nil {
log.Error(err, fmt.Sprintf(" params:%v %v", update, uuids))
ctx.JSON(200, gin.H{
"code": 2,
"msg": err.Error(),
})
return
}
events = make([]map[string]any, len(uuids))
for i := range events {
events[i] = map[string]any{"event_uuid": uuids[i], "status": update.Status}
}
} else {
err := mongo.InsertEvents(ctx.Request.Context(), events)
if err != nil {
log.Error(err, fmt.Sprintf(" params: %v", events))
ctx.JSON(200, gin.H{
"code": 3,
"msg": err.Error(),
})
return
}
}
go func(evts []map[string]any) {
workers := 5
ch := make(chan map[string]any, len(evts))
var wg sync.WaitGroup
for range workers {
wg.Add(1)
go func() {
defer wg.Done()
for e := range ch {
if err := data.PublishEvent(context.Background(), e); err != nil {
log.Error(err, fmt.Sprintf("publish event failed: %v", e))
}
}
}()
}
for _, e := range evts {
ch <- e
}
close(ch)
wg.Wait()
}(events)
ctx.JSON(200, gin.H{
"code": 0,
"msg": "success",
})
}
func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, int64, error) {
uuidStr := ctx.Query("uuid")
if len(uuidStr) > 0 {
if uuid.Validate(uuidStr) != nil {
return nil, 0, -1, -1, errors.New("invalid uuid")
}
return bson.M{"event_uuid": uuidStr}, 0, -1, -1, nil
}
filter := bson.M{}
var err error
begin, end := int64(-1), int64(-1)
beginStr := ctx.Query("begin")
if len(beginStr) > 0 {
if begin, err = strconv.ParseInt(beginStr, 10, 64); err != nil {
return nil, 0, -1, -1, err
}
}
endStr := ctx.Query("end")
if len(endStr) > 0 {
if end, err = strconv.ParseInt(endStr, 10, 64); err != nil {
return nil, 0, -1, -1, err
}
}
if begin > 0 && end > 0 && begin > end {
return nil, 0, -1, -1, errors.New("invalid time")
}
switch {
case begin > 0 && end < 0:
filter["timestamp"] = bson.M{"$gte": begin}
case begin < 0 && end > 0:
filter["timestamp"] = bson.M{"$lte": end}
case begin > 0 && end > 0:
filter["timestamp"] = bson.M{"$gte": begin, "$lte": end}
}
statusStr := ctx.Query("status")
if len(statusStr) > 0 {
statusStrs := strings.Split(statusStr, ",")
statuss := make([]int, len(statusStrs))
for i := range statusStrs {
s, err := strconv.Atoi(statusStrs[i])
if err != nil {
return nil, 0, -1, -1, errors.New("invalid status")
}
statuss[i] = s
}
filter["status"] = bson.M{"$in": statuss}
}
var sort int
sortStr := ctx.Query("sort")
if len(sortStr) > 0 {
s, err := strconv.Atoi(sortStr)
if err != nil {
return nil, 0, -1, -1, err
}
if s != 1 && s != -1 {
return nil, 0, -1, -1, errors.New("invalid sort")
}
sort = s
}
pageNo, pageSize := -1, -1
pageNoStr := ctx.Query("page_no")
pageSizeStr := ctx.Query("page_size")
if len(pageNoStr) > 0 && len(pageSizeStr) > 0 {
pageNo, err = strconv.Atoi(pageNoStr)
if err != nil {
return nil, 0, -1, -1, err
}
pageSize, err = strconv.Atoi(pageSizeStr)
if err != nil {
return nil, 0, -1, -1, err
}
if pageNo <= 0 || pageSize <= 0 {
return nil, 0, -1, -1, errors.New("invalid page param")
}
if pageSize > pageSizeLimit {
return nil, 0, -1, -1, fmt.Errorf("too many events, max %d", pageSizeLimit)
}
}
return filter, sort, int64(pageNo), int64(pageSize), nil
}
func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) ([]string, *mongo.Event, []map[string]any, error) {
insert := true
update := &mongo.Event{}
statusStr := ctx.Query("status")
if len(statusStr) > 0 {
insert = false
status, err := strconv.Atoi(statusStr)
if err != nil {
return nil, nil, nil, err
}
update.Status = status
}
if !insert {
uuids := []string{}
err := ctx.ShouldBindJSON(&uuids)
if err != nil {
return nil, nil, nil, err
}
if len(uuids) == 0 {
return nil, nil, nil, errors.New("no uuid")
}
if len(uuids) > pageSizeLimit {
return nil, nil, nil, fmt.Errorf("too many uuids, max %d", pageSizeLimit)
}
return uuids, update, nil, nil
}
events := []map[string]any{}
err := ctx.ShouldBindJSON(&events)
if err != nil {
return nil, nil, nil, err
}
if len(events) == 0 {
return nil, nil, nil, errors.New("no event")
}
if len(events) > pageSizeLimit {
return nil, nil, nil, fmt.Errorf("too many events, max %d", pageSizeLimit)
}
return nil, nil, events, nil
}
func validateEventUpsert(event map[string]any) error {
noUUID := true
if eu, ok := event["event_uuid"]; ok {
if eID, ok := eu.(string); ok {
if uuid.Validate(eID) == nil {
noUUID = false
}
}
}
if noUUID {
if uid, err := uuid.NewV7(); err != nil {
return err
} else {
event["event_uuid"] = uid.String()
}
}
if len(event) < 2 {
return errors.New("invalid event")
}
return nil
}

135
route/api/point.go Normal file
View File

@ -0,0 +1,135 @@
package api
import (
"datart/data"
"datart/data/influx"
"datart/data/postgres"
"datart/log"
"datart/util"
"errors"
"fmt"
"strings"
"github.com/gin-gonic/gin"
)
func (a *Api) GetPoints(ctx *gin.Context) {
request, err := a.checkAndGenGetPointsRequest(ctx)
if err != nil {
log.Error(err)
ctx.JSON(200, gin.H{
"code": 1,
"msg": err.Error(),
})
return
}
var data map[string][]influx.TV
switch {
case request.Begin > 0 && request.End > 0:
data, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request)
case request.Begin > 0 && request.End < 0:
data, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1)
case request.Begin < 0 && request.End > 0:
data, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1)
default:
data, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1)
}
if err != nil {
log.Error(err, fmt.Sprintf(" params: %v", request))
ctx.JSON(200, gin.H{
"code": 2,
"msg": "query database error",
})
return
}
ctx.JSON(200, gin.H{
"code": 0,
"msg": "success",
"data": data,
})
}
func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, error) {
typeStr := ctx.DefaultQuery("type", "")
if len(typeStr) <= 0 {
return nil, errors.New("invalid type")
}
station, mainPos, subPos := "", "", ""
mtag := ctx.DefaultQuery("mtag", "")
v, ok := postgres.ChannelSizes.Load(mtag)
if ok {
if channelSize, ok := v.(postgres.ChannelSize); ok {
fields := data.GenPhasorFields(channelSize.Channel)
station = channelSize.Station
mainPos = channelSize.Device
subPos = strings.Join(fields, ",")
}
} else {
station = ctx.DefaultQuery("station", "")
if len(station) <= 0 {
return nil, errors.New("invalid station")
}
mainPos = ctx.DefaultQuery("main_pos", "")
if len(mainPos) <= 0 {
return nil, errors.New("invalid main_pos")
}
subPos = ctx.DefaultQuery("sub_pos", "")
if len(subPos) <= 0 {
return nil, errors.New("invalid sub_pos")
}
}
beginStr := ctx.DefaultQuery("begin", "")
endStr := ctx.DefaultQuery("end", "")
operate := ctx.DefaultQuery("operate", "")
step := ctx.DefaultQuery("step", "")
defaultV := ctx.DefaultQuery("default", "")
begin := util.ConvertToInt64Default(beginStr, -1)
end := util.ConvertToInt64Default(endStr, -1)
bucket, err := influx.GetDB(typeStr)
if err != nil {
return nil, err
}
measure, err := influx.GetTable(typeStr, mainPos)
if err != nil {
return nil, err
}
return &influx.Request{
DB: bucket,
Table: measure,
Type: typeStr,
Station: station,
MainPos: mainPos,
SubPos: subPos,
Begin: begin,
End: end,
Operate: operate,
Step: step,
Default: defaultV,
}, nil
}

23
route/route.go Normal file
View File

@ -0,0 +1,23 @@
package route
import (
"datart/route/admin"
"datart/route/api"
"github.com/gin-gonic/gin"
)
func LoadRoute(engine *gin.Engine) {
engine.Use(gin.Recovery())
a := new(api.Api)
ga := engine.Group("api")
ga.POST("/alarm", a.PostUploadAlarm)
ga.GET("/points", a.GetPoints)
ga.GET("/events", a.GetEvents)
ga.POST("/events", a.PostUpsertEvents)
d := new(admin.Admin)
gd := engine.Group("admin")
gd.POST("/command", d.PostExecuteCommand)
}

13
util/util.go Normal file
View File

@ -0,0 +1,13 @@
package util
import (
"strconv"
)
func ConvertToInt64Default(tsStr string, defaultTS int64) int64 {
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return defaultTS
}
return ts
}