fix: fix K8s service names, deployment command, and GORM logger

- rename all K8s services to xxx-service convention and update
    all configmap references (postgres, mongodb, loki, jaeger)
  - add explicit command: ["/app/modelrt"] to deployment to prevent
    args from being treated as the executable (no ENTRYPOINT in
    Dockerfile)
  - set deploy_env to development to bypass Redis empty-password
    guard in non-production Minikube environment
  - fix GormLogger Info/Warn/Error to use fmt.Sprintf(msg, data...)
    so GORM printf-style messages are formatted correctly and avoid
    json: unsupported type: func() time.Time serialization panic
  - expand pg PVC storage from 2Gi to 6Gi
  - rename loop variable msg to task in PushTaskToRabbitMQ for clarity
  - align comment indentation in queue_producer.go
This commit is contained in:
douxu 2026-06-03 17:11:54 +08:00
parent 3309e53653
commit 195150d9b1
12 changed files with 49 additions and 47 deletions

View File

@ -10,7 +10,7 @@ data:
- name: Loki
type: loki
access: proxy
url: http://loki:3100
url: http://loki-service:3100
isDefault: true
jsonData:
# derivedFields: 从日志的 traceID 字段生成跳转链接到 Jaeger
@ -23,4 +23,4 @@ data:
type: jaeger
uid: jaeger
access: proxy
url: http://jaeger:16686
url: http://jaeger-service:16686

View File

@ -1,14 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: grafana
name: grafana-service
namespace: default
spec:
ports:
- name: http
port: 3000
targetPort: 3000
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
nodePort: 31000 # Grafana UI: http://<NodeIP>:31000
selector:
app: grafana
type: NodePort

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: Service
metadata:
name: jaeger
name: jaeger-serivce
labels:
app: jaeger
spec:
@ -9,19 +9,19 @@ spec:
- name: ui
port: 16686
targetPort: 16686
nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686
nodePort: 31686 # Jaeger UI浏览器访问 http://<NodeIP>:31686
- name: collector-http
port: 14268
targetPort: 14268
nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel
nodePort: 31268 # Jaeger 原生 HTTP collector非 OTel
- name: otlp-http
port: 4318
targetPort: 4318
nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318
nodePort: 31318 # OTLP HTTP集群外使用 <NodeIP>:31318
- name: otlp-grpc
port: 4317
targetPort: 4317
nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317
nodePort: 31317 # OTLP gRPC集群外使用 <NodeIP>:31317
selector:
app: jaeger
type: NodePort

View File

@ -1,14 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: loki
name: loki-service
namespace: default
spec:
ports:
- name: http
port: 3100
targetPort: 3100
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
nodePort: 31100 # 集群外访问: http://<NodeIP>:31100
selector:
app: loki
type: NodePort

View File

@ -5,7 +5,7 @@ metadata:
data:
config.yaml: |
postgres:
host: "192.168.1.101"
host: "postgres-service"
port: 5432
database: "demo"
user: "postgres"
@ -35,7 +35,7 @@ data:
endpoint: "" # Promtail handles log collection in K8s, direct push disabled
otel:
endpoint: "jaeger:4318"
endpoint: "jaeger-service:4318"
insecure: true
ants:
@ -77,7 +77,7 @@ data:
service_addr: ":8080"
service_name: "modelRT"
secret_key: "" # injected via env SERVICE_SECRET_KEY
deploy_env: "production"
deploy_env: "development"
dataRT:
host: "http://127.0.0.1"

View File

@ -16,8 +16,9 @@ spec:
spec:
containers:
- name: modelrt
image: coslight/modelrt:latest
image: modelrt:v1
imagePullPolicy: IfNotPresent
command: ["/app/modelrt"]
args:
- "-modelRT_config_dir=/app/configs"
- "-modelRT_config_name=config"

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: Service
metadata:
name: mongodb
name: mongodb-service
labels:
app: mongodb
spec:

View File

@ -7,4 +7,4 @@ spec:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
storage: 6Gi

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: Service
metadata:
name: postgres
name: postgres-service
labels:
app: postgres
spec:

View File

@ -13,7 +13,7 @@ data:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
- url: http://loki-service:3100/loki/api/v1/push
scrape_configs:
- job_name: kubernetes-pods

View File

@ -4,6 +4,7 @@ package logger
import (
"context"
"errors"
"fmt"
"time"
"gorm.io/gorm"
@ -29,17 +30,17 @@ func (l *GormLogger) LogMode(_ gormLogger.LogLevel) gormLogger.Interface {
// Info define func for implementing gormLogger.Interface
func (l *GormLogger) Info(ctx context.Context, msg string, data ...any) {
Info(ctx, msg, "data", data)
Info(ctx, fmt.Sprintf(msg, data...))
}
// Warn define func for implementing gormLogger.Interface
func (l *GormLogger) Warn(ctx context.Context, msg string, data ...any) {
Warn(ctx, msg, "data", data)
Warn(ctx, fmt.Sprintf(msg, data...))
}
// Error define func for implementing gormLogger.Interface
func (l *GormLogger) Error(ctx context.Context, msg string, data ...any) {
Error(ctx, msg, "data", data)
Error(ctx, fmt.Sprintf(msg, data...))
}
// Trace define func for implementing gormLogger.Interface

View File

@ -67,12 +67,12 @@ func (p *QueueProducer) declareInfrastructure() error {
// Declare durable direct exchange
err := p.ch.ExchangeDeclare(
constants.TaskExchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare exchange: %w", err)
@ -81,12 +81,12 @@ func (p *QueueProducer) declareInfrastructure() error {
// Declare durable queue with priority support and message TTL
_, err = p.ch.QueueDeclare(
constants.TaskQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
"x-max-priority": constants.TaskMaxPriority, // support priority levels 0-10
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(), // message TTL
},
)
@ -99,8 +99,8 @@ func (p *QueueProducer) declareInfrastructure() error {
constants.TaskQueueName, // queue name
constants.TaskRoutingKey, // routing key
constants.TaskExchangeName, // exchange name
false, // no-wait
nil, // arguments
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
@ -148,8 +148,8 @@ func (p *QueueProducer) PublishTask(ctx context.Context, taskID uuid.UUID, taskT
ctx,
constants.TaskExchangeName, // exchange
constants.TaskRoutingKey, // routing key
false, // mandatory
false, // immediate
false, // mandatory
false, // immediate
publishing,
)
if err != nil {
@ -211,10 +211,10 @@ func (p *QueueProducer) Close() error {
func (p *QueueProducer) GetQueueInfo() (*amqp.Queue, error) {
queue, err := p.ch.QueueDeclarePassive(
constants.TaskQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
amqp.Table{
"x-max-priority": constants.TaskMaxPriority,
"x-message-ttl": constants.TaskDefaultMessageTTL.Milliseconds(),
@ -246,22 +246,22 @@ func PushTaskToRabbitMQ(ctx context.Context, cfg config.RabbitMQConfig, taskChan
case <-ctx.Done():
logger.Info(ctx, "push task to RabbitMQ stopped by context cancel")
return
case msg, ok := <-taskChan:
case task, ok := <-taskChan:
if !ok {
logger.Info(ctx, "task channel closed, exiting push loop")
return
}
// Restore trace context from the handler that enqueued this message
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(task.TraceCarrier))
taskCtx, pubSpan := otel.Tracer("modelRT/task").Start(taskCtx, "task.publish",
oteltrace.WithAttributes(attribute.String("task_id", msg.TaskID.String())),
oteltrace.WithAttributes(attribute.String("task_id", task.TaskID.String())),
)
if err := producer.PublishTaskWithRetry(taskCtx, msg.TaskID, msg.TaskType, msg.Priority, msg.Params, 3); err != nil {
if err := producer.PublishTaskWithRetry(taskCtx, task.TaskID, task.TaskType, task.Priority, task.Params, 3); err != nil {
pubSpan.RecordError(err)
logger.Error(taskCtx, "publish task to RabbitMQ failed",
"task_id", msg.TaskID, "error", err)
"task_id", task.TaskID, "error", err)
}
pubSpan.End()
}
}
}
}