Compare commits

...

1 Commits

Author SHA1 Message Date
zhuxu b2e9101257 demo 2025-01-25 17:28:36 +08:00
34 changed files with 1893 additions and 0 deletions

12
.drone.yml Normal file
View File

@ -0,0 +1,12 @@
kind: pipeline
type: docker
name: default
steps:
- name: build
image: golang:latest
environment:
GO111MODULE: on
GOPROXY: https://goproxy.cn,direct
commands:
- go build

53
Makefile Normal file
View File

@ -0,0 +1,53 @@
# set package name and path
package_name = datart
# set package path
package_path = .
# ==================================================================================== #
# DEFAULT
# ==================================================================================== #
default: help
.PHONY: help
help: ## list makefile function
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
# ==================================================================================== #
# QUALITY
# ==================================================================================== #
.PHONY: tidy
tidy: ## tidy modfiles and format .go files
@go mod tidy -v
@go fmt ./...
.PHONY: test
test: ## run all tests
go test -v -race -buildvcs ./...
.PHONY: cover
cover: ## run all tests and display coverage
go test -v -race -buildvcs -coverprofile=${package_path}/coverage.out ./...
go tool cover -func=coverage.out | sort -rnk3
# ==================================================================================== #
# DEVELOPMENT
# ==================================================================================== #
.PHONY: build
build: ## build the application
@go mod download
@go build -ldflags "-X main.version=$(shell git describe --abbrev=0 --tags 2>/dev/null || echo 0.0.0)" -o=${package_path}/${package_name}
.PHONY: run
run: build ## run the application after build
@chmod +x ${package_path}/${package_name}
${package_path}/${package_name}
.PHONY: clean
clean: ## clean up environment
@go clean
@rm -f ${package_path}/${package_name}
@rm -f ${package_path}/coverage.out

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# dataRT
[![Build Status](http://192.168.46.100:4080/api/badges/CL-Softwares/dataRT/status.svg?ref=refs/heads/datart_demo)](http://192.168.46.100:4080/CL-Softwares/dataRT)

116
config/config.go Normal file
View File

@ -0,0 +1,116 @@
package config
import (
_ "embed"
"encoding/json"
)
type config struct {
serverConf *serverConfig
logConf *logConfig
postgresConf map[string]*postgresConfig
influxConf map[string]*influxConfig
redisConf map[string]*redisConfig
modelrtConf map[string]*modelrtConfig
}
//go:embed server.json
var serverjson string
//go:embed log.json
var logjson string
//go:embed postgres.json
var postgresjson string
//go:embed influx.json
var influxjson string
//go:embed redis.json
var redisjson string
//go:embed modelrt.json
var modelrtjson string
var conf *config
func init() {
conf = new(config)
conf.serverConf = new(serverConfig)
conf.logConf = new(logConfig)
conf.postgresConf = make(map[string]*postgresConfig)
conf.influxConf = make(map[string]*influxConfig)
conf.redisConf = make(map[string]*redisConfig)
conf.modelrtConf = make(map[string]*modelrtConfig)
err := json.Unmarshal([]byte(serverjson), conf.serverConf)
if err != nil {
panic(err.Error())
}
err = json.Unmarshal([]byte(logjson), conf.logConf)
if err != nil {
panic(err.Error())
}
err = json.Unmarshal([]byte(postgresjson), &conf.postgresConf)
if err != nil {
panic(err.Error())
}
err = json.Unmarshal([]byte(influxjson), &conf.influxConf)
if err != nil {
panic(err.Error())
}
err = json.Unmarshal([]byte(redisjson), &conf.redisConf)
if err != nil {
panic(err.Error())
}
err = json.Unmarshal([]byte(modelrtjson), &conf.modelrtConf)
if err != nil {
panic(err.Error())
}
}
func Conf() *config {
return conf
}
func (c *config) ServerConf() *serverConfig {
if c == nil {
panic("config is nil")
}
return c.serverConf
}
func (c *config) LogConf() *logConfig {
if c == nil {
panic("config is nil")
}
return c.logConf
}
func (c *config) PostgresConf(tag string) *postgresConfig {
if c == nil || c.postgresConf == nil {
panic("postgres config is nil")
}
return c.postgresConf[tag]
}
func (c *config) InfluxConf(tag string) *influxConfig {
if c == nil || c.influxConf == nil {
panic("influx config is nil")
}
return c.influxConf[tag]
}
func (c *config) RedisConf(tag string) *redisConfig {
if c == nil || c.redisConf == nil {
panic("redis config is nil")
}
return c.redisConf[tag]
}
func (c *config) ModelRTConf(tag string) *modelrtConfig {
if c == nil {
panic("modelrt config is nil")
}
return c.modelrtConf[tag]
}

64
config/influx.go Normal file
View File

@ -0,0 +1,64 @@
package config
type influxConfig struct {
URL string `json:"url" yaml:"url"`
Token string `json:"token" yaml:"token"`
Org string `json:"org" yaml:"org"`
Timeout int `json:"timeoutms" yaml:"timeoutms"`
}
func (config *influxConfig) GetURL() string {
if config == nil {
panic("influx config is nil")
}
return config.URL
}
func (config *influxConfig) SetURL(url string) {
if config == nil {
panic("influx config is nil")
}
config.URL = url
}
func (config *influxConfig) GetToken() string {
if config == nil {
panic("influx config is nil")
}
return config.Token
}
func (config *influxConfig) SetToken(token string) {
if config == nil {
panic("influx config is nil")
}
config.Token = token
}
func (config *influxConfig) GetOrg() string {
if config == nil {
panic("influx config is nil")
}
return config.Org
}
func (config *influxConfig) SetOrg(org string) {
if config == nil {
panic("influx config is nil")
}
config.Org = org
}
func (config *influxConfig) GetTimeout() int {
if config == nil {
panic("influx config is nil")
}
return config.Timeout
}
func (config *influxConfig) SetTimeout(timeout int) {
if config == nil {
panic("influx config is nil")
}
config.Timeout = timeout
}

8
config/influx.json Normal file
View File

@ -0,0 +1,8 @@
{
"demo": {
"url": "http://192.168.46.100:8086",
"token": "kKtMhMj5ISrnrCAO1ugvL4D4c_HrbAv4HzSHGA3Ai1AeBIEmGbQpQY0qSwjaXYluOmuDAv0zAvFCRRqEWQ0zJw==",
"org": "eCL3000",
"timeout":1000
}
}

109
config/log.go Normal file
View File

@ -0,0 +1,109 @@
package config
import (
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
type logConfig struct {
lumberjack.Logger
LogLevel int8 `json:"loglevel" yaml:"loglevel"`
}
func (config *logConfig) GetFileName() string {
if config == nil {
panic("log config is nil")
}
return config.Filename
}
func (config *logConfig) SetFileName(fileName string) {
if config == nil {
panic("log config is nil")
}
config.Filename = fileName
}
func (config *logConfig) GetMaxSize() int {
if config == nil {
panic("log config is nil")
}
return config.MaxSize
}
func (config *logConfig) SetMaxSize(maxSize int) {
if config == nil {
panic("log config is nil")
}
config.MaxSize = maxSize
}
func (config *logConfig) GetMaxAge() int {
if config == nil {
panic("log config is nil")
}
return config.MaxAge
}
func (config *logConfig) SetMaxAge(maxAge int) {
if config == nil {
panic("log config is nil")
}
config.MaxAge = maxAge
}
func (config *logConfig) GetMaxBackups() int {
if config == nil {
panic("log config is nil")
}
return config.MaxBackups
}
func (config *logConfig) SetMaxBackups(maxBackups int) {
if config == nil {
panic("log config is nil")
}
config.MaxBackups = maxBackups
}
func (config *logConfig) GetLocalTime() bool {
if config == nil {
panic("log config is nil")
}
return config.LocalTime
}
func (config *logConfig) SetLocalTime(useLocalTime bool) {
if config == nil {
panic("log config is nil")
}
config.LocalTime = useLocalTime
}
func (config *logConfig) GetCompress() bool {
if config == nil {
panic("log config is nil")
}
return config.Compress
}
func (config *logConfig) SetCompress(doCompress bool) {
if config == nil {
panic("log config is nil")
}
config.Compress = doCompress
}
func (config *logConfig) GetLogLevel() zapcore.Level {
if config == nil {
panic("log config is nil")
}
return zapcore.Level(config.LogLevel)
}
func (config *logConfig) SetLogLevel(logLevel zapcore.Level) {
if config == nil {
panic("log config is nil")
}
config.LogLevel = int8(logLevel)
}

9
config/log.json Normal file
View File

@ -0,0 +1,9 @@
{
"filename": "./logs/datart.log",
"maxsize": 100,
"maxage": 7,
"maxbackups": 20,
"localtime": true,
"compress": false,
"loglevel": -1
}

49
config/modelrt.go Normal file
View File

@ -0,0 +1,49 @@
package config
type modelrtConfig struct {
Scheme string `json:"scheme" yaml:"scheme"`
Host string `json:"host" yaml:"host"` // host or host:port
Path string `json:"path" yaml:"path"`
}
func (config *modelrtConfig) GetScheme() string {
if config == nil {
panic("modelrt config is nil")
}
return config.Scheme
}
func (config *modelrtConfig) SetScheme(scheme string) {
if config == nil {
panic("modelrt config is nil")
}
config.Scheme = scheme
}
func (config *modelrtConfig) GetHost() string {
if config == nil {
panic("modelrt config is nil")
}
return config.Host
}
func (config *modelrtConfig) SetHost(host string) {
if config == nil {
panic("modelrt config is nil")
}
config.Host = host
}
func (config *modelrtConfig) GetPath() string {
if config == nil {
panic("modelrt config is nil")
}
return config.Path
}
func (config *modelrtConfig) SetPath(path string) {
if config == nil {
panic("modelrt config is nil")
}
config.Path = path
}

7
config/modelrt.json Normal file
View File

@ -0,0 +1,7 @@
{
"demo":{
"scheme":"ws",
"host":"192.168.46.100:8080",
"path":"/ws/rtdatas"
}
}

109
config/postgres.go Normal file
View File

@ -0,0 +1,109 @@
package config
type postgresConfig struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
DBName string `json:"dbname" yaml:"dbname"`
SSLMode string `json:"sslmode" yaml:"sslmode"`
TimeZone string `json:"timezone" yaml:"timezone"`
}
func (config *postgresConfig) GetHost() string {
if config == nil {
panic("postgres config is nil")
}
return config.Host
}
func (config *postgresConfig) SetHost(host string) {
if config == nil {
panic("postgres config is nil")
}
config.Host = host
}
func (config *postgresConfig) GetPort() int {
if config == nil {
panic("postgres config is nil")
}
return config.Port
}
func (config *postgresConfig) SetPort(port int) {
if config == nil {
panic("postgres config is nil")
}
config.Port = port
}
func (config *postgresConfig) GetUser() string {
if config == nil {
panic("postgres config is nil")
}
return config.User
}
func (config *postgresConfig) SetUser(user string) {
if config == nil {
panic("postgres config is nil")
}
config.User = user
}
func (config *postgresConfig) GetPassword() string {
if config == nil {
panic("postgres config is nil")
}
return config.Password
}
func (config *postgresConfig) SetPassword(password string) {
if config == nil {
panic("postgres config is nil")
}
config.Password = password
}
func (config *postgresConfig) GetDBName() string {
if config == nil {
panic("postgres config is nil")
}
return config.DBName
}
func (config *postgresConfig) SetDBName(dbName string) {
if config == nil {
panic("postgres config is nil")
}
config.DBName = dbName
}
func (config *postgresConfig) GetSSLMode() string {
if config == nil {
panic("postgres config is nil")
}
return config.SSLMode
}
func (config *postgresConfig) SetSSLMode(sslMode string) {
if config == nil {
panic("postgres config is nil")
}
config.SSLMode = sslMode
}
func (config *postgresConfig) GetTimeZone() string {
if config == nil {
panic("postgres config is nil")
}
return config.TimeZone
}
func (config *postgresConfig) SetTimeZone(timeZone string) {
if config == nil {
panic("postgres config is nil")
}
config.TimeZone = timeZone
}

11
config/postgres.json Normal file
View File

@ -0,0 +1,11 @@
{
"demo":{
"host":"192.168.46.100",
"port":9432,
"user":"postgres",
"password":"123RTYjkl",
"dbname":"metamodule",
"sslmode":"disable",
"timezone":"Asia/Shanghai"
}
}

139
config/redis.go Normal file
View File

@ -0,0 +1,139 @@
package config
type redisConfig struct {
Addr string `json:"addr" yaml:"addr"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
DB int `json:"db" yaml:"db"`
RESP int `json:"resp" yaml:"resp"`
DialTimeout int `json:"dialtimeout" yaml:"dialtimeout"`
ReadTimeout int `json:"readtimeout" yaml:"readtimeout"`
WriteTimeout int `json:"writetimeout" yaml:"writetimeout"`
PoolSize int `json:"poolsize" yaml:"poolsize"`
}
func (config *redisConfig) GetAddr() string {
if config == nil {
panic("redis config is nil")
}
return config.Addr
}
func (config *redisConfig) SetAddr(addr string) {
if config == nil {
panic("redis config is nil")
}
config.Addr = addr
}
func (config *redisConfig) GetUsername() string {
if config == nil {
panic("redis config is nil")
}
return config.Username
}
func (config *redisConfig) SetUsername(username string) {
if config == nil {
panic("redis config is nil")
}
config.Username = username
}
func (config *redisConfig) GetPassword() string {
if config == nil {
panic("redis config is nil")
}
return config.Password
}
func (config *redisConfig) SetPassword(password string) {
if config == nil {
panic("redis config is nil")
}
config.Password = password
}
func (config *redisConfig) GetDB() int {
if config == nil {
panic("redis config is nil")
}
return config.DB
}
func (config *redisConfig) SetDB(db int) {
if config == nil {
panic("redis config is nil")
}
config.DB = db
}
func (config *redisConfig) GetRESP() int {
if config == nil {
panic("redis config is nil")
}
return config.RESP
}
func (config *redisConfig) SetRESP(resp int) {
if config == nil {
panic("redis config is nil")
}
config.RESP = resp
}
func (config *redisConfig) GetDialTimeout() int {
if config == nil {
panic("redis config is nil")
}
return config.DialTimeout
}
func (config *redisConfig) SetDialTimeout(dialTimeout int) {
if config == nil {
panic("redis config is nil")
}
config.DialTimeout = dialTimeout
}
func (config *redisConfig) GetReadTimeout() int {
if config == nil {
panic("redis config is nil")
}
return config.ReadTimeout
}
func (config *redisConfig) SetReadTimeout(readTimeout int) {
if config == nil {
panic("redis config is nil")
}
config.ReadTimeout = readTimeout
}
func (config *redisConfig) GetWriteTimeout() int {
if config == nil {
panic("redis config is nil")
}
return config.WriteTimeout
}
func (config *redisConfig) SetWriteTimeout(writeTimeout int) {
if config == nil {
panic("redis config is nil")
}
config.WriteTimeout = writeTimeout
}
func (config *redisConfig) GetPoolSize() int {
if config == nil {
panic("redis config is nil")
}
return config.PoolSize
}
func (config *redisConfig) SetPoolSIze(poolSize int) {
if config == nil {
panic("redis config is nil")
}
config.PoolSize = poolSize
}

13
config/redis.json Normal file
View File

@ -0,0 +1,13 @@
{
"demo":{
"addr":"192.168.46.100:6379",
"username":"",
"password":"",
"db":0,
"resp":3,
"dialtimeout":50,
"readtimeout":200,
"writetimeout":200,
"poolsize":20
}
}

79
config/server.go Normal file
View File

@ -0,0 +1,79 @@
package config
type serverConfig struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
Grid int `json:"grid" yaml:"grid"` // TODO no use
Zone int `json:"zone" yaml:"zone"` // TODO no use
Station int `json:"station" yaml:"station"` // TODO no use
}
func (config *serverConfig) GetHost() string {
if config == nil {
panic("server config is nil")
}
return config.Host
}
func (config *serverConfig) SetHost(host string) {
if config == nil {
panic("server config is nil")
}
config.Host = host
}
func (config *serverConfig) GetPort() int {
if config == nil {
panic("server config is nil")
}
return config.Port
}
func (config *serverConfig) SetPort(port int) {
if config == nil {
panic("server config is nil")
}
config.Port = port
}
func (config *serverConfig) GetGrid() int {
if config == nil {
panic("server config is nil")
}
return config.Grid
}
func (config *serverConfig) SetGrid(grid int) {
if config == nil {
panic("server config is nil")
}
config.Grid = grid
}
func (config *serverConfig) GetZone() int {
if config == nil {
panic("server config is nil")
}
return config.Zone
}
func (config *serverConfig) SetZone(zone int) {
if config == nil {
panic("server config is nil")
}
config.Zone = zone
}
func (config *serverConfig) GetStation() int {
if config == nil {
panic("server config is nil")
}
return config.Station
}
func (config *serverConfig) SetStation(station int) {
if config == nil {
panic("server config is nil")
}
config.Station = station
}

7
config/server.json Normal file
View File

@ -0,0 +1,7 @@
{
"host":"",
"port":8888,
"grid":1,
"zone":1,
"station":1
}

7
data/data.go Normal file
View File

@ -0,0 +1,7 @@
package data
// 读取基础数据
// 将客户端请求参数通过缓存转换为内部请求参数
// 读取时序数据推送给图模库运行时服务

153
data/influx/common.go Normal file
View File

@ -0,0 +1,153 @@
package influx
import (
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
)
// for influx data, one measurement
type jsonResp struct {
Results []*result `json:"results"`
}
type result struct {
StatementID int `json:"statement_id"`
Series []*fields `json:"series"`
}
type fields struct {
Name string `json:"name"`
Column []string `json:"column"`
Values [][]interface{} `json:"values"`
}
// respType json/csv
// json_time:"2024-12-18T08:12:21.4735154Z"
// csv_time:"1734572793695885000"
func (client *influxClient) getRespData(ctx context.Context, reqData url.Values,
respType string) ([]*TV, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
client.url+"/query?"+reqData.Encode(), nil)
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Authorization", "Token "+client.token)
if respType == "csv" {
request.Header.Set("Accept", "application/csv")
}
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode)
}
respData, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
switch respType {
case "json":
resp := new(jsonResp)
err = json.Unmarshal(respData, resp)
if err != nil {
return nil, err
}
if len(resp.Results) > 0 &&
len(resp.Results[0].Series) > 0 {
return client.turnJsonRespDataToTVs(resp.Results[0].Series[0].Values)
}
case "csv":
rows, err := csv.NewReader(strings.NewReader(string(respData))).ReadAll()
if err != nil {
return nil, err
}
if len(rows) > 1 {
return client.turnCsvRespDataToTVs(rows[1:])
}
default:
return nil, errors.New("unsupported response type")
}
return nil, errors.New("response has no data")
}
func (client *influxClient) turnJsonRespDataToTVs(data [][]interface{}) ([]*TV, error) {
ret := make([]*TV, 0, len(data))
for _, row := range data {
if len(row) > 1 {
ret = append(ret, &TV{
Time: row[0],
Value: row[1],
})
}
}
return ret, nil
}
func (client *influxClient) turnCsvRespDataToTVs(data [][]string) ([]*TV, error) {
ret := make([]*TV, 0, len(data))
for _, row := range data {
if len(row) > 3 {
tv := &TV{}
if ns, err := strconv.ParseInt(row[2], 10, 64); err == nil {
tv.Time = ns
} else {
return nil, err
}
if v, err := strconv.ParseFloat(row[3], 64); err == nil {
tv.Value = v
} else {
return nil, err
}
ret = append(ret, tv)
}
}
return ret, nil
}
// line protocol, better to gzip and sort tags by key in lexicographic order
func (client *influxClient) setLineData(db, line string, gzip bool) error {
request, err := http.NewRequest(http.MethodPost,
client.url+"/write?db="+db, strings.NewReader(line))
if err != nil {
return err
}
request.Header.Set("Content-Type", "text/plain")
request.Header.Set("Authorization", "Token "+client.token)
if gzip {
request.Header.Set("Content-Encoding", "gzip")
}
response, err := client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusNoContent {
return fmt.Errorf("unexpected status code: %d", response.StatusCode)
}
return nil
}

82
data/influx/influx.go Normal file
View File

@ -0,0 +1,82 @@
package influx
import (
"context"
"datart/config"
"net"
"net/http"
"time"
)
type influxClient struct {
*http.Client
url string
token string
org string
}
type Request struct {
RespType string
Database string
Measure string
Station string
Device string
Field string
Begin int64
End int64
Operate string
Step string
Default string
}
var client *influxClient
func init() {
client = new(influxClient)
influxConfig := config.Conf().InfluxConf("demo")
client.Client = &http.Client{
Timeout: time.Duration(influxConfig.GetTimeout()) * time.Millisecond,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
}
client.url = influxConfig.GetURL()
client.token = influxConfig.GetToken()
client.org = influxConfig.GetOrg()
}
func SetLinesData(db string, lines []string) error {
return client.setLinesData(db, lines)
}
type TV struct {
Time interface{} `json:"time"`
Value interface{} `json:"value"`
}
func GetLast(ctx context.Context, req *Request) ([]*TV, error) {
return client.getLast(ctx, req, 1)
}
func GetLastLimit(ctx context.Context, req *Request, limit int) ([]*TV, error) {
return client.getLast(ctx, req, limit)
}
func GetPointData(ctx context.Context, req *Request) ([]*TV, error) {
return client.getPointData(ctx, req)
}
func GetAfterOne(ctx context.Context, req *Request) ([]*TV, error) {
return client.getAfterOne(ctx, req)
}
func GetBeforeOne(ctx context.Context, req *Request) ([]*TV, error) {
return client.getBeforeOne(ctx, req)
}

61
data/influx/read.go Normal file
View File

@ -0,0 +1,61 @@
package influx
import (
"context"
"fmt"
"net/url"
)
// terminal unique
func (client *influxClient) getLast(ctx context.Context, req *Request, limit int) ([]*TV, error) {
q := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and device='%s';",
req.Field, req.Field, req.Measure, req.Station, req.Device)
if limit > 1 {
q = fmt.Sprintf("select %s from %s where station='%s' and device='%s' order by time desc limit %d;",
req.Field, req.Measure, req.Station, req.Device, limit)
}
reqData := url.Values{
"db": {req.Database},
"q": {q},
}
return client.getRespData(ctx, reqData, req.RespType)
}
func (client *influxClient) getPointData(ctx context.Context, req *Request) ([]*TV, error) {
sql := fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms;",
req.Field, req.Measure, req.Station, req.Device, req.Begin, req.End)
if req.Operate != "" && req.Step != "" && req.Default != "" {
sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and device='%s' and time>=%dms and time<%dms group by time(%s) fill(%s);",
req.Operate, req.Field, req.Field, req.Measure, req.Station, req.Device, req.Begin, req.End, req.Step, req.Default)
}
reqData := url.Values{
"db": {req.Database},
"q": {sql},
}
return client.getRespData(ctx, reqData, req.RespType)
}
func (client *influxClient) getAfterOne(ctx context.Context, req *Request) ([]*TV, error) {
reqData := url.Values{
"db": {req.Database},
"q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time>=%dms limit 1;",
req.Field, req.Measure, req.Station, req.Device, req.Begin)},
}
return client.getRespData(ctx, reqData, req.RespType)
}
func (client *influxClient) getBeforeOne(ctx context.Context, req *Request) ([]*TV, error) {
reqData := url.Values{
"db": {req.Database},
"q": {fmt.Sprintf("select %s from %s where station='%s' and device='%s' and time<=%dms limit 1;",
req.Field, req.Measure, req.Station, req.Device, req.End)},
}
return client.getRespData(ctx, reqData, req.RespType)
}

8
data/influx/write.go Normal file
View File

@ -0,0 +1,8 @@
package influx
import "strings"
func (client *influxClient) setLinesData(db string, lines []string) error {
line := strings.Join(lines, "\n")
return client.setLineData(db, line, true)
}

33
data/postgres/postgres.go Normal file
View File

@ -0,0 +1,33 @@
package postgres
import (
"datart/config"
"fmt"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
var client *gorm.DB
func init() {
postgresConfig := config.Conf().PostgresConf("demo")
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=%s TimeZone=%s",
postgresConfig.GetHost(), postgresConfig.GetUser(), postgresConfig.GetPassword(),
postgresConfig.GetDBName(), postgresConfig.GetPort(), postgresConfig.GetSSLMode(),
postgresConfig.GetTimeZone())
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
panic(err)
}
client = db
}
// close postgres client with tag
func Close(tag string) error {
db, err := client.DB()
if err != nil {
return err
}
return db.Close()
}

View File

@ -0,0 +1,58 @@
package postgres
import "context"
type terminalMapping struct {
ID int64 `gorm:"colunmn:id"`
Component string `gorm:"column:component"`
Tag string `gorm:"column:tag"`
// mapping TODO
}
func GetAllTerminalMapping(ctx context.Context, batchSize int) ([]*terminalMapping, error) {
var totalRecords []*terminalMapping
id := 0
for {
var records []*terminalMapping
result := client.WithContext(ctx).Table("terminal_mapping").Where("id > ?", id).
Order("id ASC").Limit(batchSize).Find(&records)
if result.Error != nil {
return totalRecords, result.Error
}
length := len(records)
if length <= 0 {
break
}
id += length
totalRecords = append(totalRecords, records...)
}
return totalRecords, nil
}
func GetStationTerminalMapping(ctx context.Context, batchSize int, station string) ([]*terminalMapping, error) {
var totalRecords []*terminalMapping
id := 0
for {
var records []*terminalMapping
result := client.WithContext(ctx).Table("terminal_mapping").Where("station = ?", station).
Where("id > ?", id).Order("id ASC").Limit(batchSize).Find(&records)
if result.Error != nil {
return totalRecords, result.Error
}
length := len(records)
if length <= 0 {
break
}
id += length
totalRecords = append(totalRecords, records...)
}
return totalRecords, nil
}

37
data/redis/hash.go Normal file
View File

@ -0,0 +1,37 @@
package redis
import (
"context"
"github.com/redis/go-redis/v9"
)
func HGetAll(ctx context.Context, key string) (map[string]string, error) {
kvs, err := client.HGetAll(ctx, key).Result()
if err == redis.Nil {
return nil, nil
} else if err != nil {
return nil, err
}
return kvs, nil
}
func HMSet(ctx context.Context, key string, fields map[string]interface{}) error {
return client.HMSet(ctx, key, fields).Err()
}
func HGet(ctx context.Context, key, field string) (string, error) {
v, err := client.HGet(ctx, key, field).Result()
if err == redis.Nil {
return "", nil
} else if err != nil {
return "", err
}
return v, nil
}
func HSet(ctx context.Context, key, field string, value interface{}) error {
return client.HSet(ctx, key, field, value).Err()
}

49
data/redis/redis.go Normal file
View File

@ -0,0 +1,49 @@
package redis
import (
"context"
"datart/config"
"time"
"github.com/redis/go-redis/v9"
)
var client *redis.Client
func init() {
config := config.Conf().RedisConf("demo")
client = redis.NewClient(&redis.Options{
Addr: config.Addr,
Username: config.Username,
Password: config.Password,
DB: config.DB,
Protocol: config.RESP,
DialTimeout: time.Duration(config.GetDialTimeout()) * time.Millisecond,
ReadTimeout: time.Duration(config.GetReadTimeout()) * time.Millisecond,
WriteTimeout: time.Duration(config.GetWriteTimeout()) * time.Millisecond,
PoolSize: config.PoolSize,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
pong, err := client.Ping(ctx).Result()
if err != nil {
panic(err)
}
if pong != "PONG" {
panic("redis ping failed")
}
}
// close redis client with tag
func Close(tag string) error {
return client.Close()
}
func Lock(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return client.SetNX(ctx, key, value, expiration).Err()
}
func Unlock(ctx context.Context, key string) error {
return client.Del(ctx, key).Err()
}

1
data/redis/set.go Normal file
View File

@ -0,0 +1 @@
package redis

23
data/redis/string.go Normal file
View File

@ -0,0 +1,23 @@
package redis
import (
"context"
"time"
"github.com/redis/go-redis/v9"
)
func Get(ctx context.Context, key string) (string, error) {
str, err := client.Get(ctx, key).Result()
if err == redis.Nil {
return "", nil
} else if err != nil {
return "", err
}
return str, nil
}
func Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return client.Set(ctx, key, value, expiration).Err()
}

53
go.mod Normal file
View File

@ -0,0 +1,53 @@
module datart
go 1.22.4
require (
github.com/gin-gonic/gin v1.10.0
github.com/gorilla/websocket v1.5.3
github.com/redis/go-redis/v9 v9.7.0
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/driver/postgres v1.5.11
gorm.io/gorm v1.25.12
)
require (
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

134
go.sum Normal file
View File

@ -0,0 +1,134 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314=
gorm.io/driver/postgres v1.5.11/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

92
log/log.go Normal file
View File

@ -0,0 +1,92 @@
package log
import (
"datart/config"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var log *zap.SugaredLogger
func init() {
logConfig := zap.NewProductionEncoderConfig()
logConfig.EncodeTime = zapcore.ISO8601TimeEncoder
fileEncoder := zapcore.NewJSONEncoder(logConfig)
core := zapcore.NewCore(
fileEncoder,
zapcore.AddSync(config.Conf().LogConf()), // conf is not null
// DebugLevel logs are typically voluminous, and are usually disabled in
// production.
// InfoLevel is the default logging priority.
// WarnLevel logs are more important than Info, but don't need individual
// human review.
// ErrorLevel logs are high-priority. If an application is running smoothly,
// it shouldn't generate any error-level logs.
// DPanicLevel logs are particularly important errors. In development the
// logger panics after writing the message.
// PanicLevel logs a message, then panics.
// FatalLevel logs a message, then calls os.Exit(1).
config.Conf().LogConf().GetLogLevel(),
)
log = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1)).Sugar()
}
func Sync() {
log.Sync()
}
func Log(lvl zapcore.Level, args ...interface{}) {
log.Log(lvl, args...)
}
func Debug(args ...interface{}) {
log.Debug(args...)
}
func Info(args ...interface{}) {
log.Info(args...)
}
func Warn(args ...interface{}) {
log.Warn(args...)
}
func Error(args ...interface{}) {
log.Error(args...)
}
func Panic(args ...interface{}) {
log.Panic(args...)
}
func Logf(lvl zapcore.Level, template string, args ...interface{}) {
log.Logf(lvl, template, args...)
}
func Debugf(template string, args ...interface{}) {
log.Debugf(template, args...)
}
func Infof(template string, args ...interface{}) {
log.Infof(template, args...)
}
func Warnf(template string, args ...interface{}) {
log.Warnf(template, args...)
}
func Errorf(template string, args ...interface{}) {
log.Errorf(template, args...)
}
func Panicf(template string, args ...interface{}) {
log.Panicf(template, args...)
}

22
main.go Normal file
View File

@ -0,0 +1,22 @@
package main
import (
"datart/config"
"datart/modelrt"
"datart/route"
"strconv"
"github.com/gin-gonic/gin"
)
func main() {
// gin.SetMode(gin.ReleaseMode)
engine := gin.Default()
route.LoadRoute(engine)
go modelrt.PushDataToModelRT()
engine.Run(":" + strconv.Itoa(config.Conf().ServerConf().GetPort()))
}

140
modelrt/modelrt.go Normal file
View File

@ -0,0 +1,140 @@
package modelrt
import (
"context"
"datart/config"
"datart/data/influx"
"datart/log"
"encoding/json"
"net/url"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
// 处理非模块内接口的数据
func PushDataToModelRT() {
modelrtConfig := config.Conf().ModelRTConf("demo")
u := url.URL{
Scheme: modelrtConfig.GetScheme(),
Host: modelrtConfig.GetHost(),
Path: modelrtConfig.GetPath(),
}
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
for {
pushDataToModelRT(signalChan, u.String())
}
}
func pushDataToModelRT(signalChan <-chan os.Signal, url string) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Error("ws dial:", err)
}
defer conn.Close()
done := make(chan struct{})
go func() {
defer close(done)
for {
time.Sleep(time.Second) // demo TODO
pushData(conn, "channel_1")
pushData(conn, "channel_2")
}
}()
for {
select {
case <-done:
return
case <-signalChan:
err := conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Error("ws write close:", err)
return
}
select {
case <-done:
case <-time.After(time.Second):
}
return
}
}
}
func pushData(conn *websocket.Conn, field string) error {
data, err := getPushedData(field)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
return err
}
_, msg, err := conn.ReadMessage()
if err != nil {
return err
}
log.Debug("ws read:", msg)
return nil
}
func getPushedData(field string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
req := &influx.Request{
RespType: "csv",
Database: "influxBucket",
Measure: "samples",
Station: "TJSH_FZ001",
Device: "DDJ001",
Field: field,
Begin: 0,
End: 0,
Operate: "",
Step: "",
Default: "",
}
tvs, err := influx.GetLastLimit(ctx, req, 1000)
if err != nil {
return nil, err
}
pushedData := &struct {
Payload *struct {
ComponentID int `json:"component_id"`
Point string `json:"point"`
Values []*influx.TV `json:"values"`
} `json:"payload"`
}{
Payload: &struct {
ComponentID int `json:"component_id"`
Point string `json:"point"`
Values []*influx.TV `json:"values"`
}{
ComponentID: 0,
Point: field,
Values: tvs,
},
}
data, err := json.Marshal(pushedData)
if err != nil {
return nil, err
}
return data, nil
}

135
route/handler.go Normal file
View File

@ -0,0 +1,135 @@
package route
import (
"datart/data/influx"
"strconv"
"github.com/gin-gonic/gin"
)
func getLast(ctx *gin.Context) {
// station := ctx.DefaultQuery("station", "0")
// component := ctx.DefaultQuery("component", "0")
point := ctx.DefaultQuery("point", "")
request := &influx.Request{
RespType: "csv",
Database: "influxBucket",
Measure: "samples",
Station: "TJSH_FZ001",
Device: "DDJ001",
Field: "",
}
switch point {
case "i":
request.Field = "channel_1"
case "v":
request.Field = "channel_2"
default:
ctx.JSON(200, gin.H{
"code": 1,
"msg": "invalid point",
})
return
}
data, err := influx.GetLast(ctx, request)
if err != nil {
ctx.JSON(200, gin.H{
"code": 2,
"msg": err.Error(),
})
return
}
ctx.JSON(200, gin.H{
"code": 0,
"msg": "",
"data": data,
})
}
func getPointData(ctx *gin.Context) {
// station := ctx.DefaultQuery("station", "0")
// component := ctx.DefaultQuery("component", "0")
request := &influx.Request{
RespType: "csv",
Database: "influxBucket",
Measure: "samples",
Station: "TJSH_FZ001",
Device: "DDJ001",
Field: "",
Begin: 0,
End: 0,
Operate: ctx.DefaultQuery("operate", ""),
Step: ctx.DefaultQuery("step", ""),
Default: ctx.DefaultQuery("default", ""),
}
point := ctx.DefaultQuery("point", "")
switch point {
case "i":
request.Field = "channel_1"
case "v":
request.Field = "channel_2"
default:
ctx.JSON(200, gin.H{
"code": 1,
"msg": "invalid point",
})
return
}
var err error
begin := ctx.DefaultQuery("begin", "")
end := ctx.DefaultQuery("end", "")
if begin != "" {
request.Begin, err = strconv.ParseInt(begin, 10, 64)
if err != nil {
ctx.JSON(200, gin.H{
"code": 1,
"msg": "invalid begin",
})
return
}
}
if end != "" {
request.End, err = strconv.ParseInt(end, 10, 64)
if err != nil {
ctx.JSON(200, gin.H{
"code": 1,
"msg": "invalid end",
})
return
}
}
var data []*influx.TV
switch {
case begin != "" && end != "":
data, err = influx.GetPointData(ctx, request)
case begin != "":
data, err = influx.GetAfterOne(ctx, request)
case end != "":
data, err = influx.GetBeforeOne(ctx, request)
default:
data, err = influx.GetLast(ctx, request)
}
if err != nil {
ctx.JSON(200, gin.H{
"code": 2,
"msg": err.Error(),
})
return
}
ctx.JSON(200, gin.H{
"code": 0,
"msg": "",
"data": data,
})
}

17
route/route.go Normal file
View File

@ -0,0 +1,17 @@
package route
import "github.com/gin-gonic/gin"
func LoadRoute(engine *gin.Engine) {
engine.Use(middleware()) // TODO
rg := engine.Group("datart")
rg.GET("/getLast", getLast)
rg.GET("/getPointData", getPointData)
}
func middleware() gin.HandlerFunc {
return func(ctx *gin.Context) {
}
}