diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..c92434d --- /dev/null +++ b/.drone.yml @@ -0,0 +1,12 @@ +kind: pipeline +type: docker +name: default + +steps: +- name: build + image: golang:latest + environment: + GO111MODULE: on + GOPROXY: https://goproxy.cn,direct + commands: + - go build \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3a45833 --- /dev/null +++ b/Makefile @@ -0,0 +1,53 @@ +# set package name and path +package_name = datart + +# set package path +package_path = . + +# ==================================================================================== # +# DEFAULT +# ==================================================================================== # + +default: help + +.PHONY: help +help: ## list makefile function + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' + +# ==================================================================================== # +# QUALITY +# ==================================================================================== # + +.PHONY: tidy +tidy: ## tidy modfiles and format .go files + @go mod tidy -v + @go fmt ./... + +.PHONY: test +test: ## run all tests + go test -v -race -buildvcs ./... + +.PHONY: cover +cover: ## run all tests and display coverage + go test -v -race -buildvcs -coverprofile=${package_path}/coverage.out ./... + go tool cover -func=coverage.out | sort -rnk3 + +# ==================================================================================== # +# DEVELOPMENT +# ==================================================================================== # + +.PHONY: build +build: ## build the application + @go mod download + @go build -ldflags "-X main.version=$(shell git describe --abbrev=0 --tags 2>/dev/null || echo 0.0.0)" -o=${package_path}/${package_name} + +.PHONY: run +run: build ## run the application after build + @chmod +x ${package_path}/${package_name} + ${package_path}/${package_name} + +.PHONY: clean +clean: ## clean up environment + @go clean + @rm -f ${package_path}/${package_name} + @rm -f ${package_path}/coverage.out diff --git a/README.md b/README.md new file mode 100644 index 0000000..6223331 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# dataRT + +[![Build Status](http://192.168.46.100:4080/api/badges/CL-Softwares/dataRT/status.svg)](http://192.168.46.100:4080/CL-Softwares/dataRT) \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..aa825fe --- /dev/null +++ b/config/config.go @@ -0,0 +1,116 @@ +package config + +import ( + _ "embed" + "encoding/json" +) + +type config struct { + serverConf *serverConfig + logConf *logConfig + postgresConf map[string]*postgresConfig + influxConf map[string]*influxConfig + redisConf map[string]*redisConfig + modelrtConf map[string]*modelrtConfig +} + +//go:embed server.json +var serverjson string + +//go:embed log.json +var logjson string + +//go:embed postgres.json +var postgresjson string + +//go:embed influx.json +var influxjson string + +//go:embed redis.json +var redisjson string + +//go:embed modelrt.json +var modelrtjson string + +var conf *config + +func init() { + conf = new(config) + conf.serverConf = new(serverConfig) + conf.logConf = new(logConfig) + conf.postgresConf = make(map[string]*postgresConfig) + conf.influxConf = make(map[string]*influxConfig) + conf.redisConf = make(map[string]*redisConfig) + conf.modelrtConf = make(map[string]*modelrtConfig) + + err := json.Unmarshal([]byte(serverjson), conf.serverConf) + if err != nil { + panic(err.Error()) + } + err = json.Unmarshal([]byte(logjson), conf.logConf) + if err != nil { + panic(err.Error()) + } + err = json.Unmarshal([]byte(postgresjson), &conf.postgresConf) + if err != nil { + panic(err.Error()) + } + err = json.Unmarshal([]byte(influxjson), &conf.influxConf) + if err != nil { + panic(err.Error()) + } + err = json.Unmarshal([]byte(redisjson), &conf.redisConf) + if err != nil { + panic(err.Error()) + } + err = json.Unmarshal([]byte(modelrtjson), &conf.modelrtConf) + if err != nil { + panic(err.Error()) + } +} + +func Conf() *config { + return conf +} + +func (c *config) ServerConf() *serverConfig { + if c == nil { + panic("config is nil") + } + return c.serverConf +} + +func (c *config) LogConf() *logConfig { + if c == nil { + panic("config is nil") + } + return c.logConf +} + +func (c *config) PostgresConf(tag string) *postgresConfig { + if c == nil || c.postgresConf == nil { + panic("postgres config is nil") + } + return c.postgresConf[tag] +} + +func (c *config) InfluxConf(tag string) *influxConfig { + if c == nil || c.influxConf == nil { + panic("influx config is nil") + } + return c.influxConf[tag] +} + +func (c *config) RedisConf(tag string) *redisConfig { + if c == nil || c.redisConf == nil { + panic("redis config is nil") + } + return c.redisConf[tag] +} + +func (c *config) ModelRTConf(tag string) *modelrtConfig { + if c == nil { + panic("modelrt config is nil") + } + return c.modelrtConf[tag] +} diff --git a/config/influx.go b/config/influx.go new file mode 100644 index 0000000..eee52ea --- /dev/null +++ b/config/influx.go @@ -0,0 +1,64 @@ +package config + +type influxConfig struct { + URL string `json:"url" yaml:"url"` + Token string `json:"token" yaml:"token"` + Org string `json:"org" yaml:"org"` + Timeout int `json:"timeoutms" yaml:"timeoutms"` +} + +func (config *influxConfig) GetURL() string { + if config == nil { + panic("influx config is nil") + } + return config.URL +} + +func (config *influxConfig) SetURL(url string) { + if config == nil { + panic("influx config is nil") + } + config.URL = url +} + +func (config *influxConfig) GetToken() string { + if config == nil { + panic("influx config is nil") + } + return config.Token +} + +func (config *influxConfig) SetToken(token string) { + if config == nil { + panic("influx config is nil") + } + config.Token = token +} + +func (config *influxConfig) GetOrg() string { + if config == nil { + panic("influx config is nil") + } + return config.Org +} + +func (config *influxConfig) SetOrg(org string) { + if config == nil { + panic("influx config is nil") + } + config.Org = org +} + +func (config *influxConfig) GetTimeout() int { + if config == nil { + panic("influx config is nil") + } + return config.Timeout +} + +func (config *influxConfig) SetTimeout(timeout int) { + if config == nil { + panic("influx config is nil") + } + config.Timeout = timeout +} diff --git a/config/influx.json b/config/influx.json new file mode 100644 index 0000000..cfbdaa5 --- /dev/null +++ b/config/influx.json @@ -0,0 +1,8 @@ +{ + "demo": { + "url": "http://192.168.46.100:8086", + "token": "kKtMhMj5ISrnrCAO1ugvL4D4c_HrbAv4HzSHGA3Ai1AeBIEmGbQpQY0qSwjaXYluOmuDAv0zAvFCRRqEWQ0zJw==", + "org": "eCL3000", + "timeout":1000 + } +} \ No newline at end of file diff --git a/config/log.go b/config/log.go new file mode 100644 index 0000000..4be8bda --- /dev/null +++ b/config/log.go @@ -0,0 +1,109 @@ +package config + +import ( + "go.uber.org/zap/zapcore" + "gopkg.in/natefinch/lumberjack.v2" +) + +type logConfig struct { + lumberjack.Logger + LogLevel int8 `json:"loglevel" yaml:"loglevel"` +} + +func (config *logConfig) GetFileName() string { + if config == nil { + panic("log config is nil") + } + return config.Filename +} + +func (config *logConfig) SetFileName(fileName string) { + if config == nil { + panic("log config is nil") + } + config.Filename = fileName +} + +func (config *logConfig) GetMaxSize() int { + if config == nil { + panic("log config is nil") + } + return config.MaxSize +} + +func (config *logConfig) SetMaxSize(maxSize int) { + if config == nil { + panic("log config is nil") + } + config.MaxSize = maxSize +} + +func (config *logConfig) GetMaxAge() int { + if config == nil { + panic("log config is nil") + } + return config.MaxAge +} + +func (config *logConfig) SetMaxAge(maxAge int) { + if config == nil { + panic("log config is nil") + } + config.MaxAge = maxAge +} + +func (config *logConfig) GetMaxBackups() int { + if config == nil { + panic("log config is nil") + } + return config.MaxBackups +} + +func (config *logConfig) SetMaxBackups(maxBackups int) { + if config == nil { + panic("log config is nil") + } + config.MaxBackups = maxBackups +} + +func (config *logConfig) GetLocalTime() bool { + if config == nil { + panic("log config is nil") + } + return config.LocalTime +} + +func (config *logConfig) SetLocalTime(useLocalTime bool) { + if config == nil { + panic("log config is nil") + } + config.LocalTime = useLocalTime +} + +func (config *logConfig) GetCompress() bool { + if config == nil { + panic("log config is nil") + } + return config.Compress +} + +func (config *logConfig) SetCompress(doCompress bool) { + if config == nil { + panic("log config is nil") + } + config.Compress = doCompress +} + +func (config *logConfig) GetLogLevel() zapcore.Level { + if config == nil { + panic("log config is nil") + } + return zapcore.Level(config.LogLevel) +} + +func (config *logConfig) SetLogLevel(logLevel zapcore.Level) { + if config == nil { + panic("log config is nil") + } + config.LogLevel = int8(logLevel) +} diff --git a/config/log.json b/config/log.json new file mode 100644 index 0000000..92fbf19 --- /dev/null +++ b/config/log.json @@ -0,0 +1,9 @@ +{ + "filename": "./logs/datart.log", + "maxsize": 100, + "maxage": 7, + "maxbackups": 20, + "localtime": true, + "compress": false, + "loglevel": -1 +} \ No newline at end of file diff --git a/config/modelrt.go b/config/modelrt.go new file mode 100644 index 0000000..6b1bcbe --- /dev/null +++ b/config/modelrt.go @@ -0,0 +1,49 @@ +package config + +type modelrtConfig struct { + Scheme string `json:"scheme" yaml:"scheme"` + Host string `json:"host" yaml:"host"` // host or host:port + Path string `json:"path" yaml:"path"` +} + +func (config *modelrtConfig) GetScheme() string { + if config == nil { + panic("modelrt config is nil") + } + return config.Scheme +} + +func (config *modelrtConfig) SetScheme(scheme string) { + if config == nil { + panic("modelrt config is nil") + } + config.Scheme = scheme +} + +func (config *modelrtConfig) GetHost() string { + if config == nil { + panic("modelrt config is nil") + } + return config.Host +} + +func (config *modelrtConfig) SetHost(host string) { + if config == nil { + panic("modelrt config is nil") + } + config.Host = host +} + +func (config *modelrtConfig) GetPath() string { + if config == nil { + panic("modelrt config is nil") + } + return config.Path +} + +func (config *modelrtConfig) SetPath(path string) { + if config == nil { + panic("modelrt config is nil") + } + config.Path = path +} diff --git a/config/modelrt.json b/config/modelrt.json new file mode 100644 index 0000000..5f1c466 --- /dev/null +++ b/config/modelrt.json @@ -0,0 +1,7 @@ +{ + "demo":{ + "scheme":"ws", + "host":"192.168.46.100:8080", + "path":"/ws/rtdatas" + } +} \ No newline at end of file diff --git a/config/postgres.go b/config/postgres.go new file mode 100644 index 0000000..e737eb0 --- /dev/null +++ b/config/postgres.go @@ -0,0 +1,109 @@ +package config + +type postgresConfig struct { + Host string `json:"host" yaml:"host"` + Port int `json:"port" yaml:"port"` + User string `json:"user" yaml:"user"` + Password string `json:"password" yaml:"password"` + DBName string `json:"dbname" yaml:"dbname"` + SSLMode string `json:"sslmode" yaml:"sslmode"` + TimeZone string `json:"timezone" yaml:"timezone"` +} + +func (config *postgresConfig) GetHost() string { + if config == nil { + panic("postgres config is nil") + } + return config.Host +} + +func (config *postgresConfig) SetHost(host string) { + if config == nil { + panic("postgres config is nil") + } + config.Host = host +} + +func (config *postgresConfig) GetPort() int { + if config == nil { + panic("postgres config is nil") + } + return config.Port +} + +func (config *postgresConfig) SetPort(port int) { + if config == nil { + panic("postgres config is nil") + } + config.Port = port +} + +func (config *postgresConfig) GetUser() string { + if config == nil { + panic("postgres config is nil") + } + return config.User +} + +func (config *postgresConfig) SetUser(user string) { + if config == nil { + panic("postgres config is nil") + } + config.User = user +} + +func (config *postgresConfig) GetPassword() string { + if config == nil { + panic("postgres config is nil") + } + return config.Password +} + +func (config *postgresConfig) SetPassword(password string) { + if config == nil { + panic("postgres config is nil") + } + config.Password = password +} + +func (config *postgresConfig) GetDBName() string { + if config == nil { + panic("postgres config is nil") + } + return config.DBName +} + +func (config *postgresConfig) SetDBName(dbName string) { + if config == nil { + panic("postgres config is nil") + } + config.DBName = dbName +} + +func (config *postgresConfig) GetSSLMode() string { + if config == nil { + panic("postgres config is nil") + } + return config.SSLMode +} + +func (config *postgresConfig) SetSSLMode(sslMode string) { + if config == nil { + panic("postgres config is nil") + } + config.SSLMode = sslMode +} + +func (config *postgresConfig) GetTimeZone() string { + if config == nil { + panic("postgres config is nil") + } + return config.TimeZone +} + +func (config *postgresConfig) SetTimeZone(timeZone string) { + if config == nil { + panic("postgres config is nil") + } + config.TimeZone = timeZone +} diff --git a/config/postgres.json b/config/postgres.json new file mode 100644 index 0000000..cb2a85c --- /dev/null +++ b/config/postgres.json @@ -0,0 +1,11 @@ +{ + "demo":{ + "host":"192.168.46.100", + "port":9432, + "user":"postgres", + "password":"123RTYjkl", + "dbname":"metamodule", + "sslmode":"disable", + "timezone":"Asia/Shanghai" + } +} \ No newline at end of file diff --git a/config/redis.go b/config/redis.go new file mode 100644 index 0000000..90fcb9b --- /dev/null +++ b/config/redis.go @@ -0,0 +1,139 @@ +package config + +type redisConfig struct { + Addr string `json:"addr" yaml:"addr"` + Username string `json:"username" yaml:"username"` + Password string `json:"password" yaml:"password"` + DB int `json:"db" yaml:"db"` + RESP int `json:"resp" yaml:"resp"` + DialTimeout int `json:"dialtimeout" yaml:"dialtimeout"` + ReadTimeout int `json:"readtimeout" yaml:"readtimeout"` + WriteTimeout int `json:"writetimeout" yaml:"writetimeout"` + PoolSize int `json:"poolsize" yaml:"poolsize"` +} + +func (config *redisConfig) GetAddr() string { + if config == nil { + panic("redis config is nil") + } + return config.Addr +} + +func (config *redisConfig) SetAddr(addr string) { + if config == nil { + panic("redis config is nil") + } + config.Addr = addr +} + +func (config *redisConfig) GetUsername() string { + if config == nil { + panic("redis config is nil") + } + return config.Username +} + +func (config *redisConfig) SetUsername(username string) { + if config == nil { + panic("redis config is nil") + } + config.Username = username +} + +func (config *redisConfig) GetPassword() string { + if config == nil { + panic("redis config is nil") + } + return config.Password +} + +func (config *redisConfig) SetPassword(password string) { + if config == nil { + panic("redis config is nil") + } + config.Password = password +} + +func (config *redisConfig) GetDB() int { + if config == nil { + panic("redis config is nil") + } + return config.DB +} + +func (config *redisConfig) SetDB(db int) { + if config == nil { + panic("redis config is nil") + } + config.DB = db +} + +func (config *redisConfig) GetRESP() int { + if config == nil { + panic("redis config is nil") + } + return config.RESP +} + +func (config *redisConfig) SetRESP(resp int) { + if config == nil { + panic("redis config is nil") + } + config.RESP = resp +} + +func (config *redisConfig) GetDialTimeout() int { + if config == nil { + panic("redis config is nil") + } + return config.DialTimeout +} + +func (config *redisConfig) SetDialTimeout(dialTimeout int) { + if config == nil { + panic("redis config is nil") + } + config.DialTimeout = dialTimeout +} + +func (config *redisConfig) GetReadTimeout() int { + if config == nil { + panic("redis config is nil") + } + return config.ReadTimeout +} + +func (config *redisConfig) SetReadTimeout(readTimeout int) { + if config == nil { + panic("redis config is nil") + } + config.ReadTimeout = readTimeout +} + +func (config *redisConfig) GetWriteTimeout() int { + if config == nil { + panic("redis config is nil") + } + return config.WriteTimeout +} + +func (config *redisConfig) SetWriteTimeout(writeTimeout int) { + if config == nil { + panic("redis config is nil") + } + config.WriteTimeout = writeTimeout +} + +func (config *redisConfig) GetPoolSize() int { + if config == nil { + panic("redis config is nil") + } + return config.PoolSize +} + +func (config *redisConfig) SetPoolSIze(poolSize int) { + if config == nil { + panic("redis config is nil") + } + config.PoolSize = poolSize +} diff --git a/config/redis.json b/config/redis.json new file mode 100644 index 0000000..65f64ba --- /dev/null +++ b/config/redis.json @@ -0,0 +1,13 @@ +{ + "demo":{ + "addr":"192.168.46.100:6379", + "username":"", + "password":"", + "db":0, + "resp":3, + "dialtimeout":50, + "readtimeout":200, + "writetimeout":200, + "poolsize":20 + } +} \ No newline at end of file diff --git a/config/server.go b/config/server.go new file mode 100644 index 0000000..dc3c14e --- /dev/null +++ b/config/server.go @@ -0,0 +1,79 @@ +package config + +type serverConfig struct { + Host string `json:"host" yaml:"host"` + Port int `json:"port" yaml:"port"` + Grid int `json:"grid" yaml:"grid"` // TODO no use + Zone int `json:"zone" yaml:"zone"` // TODO no use + Station int `json:"station" yaml:"station"` // TODO no use +} + +func (config *serverConfig) GetHost() string { + if config == nil { + panic("server config is nil") + } + return config.Host +} + +func (config *serverConfig) SetHost(host string) { + if config == nil { + panic("server config is nil") + } + config.Host = host +} + +func (config *serverConfig) GetPort() int { + if config == nil { + panic("server config is nil") + } + return config.Port +} + +func (config *serverConfig) SetPort(port int) { + if config == nil { + panic("server config is nil") + } + config.Port = port +} + +func (config *serverConfig) GetGrid() int { + if config == nil { + panic("server config is nil") + } + return config.Grid +} + +func (config *serverConfig) SetGrid(grid int) { + if config == nil { + panic("server config is nil") + } + config.Grid = grid +} + +func (config *serverConfig) GetZone() int { + if config == nil { + panic("server config is nil") + } + return config.Zone +} + +func (config *serverConfig) SetZone(zone int) { + if config == nil { + panic("server config is nil") + } + config.Zone = zone +} + +func (config *serverConfig) GetStation() int { + if config == nil { + panic("server config is nil") + } + return config.Station +} + +func (config *serverConfig) SetStation(station int) { + if config == nil { + panic("server config is nil") + } + config.Station = station +} diff --git a/config/server.json b/config/server.json new file mode 100644 index 0000000..62326e3 --- /dev/null +++ b/config/server.json @@ -0,0 +1,7 @@ +{ + "host":"", + "port":8888, + "grid":1, + "zone":1, + "station":1 +} \ No newline at end of file diff --git a/data/data.go b/data/data.go new file mode 100644 index 0000000..c2b3b46 --- /dev/null +++ b/data/data.go @@ -0,0 +1,7 @@ +package data + +// 读取基础数据 + +// 将客户端请求参数通过缓存转换为内部请求参数 + +// 读取时序数据推送给图模库运行时服务 diff --git a/data/influx/common.go b/data/influx/common.go new file mode 100644 index 0000000..b57b7e7 --- /dev/null +++ b/data/influx/common.go @@ -0,0 +1,153 @@ +package influx + +import ( + "context" + "encoding/csv" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" +) + +// for influx data, one measurement +type jsonResp struct { + Results []*result `json:"results"` +} + +type result struct { + StatementID int `json:"statement_id"` + Series []*fields `json:"series"` +} + +type fields struct { + Name string `json:"name"` + Column []string `json:"column"` + Values [][]interface{} `json:"values"` +} + +// respType json/csv +// json_time:"2024-12-18T08:12:21.4735154Z" +// csv_time:"1734572793695885000" +func (client *influxClient) getRespData(ctx context.Context, reqData url.Values, + respType string) ([]*TV, error) { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, + client.url+"/query?"+reqData.Encode(), nil) + if err != nil { + return nil, err + } + + request.Header.Set("Content-Type", "application/json") + request.Header.Set("Authorization", "Token "+client.token) + if respType == "csv" { + request.Header.Set("Accept", "application/csv") + } + + response, err := client.Do(request) + if err != nil { + return nil, err + } + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode) + } + + respData, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + switch respType { + case "json": + resp := new(jsonResp) + err = json.Unmarshal(respData, resp) + if err != nil { + return nil, err + } + if len(resp.Results) > 0 && + len(resp.Results[0].Series) > 0 { + return client.turnJsonRespDataToTVs(resp.Results[0].Series[0].Values) + } + case "csv": + rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll() + if err != nil { + return nil, err + } + if len(rows) > 1 { + return client.turnCsvRespDataToTVs(rows[1:]) + } + default: + return nil, errors.New("unsupported response type") + } + + return nil, errors.New("response has no data") +} + +func (client *influxClient) turnJsonRespDataToTVs(data [][]interface{}) ([]*TV, error) { + ret := make([]*TV, 0, len(data)) + + for _, row := range data { + if len(row) > 1 { + ret = append(ret, &TV{ + Time: row[0], + Value: row[1], + }) + } + } + + return ret, nil +} + +func (client *influxClient) turnCsvRespDataToTVs(data [][]string) ([]*TV, error) { + ret := make([]*TV, 0, len(data)) + + for _, row := range data { + if len(row) > 3 { + tv := &TV{} + if ns, err := strconv.ParseInt(row[2], 10, 64); err == nil { + tv.Time = ns + } else { + return nil, err + } + if v, err := strconv.ParseFloat(row[3], 64); err == nil { + tv.Value = v + } else { + return nil, err + } + ret = append(ret, tv) + } + } + + return ret, nil +} + +// line protocol, better to gzip and sort tags by key in lexicographic order +func (client *influxClient) setLineData(db, line string, gzip bool) error { + request, err := http.NewRequest(http.MethodPost, + client.url+"/write?db="+db, strings.NewReader(line)) + if err != nil { + return err + } + + request.Header.Set("Content-Type", "text/plain") + request.Header.Set("Authorization", "Token "+client.token) + if gzip { + request.Header.Set("Content-Encoding", "gzip") + } + + response, err := client.Do(request) + if err != nil { + return err + } + defer response.Body.Close() + + if response.StatusCode != http.StatusNoContent { + return fmt.Errorf("unexpected status code: %d", response.StatusCode) + } + + return nil +} diff --git a/data/influx/influx.go b/data/influx/influx.go new file mode 100644 index 0000000..4092d5d --- /dev/null +++ b/data/influx/influx.go @@ -0,0 +1,82 @@ +package influx + +import ( + "context" + "datart/config" + "net" + "net/http" + "time" +) + +type influxClient struct { + *http.Client + url string + token string + org string +} + +type Request struct { + RespType string + Database string + Measure string + Station string + Device string + Field string + Begin int64 + End int64 + Operate string + Step string + Default string +} + +var client *influxClient + +func init() { + client = new(influxClient) + + influxConfig := config.Conf().InfluxConf("demo") + client.Client = &http.Client{ + Timeout: time.Duration(influxConfig.GetTimeout()) * time.Millisecond, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + }, + } + + client.url = influxConfig.GetURL() + client.token = influxConfig.GetToken() + client.org = influxConfig.GetOrg() +} + +func SetLinesData(db string, lines []string) error { + return client.setLinesData(db, lines) +} + +type TV struct { + Time interface{} `json:"time"` + Value interface{} `json:"value"` +} + +func GetLast(ctx context.Context, req *Request) ([]*TV, error) { + return client.getLast(ctx, req, 1) +} + +func GetLastLimit(ctx context.Context, req *Request, limit int) ([]*TV, error) { + return client.getLast(ctx, req, limit) +} + +func GetPointData(ctx context.Context, req *Request) ([]*TV, error) { + return client.getPointData(ctx, req) +} + +func GetAfterOne(ctx context.Context, req *Request) ([]*TV, error) { + return client.getAfterOne(ctx, req) +} + +func GetBeforeOne(ctx context.Context, req *Request) ([]*TV, error) { + return client.getBeforeOne(ctx, req) +} diff --git a/data/influx/read.go b/data/influx/read.go new file mode 100644 index 0000000..f2b4e85 --- /dev/null +++ b/data/influx/read.go @@ -0,0 +1,61 @@ +package influx + +import ( + "context" + "fmt" + "net/url" +) + +// terminal unique +func (client *influxClient) getLast(ctx context.Context, req *Request, limit int) ([]*TV, error) { + q := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and device='%s';", + req.Field, req.Field, req.Measure, req.Station, req.Device) + if limit > 1 { + q = fmt.Sprintf("select %s from %s where station='%s' and device='%s' order by time desc limit %d;", + req.Field, req.Measure, req.Station, req.Device, limit) + } + + reqData := url.Values{ + "db": {req.Database}, + "q": {q}, + } + + return client.getRespData(ctx, reqData, req.RespType) +} + +func (client *influxClient) getPointData(ctx context.Context, req *Request) ([]*TV, error) { + sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;", + req.Field, req.Measure, req.Station, req.Device, req.Begin, req.End) + + if req.Operate != "" && req.Step != "" && req.Default != "" { + sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms group by time(%s) fill(%s);", + req.Operate, req.Field, req.Field, req.Measure, req.Station, req.Device, req.Begin, req.End, req.Step, req.Default) + } + + reqData := url.Values{ + "db": {req.Database}, + "q": {sql}, + } + + return client.getRespData(ctx, reqData, req.RespType) +} + +func (client *influxClient) getAfterOne(ctx context.Context, req *Request) ([]*TV, error) { + reqData := url.Values{ + "db": {req.Database}, + "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms limit 1;", + req.Field, req.Measure, req.Station, req.Device, req.Begin)}, + } + + return client.getRespData(ctx, reqData, req.RespType) +} + +func (client *influxClient) getBeforeOne(ctx context.Context, req *Request) ([]*TV, error) { + reqData := url.Values{ + "db": {req.Database}, + "q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time<=%dms limit 1;", + req.Field, req.Measure, req.Station, req.Device, req.End)}, + } + + return client.getRespData(ctx, reqData, req.RespType) +} diff --git a/data/influx/write.go b/data/influx/write.go new file mode 100644 index 0000000..4555a88 --- /dev/null +++ b/data/influx/write.go @@ -0,0 +1,8 @@ +package influx + +import "strings" + +func (client *influxClient) setLinesData(db string, lines []string) error { + line := strings.Join(lines, "\n") + return client.setLineData(db, line, true) +} diff --git a/data/postgres/postgres.go b/data/postgres/postgres.go new file mode 100644 index 0000000..a5aadd3 --- /dev/null +++ b/data/postgres/postgres.go @@ -0,0 +1,33 @@ +package postgres + +import ( + "datart/config" + "fmt" + + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +var client *gorm.DB + +func init() { + postgresConfig := config.Conf().PostgresConf("demo") + dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=%s TimeZone=%s", + postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(), + postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetSSLMode(), + postgresConfig.GetTimeZone()) + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + if err != nil { + panic(err) + } + client = db +} + +// close postgres client with tag +func Close(tag string) error { + db, err := client.DB() + if err != nil { + return err + } + return db.Close() +} diff --git a/data/postgres/terminal_mapping.go b/data/postgres/terminal_mapping.go new file mode 100644 index 0000000..b7fe857 --- /dev/null +++ b/data/postgres/terminal_mapping.go @@ -0,0 +1,58 @@ +package postgres + +import "context" + +type terminalMapping struct { + ID int64 `gorm:"colunmn:id"` + Component string `gorm:"column:component"` + Tag string `gorm:"column:tag"` + // mapping TODO +} + +func GetAllTerminalMapping(ctx context.Context, batchSize int) ([]*terminalMapping, error) { + var totalRecords []*terminalMapping + + id := 0 + for { + var records []*terminalMapping + result := client.WithContext(ctx).Table("terminal_mapping").Where("id > ?", id). + Order("id ASC").Limit(batchSize).Find(&records) + if result.Error != nil { + return totalRecords, result.Error + } + + length := len(records) + if length <= 0 { + break + } + id += length + + totalRecords = append(totalRecords, records...) + } + + return totalRecords, nil +} + +func GetStationTerminalMapping(ctx context.Context, batchSize int, station string) ([]*terminalMapping, error) { + var totalRecords []*terminalMapping + + id := 0 + for { + var records []*terminalMapping + result := client.WithContext(ctx).Table("terminal_mapping").Where("station = ?", station). + Where("id > ?", id).Order("id ASC").Limit(batchSize).Find(&records) + if result.Error != nil { + return totalRecords, result.Error + } + + length := len(records) + if length <= 0 { + break + } + id += length + + totalRecords = append(totalRecords, records...) + } + + return totalRecords, nil +} diff --git a/data/redis/hash.go b/data/redis/hash.go new file mode 100644 index 0000000..6d3f35f --- /dev/null +++ b/data/redis/hash.go @@ -0,0 +1,37 @@ +package redis + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +func HGetAll(ctx context.Context, key string) (map[string]string, error) { + kvs, err := client.HGetAll(ctx, key).Result() + if err == redis.Nil { + return nil, nil + } else if err != nil { + return nil, err + } + + return kvs, nil +} + +func HMSet(ctx context.Context, key string, fields map[string]interface{}) error { + return client.HMSet(ctx, key, fields).Err() +} + +func HGet(ctx context.Context, key, field string) (string, error) { + v, err := client.HGet(ctx, key, field).Result() + if err == redis.Nil { + return "", nil + } else if err != nil { + return "", err + } + + return v, nil +} + +func HSet(ctx context.Context, key, field string, value interface{}) error { + return client.HSet(ctx, key, field, value).Err() +} diff --git a/data/redis/redis.go b/data/redis/redis.go new file mode 100644 index 0000000..fc7668d --- /dev/null +++ b/data/redis/redis.go @@ -0,0 +1,49 @@ +package redis + +import ( + "context" + "datart/config" + "time" + + "github.com/redis/go-redis/v9" +) + +var client *redis.Client + +func init() { + config := config.Conf().RedisConf("demo") + client = redis.NewClient(&redis.Options{ + Addr: config.Addr, + Username: config.Username, + Password: config.Password, + DB: config.DB, + Protocol: config.RESP, + DialTimeout: time.Duration(config.GetDialTimeout()) * time.Millisecond, + ReadTimeout: time.Duration(config.GetReadTimeout()) * time.Millisecond, + WriteTimeout: time.Duration(config.GetWriteTimeout()) * time.Millisecond, + PoolSize: config.PoolSize, + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + pong, err := client.Ping(ctx).Result() + if err != nil { + panic(err) + } + if pong != "PONG" { + panic("redis ping failed") + } +} + +// close redis client with tag +func Close(tag string) error { + return client.Close() +} + +func Lock(ctx context.Context, key string, value interface{}, expiration time.Duration) error { + return client.SetNX(ctx, key, value, expiration).Err() +} + +func Unlock(ctx context.Context, key string) error { + return client.Del(ctx, key).Err() +} diff --git a/data/redis/set.go b/data/redis/set.go new file mode 100644 index 0000000..65a229e --- /dev/null +++ b/data/redis/set.go @@ -0,0 +1 @@ +package redis diff --git a/data/redis/string.go b/data/redis/string.go new file mode 100644 index 0000000..785830a --- /dev/null +++ b/data/redis/string.go @@ -0,0 +1,23 @@ +package redis + +import ( + "context" + "time" + + "github.com/redis/go-redis/v9" +) + +func Get(ctx context.Context, key string) (string, error) { + str, err := client.Get(ctx, key).Result() + if err == redis.Nil { + return "", nil + } else if err != nil { + return "", err + } + + return str, nil +} + +func Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { + return client.Set(ctx, key, value, expiration).Err() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..de22663 --- /dev/null +++ b/go.mod @@ -0,0 +1,53 @@ +module datart + +go 1.22.4 + +require ( + github.com/gin-gonic/gin v1.10.0 + github.com/gorilla/websocket v1.5.3 + github.com/redis/go-redis/v9 v9.7.0 + go.uber.org/zap v1.27.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 + gorm.io/driver/postgres v1.5.11 + gorm.io/gorm v1.25.12 +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.5.5 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + go.uber.org/multierr v1.10.0 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5524511 --- /dev/null +++ b/go.sum @@ -0,0 +1,134 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= +github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= +github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= +github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= +github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= +golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314= +gorm.io/driver/postgres v1.5.11/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= +gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= +gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/log/log.go b/log/log.go new file mode 100644 index 0000000..dfe4fe8 --- /dev/null +++ b/log/log.go @@ -0,0 +1,92 @@ +package log + +import ( + "datart/config" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var log *zap.SugaredLogger + +func init() { + logConfig := zap.NewProductionEncoderConfig() + logConfig.EncodeTime = zapcore.ISO8601TimeEncoder + fileEncoder := zapcore.NewJSONEncoder(logConfig) + core := zapcore.NewCore( + fileEncoder, + zapcore.AddSync(config.Conf().LogConf()), // conf is not null + // DebugLevel logs are typically voluminous, and are usually disabled in + // production. + + // InfoLevel is the default logging priority. + + // WarnLevel logs are more important than Info, but don't need individual + // human review. + + // ErrorLevel logs are high-priority. If an application is running smoothly, + // it shouldn't generate any error-level logs. + + // DPanicLevel logs are particularly important errors. In development the + // logger panics after writing the message. + + // PanicLevel logs a message, then panics. + + // FatalLevel logs a message, then calls os.Exit(1). + config.Conf().LogConf().GetLogLevel(), + ) + + log = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1)).Sugar() +} + +func Sync() { + log.Sync() +} + +func Log(lvl zapcore.Level, args ...interface{}) { + log.Log(lvl, args...) +} + +func Debug(args ...interface{}) { + log.Debug(args...) +} + +func Info(args ...interface{}) { + log.Info(args...) +} + +func Warn(args ...interface{}) { + log.Warn(args...) +} + +func Error(args ...interface{}) { + log.Error(args...) +} + +func Panic(args ...interface{}) { + log.Panic(args...) +} + +func Logf(lvl zapcore.Level, template string, args ...interface{}) { + log.Logf(lvl, template, args...) +} + +func Debugf(template string, args ...interface{}) { + log.Debugf(template, args...) +} + +func Infof(template string, args ...interface{}) { + log.Infof(template, args...) +} + +func Warnf(template string, args ...interface{}) { + log.Warnf(template, args...) +} + +func Errorf(template string, args ...interface{}) { + log.Errorf(template, args...) +} + +func Panicf(template string, args ...interface{}) { + log.Panicf(template, args...) +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..22bf770 --- /dev/null +++ b/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "datart/config" + "datart/modelrt" + "datart/route" + "strconv" + + "github.com/gin-gonic/gin" +) + +func main() { + // gin.SetMode(gin.ReleaseMode) + + engine := gin.Default() + + route.LoadRoute(engine) + + go modelrt.PushDataToModelRT() + + engine.Run(":" + strconv.Itoa(config.Conf().ServerConf().GetPort())) +} diff --git a/modelrt/modelrt.go b/modelrt/modelrt.go new file mode 100644 index 0000000..2c445ca --- /dev/null +++ b/modelrt/modelrt.go @@ -0,0 +1,140 @@ +package modelrt + +import ( + "context" + "datart/config" + "datart/data/influx" + "datart/log" + "encoding/json" + "net/url" + "os" + "os/signal" + "time" + + "github.com/gorilla/websocket" +) + +// 处理非模块内接口的数据 +func PushDataToModelRT() { + modelrtConfig := config.Conf().ModelRTConf("demo") + u := url.URL{ + Scheme: modelrtConfig.GetScheme(), + Host: modelrtConfig.GetHost(), + Path: modelrtConfig.GetPath(), + } + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + + for { + pushDataToModelRT(signalChan, u.String()) + } +} + +func pushDataToModelRT(signalChan <-chan os.Signal, url string) { + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + log.Error("ws dial:", err) + } + defer conn.Close() + + done := make(chan struct{}) + go func() { + defer close(done) + + for { + time.Sleep(time.Second) // demo TODO + pushData(conn, "channel_1") + pushData(conn, "channel_2") + } + }() + + for { + select { + case <-done: + return + case <-signalChan: + err := conn.WriteMessage(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Error("ws write close:", err) + return + } + + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } +} + +func pushData(conn *websocket.Conn, field string) error { + data, err := getPushedData(field) + if err != nil { + return err + } + + err = conn.WriteMessage(websocket.TextMessage, data) + if err != nil { + return err + } + + _, msg, err := conn.ReadMessage() + if err != nil { + return err + } + + log.Debug("ws read:", msg) + + return nil +} + +func getPushedData(field string) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + req := &influx.Request{ + RespType: "csv", + Database: "influxBucket", + Measure: "samples", + Station: "TJSH_FZ001", + Device: "DDJ001", + Field: field, + Begin: 0, + End: 0, + Operate: "", + Step: "", + Default: "", + } + tvs, err := influx.GetLastLimit(ctx, req, 1000) + if err != nil { + return nil, err + } + + pushedData := &struct { + Payload *struct { + ComponentID int `json:"component_id"` + Point string `json:"point"` + Values []*influx.TV `json:"values"` + } `json:"payload"` + }{ + Payload: &struct { + ComponentID int `json:"component_id"` + Point string `json:"point"` + Values []*influx.TV `json:"values"` + }{ + ComponentID: 0, + Point: field, + Values: tvs, + }, + } + + data, err := json.Marshal(pushedData) + if err != nil { + return nil, err + } + + return data, nil +} diff --git a/route/handler.go b/route/handler.go new file mode 100644 index 0000000..3c43c90 --- /dev/null +++ b/route/handler.go @@ -0,0 +1,135 @@ +package route + +import ( + "datart/data/influx" + "strconv" + + "github.com/gin-gonic/gin" +) + +func getLast(ctx *gin.Context) { + // station := ctx.DefaultQuery("station", "0") + // component := ctx.DefaultQuery("component", "0") + point := ctx.DefaultQuery("point", "") + request := &influx.Request{ + RespType: "csv", + Database: "influxBucket", + Measure: "samples", + Station: "TJSH_FZ001", + Device: "DDJ001", + Field: "", + } + + switch point { + case "i": + request.Field = "channel_1" + case "v": + request.Field = "channel_2" + default: + ctx.JSON(200, gin.H{ + "code": 1, + "msg": "invalid point", + }) + return + } + + data, err := influx.GetLast(ctx, request) + if err != nil { + ctx.JSON(200, gin.H{ + "code": 2, + "msg": err.Error(), + }) + return + } + + ctx.JSON(200, gin.H{ + "code": 0, + "msg": "", + "data": data, + }) +} + +func getPointData(ctx *gin.Context) { + // station := ctx.DefaultQuery("station", "0") + // component := ctx.DefaultQuery("component", "0") + + request := &influx.Request{ + RespType: "csv", + Database: "influxBucket", + Measure: "samples", + Station: "TJSH_FZ001", + Device: "DDJ001", + Field: "", + Begin: 0, + End: 0, + Operate: ctx.DefaultQuery("operate", ""), + Step: ctx.DefaultQuery("step", ""), + Default: ctx.DefaultQuery("default", ""), + } + + point := ctx.DefaultQuery("point", "") + switch point { + case "i": + request.Field = "channel_1" + case "v": + request.Field = "channel_2" + default: + ctx.JSON(200, gin.H{ + "code": 1, + "msg": "invalid point", + }) + return + } + + var err error + begin := ctx.DefaultQuery("begin", "") + end := ctx.DefaultQuery("end", "") + if begin != "" { + request.Begin, err = strconv.ParseInt(begin, 10, 64) + if err != nil { + ctx.JSON(200, gin.H{ + "code": 1, + "msg": "invalid begin", + }) + return + } + } + if end != "" { + request.End, err = strconv.ParseInt(end, 10, 64) + if err != nil { + ctx.JSON(200, gin.H{ + "code": 1, + "msg": "invalid end", + }) + return + } + } + + var data []*influx.TV + switch { + case begin != "" && end != "": + data, err = influx.GetPointData(ctx, request) + + case begin != "": + data, err = influx.GetAfterOne(ctx, request) + + case end != "": + data, err = influx.GetBeforeOne(ctx, request) + + default: + data, err = influx.GetLast(ctx, request) + } + if err != nil { + ctx.JSON(200, gin.H{ + "code": 2, + "msg": err.Error(), + }) + return + } + + ctx.JSON(200, gin.H{ + "code": 0, + "msg": "", + "data": data, + }) +} diff --git a/route/route.go b/route/route.go new file mode 100644 index 0000000..286478e --- /dev/null +++ b/route/route.go @@ -0,0 +1,17 @@ +package route + +import "github.com/gin-gonic/gin" + +func LoadRoute(engine *gin.Engine) { + engine.Use(middleware()) // TODO + rg := engine.Group("datart") + + rg.GET("/getLast", getLast) + rg.GET("/getPointData", getPointData) +} + +func middleware() gin.HandlerFunc { + return func(ctx *gin.Context) { + + } +}