Compare commits
2 Commits
116b33b6fe
...
0b6c5e4a89
| Author | SHA1 | Date |
|---|---|---|
|
|
0b6c5e4a89 | |
|
|
ebd375d6c7 |
|
|
@ -0,0 +1,5 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
func cl104ConfigName() string {
|
||||||
|
return "cl104.json"
|
||||||
|
}
|
||||||
|
|
@ -14,6 +14,7 @@ type config struct {
|
||||||
redisConf map[string]*redisConfig
|
redisConf map[string]*redisConfig
|
||||||
mongoConf map[string]*mongoConfig
|
mongoConf map[string]*mongoConfig
|
||||||
rabbitConf map[string][]*rabbitConfig
|
rabbitConf map[string][]*rabbitConfig
|
||||||
|
cl104Conf map[string]map[string][]int
|
||||||
}
|
}
|
||||||
|
|
||||||
var conf *config
|
var conf *config
|
||||||
|
|
@ -52,6 +53,10 @@ func init() {
|
||||||
conf.rabbitConf = make(map[string][]*rabbitConfig)
|
conf.rabbitConf = make(map[string][]*rabbitConfig)
|
||||||
rabbitConf := confDir + string(os.PathSeparator) + rabbitConfigName()
|
rabbitConf := confDir + string(os.PathSeparator) + rabbitConfigName()
|
||||||
conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf)
|
conf.unmarshalJsonFile(rabbitConf, &conf.rabbitConf)
|
||||||
|
|
||||||
|
conf.cl104Conf = make(map[string]map[string][]int)
|
||||||
|
cl104Conf := confDir + string(os.PathSeparator) + cl104ConfigName()
|
||||||
|
conf.unmarshalJsonFile(cl104Conf, &conf.cl104Conf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Conf() *config {
|
func Conf() *config {
|
||||||
|
|
@ -107,6 +112,13 @@ func (c *config) RabbitConf(tag string) []*rabbitConfig {
|
||||||
return c.rabbitConf[tag]
|
return c.rabbitConf[tag]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *config) CL104ConfAll() map[string]map[string][]int {
|
||||||
|
if c == nil {
|
||||||
|
panic("config is nil")
|
||||||
|
}
|
||||||
|
return c.cl104Conf
|
||||||
|
}
|
||||||
|
|
||||||
func (c *config) unmarshalJsonFile(file string, dest any) {
|
func (c *config) unmarshalJsonFile(file string, dest any) {
|
||||||
if filejson, err := os.ReadFile(file); err != nil {
|
if filejson, err := os.ReadFile(file); err != nil {
|
||||||
panic(err.Error())
|
panic(err.Error())
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
{
|
||||||
|
"station000":{
|
||||||
|
"127.0.0.1:8899":[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
{
|
{
|
||||||
"default":{
|
"default":{
|
||||||
"host":"127.0.0.1",
|
"host":"192.168.46.100",
|
||||||
"port":5432,
|
"port":9432,
|
||||||
"user":"postgres",
|
"user":"postgres",
|
||||||
"password":"password",
|
"password":"123RTYjkl",
|
||||||
"dbname":"postgres",
|
"dbname":"metamodule",
|
||||||
"timezone":"Asia/Shanghai"
|
"timezone":"Asia/Shanghai"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,131 @@
|
||||||
|
package cl104
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"datart/config"
|
||||||
|
"datart/log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
type info struct {
|
||||||
|
IOA int `json:"ioa"`
|
||||||
|
Val float64 `json:"val"`
|
||||||
|
Q int `json:"q"`
|
||||||
|
MS int64 `json:"ms"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Msg struct {
|
||||||
|
TI int `json:"ti"`
|
||||||
|
COT int `json:"cot"`
|
||||||
|
PN int `json:"pn"`
|
||||||
|
CA int `json:"ca"`
|
||||||
|
Infos []*info `json:"infos"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var UpChan = make(chan []byte, 64)
|
||||||
|
var cl2Chan = map[string]chan []byte{}
|
||||||
|
var CA2Chan = map[int]chan []byte{}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
conf := config.Conf().CL104ConfAll()
|
||||||
|
|
||||||
|
for _, addr2CAs := range conf {
|
||||||
|
for addr, cas := range addr2CAs {
|
||||||
|
clChan := make(chan []byte, 16)
|
||||||
|
cl2Chan[addr] = clChan
|
||||||
|
for _, ca := range cas {
|
||||||
|
CA2Chan[ca] = clChan
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConnectCLs() {
|
||||||
|
for cl, ch := range cl2Chan {
|
||||||
|
go func(ctx context.Context, cl string, ch chan []byte) {
|
||||||
|
connectingCL(ctx, cl, ch)
|
||||||
|
}(context.Background(), cl, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func connectingCL(ctx context.Context, cl string, ch chan []byte) {
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Second * 5)
|
||||||
|
subctx, cancel := context.WithCancel(ctx)
|
||||||
|
newConnectCL(subctx, cancel, cl, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newConnectCL(ctx context.Context, cancel context.CancelFunc, cl string, ch chan []byte) {
|
||||||
|
c, _, err := websocket.DefaultDialer.DialContext(ctx, "ws://"+cl+"/api/104up", nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("client dial:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
go monitoringCLRead(ctx, cancel, c)
|
||||||
|
go monitoringCLWrite(ctx, cancel, c, ch)
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func monitoringCLRead(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
mt, rm, err := c.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if ce, ok := err.(*websocket.CloseError); ok {
|
||||||
|
log.Info("client closed with code:", ce.Code, "text:", ce.Text)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Error("server read:", err)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mt {
|
||||||
|
case websocket.TextMessage:
|
||||||
|
select {
|
||||||
|
case UpChan <- rm:
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
log.Error("drop msg:", string(rm))
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.Error("invalid msg type:", mt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func monitoringCLWrite(ctx context.Context, cancel context.CancelFunc, c *websocket.Conn, ch chan []byte) {
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(time.Second * 54):
|
||||||
|
err = c.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second*10))
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case msg := <-ch:
|
||||||
|
err = c.WriteMessage(websocket.TextMessage, msg)
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,6 +3,7 @@ package data
|
||||||
import (
|
import (
|
||||||
"datart/data/influx"
|
"datart/data/influx"
|
||||||
"datart/data/postgres"
|
"datart/data/postgres"
|
||||||
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
|
@ -45,6 +46,21 @@ func GenPhasorFields(channel string) []string {
|
||||||
return fields
|
return fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ToPrimary(field string, value, iRatio, uRatio float64, iPolar int) float64 {
|
||||||
|
switch {
|
||||||
|
case field == "p", field == "q":
|
||||||
|
return value * iRatio * uRatio * float64(iPolar)
|
||||||
|
case strings.HasSuffix(field, "_pa"):
|
||||||
|
if iPolar < 0 {
|
||||||
|
return value + math.Pi
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return value * iRatio * float64(iPolar)
|
||||||
|
}
|
||||||
|
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
type zUnit struct {
|
type zUnit struct {
|
||||||
Key string
|
Key string
|
||||||
Members []redis.Z
|
Members []redis.Z
|
||||||
|
|
|
||||||
|
|
@ -77,9 +77,6 @@ type fields struct {
|
||||||
Values [][]any `json:"values"`
|
Values [][]any `json:"values"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// respType json/csv
|
|
||||||
// json_time:"2024-12-18T08:12:21.4735154Z"
|
|
||||||
// csv_time:"1734572793695885000"
|
|
||||||
func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values,
|
func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values,
|
||||||
respType string) ([]TV, error) {
|
respType string) ([]TV, error) {
|
||||||
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||||
|
|
@ -112,9 +109,6 @@ func (client *influxClient) getTVsResp(ctx context.Context, reqData url.Values,
|
||||||
return respDataToTVs(respData, respType)
|
return respDataToTVs(respData, respType)
|
||||||
}
|
}
|
||||||
|
|
||||||
// respType json/csv
|
|
||||||
// json_time:"2024-12-18T08:12:21.4735154Z"
|
|
||||||
// csv_time:"1734572793695885000"
|
|
||||||
func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values,
|
func (client *influxClient) getF2TVsResp(ctx context.Context, reqData url.Values,
|
||||||
respType string) (map[string][]TV, error) {
|
respType string) (map[string][]TV, error) {
|
||||||
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
request, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||||
|
|
|
||||||
|
|
@ -90,6 +90,7 @@ func WriteLinesData(ctx context.Context, db string, data []byte) error {
|
||||||
type Request struct {
|
type Request struct {
|
||||||
DB string
|
DB string
|
||||||
Table string
|
Table string
|
||||||
|
Tags []string
|
||||||
|
|
||||||
Type string
|
Type string
|
||||||
Station string
|
Station string
|
||||||
|
|
|
||||||
|
|
@ -100,17 +100,7 @@ func (client *influxClient) GetSSUPointsLastLimit(ctx context.Context, req *Requ
|
||||||
"q": {sql},
|
"q": {sql},
|
||||||
}
|
}
|
||||||
|
|
||||||
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
|
return client.getF2TVsResp(ctx, reqData, "csv")
|
||||||
if err != nil {
|
|
||||||
return f2tvs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
|
||||||
for f, tvs := range f2tvs {
|
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) {
|
func (client *influxClient) GetSSUPointDurationData(ctx context.Context, req *Request) ([]TV, error) {
|
||||||
|
|
@ -149,17 +139,7 @@ func (client *influxClient) GetSSUPointsDurationData(ctx context.Context, req *R
|
||||||
"q": {sql},
|
"q": {sql},
|
||||||
}
|
}
|
||||||
|
|
||||||
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
|
return client.getF2TVsResp(ctx, reqData, "csv")
|
||||||
if err != nil {
|
|
||||||
return f2tvs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
|
||||||
for f, tvs := range f2tvs {
|
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointAfterLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
func (client *influxClient) GetSSUPointAfterLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
||||||
|
|
@ -183,17 +163,7 @@ func (client *influxClient) GetSSUPointsAfterLimit(ctx context.Context, req *Req
|
||||||
"q": {sql},
|
"q": {sql},
|
||||||
}
|
}
|
||||||
|
|
||||||
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
|
return client.getF2TVsResp(ctx, reqData, "csv")
|
||||||
if err != nil {
|
|
||||||
return f2tvs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
|
||||||
for f, tvs := range f2tvs {
|
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
|
func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Request, limit int) (map[string][]TV, error) {
|
||||||
|
|
@ -203,17 +173,7 @@ func (client *influxClient) GetSSUPointsBeforeLimit(ctx context.Context, req *Re
|
||||||
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)},
|
req.SubPos, req.Table, req.Station, req.MainPos, req.Begin, req.End, limit)},
|
||||||
}
|
}
|
||||||
|
|
||||||
f2tvs, err := client.getF2TVsResp(ctx, reqData, "csv")
|
return client.getF2TVsResp(ctx, reqData, "csv")
|
||||||
if err != nil {
|
|
||||||
return f2tvs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ret := make(map[string][]TV, len(f2tvs))
|
|
||||||
for f, tvs := range f2tvs {
|
|
||||||
ret[strings.Join([]string{req.Station, req.MainPos, f}, ".")] = tvs
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *influxClient) WriteLinesData(ctx context.Context, db string, data []byte) error {
|
func (client *influxClient) WriteLinesData(ctx context.Context, db string, data []byte) error {
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ func (a *Alarm) ConvertToEvent(ip string) (*Event, error) {
|
||||||
OP: ip,
|
OP: ip,
|
||||||
TS: a.AlarmTime,
|
TS: a.AlarmTime,
|
||||||
})
|
})
|
||||||
e.Alarm = a
|
e.Origin = a
|
||||||
}
|
}
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ type Event struct {
|
||||||
From string `bson:"from" json:"from"`
|
From string `bson:"from" json:"from"`
|
||||||
Operations []*operation `bson:"operations" json:"operations"`
|
Operations []*operation `bson:"operations" json:"operations"`
|
||||||
// others
|
// others
|
||||||
Alarm *Alarm `bson:"alarm" json:"alarm"`
|
Origin *Alarm `bson:"origin" json:"origin"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Event) Marshall() ([]byte, error) {
|
func (e *Event) Marshall() ([]byte, error) {
|
||||||
|
|
|
||||||
|
|
@ -45,19 +45,51 @@ func (ds *dataSource) Value() (driver.Value, error) {
|
||||||
return json.Marshal(ds)
|
return json.Marshal(ds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bind struct {
|
||||||
|
Index int `json:"index"`
|
||||||
|
Ratio float64 `json:"ratio"`
|
||||||
|
Polarity int `json:"polarity"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type binding struct {
|
||||||
|
CT bind `json:"ct"`
|
||||||
|
PT bind `json:"pt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *binding) Scan(value any) error {
|
||||||
|
if value == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
bytes, ok := value.([]byte)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("type assertion to []byte failed")
|
||||||
|
}
|
||||||
|
return json.Unmarshal(bytes, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *binding) Value() (driver.Value, error) {
|
||||||
|
return json.Marshal(b)
|
||||||
|
}
|
||||||
|
|
||||||
type measurement struct {
|
type measurement struct {
|
||||||
ID int64 `gorm:"colunmn:id"`
|
ID int64 `gorm:"colunmn:id"`
|
||||||
Tag string `gorm:"column:tag"`
|
Tag string `gorm:"column:tag"`
|
||||||
Size int `gorm:"column:size"`
|
Size int `gorm:"column:size"`
|
||||||
DataSource *dataSource `gorm:"column:data_source;type:jsonb"`
|
DataSource *dataSource `gorm:"column:data_source;type:jsonb"`
|
||||||
|
Binding *binding `gorm:"column:binding;type:josnb"`
|
||||||
// others
|
// others
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChannelSize struct {
|
type ChannelSize struct {
|
||||||
|
Type int
|
||||||
Station string
|
Station string
|
||||||
Device string
|
Device string
|
||||||
Channel string
|
Channel string
|
||||||
Size int
|
Size int
|
||||||
|
IRatio float64
|
||||||
|
IPolar int
|
||||||
|
URatio float64
|
||||||
|
UPolar int
|
||||||
}
|
}
|
||||||
|
|
||||||
// channel is original
|
// channel is original
|
||||||
|
|
@ -200,10 +232,26 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
|
||||||
}
|
}
|
||||||
|
|
||||||
channelSize := ChannelSize{
|
channelSize := ChannelSize{
|
||||||
|
Type: 1,
|
||||||
Station: station,
|
Station: station,
|
||||||
Device: device,
|
Device: device,
|
||||||
Channel: channel,
|
Channel: channel,
|
||||||
Size: record.Size,
|
Size: record.Size,
|
||||||
|
IRatio: 1,
|
||||||
|
IPolar: 1,
|
||||||
|
URatio: 1,
|
||||||
|
UPolar: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
if record.Binding != nil {
|
||||||
|
if record.Binding.CT.Index > 0 {
|
||||||
|
channelSize.IRatio = record.Binding.CT.Ratio
|
||||||
|
channelSize.IPolar = record.Binding.CT.Polarity
|
||||||
|
}
|
||||||
|
if record.Binding.PT.Index > 0 {
|
||||||
|
channelSize.URatio = record.Binding.PT.Ratio
|
||||||
|
channelSize.UPolar = record.Binding.PT.Polarity
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelSizes.Store(record.Tag, channelSize)
|
ChannelSizes.Store(record.Tag, channelSize)
|
||||||
|
|
|
||||||
|
|
@ -78,13 +78,13 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan
|
||||||
// }
|
// }
|
||||||
|
|
||||||
for f, tvs := range f2tvs {
|
for f, tvs := range f2tvs {
|
||||||
sdf := strings.Split(f, ".")
|
key := genRedisPhasorKey(channelSize.Station, channelSize.Device, f)
|
||||||
if len(sdf) != 3 {
|
|
||||||
log.Error("invalid influx field")
|
for i := range tvs {
|
||||||
return
|
tvs[i].Value = ToPrimary(f, tvs[i].Value,
|
||||||
|
channelSize.IRatio, channelSize.URatio, channelSize.IPolar)
|
||||||
}
|
}
|
||||||
|
|
||||||
key := genRedisPhasorKey(sdf[0], sdf[1], sdf[2])
|
|
||||||
members := convertTVsToMenmbers(tvs)
|
members := convertTVsToMenmbers(tvs)
|
||||||
ssuChan <- zUnit{
|
ssuChan <- zUnit{
|
||||||
Key: key,
|
Key: key,
|
||||||
|
|
|
||||||
1
go.mod
1
go.mod
|
|
@ -5,6 +5,7 @@ go 1.24.0
|
||||||
require (
|
require (
|
||||||
github.com/gin-gonic/gin v1.11.0
|
github.com/gin-gonic/gin v1.11.0
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
|
github.com/gorilla/websocket v1.5.3
|
||||||
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0
|
github.com/rabbitmq/rabbitmq-amqp-go-client v0.2.0
|
||||||
github.com/redis/go-redis/v9 v9.14.0
|
github.com/redis/go-redis/v9 v9.14.0
|
||||||
go.mongodb.org/mongo-driver/v2 v2.3.0
|
go.mongodb.org/mongo-driver/v2 v2.3.0
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -52,6 +52,8 @@ github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J
|
||||||
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||||
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type command struct {
|
type command struct {
|
||||||
Command string `json:"command"`
|
Command int `json:"command"`
|
||||||
Timeout int64 `json:"timeout"`
|
Timeout int64 `json:"timeout"`
|
||||||
Args []any `json:"args"`
|
Args []any `json:"args"`
|
||||||
}
|
}
|
||||||
|
|
@ -49,7 +49,7 @@ func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, er
|
||||||
return req, errors.New("invalid body param")
|
return req, errors.New("invalid body param")
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Command != "GenSSU2ChannelSizes" {
|
if req.Command != 1 {
|
||||||
return nil, errors.New("invalid command")
|
return nil, errors.New("invalid command")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,19 +26,19 @@ func (a *Api) GetPoints(ctx *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var data map[string][]influx.TV
|
var ftvs map[string][]influx.TV
|
||||||
switch {
|
switch {
|
||||||
case request.Begin > 0 && request.End > 0:
|
case request.Begin > 0 && request.End > 0:
|
||||||
data, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request)
|
ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request)
|
||||||
|
|
||||||
case request.Begin > 0 && request.End < 0:
|
case request.Begin > 0 && request.End < 0:
|
||||||
data, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1)
|
ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1)
|
||||||
|
|
||||||
case request.Begin < 0 && request.End > 0:
|
case request.Begin < 0 && request.End > 0:
|
||||||
data, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1)
|
ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
data, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1)
|
ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -52,10 +52,21 @@ func (a *Api) GetPoints(ctx *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle key value
|
||||||
|
v, _ := postgres.ChannelSizes.Load(request.Tags[len(request.Tags)-1])
|
||||||
|
if channelSize, ok := v.(postgres.ChannelSize); ok {
|
||||||
|
for f, tvs := range ftvs {
|
||||||
|
for i := range tvs {
|
||||||
|
tvs[i].Value = data.ToPrimary(f, tvs[i].Value,
|
||||||
|
channelSize.IRatio, channelSize.URatio, channelSize.IPolar)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ctx.JSON(200, gin.H{
|
ctx.JSON(200, gin.H{
|
||||||
"code": 0,
|
"code": 0,
|
||||||
"msg": "success",
|
"msg": "success",
|
||||||
"data": data,
|
"data": ftvs,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,8 +79,8 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er
|
||||||
|
|
||||||
station, mainPos, subPos := "", "", ""
|
station, mainPos, subPos := "", "", ""
|
||||||
|
|
||||||
mtag := ctx.DefaultQuery("mtag", "")
|
tags := strings.Split(ctx.DefaultQuery("mtag", ""), ".")
|
||||||
v, ok := postgres.ChannelSizes.Load(mtag)
|
v, ok := postgres.ChannelSizes.Load(tags[len(tags)-1])
|
||||||
if ok {
|
if ok {
|
||||||
if channelSize, ok := v.(postgres.ChannelSize); ok {
|
if channelSize, ok := v.(postgres.ChannelSize); ok {
|
||||||
fields := data.GenPhasorFields(channelSize.Channel)
|
fields := data.GenPhasorFields(channelSize.Channel)
|
||||||
|
|
@ -79,20 +90,7 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er
|
||||||
subPos = strings.Join(fields, ",")
|
subPos = strings.Join(fields, ",")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
station = ctx.DefaultQuery("station", "")
|
return nil, errors.New("invalid mtag")
|
||||||
if len(station) <= 0 {
|
|
||||||
return nil, errors.New("invalid station")
|
|
||||||
}
|
|
||||||
|
|
||||||
mainPos = ctx.DefaultQuery("main_pos", "")
|
|
||||||
if len(mainPos) <= 0 {
|
|
||||||
return nil, errors.New("invalid main_pos")
|
|
||||||
}
|
|
||||||
|
|
||||||
subPos = ctx.DefaultQuery("sub_pos", "")
|
|
||||||
if len(subPos) <= 0 {
|
|
||||||
return nil, errors.New("invalid sub_pos")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
beginStr := ctx.DefaultQuery("begin", "")
|
beginStr := ctx.DefaultQuery("begin", "")
|
||||||
|
|
@ -122,6 +120,8 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er
|
||||||
return &influx.Request{
|
return &influx.Request{
|
||||||
DB: bucket,
|
DB: bucket,
|
||||||
Table: measure,
|
Table: measure,
|
||||||
|
Tags: tags,
|
||||||
|
|
||||||
Type: typeStr,
|
Type: typeStr,
|
||||||
Station: station,
|
Station: station,
|
||||||
MainPos: mainPos,
|
MainPos: mainPos,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,231 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"datart/data/cl104"
|
||||||
|
"datart/log"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
pongWait = 60 * time.Second
|
||||||
|
pingPeriod = (pongWait * 9) / 10
|
||||||
|
writeWait = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type upSession struct {
|
||||||
|
conn *websocket.Conn
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
ctrlCh chan wsMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
type wsMessage struct {
|
||||||
|
mt int
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
var upConnNum int64
|
||||||
|
|
||||||
|
func (w *Ws) Cl104Up(ctx *gin.Context) {
|
||||||
|
if atomic.SwapInt64(&upConnNum, 1) > 0 {
|
||||||
|
ctx.JSON(http.StatusConflict, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
stopCtx, stopCancel := context.WithCancel(ctx.Request.Context())
|
||||||
|
defer stopCancel()
|
||||||
|
|
||||||
|
session := &upSession{
|
||||||
|
conn: conn,
|
||||||
|
ctx: stopCtx,
|
||||||
|
cancel: stopCancel,
|
||||||
|
ctrlCh: make(chan wsMessage, 16),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := setUpSessionConn104(session); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
startUpWorkers(session)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeControl104(session *upSession, mt int, payload []byte) error {
|
||||||
|
select {
|
||||||
|
case <-session.ctx.Done():
|
||||||
|
return session.ctx.Err()
|
||||||
|
case session.ctrlCh <- wsMessage{mt: mt, data: payload}:
|
||||||
|
return nil
|
||||||
|
case <-time.After(writeWait):
|
||||||
|
return errors.New("write control timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setUpSessionConn104(session *upSession) error {
|
||||||
|
session.conn.SetPongHandler(func(appData string) error {
|
||||||
|
session.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
session.conn.SetPingHandler(func(appData string) error {
|
||||||
|
if err := writeControl104(session, websocket.PongMessage, []byte(appData)); err != nil {
|
||||||
|
session.cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
session.conn.SetCloseHandler(func(code int, text string) error {
|
||||||
|
session.cancel()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return session.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
|
}
|
||||||
|
|
||||||
|
func startUpWorkers(session *upSession) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
monitorUpWrite(session)
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
monitorUpRead(session)
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
sendPing104(session)
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
atomic.SwapInt64(&upConnNum, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func monitorUpWrite(session *upSession) {
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-session.ctx.Done():
|
||||||
|
return
|
||||||
|
case ctrl := <-session.ctrlCh:
|
||||||
|
err = session.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
|
if err != nil {
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = session.conn.WriteControl(ctrl.mt, ctrl.data, time.Now().Add(writeWait))
|
||||||
|
if err != nil {
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case msg := <-cl104.UpChan:
|
||||||
|
err = session.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
|
if err != nil {
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = session.conn.WriteMessage(websocket.TextMessage, msg)
|
||||||
|
if err != nil {
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func monitorUpRead(session *upSession) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-session.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
mt, rm, err := session.conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if ce, ok := err.(*websocket.CloseError); ok {
|
||||||
|
log.Infof("client closed with code:", ce.Code, "text:", ce.Text)
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Error("server read:", err)
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch mt {
|
||||||
|
case websocket.TextMessage:
|
||||||
|
// TODO
|
||||||
|
// if not type cl104.Msg?
|
||||||
|
msg := new(cl104.Msg)
|
||||||
|
err := json.Unmarshal(rm, msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("server unmarshal:", err)
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if ch, ok := cl104.CA2Chan[msg.CA]; ok {
|
||||||
|
select {
|
||||||
|
case ch <- rm:
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
log.Error("drop msg:", msg)
|
||||||
|
}
|
||||||
|
} else if ch, ok := cl104.CA2Chan[0]; ok {
|
||||||
|
select {
|
||||||
|
case ch <- rm:
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
log.Error("drop msg:", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Info("not text:", string(rm))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendPing104(session *upSession) {
|
||||||
|
|
||||||
|
ticker := time.NewTicker(pingPeriod)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-session.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
err := writeControl104(session, websocket.PingMessage, nil)
|
||||||
|
if err != nil {
|
||||||
|
session.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Ws struct{}
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 4096,
|
||||||
|
WriteBufferPool: &sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
return make([]byte, 4096)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue