query and update 104
This commit is contained in:
parent
a4711c553b
commit
345f6d4a3d
|
|
@ -28,6 +28,7 @@ func (p *Processes) StartDataProcessing() {
|
|||
}
|
||||
|
||||
updatingRedisPhasor(ctx)
|
||||
updatingRedisCL104(ctx)
|
||||
|
||||
cl104.ConnectCLs()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,61 @@
|
|||
package influx
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
db104 = "influxBucket"
|
||||
tb104 = "cl104"
|
||||
)
|
||||
|
||||
const (
|
||||
mainPos104Key = "ca"
|
||||
subPos104Key = "ioa"
|
||||
val104Key = "val"
|
||||
)
|
||||
|
||||
func Get104PointLast(ctx context.Context, req *Request) ([]TV, error) {
|
||||
req.Begin = time.Now().UnixMilli() - int64(15*60*1000)
|
||||
return client.Get104PointLastLimit(ctx, req, 1)
|
||||
}
|
||||
|
||||
func Get104PointDuration(ctx context.Context, req *Request) ([]TV, error) {
|
||||
return client.Get104PointLastLimit(ctx, req, 1)
|
||||
}
|
||||
|
||||
func (client *influxClient) Get104PointLastLimit(ctx context.Context, req *Request, limit int) ([]TV, error) {
|
||||
sql := fmt.Sprintf("select last(%s) as %s from %s where station='%s' and %s='%s' and %s='%s';",
|
||||
val104Key, val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos)
|
||||
if limit > 1 {
|
||||
sql = fmt.Sprintf("select %s from %s where station='%s' and and %s='%s' and %s='%s' and time>=%dms order by time desc limit %d;",
|
||||
val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, limit)
|
||||
}
|
||||
|
||||
reqData := url.Values{
|
||||
"db": {req.DB},
|
||||
"q": {sql},
|
||||
}
|
||||
|
||||
return client.getTVsResp(ctx, reqData, "csv")
|
||||
}
|
||||
|
||||
func (client *influxClient) Get104PointDurationData(ctx context.Context, req *Request) ([]TV, error) {
|
||||
sql := fmt.Sprintf("select %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms;",
|
||||
val104Key, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End)
|
||||
|
||||
if req.Operate != "" && req.Step != "" && req.Default != "" {
|
||||
sql = fmt.Sprintf("select %s(%s) as %s from %s where station='%s' and %s='%s' and %s='%s' and time>=%dms and time<=%dms group by time(%s) fill(%s);",
|
||||
req.Operate, req.SubPos, req.SubPos, req.Table, req.Station, mainPos104Key, req.MainPos, subPos104Key, req.SubPos, req.Begin, req.End, req.Step, req.Default)
|
||||
}
|
||||
|
||||
reqData := url.Values{
|
||||
"db": {req.DB},
|
||||
"q": {sql},
|
||||
}
|
||||
|
||||
return client.getTVsResp(ctx, reqData, "csv")
|
||||
}
|
||||
|
|
@ -258,7 +258,7 @@ func convertJsonToF2TVs(cols []string, data [][]any) (map[string][]TV, error) {
|
|||
func convertCsvToTVs(data [][]string) ([]TV, error) {
|
||||
ret := make([]TV, 0, len(data))
|
||||
|
||||
for _, row := range data[1:] {
|
||||
for _, row := range data {
|
||||
if len(row) > 3 {
|
||||
ns, err := strconv.ParseInt(row[2], 10, 64)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -58,6 +58,8 @@ func GetDB(tp string) (string, error) {
|
|||
return dbphasor, nil
|
||||
case "sample":
|
||||
return dbsample, nil
|
||||
case "104":
|
||||
return db104, nil
|
||||
}
|
||||
|
||||
return "", errors.New("invalid type")
|
||||
|
|
@ -78,6 +80,8 @@ func GetTable(tp string, mainPos string) (string, error) {
|
|||
}
|
||||
case "sample":
|
||||
return "sample", nil
|
||||
case "104":
|
||||
return tb104, nil
|
||||
}
|
||||
|
||||
return "", errors.New("invalid type")
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
dbphasor = "ssuBucket"
|
||||
dbsample = "ssuBucket"
|
||||
dbphasor = "influxBucket"
|
||||
dbsample = "influxBucket"
|
||||
)
|
||||
|
||||
// keep consistent with telegraf
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
|
@ -83,8 +84,8 @@ type measurement struct {
|
|||
type ChannelSize struct {
|
||||
Type int
|
||||
Station string
|
||||
Device string
|
||||
Channel string
|
||||
MainPos string
|
||||
SubPos string
|
||||
Size int
|
||||
IRatio float64
|
||||
IPolar int
|
||||
|
|
@ -234,8 +235,8 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
|
|||
channelSize := ChannelSize{
|
||||
Type: 1,
|
||||
Station: station,
|
||||
Device: device,
|
||||
Channel: channel,
|
||||
MainPos: device,
|
||||
SubPos: channel,
|
||||
Size: record.Size,
|
||||
IRatio: 1,
|
||||
IPolar: 1,
|
||||
|
|
@ -262,7 +263,52 @@ func genMappingFromAddr(ssu2ChannelSizes map[string][]ChannelSize,
|
|||
}
|
||||
|
||||
case 2:
|
||||
if rawAddr, ok := record.DataSource.Addr.(map[string]any); ok {
|
||||
station, ok := rawAddr["station"].(string)
|
||||
if !ok {
|
||||
return errors.New("invalid station")
|
||||
}
|
||||
pack, ok := rawAddr["packet"].(float64)
|
||||
if !ok {
|
||||
return errors.New("invalid packet")
|
||||
}
|
||||
offset, ok := rawAddr["offset"].(float64)
|
||||
if !ok {
|
||||
return errors.New("invalid offset")
|
||||
}
|
||||
|
||||
mainPos := strconv.Itoa(int(pack))
|
||||
subPos := strconv.Itoa(int(offset))
|
||||
|
||||
if _, ok := ChannelSizes.Load(record.Tag); !ok {
|
||||
channelSize := ChannelSize{
|
||||
Type: 2,
|
||||
Station: station,
|
||||
MainPos: mainPos,
|
||||
SubPos: subPos,
|
||||
Size: record.Size,
|
||||
IRatio: 1,
|
||||
IPolar: 1,
|
||||
URatio: 1,
|
||||
UPolar: 1,
|
||||
}
|
||||
|
||||
if record.Binding != nil {
|
||||
if record.Binding.CT.Index > 0 { // depends on value of index
|
||||
channelSize.IRatio = record.Binding.CT.Ratio
|
||||
channelSize.IPolar = record.Binding.CT.Polarity
|
||||
}
|
||||
if record.Binding.PT.Index > 0 { // depends on value of index
|
||||
channelSize.URatio = record.Binding.PT.Ratio
|
||||
channelSize.UPolar = record.Binding.PT.Polarity
|
||||
}
|
||||
}
|
||||
|
||||
ChannelSizes.Store(record.Tag, channelSize)
|
||||
}
|
||||
} else {
|
||||
return errors.New("invalid io_address")
|
||||
}
|
||||
default:
|
||||
return errors.New("invalid data_source.type")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,124 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"context"
|
||||
"datart/data/influx"
|
||||
"datart/data/postgres"
|
||||
"datart/data/redis"
|
||||
"datart/log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
workerNum = 5
|
||||
)
|
||||
|
||||
const (
|
||||
update104Duration time.Duration = time.Second * 1
|
||||
)
|
||||
|
||||
type cl104Q struct {
|
||||
Station string
|
||||
CA string
|
||||
IOA string
|
||||
// Begin int64
|
||||
// End int64
|
||||
}
|
||||
|
||||
type workerPool struct {
|
||||
workerNum int
|
||||
wg sync.WaitGroup
|
||||
qChan chan cl104Q
|
||||
}
|
||||
|
||||
var cl104Pool *workerPool
|
||||
|
||||
func init() {
|
||||
cl104Pool = newWorkerPool(workerNum)
|
||||
}
|
||||
|
||||
func newWorkerPool(workerNum int) *workerPool {
|
||||
return &workerPool{
|
||||
workerNum: workerNum,
|
||||
qChan: make(chan cl104Q, 128),
|
||||
}
|
||||
}
|
||||
|
||||
func updatingRedisCL104(ctx context.Context) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(update104Duration):
|
||||
postgres.ChannelSizes.Range(func(key, value any) bool {
|
||||
if channelSize, ok := value.(postgres.ChannelSize); ok && channelSize.Type == 2 {
|
||||
cl104Pool.qChan <- cl104Q{
|
||||
Station: channelSize.Station,
|
||||
CA: channelSize.MainPos,
|
||||
IOA: channelSize.SubPos,
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for range workerNum {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
q := <-cl104Pool.qChan
|
||||
|
||||
tvs, err := cl104Pool.queryInflux104(ctx, q)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
cl104Pool.refreshRedis(ctx, q, tvs)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (wp *workerPool) queryInflux104(ctx context.Context, q cl104Q) ([]influx.TV, error) {
|
||||
db, err := influx.GetDB("104")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tb, err := influx.GetTable("104", q.CA)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tvs, err := influx.Get104PointLast(ctx, &influx.Request{
|
||||
DB: db,
|
||||
Table: tb,
|
||||
Type: "104",
|
||||
Station: q.Station,
|
||||
MainPos: q.CA,
|
||||
SubPos: q.IOA,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tvs, nil
|
||||
}
|
||||
|
||||
func (wp *workerPool) refreshRedis(ctx context.Context, q cl104Q, tvs []influx.TV) {
|
||||
key := genRedis104Key(q.Station, q.CA, q.IOA)
|
||||
|
||||
members := convertTVsToMenmbers(tvs)
|
||||
|
||||
redis.ZAtomicReplace(ctx, key, members)
|
||||
}
|
||||
|
||||
func genRedis104Key(station string, ca string, ioa string) string {
|
||||
return station + ":104:" + ca + ":" + ioa
|
||||
}
|
||||
|
|
@ -19,7 +19,7 @@ func updatingRedisPhasor(ctx context.Context) {
|
|||
ssuType := config.Conf().ServerConf().GetSSUType()
|
||||
ssuChans := make(map[string]chan zUnit, len(ssuType))
|
||||
for ssu := range ssuType {
|
||||
ssuChans[ssu] = make(chan zUnit, 32)
|
||||
ssuChans[ssu] = make(chan zUnit, 64)
|
||||
}
|
||||
|
||||
go queringSSUInfluxPhasor(ctx, ssuChans)
|
||||
|
|
@ -65,8 +65,8 @@ func updatingSSURedisZUnit(ctx context.Context, ssuChans map[string]chan zUnit)
|
|||
}
|
||||
|
||||
func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan chan zUnit) {
|
||||
fields := GenPhasorFields(channelSize.Channel)
|
||||
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.Device,
|
||||
fields := GenPhasorFields(channelSize.SubPos)
|
||||
f2tvs, err := queryInfluxPhasor(ctx, channelSize.Station, channelSize.MainPos,
|
||||
fields, channelSize.Size)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
|
@ -78,7 +78,7 @@ func sendSSUZUnit(ctx context.Context, channelSize postgres.ChannelSize, ssuChan
|
|||
// }
|
||||
|
||||
for f, tvs := range f2tvs {
|
||||
key := genRedisPhasorKey(channelSize.Station, channelSize.Device, f)
|
||||
key := genRedisPhasorKey(channelSize.Station, channelSize.MainPos, f)
|
||||
|
||||
for i := range tvs {
|
||||
tvs[i].Value = ToPrimary(f, tvs[i].Value,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package ws
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -33,9 +33,19 @@ type wsMessage struct {
|
|||
data []byte
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
WriteBufferPool: &sync.Pool{
|
||||
New: func() any {
|
||||
return make([]byte, 1024)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var upConnNum int64
|
||||
|
||||
func (w *Ws) Cl104Up(ctx *gin.Context) {
|
||||
func (a *Api) Cl104Up(ctx *gin.Context) {
|
||||
if atomic.SwapInt64(&upConnNum, 1) > 0 {
|
||||
ctx.JSON(http.StatusConflict, nil)
|
||||
return
|
||||
|
|
@ -58,15 +68,15 @@ func (w *Ws) Cl104Up(ctx *gin.Context) {
|
|||
ctrlCh: make(chan wsMessage, 16),
|
||||
}
|
||||
|
||||
if err := setUpSessionConn104(session); err != nil {
|
||||
if err := setUpSessionConn104Up(session); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
startUpWorkers(session)
|
||||
start104UpWorkers(session)
|
||||
}
|
||||
|
||||
func writeControl104(session *upSession, mt int, payload []byte) error {
|
||||
func writeControl104Up(session *upSession, mt int, payload []byte) error {
|
||||
select {
|
||||
case <-session.ctx.Done():
|
||||
return session.ctx.Err()
|
||||
|
|
@ -77,14 +87,14 @@ func writeControl104(session *upSession, mt int, payload []byte) error {
|
|||
}
|
||||
}
|
||||
|
||||
func setUpSessionConn104(session *upSession) error {
|
||||
func setUpSessionConn104Up(session *upSession) error {
|
||||
session.conn.SetPongHandler(func(appData string) error {
|
||||
session.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
|
||||
session.conn.SetPingHandler(func(appData string) error {
|
||||
if err := writeControl104(session, websocket.PongMessage, []byte(appData)); err != nil {
|
||||
if err := writeControl104Up(session, websocket.PongMessage, []byte(appData)); err != nil {
|
||||
session.cancel()
|
||||
return err
|
||||
}
|
||||
|
|
@ -99,32 +109,32 @@ func setUpSessionConn104(session *upSession) error {
|
|||
return session.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
}
|
||||
|
||||
func startUpWorkers(session *upSession) {
|
||||
func start104UpWorkers(session *upSession) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
monitorUpWrite(session)
|
||||
monitor104UpWrite(session)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
monitorUpRead(session)
|
||||
monitor104UpRead(session)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
sendPing104(session)
|
||||
sendPing104Up(session)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
atomic.SwapInt64(&upConnNum, 0)
|
||||
}
|
||||
|
||||
func monitorUpWrite(session *upSession) {
|
||||
func monitor104UpWrite(session *upSession) {
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
|
|
@ -156,7 +166,7 @@ func monitorUpWrite(session *upSession) {
|
|||
}
|
||||
}
|
||||
|
||||
func monitorUpRead(session *upSession) {
|
||||
func monitor104UpRead(session *upSession) {
|
||||
for {
|
||||
select {
|
||||
case <-session.ctx.Done():
|
||||
|
|
@ -211,7 +221,7 @@ func monitorUpRead(session *upSession) {
|
|||
}
|
||||
}
|
||||
|
||||
func sendPing104(session *upSession) {
|
||||
func sendPing104Up(session *upSession) {
|
||||
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
defer ticker.Stop()
|
||||
|
|
@ -221,7 +231,7 @@ func sendPing104(session *upSession) {
|
|||
case <-session.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := writeControl104(session, websocket.PingMessage, nil)
|
||||
err := writeControl104Up(session, websocket.PingMessage, nil)
|
||||
if err != nil {
|
||||
session.cancel()
|
||||
return
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package admin
|
||||
package api
|
||||
|
||||
import (
|
||||
"datart/data/postgres"
|
||||
|
|
@ -15,7 +15,7 @@ type command struct {
|
|||
Args []any `json:"args"`
|
||||
}
|
||||
|
||||
func (a *Admin) PostExecuteCommand(ctx *gin.Context) {
|
||||
func (a *Api) PostExecuteCommand(ctx *gin.Context) {
|
||||
req, err := a.checkAndGenExecuteCommandRequest(ctx)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
|
@ -42,7 +42,7 @@ func (a *Admin) PostExecuteCommand(ctx *gin.Context) {
|
|||
})
|
||||
}
|
||||
|
||||
func (a *Admin) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, error) {
|
||||
func (a *Api) checkAndGenExecuteCommandRequest(ctx *gin.Context) (*command, error) {
|
||||
req := new(command)
|
||||
err := ctx.ShouldBindJSON(req)
|
||||
if err != nil {
|
||||
|
|
@ -27,18 +27,30 @@ func (a *Api) GetPoints(ctx *gin.Context) {
|
|||
}
|
||||
|
||||
var ftvs map[string][]influx.TV
|
||||
switch {
|
||||
case request.Begin > 0 && request.End > 0:
|
||||
ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request)
|
||||
switch request.Type {
|
||||
case "104":
|
||||
var tvs []influx.TV
|
||||
switch {
|
||||
case request.Begin > 0 && request.End > 0:
|
||||
tvs, err = influx.Get104PointDuration(ctx.Request.Context(), request)
|
||||
default:
|
||||
tvs, err = influx.Get104PointLast(ctx.Request.Context(), request)
|
||||
}
|
||||
ftvs = map[string][]influx.TV{strings.Join(request.Tags, "."): tvs}
|
||||
case "phasor":
|
||||
switch {
|
||||
case request.Begin > 0 && request.End > 0:
|
||||
ftvs, err = influx.GetSSUPointsDurationData(ctx.Request.Context(), request)
|
||||
|
||||
case request.Begin > 0 && request.End < 0:
|
||||
ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1)
|
||||
case request.Begin > 0 && request.End < 0:
|
||||
ftvs, err = influx.GetSSUPointsAfterLimit(ctx.Request.Context(), request, 1)
|
||||
|
||||
case request.Begin < 0 && request.End > 0:
|
||||
ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1)
|
||||
case request.Begin < 0 && request.End > 0:
|
||||
ftvs, err = influx.GetSSUPointsBeforeLimit(ctx.Request.Context(), request, 1)
|
||||
|
||||
default:
|
||||
ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1)
|
||||
default:
|
||||
ftvs, err = influx.GetSSUPointsLastLimit(ctx.Request.Context(), request, 1)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -83,10 +95,16 @@ func (a *Api) checkAndGenGetPointsRequest(ctx *gin.Context) (*influx.Request, er
|
|||
v, ok := postgres.ChannelSizes.Load(tags[len(tags)-1])
|
||||
if ok {
|
||||
if channelSize, ok := v.(postgres.ChannelSize); ok {
|
||||
fields := data.GenPhasorFields(channelSize.Channel)
|
||||
var fields []string
|
||||
if typeStr == "phasor" {
|
||||
fields = data.GenPhasorFields(channelSize.SubPos)
|
||||
}
|
||||
if typeStr == "104" {
|
||||
fields = []string{channelSize.SubPos}
|
||||
}
|
||||
|
||||
station = channelSize.Station
|
||||
mainPos = channelSize.Device
|
||||
mainPos = channelSize.MainPos
|
||||
subPos = strings.Join(fields, ",")
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -1,9 +1,7 @@
|
|||
package route
|
||||
|
||||
import (
|
||||
"datart/route/admin"
|
||||
"datart/route/api"
|
||||
"datart/route/ws"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
|
@ -17,12 +15,6 @@ func LoadRoute(engine *gin.Engine) {
|
|||
ga.GET("/points", a.GetPoints)
|
||||
ga.GET("/events", a.GetEvents)
|
||||
ga.POST("/events", a.PostUpsertEvents)
|
||||
|
||||
d := new(admin.Admin)
|
||||
gd := engine.Group("admin")
|
||||
gd.POST("/command", d.PostExecuteCommand)
|
||||
|
||||
w := new(ws.Ws)
|
||||
gw := engine.Group("ws")
|
||||
gw.GET("/104up", w.Cl104Up)
|
||||
ga.POST("/command", a.PostExecuteCommand)
|
||||
ga.GET("/104up", a.Cl104Up)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,19 +0,0 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type Ws struct{}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
WriteBufferPool: &sync.Pool{
|
||||
New: func() any {
|
||||
return make([]byte, 4096)
|
||||
},
|
||||
},
|
||||
}
|
||||
Loading…
Reference in New Issue