diff --git a/.gitignore b/.gitignore index d69a283..51b9af9 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ go.work .vscode +.idea # Shield all log files in the log folder /log/ # Shield config files in the configs folder diff --git a/constants/monitor.go b/constants/monitor.go index dfc9017..6d68fad 100644 --- a/constants/monitor.go +++ b/constants/monitor.go @@ -27,8 +27,20 @@ const ( SubSuccessMsg = "subscription success" // SubFailedMsg define subscription failed message SubFailedMsg = "subscription failed" - // SubSuccessMsg define cancel subscription success message + // CancelSubSuccessMsg define cancel subscription success message CancelSubSuccessMsg = "cancel subscription success" - // SubFailedMsg define cancel subscription failed message + // CancelSubFailedMsg define cancel subscription failed message CancelSubFailedMsg = "Cancel subscription failed" ) + +// TargetOperationType define constant to the target operation type +type TargetOperationType int + +const ( + // OpAppend define append new target to the monitoring list + OpAppend TargetOperationType = iota + // OpRemove define remove exist target from the monitoring list + OpRemove + // OpUpdate define update exist target from the monitoring list + OpUpdate +) diff --git a/diagram/redis_client.go b/diagram/redis_client.go new file mode 100644 index 0000000..7319b02 --- /dev/null +++ b/diagram/redis_client.go @@ -0,0 +1,46 @@ +// Package diagram provide diagram data structure and operation +package diagram + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +// RedisClient define struct to create redis client +type RedisClient struct { + Client *redis.Client +} + +// NewRedisClient define func of new redis client instance +func NewRedisClient() *RedisClient { + return &RedisClient{ + Client: GetRedisClientInstance(), + } +} + +// QueryByZRangeByLex define func to query real time data from redis zset +func (rc *RedisClient) QueryByZRangeByLex(ctx context.Context, key string, size int64, startTimestamp, stopTimeStamp string) ([]float64, error) { + client := rc.Client + + datas := make([]float64, 0, size) + startStr := "[" + startTimestamp + stopStr := stopTimeStamp + "]" + args := redis.ZRangeArgs{ + Key: key, + Start: startStr, + Stop: stopStr, + ByLex: true, + Rev: false, + Count: size, + } + + members, err := client.ZRangeArgsWithScores(ctx, args).Result() + if err != nil { + return nil, err + } + for data := range members { + datas = append(datas, float64(data)) + } + return datas, nil +} diff --git a/diagram/redis_zset.go b/diagram/redis_zset.go index 6ae46b2..75dfa6e 100644 --- a/diagram/redis_zset.go +++ b/diagram/redis_zset.go @@ -3,7 +3,6 @@ package diagram import ( "context" - "fmt" "iter" "maps" @@ -71,76 +70,6 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error) return results, nil } -func (rs *RedisZSet) FindEventsByTimeRange(ctx context.Context, key string, startTS, endTS string, withScores bool) ([]redis.Z, error) { - // ZRANGEBYLEX 的范围参数必须是字符串,并以 [ 或 ( 开头,或者使用特殊值 - 和 + - // 例如:[173...0000 [173...9999 - min := fmt.Sprintf("[%s", startTS) - max := fmt.Sprintf("[%s", endTS) - - // ZRangeByLexOptions 用于设置查询范围和限制 - opts := redis.ZRangeBy{ - Min: min, // 范围的起始(包含) - Max: max, // 范围的结束(包含) - Offset: 0, // 分页偏移 - Count: -1, // -1 表示不限制数量 - } - - // ZRangeByLex 会返回 Member 列表,它们将按字典序(即时间戳)升序排列 - if withScores { - // return rs.storageClient.ZRangeByLexWithScores(ctx, key, &opts).Result() - var results []redis.Z - return results, nil - } else { - members, err := rs.storageClient.ZRangeByLex(ctx, key, &opts).Result() - if err != nil { - return nil, err - } - // ZRangeByLex 返回 []string,这里转换为 []redis.Z 结构体以保持一致性 - var results []redis.Z - for _, member := range members { - results = append(results, redis.Z{Member: member}) - } - return results, nil - } -} - -// FindLatestEventsByTime 查找最新的 N 个事件(按时间降序) -func (rs *RedisZSet) FindLatestEventsByTime(ctx context.Context, key string, limit int64, withScores bool) ([]redis.Z, error) { - // ZREVRANGEBYLEX 用于按字典序降序(即时间倒序)查找 - - // 对于 ZREVRANGEBYLEX,Min/Max 字段的语义与 ZRANGEBYLEX 相同,但排序是相反的。 - // 为了获取全部,范围设置为 + (Max) 到 - (Min),表示从最大的字符串开始往最小的字符串检索。 - opts := redis.ZRangeBy{ - Min: "-", // 最小的字符串 - Max: "+", // 最大的字符串 - Offset: 0, - Count: limit, // 限制返回的数量 - } - - if withScores { - // ZRevRangeByLexWithScores - members, err := rs.storageClient.ZRangeByLex(ctx, key, &opts).Result() - fmt.Println(err) - var results []redis.Z - for _, member := range members { - results = append(results, redis.Z{Member: member}) - } - return results, nil - // return rs.storageClient.ZRevRangeByLexWithScores(ctx, key, &opts).Result() - } else { - // ZRevRangeByLex - members, err := rs.storageClient.ZRevRangeByLex(ctx, key, &opts).Result() - if err != nil { - return nil, err - } - var results []redis.Z - for _, member := range members { - results = append(results, redis.Z{Member: member}) - } - return results, nil - } -} - type Comparer[T any] interface { Compare(T) int } diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index e4d6034..6c151a5 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -7,8 +7,12 @@ import ( "net/http" "time" + "modelRT/constants" + "modelRT/diagram" "modelRT/logger" + "modelRT/model" "modelRT/network" + "modelRT/util" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" @@ -52,7 +56,9 @@ func PullRealTimeDataHandler(c *gin.Context) { ctx, cancel := context.WithCancel(c.Request.Context()) defer cancel() - fanInChan := make(chan []float64) + + // TODO 考虑数据量庞大时候,channel的缓存大小问题 + fanInChan := make(chan []float64, 10000) go processTargetPolling(ctx, globalMonitorState, clientID, fanInChan) // 主循环:负责接收客户端消息(如心跳包、停止拉取命令等) @@ -133,31 +139,47 @@ func processTargetPolling(ctx context.Context, s *SharedMonitorState, clientID s for interval, componentItems := range config.components { fmt.Println(componentItems) for _, target := range componentItems.targets { - dataSize, exist := componentItems.targetParam[target] + measurement, exist := componentItems.targetParam[target] if !exist { logger.Error(ctx, "can not found subscription node param into param map", "target", target) continue } - stopChan := make(chan struct{}) + queryGStopChan := make(chan struct{}) // store stop channel with target into map - stopChanMap[target] = stopChan - go realTimeDataQueryFromRedis(ctx, interval, dataSize, fanInChan, stopChan) + // TODO 增加二次检查,首先判断target是否存在于stopChanMap中 + stopChanMap[target] = queryGStopChan + queryKey, err := model.GenerateMeasureIdentifier(measurement.DataSource) + if err != nil { + logger.Error(ctx, "generate measurement indentifier by data_source field failed", "data_source", measurement.DataSource, "error", err) + continue + } + go realTimeDataQueryFromRedis(ctx, queryKey, interval, int64(measurement.Size), fanInChan, queryGStopChan) } } s.mutex.RUnlock() - // TODO 持续监听noticeChannel for { select { - case noticeValue, ok := <-config.noticeChan: + case transportTargets, ok := <-config.noticeChan: if !ok { logger.Error(ctx, "notice channel was closed unexpectedly", "clientID", clientID) stopAllPolling(ctx, stopChanMap) return } - // TODO 判断传递数据类型,决定是调用新增函数或者移除函数 - fmt.Println(noticeValue, ok) + switch transportTargets.OperationType { + case constants.OpAppend: + // TODO 考虑精细化锁结果,将RW锁置于ClientID层面之下 + s.mutex.Lock() + defer s.mutex.Unlock() + // TODO 增加 append 函数调用 + fmt.Println(transportTargets.Targets) + case constants.OpRemove: + s.mutex.Lock() + defer s.mutex.Unlock() + // TODO 增加 remove 函数调用 + fmt.Println(transportTargets.Targets) + } case <-ctx.Done(): logger.Info(ctx, fmt.Sprintf("stop all data retrieval goroutines under this clientID:%s", clientID)) stopAllPolling(ctx, stopChanMap) @@ -175,8 +197,7 @@ func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) { return } -// TODO 根据Measure表 datasource 字段从 redis 查询信息 -func realTimeDataQueryFromRedis(ctx context.Context, interval string, dataSize int, fanInChan chan []float64, stopChan chan struct{}) { +func realTimeDataQueryFromRedis(ctx context.Context, queryKey, interval string, dataSize int64, fanInChan chan []float64, stopChan chan struct{}) { duration, err := time.ParseDuration(interval) if err != nil { logger.Error(ctx, "failed to parse the time string", "interval", interval, "error", err) @@ -185,21 +206,23 @@ func realTimeDataQueryFromRedis(ctx context.Context, interval string, dataSize i ticker := time.NewTicker(duration * time.Second) defer ticker.Stop() + client := diagram.NewRedisClient() + startTimestamp := util.GenNanoTsStr() for { select { case <-ticker.C: - // TODO 添加 redis 查询逻辑 - result := make([]float64, dataSize) - select { - case fanInChan <- result: - default: - // TODO 考虑如果 fanInChan 阻塞,避免阻塞查询循环,可以丢弃数据或记录日志 - // logger.Warning("Fan-in channel is full, skipping data push.") + stopTimestamp := util.GenNanoTsStr() + datas, err := client.QueryByZRangeByLex(ctx, queryKey, dataSize, startTimestamp, stopTimestamp) + if err != nil { + logger.Error(ctx, "query real time data from redis failed", "key", queryKey, "error", err) + continue } - + // use end timestamp reset start timestamp + startTimestamp = stopTimestamp + // TODO 考虑如果 fanInChan 阻塞,如何避免阻塞查询循环,是否可以丢弃数据或使用日志记录的方式进行填补 + fanInChan <- datas case <-stopChan: - // TODO 优化日志输出 - logger.Info(ctx, "redis query goroutine have stopped") + logger.Info(ctx, "stop the redis query goroutine via a singal") return } } diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index d0709a8..6c102cb 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -11,6 +11,7 @@ import ( "modelRT/database" "modelRT/logger" "modelRT/network" + "modelRT/orm" "github.com/gin-gonic/gin" "github.com/gofrs/uuid" @@ -205,12 +206,12 @@ func RealTimeSubHandler(c *gin.Context) { // RealTimeMonitorComponent define struct of real time monitor component type RealTimeMonitorComponent struct { targets []string - targetParam map[string]int + targetParam map[string]*orm.Measurement } // RealTimeMonitorConfig define struct of real time monitor config type RealTimeMonitorConfig struct { - noticeChan chan struct{} + noticeChan chan *transportTargets components map[string]*RealTimeMonitorComponent } @@ -242,7 +243,7 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni } config := &RealTimeMonitorConfig{ - noticeChan: make(chan struct{}), + noticeChan: make(chan *transportTargets), components: make(map[string]*RealTimeMonitorComponent), } @@ -266,15 +267,15 @@ func (s *SharedMonitorState) CreateConfig(ctx context.Context, tx *gorm.DB, moni targetProcessResults = append(targetProcessResults, targetResult) if comp, ok := config.components[interval]; !ok { targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]int) - targetParam[target] = targetModel.GetMeasurementInfo().Size + targetParam := make(map[string]*orm.Measurement) + targetParam[target] = targetModel.GetMeasurementInfo() config.components[interval] = &RealTimeMonitorComponent{ targets: append(targets, target), targetParam: targetParam, } } else { comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo().Size + comp.targetParam[target] = targetModel.GetMeasurementInfo() } } } @@ -298,6 +299,10 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, mon return processRealTimeRequestTargets(components, requestTargetsCount, err), err } + transportTargets := &transportTargets{ + OperationType: constants.OpAppend, + Targets: make([]string, requestTargetsCount), + } for _, componentItem := range components { interval := componentItem.Interval for _, target := range componentItem.Targets { @@ -318,19 +323,20 @@ func (s *SharedMonitorState) AppendTargets(ctx context.Context, tx *gorm.DB, mon targetProcessResults = append(targetProcessResults, targetResult) if comp, ok := config.components[interval]; !ok { targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]int) - targetParam[target] = targetModel.GetMeasurementInfo().Size + targetParam := make(map[string]*orm.Measurement) + targetParam[target] = targetModel.GetMeasurementInfo() config.components[interval] = &RealTimeMonitorComponent{ targets: append(targets, target), } } else { comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo().Size + comp.targetParam[target] = targetModel.GetMeasurementInfo() } + transportTargets.Targets = append(transportTargets.Targets, target) } } - // TODO 将增加的订阅配置传递给channel - config.noticeChan <- struct{}{} + + config.noticeChan <- transportTargets return targetProcessResults, nil } @@ -343,7 +349,7 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, mon if !exist { // create new config config = &RealTimeMonitorConfig{ - noticeChan: make(chan struct{}), + noticeChan: make(chan *transportTargets), components: make(map[string]*RealTimeMonitorComponent), } @@ -367,14 +373,14 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, mon targetResult.Msg = constants.SubSuccessMsg if comp, ok := config.components[interval]; !ok { targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]int) - targetParam[target] = targetModel.GetMeasurementInfo().Size + targetParam := make(map[string]*orm.Measurement) + targetParam[target] = targetModel.GetMeasurementInfo() config.components[interval] = &RealTimeMonitorComponent{ targets: append(targets, target), } } else { comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo().Size + comp.targetParam[target] = targetModel.GetMeasurementInfo() } } } @@ -382,7 +388,12 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, mon return targetProcessResults, nil } - targetProcessResults := make([]network.TargetResult, 0, processRealTimeRequestCount(components)) + requestTargetsCount := processRealTimeRequestCount(components) + targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) + transportTargets := &transportTargets{ + OperationType: constants.OpUpdate, + Targets: make([]string, requestTargetsCount), + } for _, componentItem := range components { interval := componentItem.Interval for _, target := range componentItem.Targets { @@ -403,18 +414,20 @@ func (s *SharedMonitorState) UpsertTargets(ctx context.Context, tx *gorm.DB, mon targetProcessResults = append(targetProcessResults, targetResult) if comp, ok := config.components[interval]; !ok { targets := make([]string, 0, len(componentItem.Targets)) - targetParam := make(map[string]int) - targetParam[target] = targetModel.GetMeasurementInfo().Size + targetParam := make(map[string]*orm.Measurement) + targetParam[target] = targetModel.GetMeasurementInfo() config.components[interval] = &RealTimeMonitorComponent{ targets: append(targets, target), } } else { comp.targets = append(comp.targets, target) - comp.targetParam[target] = targetModel.GetMeasurementInfo().Size + comp.targetParam[target] = targetModel.GetMeasurementInfo() } + transportTargets.Targets = append(transportTargets.Targets, target) } } - config.noticeChan <- struct{}{} + + config.noticeChan <- transportTargets return targetProcessResults, nil } @@ -446,6 +459,10 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string } // components is the list of items to be removed passed in the request + transportTargets := &transportTargets{ + OperationType: constants.OpRemove, + Targets: make([]string, requestTargetsCount), + } for _, compent := range components { interval := compent.Interval // comp is the locally running listener configuration @@ -473,6 +490,7 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string if _, found := targetsToRemoveMap[existingTarget]; !found { newTargets = append(newTargets, existingTarget) } else { + transportTargets.Targets = append(transportTargets.Targets, existingTarget) targetResult := network.TargetResult{ ID: existingTarget, Code: constants.CancelSubSuccessCode, @@ -506,11 +524,13 @@ func (s *SharedMonitorState) RemoveTargets(ctx context.Context, monitorID string } } - // TODO 将移除的订阅配置传递给channel - config.noticeChan <- struct{}{} + // pass the removed subscription configuration to the notice channel + config.noticeChan <- transportTargets return targetProcessResults, nil } +// TODO 增加一个update 函数用来更新 interval + func processRealTimeRequestCount(components []network.RealTimeComponentItem) int { totalTargetsCount := 0 for _, compItem := range components { @@ -532,3 +552,10 @@ func processRealTimeRequestTargets(components []network.RealTimeComponentItem, t } return targetProcessResults } + +// transportTargets define struct to transport update or remove target to real +// time pull api +type transportTargets struct { + OperationType constants.TargetOperationType + Targets []string +} diff --git a/model/measurement_model.go b/model/measurement_model.go index 90e4f3a..c5d0925 100644 --- a/model/measurement_model.go +++ b/model/measurement_model.go @@ -17,7 +17,7 @@ type MeasurementDataSource struct { } // IOAddress define interface of IO address -type IOAddress interface{} +type IOAddress any // CL3611Address define CL3611 protol struct type CL3611Address struct { @@ -174,3 +174,78 @@ func (m MeasurementDataSource) GetIOAddress() (IOAddress, error) { return nil, constants.ErrUnknownDataType } } + +// GenerateMeasureIdentifier define func of generate measurement identifier +func GenerateMeasureIdentifier(source map[string]any) (string, error) { + regTypeVal, ok := source["type"] + if !ok { + return "", fmt.Errorf("can not find type in datasource field") + } + + var regType int + switch v := regTypeVal.(type) { + case int: + regType = v + default: + return "", fmt.Errorf("invalid type format in datasource field") + } + + ioAddrVal, ok := source["io_address"] + if !ok { + return "", fmt.Errorf("can not find io_address from datasource field") + } + + ioAddress, ok := ioAddrVal.(map[string]any) + if !ok { + return "", fmt.Errorf("io_address field is not a valid map") + } + + switch regType { + case constants.DataSourceTypeCL3611: + station, ok := ioAddress["station"].(string) + if !ok { + return "", fmt.Errorf("CL3611:invalid or missing station field") + } + device, ok := ioAddress["device"].(string) + if !ok { + return "", fmt.Errorf("CL3611:invalid or missing device field") + } + // 提取 channel (string) + channel, ok := ioAddress["channel"].(string) + if !ok { + return "", fmt.Errorf("CL3611:invalid or missing channel field") + } + return fmt.Sprintf("%s.%s.%s", station, device, channel), nil + case constants.DataSourceTypePower104: + station, ok := ioAddress["station"].(string) + if !ok { + return "", fmt.Errorf("Power104:invalid or missing station field") + } + packetVal, ok := ioAddress["packet"] + if !ok { + return "", fmt.Errorf("Power104: missing packet field") + } + var packet int + switch v := packetVal.(type) { + case int: + packet = v + default: + return "", fmt.Errorf("Power104:invalid packet format") + } + + offsetVal, ok := ioAddress["offset"] + if !ok { + return "", fmt.Errorf("Power104:missing offset field") + } + var offset int + switch v := offsetVal.(type) { + case int: + offset = v + default: + return "", fmt.Errorf("Power104:invalid offset format") + } + return fmt.Sprintf("%s.%d.%d", station, packet, offset), nil + default: + return "", fmt.Errorf("unsupport regulation type %d into datasource field", regType) + } +} diff --git a/model/redis_recommend.go b/model/redis_recommend.go index b01750e..c885cde 100644 --- a/model/redis_recommend.go +++ b/model/redis_recommend.go @@ -59,7 +59,7 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er } if len(results) == 0 { - // TODO 构建 for 循环返回所有可能的补全 + // TODO 考虑使用其他方式代替for 循环退一字节的查询方式 searchInput = searchInput[:len(searchInput)-1] inputLen = len(searchInput) continue @@ -124,7 +124,6 @@ func RedisSearchRecommend(ctx context.Context, input string) ([]string, bool, er return []string{}, false, err } if len(results) == 0 { - // TODO 构建 for 循环返回所有可能的补全 searchInput = input[:inputLen-1] inputLen = len(searchInput) continue @@ -154,7 +153,6 @@ func getAllGridKeys(ctx context.Context, setKey string) ([]string, bool, error) func getSpecificZoneKeys(ctx context.Context, input string) ([]string, bool, error) { setKey := fmt.Sprintf(constants.RedisSpecGridZoneSetKey, input) - // TODO 从redis set 中获取指定 grid 下的 zone key zoneSets := diagram.NewRedisSet(ctx, setKey, 10, true) keys, err := zoneSets.SMembers(setKey) if err != nil { diff --git a/orm/circuit_diagram_measurement.go b/orm/circuit_diagram_measurement.go index fb276a5..1bf6c60 100644 --- a/orm/circuit_diagram_measurement.go +++ b/orm/circuit_diagram_measurement.go @@ -9,17 +9,17 @@ import ( // Measurement structure define abstracted info set of electrical measurement type Measurement struct { - ID int64 `gorm:"column:ID;primaryKey;autoIncrement"` - Tag string `gorm:"column:TAG;size:64;not null;default:''"` - Name string `gorm:"column:NAME;size:64;not null;default:''"` - Type int16 `gorm:"column:TYPE;not null;default:-1"` - Size int `gorm:"column:SIZE;not null;default:-1"` - DataSource map[string]interface{} `gorm:"column:DATA_SOURCE;type:jsonb;not null;default:'{}'"` - EventPlan map[string]interface{} `gorm:"column:EVENT_PLAN;type:jsonb;not null;default:'{}'"` - BayUUID uuid.UUID `gorm:"column:BAY_UUID;type:uuid;not null"` - ComponentUUID uuid.UUID `gorm:"column:COMPONENT_UUID;type:uuid;not null"` - Op int `gorm:"column:OP;not null;default:-1"` - Ts time.Time `gorm:"column:TS;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` + ID int64 `gorm:"column:ID;primaryKey;autoIncrement"` + Tag string `gorm:"column:TAG;size:64;not null;default:''"` + Name string `gorm:"column:NAME;size:64;not null;default:''"` + Type int16 `gorm:"column:TYPE;not null;default:-1"` + Size int `gorm:"column:SIZE;not null;default:-1"` + DataSource map[string]any `gorm:"column:DATA_SOURCE;type:jsonb;not null;default:'{}'"` + EventPlan map[string]any `gorm:"column:EVENT_PLAN;type:jsonb;not null;default:'{}'"` + BayUUID uuid.UUID `gorm:"column:BAY_UUID;type:uuid;not null"` + ComponentUUID uuid.UUID `gorm:"column:COMPONENT_UUID;type:uuid;not null"` + Op int `gorm:"column:OP;not null;default:-1"` + Ts time.Time `gorm:"column:TS;type:timestamptz;not null;default:CURRENT_TIMESTAMP"` } // TableName func respresent return table name of Measurement diff --git a/util/time.go b/util/time.go new file mode 100644 index 0000000..0c800c8 --- /dev/null +++ b/util/time.go @@ -0,0 +1,15 @@ +// Package util provide some utility functions +package util + +import ( + "strconv" + "time" +) + +// GenNanoTsStr define func to generate nanosecond timestamp string by current time +func GenNanoTsStr() string { + now := time.Now() + nanoseconds := now.UnixNano() + timestampStr := strconv.FormatInt(nanoseconds, 10) + return timestampStr +}