diff --git a/deploy/deploy.md b/deploy/deploy.md index 18f14b9..e8f2e15 100644 --- a/deploy/deploy.md +++ b/deploy/deploy.md @@ -895,7 +895,9 @@ kubectl delete secret modelrt-certs ### 6\. 部署可观测性栈(Kubernetes) -在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Promtail + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。 +在 `Kubernetes` 集群中部署 `Jaeger`(链路追踪)+ `Loki + Alloy + Grafana`(日志可视化)。所有资源部署在 `default` 命名空间,`YAML` 文件位于 `deploy/k8s/`。 + +> **日志采集器说明:** 集群内的日志采集由 `Grafana Alloy`(DaemonSet)负责,它通过 Kubernetes API 抓取带 `app` label 的 Pod 容器日志,解析 `zap` 输出的 JSON 字段后推送到 `Loki`。Alloy 已**替代**早期的 `Promtail`,两者推送目标(`loki-service:3100`)与标签解析完全一致,**不要同时部署**,否则会导致 Loki 中日志翻倍。 #### 6.1 部署 Jaeger @@ -913,14 +915,16 @@ kubectl apply -f deploy/k8s/loki-deployment.yaml kubectl apply -f deploy/k8s/loki-service.yaml ``` -#### 6.3 部署 Promtail +#### 6.3 部署 Alloy ```bash -kubectl apply -f deploy/k8s/promtail-rbac.yaml -kubectl apply -f deploy/k8s/promtail-configmap.yaml -kubectl apply -f deploy/k8s/promtail-daemonset.yaml +kubectl apply -f deploy/k8s/alloy-rbac.yaml +kubectl apply -f deploy/k8s/alloy-configmap.yaml +kubectl apply -f deploy/k8s/alloy-daemonset.yaml ``` +> Alloy 以 DaemonSet 形式在每个节点运行,需要 `ServiceAccount` + `ClusterRole`(`alloy-rbac.yaml`)授予读取 `nodes/pods/pods/log` 的权限。采集与解析规则定义在 `alloy-configmap.yaml` 的 `config.alloy` 中。 + #### 6.4 部署 Grafana ```bash @@ -938,9 +942,9 @@ kubectl apply -f deploy/k8s/jaeger-deployment.yaml \ -f deploy/k8s/loki-pvc.yaml \ -f deploy/k8s/loki-deployment.yaml \ -f deploy/k8s/loki-service.yaml \ - -f deploy/k8s/promtail-rbac.yaml \ - -f deploy/k8s/promtail-configmap.yaml \ - -f deploy/k8s/promtail-daemonset.yaml \ + -f deploy/k8s/alloy-rbac.yaml \ + -f deploy/k8s/alloy-configmap.yaml \ + -f deploy/k8s/alloy-daemonset.yaml \ -f deploy/k8s/grafana-configmap.yaml \ -f deploy/k8s/grafana-deployment.yaml \ -f deploy/k8s/grafana-service.yaml @@ -1090,7 +1094,7 @@ kubectl scale deployment --all --replicas=1 kubectl scale statefulset --all --replicas=1 ``` -> **注意:** DaemonSet(Promtail)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/promtail-daemonset.yaml`。 +> **注意:** DaemonSet(Alloy)无法通过 `scale` 停止,如需停用可手动删除其资源:`kubectl delete -f deploy/k8s/alloy-daemonset.yaml`。 --- @@ -1099,13 +1103,13 @@ kubectl scale statefulset --all --replicas=1 按部署顺序反向删除各服务资源: ```bash -# 可观测性栈(Grafana / Promtail / Loki / Jaeger) +# 可观测性栈(Grafana / Alloy / Loki / Jaeger) kubectl delete -f deploy/k8s/grafana-service.yaml \ -f deploy/k8s/grafana-deployment.yaml \ -f deploy/k8s/grafana-configmap.yaml \ - -f deploy/k8s/promtail-daemonset.yaml \ - -f deploy/k8s/promtail-configmap.yaml \ - -f deploy/k8s/promtail-rbac.yaml \ + -f deploy/k8s/alloy-daemonset.yaml \ + -f deploy/k8s/alloy-configmap.yaml \ + -f deploy/k8s/alloy-rbac.yaml \ -f deploy/k8s/loki-service.yaml \ -f deploy/k8s/loki-deployment.yaml \ -f deploy/k8s/loki-pvc.yaml \ diff --git a/diagram/redis_zset.go b/diagram/redis_zset.go index d350b4f..6884448 100644 --- a/diagram/redis_zset.go +++ b/diagram/redis_zset.go @@ -3,8 +3,6 @@ package diagram import ( "context" - "iter" - "maps" locker "modelRT/distributedlock" "modelRT/logger" @@ -70,55 +68,3 @@ func (rs *RedisZSet) ZRANGE(setKey string, start, stop int64) ([]string, error) } return results, nil } - -type Comparer[T any] interface { - Compare(T) int -} - -type ComparableComparer[T any] interface { - Compare(T) int - comparable // 直接嵌入 comparable 约束 -} - -type methodNode[E Comparer[E]] struct { - value E - left *methodNode[E] - right *methodNode[E] -} - -type MethodTree[E Comparer[E]] struct { - root *methodNode[E] -} - -type OrderedSet[E interface { - comparable - Comparer[E] -}] struct { - tree MethodTree[E] - elements map[E]bool -} - -type ComparableOrderedSet[E ComparableComparer[E]] struct { - tree MethodTree[E] - elements map[E]bool -} - -type Set[E any] interface { - Insert(E) - Delete(E) - Has(E) bool - All() iter.Seq[E] -} - -func InsertAll[E any](set Set[E], seq iter.Seq[E]) { - for v := range seq { - set.Insert(v) - } -} - -type HashSet[E comparable] map[E]bool - -func (s HashSet[E]) Insert(v E) { s[v] = true } -func (s HashSet[E]) Delete(v E) { delete(s, v) } -func (s HashSet[E]) Has(v E) bool { return s[v] } -func (s HashSet[E]) All() iter.Seq[E] { return maps.Keys(s) } diff --git a/diagram/topologic_set.go b/diagram/topologic_set.go index 4c96a4a..6413256 100644 --- a/diagram/topologic_set.go +++ b/diagram/topologic_set.go @@ -12,10 +12,9 @@ var graphOverview util.TypedMap[int64, *Graph] // PrintGrapMap define func of print circuit diagram topologic info data func PrintGrapMap() { - graphOverview.Range(func(pageID int64, graph *Graph) bool { + for pageID, graph := range graphOverview.All() { fmt.Println(pageID, graph) - return true - }) + } } // GetGraphMap define func of get circuit diagram topologic data by pageID diff --git a/real-time-data/compute_state_manager.go b/real-time-data/compute_state_manager.go index 33754c4..d0161e7 100644 --- a/real-time-data/compute_state_manager.go +++ b/real-time-data/compute_state_manager.go @@ -2,7 +2,7 @@ package realtimedata import ( - "sync" + "modelRT/util" ) // ComputeConfig define struct of measurement computation @@ -19,54 +19,15 @@ type ComputeConfig struct { Analyzer RealTimeAnalyzer } -// MeasComputeState define struct of manages the state of measurement computations using sync.Map +// MeasComputeState define struct of manages the state of measurement +// computations. It embeds util.TypedMap to inherit the concurrency-safe, +// type-safe Store/Load/Delete/LoadOrStore/Range/All/Len operations without +// per-call-site type assertions. type MeasComputeState struct { - measMap sync.Map + util.TypedMap[string, *ComputeConfig] } // NewMeasComputeState define func to create and returns a new instance of MeasComputeState func NewMeasComputeState() *MeasComputeState { return &MeasComputeState{} } - -// Store define func to store a compute configuration for the specified key -func (m *MeasComputeState) Store(key string, config *ComputeConfig) { - m.measMap.Store(key, config) -} - -// Load define func to retrieve the compute configuration for the specified key -func (m *MeasComputeState) Load(key string) (*ComputeConfig, bool) { - value, ok := m.measMap.Load(key) - if !ok { - return nil, false - } - return value.(*ComputeConfig), true -} - -// Delete define func to remove the compute configuration for the specified key -func (m *MeasComputeState) Delete(key string) { - m.measMap.Delete(key) -} - -// LoadOrStore define func to returns the existing compute configuration for the key if present,otherwise stores and returns the given configuration -func (m *MeasComputeState) LoadOrStore(key string, config *ComputeConfig) (*ComputeConfig, bool) { - value, loaded := m.measMap.LoadOrStore(key, config) - return value.(*ComputeConfig), loaded -} - -// Range define func to iterate over all key-configuration pairs in the map -func (m *MeasComputeState) Range(f func(key string, config *ComputeConfig) bool) { - m.measMap.Range(func(key, value any) bool { - return f(key.(string), value.(*ComputeConfig)) - }) -} - -// Len define func to return the number of compute configurations in the map -func (m *MeasComputeState) Len() int { - count := 0 - m.measMap.Range(func(_, _ any) bool { - count++ - return true - }) - return count -} diff --git a/util/typed_map.go b/util/typed_map.go index 6693668..bd1b1d9 100644 --- a/util/typed_map.go +++ b/util/typed_map.go @@ -1,7 +1,10 @@ // Package util provide some utility functions package util -import "sync" +import ( + "iter" + "sync" +) // TypedMap define a type-safe generic wrapper around sync.Map. // It keeps the concurrency guarantees of sync.Map while removing the @@ -26,6 +29,14 @@ func (t *TypedMap[K, V]) Store(key K, value V) { t.m.Store(key, value) } +// LoadOrStore define func of return the existing value for key if present. +// Otherwise it stores and returns the given value. loaded reports whether the +// value was already present. +func (t *TypedMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + v, loaded := t.m.LoadOrStore(key, value) + return v.(V), loaded +} + // Swap define func of store value for key and return the previous value (if any). // loaded reports whether a value was already present for key. func (t *TypedMap[K, V]) Swap(key K, value V) (previous V, loaded bool) { @@ -49,3 +60,25 @@ func (t *TypedMap[K, V]) Range(f func(key K, value V) bool) { return f(key.(K), value.(V)) }) } + +// Len define func of return the number of key/value pairs currently stored. +// It walks the map, so the cost is O(n). +func (t *TypedMap[K, V]) Len() int { + count := 0 + t.m.Range(func(_, _ any) bool { + count++ + return true + }) + return count +} + +// All define func of return an iterator over all key/value pairs, +// usable directly with range-over-func. Iteration stops early if the +// consumer breaks out of the loop. +func (t *TypedMap[K, V]) All() iter.Seq2[K, V] { + return func(yield func(K, V) bool) { + t.m.Range(func(key, value any) bool { + return yield(key.(K), value.(V)) + }) + } +}