Compare commits
10 Commits
main
...
feature-jo
| Author | SHA1 | Date |
|---|---|---|
|
|
f333a830a1 | |
|
|
a3e5892379 | |
|
|
b2a8180f64 | |
|
|
8930f3a20e | |
|
|
722e8a9c0f | |
|
|
fff5dd9218 | |
|
|
23bc2dab9f | |
|
|
5653fb0719 | |
|
|
c7bc0d033b | |
|
|
9380717b75 |
|
|
@ -0,0 +1,12 @@
|
|||
kind: pipeline
|
||||
type: docker
|
||||
name: default
|
||||
|
||||
steps:
|
||||
- name: build
|
||||
image: golang:latest
|
||||
environment:
|
||||
GO111MODULE: on
|
||||
GOPROXY: https://goproxy.cn,direct
|
||||
commands:
|
||||
- go build main.go
|
||||
|
|
@ -21,3 +21,11 @@
|
|||
# Go workspace file
|
||||
go.work
|
||||
|
||||
# vscode files
|
||||
.vscode/
|
||||
|
||||
# Shield all log files in the log folder
|
||||
/log/
|
||||
# Shield config files in the configs folder
|
||||
/configs/**/*.yaml
|
||||
/configs/**/*.pem
|
||||
|
|
@ -1,3 +1,5 @@
|
|||
# eventRT
|
||||
|
||||
事件处理服务
|
||||
事件处理服务
|
||||
|
||||
[](http://192.168.46.100:4080/CL-Softwares/eventRT)
|
||||
|
|
@ -0,0 +1,92 @@
|
|||
// Package config define config struct of model runtime service
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// LoggerConfig define config struct of zap logger config
|
||||
type LoggerConfig struct {
|
||||
Mode string `mapstructure:"mode"`
|
||||
Level string `mapstructure:"level"`
|
||||
FilePath string `mapstructure:"filepath"`
|
||||
MaxSize int `mapstructure:"maxsize"`
|
||||
MaxBackups int `mapstructure:"maxbackups"`
|
||||
MaxAge int `mapstructure:"maxage"`
|
||||
Compress bool `mapstructure:"compress"`
|
||||
}
|
||||
|
||||
// MongoDBConfig define config struct of mongoDB config
|
||||
type MongoDBConfig struct {
|
||||
Host string `mapstructure:"host"`
|
||||
User string `mapstructure:"user"`
|
||||
Password string `mapstructure:"password"`
|
||||
Database string `mapstructure:"database"`
|
||||
AuthDB string `mapstructure:"auth_db"`
|
||||
Port int `mapstructure:"port"`
|
||||
Timeout int `mapstructure:"timeout"`
|
||||
}
|
||||
|
||||
type RabbitMQConfig struct {
|
||||
CACertPath string `mapstructure:"ca_cert_path"`
|
||||
ClientKeyPath string `mapstructure:"client_key_path"`
|
||||
ClientKeyPassword string `mapstructure:"client_key_password"`
|
||||
ClientCertPath string `mapstructure:"client_cert_path"`
|
||||
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
|
||||
ServerName string `mapstructure:"server_name"`
|
||||
User string `mapstructure:"user"`
|
||||
Password string `mapstructure:"password"`
|
||||
Host string `mapstructure:"host"`
|
||||
Port int `mapstructure:"port"`
|
||||
}
|
||||
|
||||
// ServiceConfig define config struct of service config
|
||||
type ServiceConfig struct {
|
||||
ServiceAddr string `mapstructure:"service_addr"`
|
||||
ServiceName string `mapstructure:"service_name"`
|
||||
SecretKey string `mapstructure:"secret_key"`
|
||||
DeployEnv string `mapstructure:"deploy_env"`
|
||||
}
|
||||
|
||||
// EventRTConfig define config struct of model runtime server
|
||||
type EventRTConfig struct {
|
||||
ServiceConfig `mapstructure:"service"`
|
||||
LoggerConfig `mapstructure:"logger"`
|
||||
MongoDBConfig `mapstructure:"mongodb"`
|
||||
RabbitMQConfig `mapstructure:"rabbitMQ"`
|
||||
MongoDBURI string
|
||||
}
|
||||
|
||||
// ReadAndInitConfig return eventRT project config struct
|
||||
func ReadAndInitConfig(configDir, configName, configType string) (eventRTConfig EventRTConfig) {
|
||||
config := viper.New()
|
||||
config.AddConfigPath(configDir)
|
||||
config.SetConfigName(configName)
|
||||
config.SetConfigType(configType)
|
||||
if err := config.ReadInConfig(); err != nil {
|
||||
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
|
||||
panic(fmt.Sprintf("can not find conifg file:%s\n", err.Error()))
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := config.Unmarshal(&eventRTConfig); err != nil {
|
||||
panic(fmt.Sprintf("unmarshal eventRT config failed:%s\n", err.Error()))
|
||||
}
|
||||
|
||||
if eventRTConfig.MongoDBConfig.Timeout <= 0 {
|
||||
eventRTConfig.MongoDBConfig.Timeout = 10
|
||||
}
|
||||
if eventRTConfig.MongoDBConfig.AuthDB == "" {
|
||||
eventRTConfig.MongoDBConfig.AuthDB = "admin"
|
||||
}
|
||||
|
||||
// init mongoDB uri
|
||||
user := url.QueryEscape(eventRTConfig.MongoDBConfig.User)
|
||||
password := url.QueryEscape(eventRTConfig.MongoDBConfig.Password)
|
||||
eventRTConfig.MongoDBURI = fmt.Sprintf("mongodb://%s:%s@%s:%d/?authSource=%s", user, password, eventRTConfig.MongoDBConfig.Host, eventRTConfig.MongoDBConfig.Port, eventRTConfig.MongoDBConfig.AuthDB)
|
||||
return eventRTConfig
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// DevelopmentDeployMode define development operator environment for eventRT project
|
||||
DevelopmentDeployMode = "development"
|
||||
// DebugDeployMode define debug operator environment for eventRT project
|
||||
DebugDeployMode = "debug"
|
||||
// ProductionDeployMode define production operator environment for eventRT project
|
||||
ProductionDeployMode = "production"
|
||||
)
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
const (
|
||||
// DevelopmentLogMode define development operator environment for eventRT project
|
||||
DevelopmentLogMode = "development"
|
||||
// DebugLogMode define debug operator environment for eventRT project
|
||||
DebugLogMode = "debug"
|
||||
// ProductionLogMode define production operator environment for eventRT project
|
||||
ProductionLogMode = "production"
|
||||
)
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
// Package constants define constant variable
|
||||
package constants
|
||||
|
||||
// Assuming the B3 specification
|
||||
const (
|
||||
HeaderTraceID = "X-B3-TraceId"
|
||||
HeaderSpanID = "X-B3-SpanId"
|
||||
HeaderParentSpanID = "X-B3-ParentSpanId"
|
||||
)
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
// Package database define database operation functions
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"eventRT/config" //
|
||||
"eventRT/logger"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
var (
|
||||
mongoOnce sync.Once
|
||||
_globalMongoClient *mongo.Client
|
||||
)
|
||||
|
||||
// GetMongoClient returns the global MongoDB client
|
||||
func GetMongoClient() *mongo.Client {
|
||||
return _globalMongoClient
|
||||
}
|
||||
|
||||
// InitMongoInstance return instance of MongoDB client
|
||||
func InitMongoInstance(ctx context.Context, eCfg config.EventRTConfig) *mongo.Client {
|
||||
mongoOnce.Do(func() {
|
||||
_globalMongoClient = initMongoClient(ctx, eCfg.MongoDBURI, eCfg.Timeout)
|
||||
})
|
||||
return _globalMongoClient
|
||||
}
|
||||
|
||||
// initMongoClient return successfully initialized MongoDB client
|
||||
func initMongoClient(ctx context.Context, mongoDBURI string, timeout int) *mongo.Client {
|
||||
clientOptions := options.Client().ApplyURI(mongoDBURI)
|
||||
client, err := mongo.Connect(ctx, clientOptions)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "failed to connect to MongoDB", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pingTimeout := time.Duration(timeout) * time.Second
|
||||
if pingTimeout == 0 {
|
||||
pingTimeout = 10 * time.Second
|
||||
}
|
||||
|
||||
pingCtx, cancel := context.WithTimeout(ctx, pingTimeout)
|
||||
defer cancel()
|
||||
if err := client.Ping(pingCtx, nil); err != nil {
|
||||
logger.Error(ctx, "failed to ping operation with MongoDB", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
[req]
|
||||
distinguished_name = req_distinguished_name
|
||||
prompt = no
|
||||
|
||||
[req_distinguished_name]
|
||||
C = CN
|
||||
ST = Beijing
|
||||
L = Beijing
|
||||
O = coslight
|
||||
CN = web-client
|
||||
|
||||
[v3_client]
|
||||
keyUsage = critical, digitalSignature, keyEncipherment
|
||||
extendedKeyUsage = clientAuth
|
||||
|
|
@ -0,0 +1,154 @@
|
|||
# RabbitMQ 部署与 mTLS 证书签发指南
|
||||
|
||||
## 一、 证书签发 (PKI)
|
||||
|
||||
### 1. 准备工作
|
||||
|
||||
确保本地拥有根 CA 文件:`ca_certificate.pem` 和 `cakey.pem`
|
||||
|
||||
### 2. 生成 RabbitMQ 服务端证书
|
||||
|
||||
服务端证书必须包含 `serverAuth` 权限,并涵盖所有访问域名和 IP
|
||||
|
||||
**配置文件 `server.conf` 关键点:**
|
||||
|
||||
* **CN**: `rabbitmq-server`
|
||||
* **SAN (alt_names)**: 必须包含 `localhost` 和 `127.0.0.1` 以适配 SSH 隧道
|
||||
|
||||
```bash
|
||||
# 1. 生成服务端私钥
|
||||
openssl genrsa -out server_key.pem 2048
|
||||
|
||||
# 2. 生成签名请求 (CSR)
|
||||
openssl req -new -key server_key.pem -out server_cert.csr -config server.conf
|
||||
|
||||
# 3. 使用 v3_server 扩展签发
|
||||
openssl x509 -req -in server_cert.csr -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
|
||||
-out server_certificate.pem -days 730 -sha256 \
|
||||
-extfile server.conf -extensions v3_server
|
||||
|
||||
```
|
||||
|
||||
### 3. 生成客户端证书 (modelRT / eventRT)
|
||||
|
||||
**注意**:客户端证书必须包含 `clientAuth` 扩展,否则会导致 403 错误
|
||||
|
||||
#### 签发 modelRT 证书
|
||||
|
||||
```bash
|
||||
# 1. 生成私钥
|
||||
openssl genrsa -out modelrt_client_key.pem 2048
|
||||
|
||||
# 2. 生成 CSR (CN 必须匹配 rabbitmq 里的用户名: modelrt-client)
|
||||
openssl req -new -key modelrt_client_key.pem -out modelrt_client_cert.csr -config modelrt.conf
|
||||
|
||||
# 3. 关键:使用 v3_client 扩展签发
|
||||
openssl x509 -req -in modelrt_client_cert.csr -CA ca_certificate.pem -CAkey cakey.pem -CAcreateserial \
|
||||
-out modelrt_client_cert.pem -days 730 -sha256 \
|
||||
-extfile modelrt.conf -extensions v3_client
|
||||
|
||||
```
|
||||
|
||||
*(eventRT 证书签发流程同上,只需更换 `eventrt.conf` 配置文件)*
|
||||
|
||||
---
|
||||
|
||||
## 二、 RabbitMQ 服务端部署 (K8s)
|
||||
|
||||
### 1. 配置用户与权限
|
||||
|
||||
修改 `rabbitmq-users-config.yaml`,确保用户标签为 `management` 或 `administrator`,并赋予 `/` 的权限
|
||||
|
||||
```yaml
|
||||
# 关键部分:definitions.json
|
||||
{
|
||||
"name": "modelrt-client",
|
||||
"password_hash": "",
|
||||
"tags": ["management"]
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
### 2. 应用 Kubernetes 配置
|
||||
|
||||
```bash
|
||||
# 1. 应用用户定义
|
||||
kubectl apply -f rabbitmq-users-config.yaml
|
||||
|
||||
# 2. 创建插件 ConfigMap
|
||||
kubectl create configmap rabbit-plugins-conf
|
||||
--from-literal=enabled_plugins="[rabbitmq_auth_mechanism_ssl, \
|
||||
rabbitmq_management, rabbitmq_management_agent, \
|
||||
rabbitmq_prometheus, rabbitmq_web_dispatch]."
|
||||
|
||||
# 3. 创建证书 Secret
|
||||
kubectl create secret generic rabbitmq-certs \
|
||||
--from-file=ca.pem=ca_certificate.pem \
|
||||
--from-file=server.pem=server_certificate.pem \
|
||||
--from-file=server_key.pem=server_key.pem
|
||||
|
||||
# 4. 应用部署文件
|
||||
kubectl apply -f rabbitmq-config.yaml
|
||||
kubectl apply -f rabbitmq-deployment.yaml
|
||||
kubectl apply -f rabbitmq-service.yaml
|
||||
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 三、 开发环境网络配置 (SSH 隧道)
|
||||
|
||||
如果你在 Mac 上开发,RabbitMQ 在远程 Linux 的 Minikube 中,请执行以下命令建立加密隧道:
|
||||
|
||||
```bash
|
||||
# 将远程 Minikube 中 rabbitMQ service 的 NodePort (30671) 映射到 Mac 本地的 5671
|
||||
ssh -L 5671:<minikube-ip>:30671 <host-user>@host-ip
|
||||
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 四、 Go 程序配置 (config.yaml)
|
||||
|
||||
确保客户端配置指向隧道入口,并开启 TLS:
|
||||
|
||||
```yaml
|
||||
rabbitmq:
|
||||
host: "localhost"
|
||||
port: 5671
|
||||
server_name: "rabbitmq-server"
|
||||
ca_cert_path: "./configs/certs/ca_certificate.pem"
|
||||
client_cert_path: "./configs/certs/modelrt_client_cert.pem"
|
||||
client_key_path: "./configs/certs/modelrt_client_key.pem"
|
||||
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 五、 验证与排查
|
||||
|
||||
### 1. 证书权限检查
|
||||
|
||||
运行以下命令,确信输出中有 `TLS Web Client Authentication`:
|
||||
|
||||
```bash
|
||||
openssl x509 -in modelrt_client_cert.pem -noout -text | grep -A 1 "Extended Key Usage"
|
||||
|
||||
```
|
||||
|
||||
### 2. 握手连通性验证
|
||||
|
||||
```bash
|
||||
openssl s_client -connect localhost:5671 \
|
||||
-cert modelrt_client_cert.pem \
|
||||
-key modelrt_client_key.pem \
|
||||
-CAfile ca_certificate.pem
|
||||
|
||||
```
|
||||
|
||||
**预期结果**:看到 `Verify return code: 0 (ok)`
|
||||
|
||||
### 3. 日志检查
|
||||
|
||||
如果连接成功,RabbitMQ 日志应显示:
|
||||
`connection <xxx>: user 'modelrt-client' authenticated and granted access to vhost '/'`
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
[req]
|
||||
distinguished_name = req_distinguished_name
|
||||
prompt = no
|
||||
|
||||
[req_distinguished_name]
|
||||
C = CN
|
||||
ST = Beijing
|
||||
L = Beijing
|
||||
O = coslight
|
||||
CN = eventrt-client
|
||||
|
||||
[v3_client]
|
||||
keyUsage = critical, digitalSignature, keyEncipherment
|
||||
extendedKeyUsage = clientAuth
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
[req]
|
||||
distinguished_name = req_distinguished_name
|
||||
prompt = no
|
||||
|
||||
[req_distinguished_name]
|
||||
C = CN
|
||||
ST = Beijing
|
||||
L = Beijing
|
||||
O = coslight
|
||||
CN = modelrt-client
|
||||
|
||||
[v3_client]
|
||||
keyUsage = critical, digitalSignature, keyEncipherment
|
||||
extendedKeyUsage = clientAuth
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
[req]
|
||||
distinguished_name = req_distinguished_name
|
||||
prompt = no
|
||||
|
||||
[req_distinguished_name]
|
||||
C = CN
|
||||
ST = Beijing
|
||||
L = Beijing
|
||||
O = coslight
|
||||
CN = modelrt-client
|
||||
|
||||
[v3_client]
|
||||
keyUsage = critical, digitalSignature, keyEncipherment
|
||||
extendedKeyUsage = clientAuth
|
||||
|
|
@ -0,0 +1 @@
|
|||
kubectl create configmap rabbit-plugins-conf --from-literal=enabled_plugins="[rabbitmq_auth_mechanism_ssl, rabbitmq_management, rabbitmq_management_agent, rabbitmq_prometheus, rabbitmq_web_dispatch]."
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: rabbitmq-config
|
||||
data:
|
||||
rabbitmq.conf: |
|
||||
# 确保允许PLAIN认证
|
||||
auth_mechanisms.1 = PLAIN
|
||||
auth_mechanisms.2 = AMQPLAIN
|
||||
auth_mechanisms.3 = EXTERNAL
|
||||
# 允许admin用户通过远程方式连接
|
||||
loopback_users.admin = false
|
||||
# 默认心跳和监听配置可在此扩展
|
||||
# 确定 ssl 连接时验证使用的用户名
|
||||
ssl_cert_login_from = common_name
|
||||
# 开启此项配置会导致只能通过TLS端口访问
|
||||
listeners.tcp = none
|
||||
listeners.ssl.default = 5671
|
||||
# default user config
|
||||
load_definitions = /etc/rabbitmq/definitions.json
|
||||
# ssl config
|
||||
ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem
|
||||
ssl_options.certfile = /etc/rabbitmq/certs/server_certificate.pem
|
||||
ssl_options.keyfile = /etc/rabbitmq/certs/server_key.pem
|
||||
ssl_options.verify = verify_peer
|
||||
ssl_options.fail_if_no_peer_cert = true
|
||||
# management config
|
||||
management.ssl.port = 15671
|
||||
management.ssl.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem
|
||||
management.ssl.certfile = /etc/rabbitmq/certs/server_certificate.pem
|
||||
management.ssl.keyfile = /etc/rabbitmq/certs/server_key.pem
|
||||
management.ssl.verify = verify_peer
|
||||
management.ssl.fail_if_no_peer_cert = true
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: eventrt-rabbitmq
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: rabbitmq
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: rabbitmq
|
||||
spec:
|
||||
containers:
|
||||
- name: rabbitmq
|
||||
image: rabbitmq:4.1.1-management-alpine
|
||||
ports:
|
||||
- containerPort: 4369
|
||||
- containerPort: 5671
|
||||
- containerPort: 5672 # AMQP
|
||||
- containerPort: 15671
|
||||
- containerPort: 15672 # Management UI
|
||||
- containerPort: 15691
|
||||
- containerPort: 15692
|
||||
- containerPort: 25672
|
||||
env:
|
||||
- name: RABBITMQ_DEFAULT_USER
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: rabbitmq-secret
|
||||
key: rabbitmq-user
|
||||
- name: RABBITMQ_DEFAULT_PASS
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: rabbitmq-secret
|
||||
key: rabbitmq-pass
|
||||
- name: RABBITMQ_ERLANG_COOKIE
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: rabbitmq-secret
|
||||
key: erlang-cookie
|
||||
- name: RABBITMQ_DEFAULT_VHOST
|
||||
value: "/"
|
||||
volumeMounts:
|
||||
- name: rabbitmq-certs-volume
|
||||
mountPath: /etc/rabbitmq/certs
|
||||
readOnly: true
|
||||
- name: rabbitmq-config-volume
|
||||
mountPath: /etc/rabbitmq/rabbitmq.conf
|
||||
subPath: rabbitmq.conf
|
||||
readOnly: true
|
||||
- name: plugins-config-volume
|
||||
mountPath: /etc/rabbitmq/enabled_plugins
|
||||
subPath: enabled_plugins
|
||||
- name: users-config-volume
|
||||
mountPath: /etc/rabbitmq/definitions.json
|
||||
subPath: definitions.json
|
||||
- name: rabbitmq-data
|
||||
mountPath: /var/lib/rabbitmq
|
||||
volumes:
|
||||
- name: rabbitmq-certs-volume
|
||||
secret:
|
||||
secretName: rabbitmq-certs
|
||||
- name: rabbitmq-config-volume
|
||||
configMap:
|
||||
name: rabbitmq-config
|
||||
- name: plugins-config-volume
|
||||
configMap:
|
||||
name: rabbit-plugins-conf
|
||||
- name: users-config-volume
|
||||
configMap:
|
||||
name: rabbitmq-users-definitions
|
||||
- name: rabbitmq-data
|
||||
emptyDir: {}
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: rabbitmq-secret
|
||||
type: Opaque
|
||||
stringData:
|
||||
rabbitmq-user: "coslight"
|
||||
rabbitmq-pass: "coslight@tj"
|
||||
erlang-cookie: "secret-erlang-cookie"
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: rabbitmq-service
|
||||
spec:
|
||||
type: NodePort # 在 Minikube 中使用 NodePort 方便外部访问
|
||||
selector:
|
||||
app: rabbitmq
|
||||
ports:
|
||||
- name: amqp-ssl
|
||||
protocol: TCP
|
||||
port: 5671
|
||||
targetPort: 5671
|
||||
nodePort: 30671
|
||||
- name: amqp
|
||||
protocol: TCP
|
||||
port: 5672
|
||||
targetPort: 5672
|
||||
nodePort: 30672
|
||||
- name: management-ssl
|
||||
protocol: TCP
|
||||
port: 15671
|
||||
targetPort: 15671
|
||||
nodePort: 31671
|
||||
- name: management
|
||||
protocol: TCP
|
||||
port: 15672
|
||||
targetPort: 15672
|
||||
nodePort: 31672
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: rabbitmq-users-definitions
|
||||
data:
|
||||
definitions.json: |
|
||||
{
|
||||
"users": [
|
||||
{
|
||||
"name": "coslight",
|
||||
"password_hash": "Gl2XVEJwPwDZQF8ZhsYnvm83wMkdftY3/raxyntdZueyx/Uv",
|
||||
"hashing_algorithm": "rabbit_password_hashing_sha256",
|
||||
"tags": ["administrator"]
|
||||
},
|
||||
{
|
||||
"name": "web-client",
|
||||
"password_hash": "",
|
||||
"hashing_algorithm": "rabbit_password_hashing_sha256",
|
||||
"tags": ["management"]
|
||||
},
|
||||
{
|
||||
"name": "modelrt-client",
|
||||
"password_hash": "",
|
||||
"hashing_algorithm": "rabbit_password_hashing_sha256",
|
||||
"tags": ["management"]
|
||||
},
|
||||
{
|
||||
"name": "eventrt-client",
|
||||
"password_hash": "",
|
||||
"hashing_algorithm": "rabbit_password_hashing_sha256",
|
||||
"tags": ["management"]
|
||||
}
|
||||
],
|
||||
"vhosts": [ { "name": "/" } ],
|
||||
"permissions": [
|
||||
{
|
||||
"user": "coslight",
|
||||
"vhost": "/",
|
||||
"configure": ".*",
|
||||
"write": ".*",
|
||||
"read": ".*"
|
||||
},
|
||||
{
|
||||
"user": "web-client",
|
||||
"vhost": "/",
|
||||
"configure": "^$",
|
||||
"write": ".*",
|
||||
"read": ".*"
|
||||
},
|
||||
{
|
||||
"user": "modelrt-client",
|
||||
"vhost": "/",
|
||||
"configure": ".*",
|
||||
"write": ".*",
|
||||
"read": ".*"
|
||||
},
|
||||
{
|
||||
"user": "eventrt-client",
|
||||
"vhost": "/",
|
||||
"configure": ".*",
|
||||
"write": ".*",
|
||||
"read": ".*"
|
||||
}
|
||||
],
|
||||
"topic_permissions": [],
|
||||
"parameters": [],
|
||||
"global_parameters": [
|
||||
{
|
||||
"name": "cluster_name",
|
||||
"value": "evnetrt-rabbitmq-cluster"
|
||||
}
|
||||
],
|
||||
"policies": [],
|
||||
"queues": [],
|
||||
"exchanges": [],
|
||||
"bindings": []
|
||||
}
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
kubectl create secret generic rabbitmq-certs \
|
||||
--from-file=ca_certificate.pem=./certs/ca_certificate.pem \
|
||||
--from-file=server_certificate.pem=./certs/server_certificate.pem \
|
||||
--from-file=server_key.pem=./certs/server_key.pem
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
[req]
|
||||
distinguished_name = req_distinguished_name
|
||||
prompt = no
|
||||
|
||||
[req_distinguished_name]
|
||||
C = CN
|
||||
ST = Beijing
|
||||
L = Beijing
|
||||
O = coslight
|
||||
CN = rabbitmq-server
|
||||
|
||||
[v3_server]
|
||||
keyUsage = critical, digitalSignature, keyEncipherment
|
||||
extendedKeyUsage = serverAuth, clientAuth
|
||||
subjectAltName = @alt_names
|
||||
|
||||
[alt_names]
|
||||
DNS.1 = rabbitmq-server
|
||||
DNS.2 = rabbitmq-service.default.svc.cluster.local
|
||||
DNS.3 = localhost
|
||||
IP.1 = 192.168.49.2
|
||||
IP.2 = 127.0.0.1
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
module eventRT
|
||||
|
||||
go 1.24.1
|
||||
|
||||
require (
|
||||
github.com/gin-gonic/gin v1.11.0
|
||||
github.com/natefinch/lumberjack v2.0.0+incompatible
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
github.com/spf13/viper v1.21.0
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
|
||||
go.mongodb.org/mongo-driver v1.17.7
|
||||
go.uber.org/zap v1.27.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/BurntSushi/toml v1.6.0 // indirect
|
||||
github.com/bytedance/sonic v1.14.0 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/gin-contrib/sse v1.1.0 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.27.0 // indirect
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/goccy/go-yaml v1.18.0 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.16.7 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/montanaflynn/stats v0.7.1 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
|
||||
github.com/quic-go/qpack v0.5.1 // indirect
|
||||
github.com/quic-go/quic-go v0.54.0 // indirect
|
||||
github.com/sagikazarmark/locafero v0.11.0 // indirect
|
||||
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
|
||||
github.com/spf13/afero v1.15.0 // indirect
|
||||
github.com/spf13/cast v1.10.0 // indirect
|
||||
github.com/spf13/pflag v1.0.10 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.3.0 // indirect
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/scram v1.1.2 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
go.uber.org/mock v0.5.0 // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||
golang.org/x/arch v0.20.0 // indirect
|
||||
golang.org/x/crypto v0.40.0 // indirect
|
||||
golang.org/x/mod v0.26.0 // indirect
|
||||
golang.org/x/net v0.42.0 // indirect
|
||||
golang.org/x/sync v0.16.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/text v0.28.0 // indirect
|
||||
golang.org/x/tools v0.35.0 // indirect
|
||||
google.golang.org/protobuf v1.36.9 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
)
|
||||
|
|
@ -0,0 +1,174 @@
|
|||
github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk=
|
||||
github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
||||
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
|
||||
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
|
||||
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
|
||||
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
|
||||
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
||||
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
|
||||
github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w=
|
||||
github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM=
|
||||
github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk=
|
||||
github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls=
|
||||
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
|
||||
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
|
||||
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
|
||||
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
|
||||
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
|
||||
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
|
||||
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
|
||||
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
|
||||
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
|
||||
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
|
||||
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
|
||||
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
|
||||
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
|
||||
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
|
||||
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
|
||||
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
|
||||
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc=
|
||||
github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik=
|
||||
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw=
|
||||
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U=
|
||||
github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I=
|
||||
github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg=
|
||||
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
|
||||
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
|
||||
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
|
||||
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU=
|
||||
github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA=
|
||||
github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4=
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
|
||||
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
|
||||
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
|
||||
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
|
||||
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
|
||||
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
go.mongodb.org/mongo-driver v1.17.7 h1:a9w+U3Vt67eYzcfq3k/OAv284/uUUkL0uP75VE5rCOU=
|
||||
go.mongodb.org/mongo-driver v1.17.7/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
|
||||
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
|
||||
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
|
||||
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
|
||||
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
|
||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
|
||||
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
|
||||
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg=
|
||||
golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
|
||||
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
|
||||
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
|
||||
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
|
||||
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0=
|
||||
golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
|
||||
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
// Package logger define log struct of eventRT project
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
var (
|
||||
f *facade
|
||||
fOnce sync.Once
|
||||
)
|
||||
|
||||
type facade struct {
|
||||
_logger *zap.Logger
|
||||
}
|
||||
|
||||
// Debug define facade func of debug level log
|
||||
func Debug(ctx context.Context, msg string, kv ...any) {
|
||||
logFacade().log(ctx, zapcore.DebugLevel, msg, kv...)
|
||||
}
|
||||
|
||||
// Info define facade func of info level log
|
||||
func Info(ctx context.Context, msg string, kv ...any) {
|
||||
logFacade().log(ctx, zapcore.InfoLevel, msg, kv...)
|
||||
}
|
||||
|
||||
// Warn define facade func of warn level log
|
||||
func Warn(ctx context.Context, msg string, kv ...any) {
|
||||
logFacade().log(ctx, zapcore.WarnLevel, msg, kv...)
|
||||
}
|
||||
|
||||
// Error define facade func of error level log
|
||||
func Error(ctx context.Context, msg string, kv ...any) {
|
||||
logFacade().log(ctx, zapcore.ErrorLevel, msg, kv...)
|
||||
}
|
||||
|
||||
func (f *facade) log(ctx context.Context, lvl zapcore.Level, msg string, kv ...any) {
|
||||
fields := makeLogFields(ctx, kv...)
|
||||
ce := f._logger.Check(lvl, msg)
|
||||
ce.Write(fields...)
|
||||
}
|
||||
|
||||
func logFacade() *facade {
|
||||
fOnce.Do(func() {
|
||||
f = &facade{
|
||||
_logger: GetLoggerInstance(),
|
||||
}
|
||||
})
|
||||
return f
|
||||
}
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
// Package logger define log struct of eventRT project
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"runtime"
|
||||
|
||||
"eventRT/constants"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
type logger struct {
|
||||
ctx context.Context
|
||||
traceID string
|
||||
spanID string
|
||||
pSpanID string
|
||||
_logger *zap.Logger
|
||||
}
|
||||
|
||||
func (l *logger) Debug(msg string, kv ...any) {
|
||||
l.log(zapcore.DebugLevel, msg, kv...)
|
||||
}
|
||||
|
||||
func (l *logger) Info(msg string, kv ...any) {
|
||||
l.log(zapcore.InfoLevel, msg, kv...)
|
||||
}
|
||||
|
||||
func (l *logger) Warn(msg string, kv ...any) {
|
||||
l.log(zapcore.WarnLevel, msg, kv...)
|
||||
}
|
||||
|
||||
func (l *logger) Error(msg string, kv ...any) {
|
||||
l.log(zapcore.ErrorLevel, msg, kv...)
|
||||
}
|
||||
|
||||
func (l *logger) log(lvl zapcore.Level, msg string, kv ...any) {
|
||||
fields := makeLogFields(l.ctx, kv...)
|
||||
ce := l._logger.Check(lvl, msg)
|
||||
ce.Write(fields...)
|
||||
}
|
||||
|
||||
func makeLogFields(ctx context.Context, kv ...any) []zap.Field {
|
||||
// Ensure that log information appears in pairs in the form of key-value pairs
|
||||
if len(kv)%2 != 0 {
|
||||
kv = append(kv, "unknown")
|
||||
}
|
||||
|
||||
kv = append(kv, "traceID", ctx.Value(constants.HeaderTraceID), "spanID", ctx.Value(constants.HeaderSpanID), "parentSpanID", ctx.Value(constants.HeaderParentSpanID))
|
||||
|
||||
funcName, file, line := getLoggerCallerInfo()
|
||||
kv = append(kv, "func", funcName, "file", file, "line", line)
|
||||
fields := make([]zap.Field, 0, len(kv)/2)
|
||||
for i := 0; i < len(kv); i += 2 {
|
||||
key := kv[i].(string)
|
||||
value := kv[i+1]
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
fields = append(fields, zap.String(key, v))
|
||||
case int:
|
||||
fields = append(fields, zap.Int(key, v))
|
||||
case int64:
|
||||
fields = append(fields, zap.Int64(key, v))
|
||||
case float32:
|
||||
fields = append(fields, zap.Float32(key, v))
|
||||
case float64:
|
||||
fields = append(fields, zap.Float64(key, v))
|
||||
case bool:
|
||||
fields = append(fields, zap.Bool(key, v))
|
||||
case error:
|
||||
fields = append(fields, zap.Error(v))
|
||||
default:
|
||||
fields = append(fields, zap.Any(key, v))
|
||||
}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// getLoggerCallerInfo define func of return log caller information、method name、file name、line number
|
||||
func getLoggerCallerInfo() (funcName, file string, line int) {
|
||||
pc, file, line, ok := runtime.Caller(4)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
file = path.Base(file)
|
||||
funcName = runtime.FuncForPC(pc).Name()
|
||||
return
|
||||
}
|
||||
|
||||
func New(ctx context.Context) *logger {
|
||||
var traceID, spanID, pSpanID string
|
||||
if ctx.Value("traceID") != nil {
|
||||
traceID = ctx.Value("traceID").(string)
|
||||
}
|
||||
if ctx.Value("spanID") != nil {
|
||||
spanID = ctx.Value("spanID").(string)
|
||||
}
|
||||
if ctx.Value("psapnID") != nil {
|
||||
pSpanID = ctx.Value("pspanID").(string)
|
||||
}
|
||||
|
||||
return &logger{
|
||||
ctx: ctx,
|
||||
traceID: traceID,
|
||||
spanID: spanID,
|
||||
pSpanID: pSpanID,
|
||||
_logger: GetLoggerInstance(),
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,88 @@
|
|||
// Package logger define log struct of eventRT project
|
||||
package logger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"eventRT/config"
|
||||
"eventRT/constants"
|
||||
|
||||
"github.com/natefinch/lumberjack"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
_globalLoggerMu sync.RWMutex
|
||||
_globalLogger *zap.Logger
|
||||
)
|
||||
|
||||
// getEncoder responsible for setting the log format for encoding
|
||||
func getEncoder() zapcore.Encoder {
|
||||
encoderConfig := zap.NewProductionEncoderConfig()
|
||||
// serialization time eg:2006-01-02 15:04:05
|
||||
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
|
||||
encoderConfig.TimeKey = "time"
|
||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
|
||||
return zapcore.NewJSONEncoder(encoderConfig)
|
||||
}
|
||||
|
||||
// getLogWriter responsible for setting the location of log storage
|
||||
func getLogWriter(mode, filename string, maxsize, maxBackup, maxAge int, compress bool) zapcore.WriteSyncer {
|
||||
dateStr := time.Now().Format("2006-01-02 15:04:05")
|
||||
finalFilename := fmt.Sprintf(filename, dateStr)
|
||||
lumberJackLogger := &lumberjack.Logger{
|
||||
Filename: finalFilename, // log file position
|
||||
MaxSize: maxsize, // log file maxsize
|
||||
MaxAge: maxAge, // maximum number of day files retained
|
||||
MaxBackups: maxBackup, // maximum number of old files retained
|
||||
Compress: compress, // whether to compress
|
||||
}
|
||||
|
||||
syncConsole := zapcore.AddSync(os.Stderr)
|
||||
if mode == constants.DevelopmentLogMode {
|
||||
return syncConsole
|
||||
}
|
||||
|
||||
syncFile := zapcore.AddSync(lumberJackLogger)
|
||||
return zapcore.NewMultiWriteSyncer(syncFile, syncConsole)
|
||||
}
|
||||
|
||||
// initLogger return successfully initialized zap logger
|
||||
func initLogger(lCfg config.LoggerConfig) *zap.Logger {
|
||||
writeSyncer := getLogWriter(lCfg.Mode, lCfg.FilePath, lCfg.MaxSize, lCfg.MaxBackups, lCfg.MaxAge, lCfg.Compress)
|
||||
encoder := getEncoder()
|
||||
|
||||
l := new(zapcore.Level)
|
||||
err := l.UnmarshalText([]byte(lCfg.Level))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
core := zapcore.NewCore(encoder, writeSyncer, l)
|
||||
logger := zap.New(core, zap.AddCaller())
|
||||
|
||||
// 替换全局日志实例
|
||||
zap.ReplaceGlobals(logger)
|
||||
return logger
|
||||
}
|
||||
|
||||
// InitLoggerInstance define func of return instance of zap logger
|
||||
func InitLoggerInstance(lCfg config.LoggerConfig) {
|
||||
once.Do(func() {
|
||||
_globalLogger = initLogger(lCfg)
|
||||
})
|
||||
}
|
||||
|
||||
// GetLoggerInstance define func of returns the global logger instance It's safe for concurrent use.
|
||||
func GetLoggerInstance() *zap.Logger {
|
||||
_globalLoggerMu.RLock()
|
||||
logger := _globalLogger
|
||||
_globalLoggerMu.RUnlock()
|
||||
return logger
|
||||
}
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
// entry function
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"eventRT/config"
|
||||
"eventRT/constants"
|
||||
"eventRT/database"
|
||||
"eventRT/logger"
|
||||
"eventRT/mq"
|
||||
"eventRT/util"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
eventRTConfigDir = flag.String("eventRT_config_dir", "./configs", "config file dir")
|
||||
eventRTConfigName = flag.String("eventRT_config_name", "config", "config file name")
|
||||
eventRTConfigType = flag.String("eventRT_config_type", "yaml", "config file type")
|
||||
)
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
configPath := filepath.Join(*eventRTConfigDir, *eventRTConfigName+"."+*eventRTConfigType)
|
||||
if _, err := os.Stat(configPath); os.IsNotExist(err) {
|
||||
log.Println("configuration file not found,checking for example file")
|
||||
|
||||
exampleConfigPath := filepath.Join(*eventRTConfigDir, *eventRTConfigName+".example."+*eventRTConfigType)
|
||||
configDir := filepath.Dir(configPath)
|
||||
if err := os.MkdirAll(configDir, 0o755); err != nil {
|
||||
panic(fmt.Errorf("failed to create config directory %s:%w", configDir, err))
|
||||
}
|
||||
if _, err := os.Stat(exampleConfigPath); err == nil {
|
||||
if err := util.CopyFile(exampleConfigPath, configPath); err != nil {
|
||||
panic(fmt.Errorf("failed to copy example config file:%w", err))
|
||||
}
|
||||
} else {
|
||||
panic(errors.New("no config file and no config example file found"))
|
||||
}
|
||||
}
|
||||
|
||||
eventRTConfig := config.ReadAndInitConfig(*eventRTConfigDir, *eventRTConfigName, *eventRTConfigType)
|
||||
// init logger instance
|
||||
logger.InitLoggerInstance(eventRTConfig.LoggerConfig)
|
||||
|
||||
// init MongoDB client
|
||||
notifyCtx, stop := signal.NotifyContext(context.TODO(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
client := database.InitMongoInstance(notifyCtx, eventRTConfig)
|
||||
defer func() {
|
||||
disconnectCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := client.Disconnect(disconnectCtx); err != nil {
|
||||
logger.Error(notifyCtx, "mongodb disconnect failed", "err", err)
|
||||
} else {
|
||||
logger.Info(notifyCtx, "mongodb connection closed gracefully")
|
||||
}
|
||||
}()
|
||||
|
||||
// init RabbitMQ connection
|
||||
mq.InitRabbitProxy(notifyCtx, eventRTConfig.RabbitMQConfig)
|
||||
defer mq.GetConn().Close()
|
||||
|
||||
// use release mode in production
|
||||
if eventRTConfig.DeployEnv == constants.ProductionDeployMode {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
}
|
||||
engine := gin.New()
|
||||
engine.Use(gin.Logger(), gin.Recovery())
|
||||
|
||||
// TODO k8s liveness & readiness probe endpoints
|
||||
// engine.GET("/ping", func(c *gin.Context) {
|
||||
// c.JSON(200, gin.H{"status": "ok"})
|
||||
// })
|
||||
|
||||
server := http.Server{
|
||||
Addr: eventRTConfig.ServiceAddr,
|
||||
Handler: engine,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-notifyCtx.Done()
|
||||
ctx := context.Background()
|
||||
logger.Info(ctx, "shutdown signal received, cleaning up...")
|
||||
shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
if err := server.Shutdown(shutdownCtx); err != nil {
|
||||
logger.Error(shutdownCtx, "server shutdown failed", "err", err)
|
||||
}
|
||||
mq.CloseRabbitProxy()
|
||||
logger.Info(ctx, "resources cleaned up, exiting")
|
||||
}()
|
||||
|
||||
logger.Info(notifyCtx, "starting EventRT server")
|
||||
err := server.ListenAndServe()
|
||||
if err != nil {
|
||||
if err == http.ErrServerClosed {
|
||||
// the service receives the shutdown signal normally and then closes
|
||||
logger.Info(notifyCtx, "server closed under request")
|
||||
} else {
|
||||
// abnormal shutdown of service
|
||||
logger.Error(notifyCtx, "server closed unexpected", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,217 @@
|
|||
// Package mq define message queue operation functions
|
||||
package mq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"eventRT/config"
|
||||
"eventRT/logger"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/youmark/pkcs8"
|
||||
)
|
||||
|
||||
var (
|
||||
_globalRabbitMQProxy *RabbitMQProxy
|
||||
rabbitMQOnce sync.Once
|
||||
)
|
||||
|
||||
// RabbitMQProxy define stuct of rabbitMQ connection proxy
|
||||
type RabbitMQProxy struct {
|
||||
tlsConf *tls.Config
|
||||
conn *amqp.Connection
|
||||
cancel context.CancelFunc
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// rabbitMQCertConf define stuct of rabbitMQ connection certificates config
|
||||
type rabbitMQCertConf struct {
|
||||
serverName string
|
||||
insecureSkipVerify bool
|
||||
clientCert tls.Certificate
|
||||
caCertPool *x509.CertPool
|
||||
}
|
||||
|
||||
// GetConn define func to return the rabbitMQ connection
|
||||
func GetConn() *amqp.Connection {
|
||||
_globalRabbitMQProxy.mu.Lock()
|
||||
defer _globalRabbitMQProxy.mu.Unlock()
|
||||
return _globalRabbitMQProxy.conn
|
||||
}
|
||||
|
||||
// InitRabbitProxy return instance of rabbitMQ connection
|
||||
func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy {
|
||||
amqpURI := generateRabbitMQURI(rCfg)
|
||||
tlsConf, err := initCertConf(rCfg)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "init rabbitMQ cert config failed", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
rabbitMQOnce.Do(func() {
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
conn := initRabbitMQ(ctx, amqpURI, tlsConf)
|
||||
_globalRabbitMQProxy = &RabbitMQProxy{tlsConf: tlsConf, conn: conn, cancel: cancel}
|
||||
go _globalRabbitMQProxy.handleReconnect(cancelCtx, amqpURI)
|
||||
})
|
||||
return _globalRabbitMQProxy
|
||||
}
|
||||
|
||||
// initRabbitMQ return instance of rabbitMQ connection
|
||||
func initRabbitMQ(ctx context.Context, rabbitMQURI string, tlsConf *tls.Config) *amqp.Connection {
|
||||
logger.Info(ctx, "connecting to rabbitMQ server", "rabbitmq_uri", rabbitMQURI)
|
||||
conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
|
||||
TLSClientConfig: tlsConf,
|
||||
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
|
||||
Heartbeat: 10 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(ctx, "init rabbitMQ connection failed", "error", err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) {
|
||||
for {
|
||||
closeChan := make(chan *amqp.Error)
|
||||
GetConn().NotifyClose(closeChan)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(ctx, "context cancelled, exiting handleReconnect")
|
||||
return
|
||||
case err, ok := <-closeChan:
|
||||
if !ok {
|
||||
logger.Info(ctx, "rabbitMQ notify channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
logger.Info(ctx, "rabbitMQ connection closed normally, no need to reconnect")
|
||||
return
|
||||
}
|
||||
|
||||
logger.Warn(ctx, "rabbitMQ connection closed by error, starting reconnect", "reason", err)
|
||||
}
|
||||
|
||||
if !p.reconnect(ctx, rabbitMQURI) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RabbitMQProxy) reconnect(ctx context.Context, rabbitMQURI string) bool {
|
||||
for {
|
||||
logger.Info(ctx, "attempting to reconnect to rabbitMQ...")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(5 * time.Second):
|
||||
|
||||
}
|
||||
|
||||
newConn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{
|
||||
TLSClientConfig: p.tlsConf,
|
||||
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
|
||||
Heartbeat: 10 * time.Second,
|
||||
})
|
||||
if err == nil {
|
||||
p.mu.Lock()
|
||||
p.conn = newConn
|
||||
p.mu.Unlock()
|
||||
logger.Info(ctx, "rabbitMQ reconnected successfully")
|
||||
return true
|
||||
}
|
||||
logger.Error(ctx, "rabbitMQ reconnect failed, will retry", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// CloseRabbitProxy close the rabbitMQ connection and stop reconnect goroutine
|
||||
func CloseRabbitProxy() {
|
||||
if _globalRabbitMQProxy != nil {
|
||||
_globalRabbitMQProxy.cancel()
|
||||
_globalRabbitMQProxy.mu.Lock()
|
||||
if _globalRabbitMQProxy.conn != nil {
|
||||
_globalRabbitMQProxy.conn.Close()
|
||||
}
|
||||
_globalRabbitMQProxy.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func generateRabbitMQURI(rCfg config.RabbitMQConfig) string {
|
||||
// TODO 考虑拆分用户名密码配置项,兼容不同认证方式
|
||||
// user := url.QueryEscape(rCfg.User)
|
||||
// password := url.QueryEscape(rCfg.Password)
|
||||
|
||||
// amqpURI := fmt.Sprintf("amqps://%s:%s@%s:%d/",
|
||||
// user,
|
||||
// password,
|
||||
// rCfg.Host,
|
||||
// rCfg.Port,
|
||||
// )
|
||||
amqpURI := fmt.Sprintf("amqps://%s:%d/",
|
||||
rCfg.Host,
|
||||
rCfg.Port,
|
||||
)
|
||||
return amqpURI
|
||||
}
|
||||
|
||||
func initCertConf(rCfg config.RabbitMQConfig) (*tls.Config, error) {
|
||||
tlsConf := &tls.Config{
|
||||
InsecureSkipVerify: rCfg.InsecureSkipVerify,
|
||||
ServerName: rCfg.ServerName,
|
||||
}
|
||||
|
||||
caCert, err := os.ReadFile(rCfg.CACertPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read server ca file failed: %w", err)
|
||||
}
|
||||
caCertPool := x509.NewCertPool()
|
||||
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
|
||||
return nil, fmt.Errorf("failed to parse root certificate from %s", rCfg.CACertPath)
|
||||
}
|
||||
tlsConf.RootCAs = caCertPool
|
||||
|
||||
certPEM, err := os.ReadFile(rCfg.ClientCertPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read client cert file failed: %w", err)
|
||||
}
|
||||
|
||||
keyData, err := os.ReadFile(rCfg.ClientKeyPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read private key file failed: %w", err)
|
||||
}
|
||||
|
||||
block, _ := pem.Decode(keyData)
|
||||
if block == nil {
|
||||
return nil, fmt.Errorf("failed to decode PEM block from private key")
|
||||
}
|
||||
|
||||
der, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse password-protected private key failed: %w", err)
|
||||
}
|
||||
|
||||
privBytes, err := x509.MarshalPKCS8PrivateKey(der)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal private key failed: %w", err)
|
||||
}
|
||||
|
||||
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
|
||||
|
||||
clientCert, err := tls.X509KeyPair(certPEM, keyPEM)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create x509 key pair failed: %w", err)
|
||||
}
|
||||
|
||||
tlsConf.Certificates = []tls.Certificate{clientCert}
|
||||
return tlsConf, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
// Package util provide some utility functions
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
// CopyFile define func of copies a file from src to dst.
|
||||
// If the destination file exists, it will be overwritten.
|
||||
func CopyFile(src, dst string) error {
|
||||
sourceFile, err := os.Open(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open source file %s: %w", src, err)
|
||||
}
|
||||
defer sourceFile.Close()
|
||||
|
||||
destFile, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create destination file %s: %w", dst, err)
|
||||
}
|
||||
defer destFile.Close()
|
||||
|
||||
_, err = io.Copy(destFile, sourceFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy file contents from %s to %s: %w", src, dst, err)
|
||||
}
|
||||
|
||||
err = destFile.Sync()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to sync destination file %s: %w", dst, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in New Issue