refactor: extend TypedMap and migrate MeasComputeState onto it

- add LoadOrStore, Len, and All (range-over-func) to util.TypedMap
  - embed util.TypedMap in MeasComputeState, dropping its hand-written
    sync.Map wrappers and per-call-site type assertions
  - iterate graphOverview via All() instead of Range in PrintGrapMap
  - remove unused Set/Comparer/OrderedSet/HashSet code from redis_zset.go
  - update deploy.md to replace Promtail with Grafana Alloy in the
    observability stack
This commit is contained in:
douxu 2026-06-18 16:06:06 +08:00
parent c82ad773a3
commit ca68cf6c18
5 changed files with 59 additions and 116 deletions

View File

@ -895,7 +895,9 @@ kubectl delete secret modelrt-certs
### 6\. 部署可观测性栈Kubernetes
`Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Promtail + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`
`Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Alloy + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`
> **日志采集器说明:** 集群内的日志采集由 `Grafana Alloy`DaemonSet负责它通过 Kubernetes API 抓取带 `app` label 的 Pod 容器日志,解析 `zap` 输出的 JSON 字段后推送到 `Loki`。Alloy 已**替代**早期的 `Promtail`,两者推送目标(`loki-service:3100`)与标签解析完全一致,**不要同时部署**,否则会导致 Loki 中日志翻倍。
#### 6.1 部署 Jaeger
@ -913,14 +915,16 @@ kubectl apply -f deploy/k8s/loki-deployment.yaml
kubectl apply -f deploy/k8s/loki-service.yaml
```
#### 6.3 部署 Promtail
#### 6.3 部署 Alloy
```bash
kubectl apply -f deploy/k8s/promtail-rbac.yaml
kubectl apply -f deploy/k8s/promtail-configmap.yaml
kubectl apply -f deploy/k8s/promtail-daemonset.yaml
kubectl apply -f deploy/k8s/alloy-rbac.yaml
kubectl apply -f deploy/k8s/alloy-configmap.yaml
kubectl apply -f deploy/k8s/alloy-daemonset.yaml
```
> Alloy 以 DaemonSet 形式在每个节点运行,需要 `ServiceAccount` + `ClusterRole``alloy-rbac.yaml`)授予读取 `nodes/pods/pods/log` 的权限。采集与解析规则定义在 `alloy-configmap.yaml``config.alloy` 中。
#### 6.4 部署 Grafana
```bash
@ -938,9 +942,9 @@ kubectl apply -f deploy/k8s/jaeger-deployment.yaml \
-f deploy/k8s/loki-pvc.yaml \
-f deploy/k8s/loki-deployment.yaml \
-f deploy/k8s/loki-service.yaml \
-f deploy/k8s/promtail-rbac.yaml \
-f deploy/k8s/promtail-configmap.yaml \
-f deploy/k8s/promtail-daemonset.yaml \
-f deploy/k8s/alloy-rbac.yaml \
-f deploy/k8s/alloy-configmap.yaml \
-f deploy/k8s/alloy-daemonset.yaml \
-f deploy/k8s/grafana-configmap.yaml \
-f deploy/k8s/grafana-deployment.yaml \
-f deploy/k8s/grafana-service.yaml
@ -1090,7 +1094,7 @@ kubectl scale deployment --all --replicas=1
kubectl scale statefulset --all --replicas=1
```
> **注意:** DaemonSetPromtail)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/promtail-daemonset.yaml`。
> **注意:** DaemonSetAlloy)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/alloy-daemonset.yaml`。
---
@ -1099,13 +1103,13 @@ kubectl scale statefulset --all --replicas=1
按部署顺序反向删除各服务资源:
```bash
# 可观测性栈Grafana / Promtail / Loki / Jaeger
# 可观测性栈Grafana / Alloy / Loki / Jaeger
kubectl delete -f deploy/k8s/grafana-service.yaml \
-f deploy/k8s/grafana-deployment.yaml \
-f deploy/k8s/grafana-configmap.yaml \
-f deploy/k8s/promtail-daemonset.yaml \
-f deploy/k8s/promtail-configmap.yaml \
-f deploy/k8s/promtail-rbac.yaml \
-f deploy/k8s/alloy-daemonset.yaml \
-f deploy/k8s/alloy-configmap.yaml \
-f deploy/k8s/alloy-rbac.yaml \
-f deploy/k8s/loki-service.yaml \
-f deploy/k8s/loki-deployment.yaml \
-f deploy/k8s/loki-pvc.yaml \

View File

@ -3,8 +3,6 @@ package diagram
import (
"context"
"iter"
"maps"
locker "modelRT/distributedlock"
"modelRT/logger"
@ -70,55 +68,3 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error)
}
return results, nil
}
type Comparer[T any] interface {
Compare(T) int
}
type ComparableComparer[T any] interface {
Compare(T) int
comparable // 直接嵌入 comparable 约束
}
type methodNode[E Comparer[E]] struct {
value E
left *methodNode[E]
right *methodNode[E]
}
type MethodTree[E Comparer[E]] struct {
root *methodNode[E]
}
type OrderedSet[E interface {
comparable
Comparer[E]
}] struct {
tree MethodTree[E]
elements map[E]bool
}
type ComparableOrderedSet[E ComparableComparer[E]] struct {
tree MethodTree[E]
elements map[E]bool
}
type Set[E any] interface {
Insert(E)
Delete(E)
Has(E) bool
All() iter.Seq[E]
}
func InsertAll[E any](set Set[E], seq iter.Seq[E]) {
for v := range seq {
set.Insert(v)
}
}
type HashSet[E comparable] map[E]bool
func (s HashSet[E]) Insert(v E) { s[v] = true }
func (s HashSet[E]) Delete(v E) { delete(s, v) }
func (s HashSet[E]) Has(v E) bool { return s[v] }
func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) }

View File

@ -12,10 +12,9 @@ var graphOverview util.TypedMap[int64, *Graph]
// PrintGrapMap define func of print circuit diagram topologic info data
func PrintGrapMap() {
graphOverview.Range(func(pageID int64, graph *Graph) bool {
for pageID, graph := range graphOverview.All() {
fmt.Println(pageID, graph)
return true
})
}
}
// GetGraphMap define func of get circuit diagram topologic data by pageID

View File

@ -2,7 +2,7 @@
package realtimedata
import (
"sync"
"modelRT/util"
)
// ComputeConfig define struct of measurement computation
@ -19,54 +19,15 @@ type ComputeConfig struct {
Analyzer RealTimeAnalyzer
}
// MeasComputeState define struct of manages the state of measurement computations using sync.Map
// MeasComputeState define struct of manages the state of measurement
// computations. It embeds util.TypedMap to inherit the concurrency-safe,
// type-safe Store/Load/Delete/LoadOrStore/Range/All/Len operations without
// per-call-site type assertions.
type MeasComputeState struct {
measMap sync.Map
util.TypedMap[string, *ComputeConfig]
}
// NewMeasComputeState define func to create and returns a new instance of MeasComputeState
func NewMeasComputeState() *MeasComputeState {
return &MeasComputeState{}
}
// Store define func to store a compute configuration for the specified key
func (m *MeasComputeState) Store(key string, config *ComputeConfig) {
m.measMap.Store(key, config)
}
// Load define func to retrieve the compute configuration for the specified key
func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) {
value, ok := m.measMap.Load(key)
if !ok {
return nil, false
}
return value.(*ComputeConfig), true
}
// Delete define func to remove the compute configuration for the specified key
func (m *MeasComputeState) Delete(key string) {
m.measMap.Delete(key)
}
// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration
func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) {
value, loaded := m.measMap.LoadOrStore(key, config)
return value.(*ComputeConfig), loaded
}
// Range define func to iterate over all key-configuration pairs in the map
func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) {
m.measMap.Range(func(key, value any) bool {
return f(key.(string), value.(*ComputeConfig))
})
}
// Len define func to return the number of compute configurations in the map
func (m *MeasComputeState) Len() int {
count := 0
m.measMap.Range(func(_, _ any) bool {
count++
return true
})
return count
}

View File

@ -1,7 +1,10 @@
// Package util provide some utility functions
package util
import "sync"
import (
"iter"
"sync"
)
// TypedMap define a type-safe generic wrapper around sync.Map.
// It keeps the concurrency guarantees of sync.Map while removing the
@ -26,6 +29,14 @@ func (t *TypedMap[K, V]) Store(key K, value V) {
t.m.Store(key, value)
}
// LoadOrStore define func of return the existing value for key if present.
// Otherwise it stores and returns the given value. loaded reports whether the
// value was already present.
func (t *TypedMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
v, loaded := t.m.LoadOrStore(key, value)
return v.(V), loaded
}
// Swap define func of store value for key and return the previous value (if any).
// loaded reports whether a value was already present for key.
func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) {
@ -49,3 +60,25 @@ func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) {
return f(key.(K), value.(V))
})
}
// Len define func of return the number of key/value pairs currently stored.
// It walks the map, so the cost is O(n).
func (t *TypedMap[K, V]) Len() int {
count := 0
t.m.Range(func(_, _ any) bool {
count++
return true
})
return count
}
// All define func of return an iterator over all key/value pairs,
// usable directly with range-over-func. Iteration stops early if the
// consumer breaks out of the loop.
func (t *TypedMap[K, V]) All() iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
t.m.Range(func(key, value any) bool {
return yield(key.(K), value.(V))
})
}
}