From 35cb969a54029fda6d2d1cd82ee8ae8053dffd85 Mon Sep 17 00:00:00 2001 From: douxu Date: Mon, 2 Feb 2026 16:48:46 +0800 Subject: [PATCH] add code of inter-module communication --- alert/event.go | 58 +++++++++++++ go.mod | 4 +- go.sum | 4 + mq/publisher_with_ssh_091.go | 160 +++++++++++++++++++++++++++++++++++ sharememory/share_memeory.go | 97 --------------------- 5 files changed, 225 insertions(+), 98 deletions(-) create mode 100644 alert/event.go create mode 100644 mq/publisher_with_ssh_091.go delete mode 100644 sharememory/share_memeory.go diff --git a/alert/event.go b/alert/event.go new file mode 100644 index 0000000..f580489 --- /dev/null +++ b/alert/event.go @@ -0,0 +1,58 @@ +// Package alert define alert event struct of modelRT project +package alert + +// EventRecord define struct for CIM event record +type EventRecord struct { + // 事件名称 + Event string `json:"event"` + // 事件唯一标识符 + EventUUID string `json:"event_uuid"` + // 事件类型 + Type int `json:"type"` + // 事件优先级 (0-9) + Priority int `json:"priority"` + // 事件状态 + Status int `json:"status"` + // 可选模板参数 + Category string `json:"category,omitempty"` + // 毫秒级时间戳 (Unix epoch) + Timestamp int64 `json:"timestamp"` + // 事件来源 (station, platform, msa) + From string `json:"from"` + // 事件场景描述对象 (如阈值、当前值) + Condition map[string]any `json:"condition"` + // 与事件相关的订阅信息 + AttachedSubscriptions []any `json:"attached_subscriptions"` + // 事件分析结果对象 + Result map[string]any `json:"result,omitempty"` + // 操作历史记录 (CIM ActivityRecord) + Operations []OperationRecord `json:"operations"` + // 子站告警原始数据 (CIM Alarm 数据) + Alarm map[string]any `json:"alarm,omitempty"` +} + +// OperationRecord 描述对事件的操作记录,如确认(acknowledgment)等 +type OperationRecord struct { + Action string `json:"action"` // 执行的动作,如 "acknowledgment" + Op string `json:"op"` // 操作人/操作账号标识 + TS int64 `json:"ts"` // 操作发生的毫秒时间戳 +} + +// 定义事件类型常量,便于逻辑判断 +const ( + TypeGeneralHard = 0 + TypeGeneralPlatformSoft = 1 + TypeGeneralApplicationSoft = 2 + TypeWarnHard = 3 + TypeWarnPlatformSoft = 4 + TypeWarnApplicationSoft = 5 + TypeCriticalHard = 6 + TypeCriticalPlatformSoft = 7 + TypeCriticalApplicationSoft = 8 +) + +const ( + FromStation = "station" + FromPlatform = "platform" + FromMSA = "msa" +) diff --git a/go.mod b/go.mod index 3695aef..99cc668 100644 --- a/go.mod +++ b/go.mod @@ -13,14 +13,15 @@ require ( github.com/json-iterator/go v1.1.12 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/panjf2000/ants/v2 v2.10.0 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/redis/go-redis/v9 v9.7.3 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 github.com/swaggo/swag v1.16.4 + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 go.uber.org/zap v1.27.0 - golang.org/x/sys v0.28.0 gorm.io/driver/mysql v1.5.7 gorm.io/driver/postgres v1.5.9 gorm.io/gorm v1.25.12 @@ -81,6 +82,7 @@ require ( golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.28.0 // indirect google.golang.org/protobuf v1.35.2 // indirect diff --git a/go.sum b/go.sum index 33b982e..2b3c5ff 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,8 @@ github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xl github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= @@ -162,6 +164,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/mq/publisher_with_ssh_091.go b/mq/publisher_with_ssh_091.go new file mode 100644 index 0000000..2319558 --- /dev/null +++ b/mq/publisher_with_ssh_091.go @@ -0,0 +1,160 @@ +// Package mq provides read or write access to message queue services +package mq + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "log" + "os" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/youmark/pkcs8" +) + +func main() { + exchangeName := "getting-started-go-exchange" + queueName := "getting-started-go-queue" + routingKey := "routing-key" + + deadExchangeName := "getting-started-go-dead-letter-exchange" + deadQueueName := "getting-started-go-dead-letter-queue" + deadRoutingKey := "dead-letter-routing-key" + + caCert, err := os.ReadFile("../certs/ca_certificate.pem") + if err != nil { + log.Fatal("read ca file failed", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + keyData, err := os.ReadFile("../certs/client_key.pem") + if err != nil { + log.Fatal("read private key file failed", err) + } + + block, _ := pem.Decode(keyData) + password := []byte("ecl3000") + privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) + if err != nil { + log.Fatal("parse private key failed", err) + } + + pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + pemBlock := &pem.Block{Type: "PRIVATE KEY", Bytes: pemBytes} + + certPEM, err := os.ReadFile("../certs/client_certificate.pem") + clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock)) + if err != nil { + log.Fatal("load client cert failed", err) + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{clientCert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + ServerName: "douxu-buntu22", + } + + url := "amqps://192.168.2.104:5671/" + + conn, err := amqp.DialConfig(url, amqp.Config{ + TLSClientConfig: tlsConfig, + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, + Heartbeat: 10 * time.Second, + }) + if err != nil { + log.Fatal("Error opening connection: ", err) + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + log.Fatal("Error opening channel: ", err) + } + defer ch.Close() + + go func() { + closeErr := <-conn.NotifyClose(make(chan *amqp.Error)) + log.Printf("Connection closed: %v", closeErr) + }() + + err = ch.ExchangeDeclare(deadExchangeName, "topic", true, false, false, false, nil) + if err != nil { + log.Fatal("Error declaring dead letter exchange: ", err) + } + + _, err = ch.QueueDeclare(deadQueueName, true, false, false, false, nil) + if err != nil { + log.Fatal("Error declaring dead letter queue: ", err) + } + + err = ch.QueueBind(deadQueueName, deadRoutingKey, deadExchangeName, false, nil) + if err != nil { + log.Fatal("Error binding dead letter: ", err) + } + + err = ch.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil) + if err != nil { + log.Fatal("Error declaring exchange: ", err) + } + + args := amqp.Table{ + "x-max-length": int32(50), + "x-dead-letter-exchange": deadExchangeName, + "x-dead-letter-routing-key": deadRoutingKey, + } + _, err = ch.QueueDeclare(queueName, true, false, false, false, args) + if err != nil { + log.Fatal("Error declaring queue: ", err) + } + + err = ch.QueueBind(queueName, routingKey, exchangeName, false, nil) + if err != nil { + log.Fatal("Error binding queue: ", err) + } + + if err := ch.Confirm(false); err != nil { + log.Fatal("Channel could not be put into confirm mode: ", err) + } + + confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) + + for i := 0; i < 100; i++ { + msgBody := fmt.Sprintf("Hello, World!%d", i) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err = ch.PublishWithContext(ctx, + exchangeName, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(msgBody), + }) + cancel() + + if err != nil { + log.Printf("Error publishing message: %v", err) + time.Sleep(1 * time.Second) + continue + } + + select { + case confirm := <-confirms: + if confirm.Ack { + log.Printf("[Publisher] Message %d accepted", i) + } else { + log.Printf("[Publisher] Message %d Nack (Rejected by RabbitMQ)", i) + } + case <-time.After(5 * time.Second): + log.Printf("[Publisher] Timeout waiting for confirm for message %d", i) + } + } + + log.Println("Producer finished sending messages") +} diff --git a/sharememory/share_memeory.go b/sharememory/share_memeory.go deleted file mode 100644 index 8248064..0000000 --- a/sharememory/share_memeory.go +++ /dev/null @@ -1,97 +0,0 @@ -package sharememory - -import ( - "fmt" - "unsafe" - - "modelRT/orm" - - "golang.org/x/sys/unix" -) - -// CreateShareMemory defines a function to create a shared memory -func CreateShareMemory(key uintptr, structSize uintptr) (uintptr, error) { - // logger := logger.GetLoggerInstance() - // create shared memory - shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, structSize, unix.IPC_CREAT|0o666) - if err != 0 { - // logger.Error(fmt.Sprintf("create shared memory by key %v failed:", key), zap.Error(err)) - return 0, fmt.Errorf("create shared memory failed:%w", err) - } - - // attach shared memory - shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0) - if err != 0 { - // logger.Error(fmt.Sprintf("attach shared memory by shmID %v failed:", shmID), zap.Error(err)) - return 0, fmt.Errorf("attach shared memory failed:%w", err) - } - return shmAddr, nil -} - -// ReadComponentFromShareMemory defines a function to read component value from shared memory -func ReadComponentFromShareMemory(key uintptr, componentInfo *orm.Component) error { - structSize := unsafe.Sizeof(orm.Component{}) - shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666) - if err != 0 { - return fmt.Errorf("get shared memory failed:%w", err) - } - - shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0) - if err != 0 { - return fmt.Errorf("attach shared memory failed:%w", err) - } - - // 读取共享内存中的数据 - componentInfo = (*orm.Component)(unsafe.Pointer(shmAddr + structSize)) - - // Detach shared memory - unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0) - return nil -} - -func WriteComponentInShareMemory(key uintptr, componentInfo *orm.Component) error { - structSize := unsafe.Sizeof(orm.Component{}) - shmID, _, err := unix.Syscall(unix.SYS_SHMGET, key, uintptr(int(structSize)), 0o666) - if err != 0 { - return fmt.Errorf("get shared memory failed:%w", err) - } - - shmAddr, _, err := unix.Syscall(unix.SYS_SHMAT, shmID, 0, 0) - if err != 0 { - return fmt.Errorf("attach shared memory failed:%w", err) - } - - obj := (*orm.Component)(unsafe.Pointer(shmAddr + unsafe.Sizeof(structSize))) - fmt.Println(obj) - - // id integer NOT NULL DEFAULT nextval('component_id_seq'::regclass), - // global_uuid uuid NOT NULL DEFAULT gen_random_uuid(), - // nspath character varying(32) COLLATE pg_catalog."default", - // tag character varying(32) COLLATE pg_catalog."default" NOT NULL, - // name character varying(64) COLLATE pg_catalog."default" NOT NULL, - // description character varying(512) COLLATE pg_catalog."default" NOT NULL DEFAULT ''::character varying, - // grid character varying(64) COLLATE pg_catalog."default" NOT NULL, - // zone character varying(64) COLLATE pg_catalog."default" NOT NULL, - // station character varying(64) COLLATE pg_catalog."default" NOT NULL, - // type integer NOT NULL, - // in_service boolean DEFAULT false, - // state integer NOT NULL DEFAULT 0, - // connected_bus jsonb NOT NULL DEFAULT '{}'::jsonb, - // label jsonb NOT NULL DEFAULT '{}'::jsonb, - // context jsonb NOT NULL DEFAULT '{}'::jsonb, - // page_id integer NOT NULL, - // op integer NOT NULL DEFAULT '-1'::integer, - // ts timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, - - unix.Syscall(unix.SYS_SHMDT, shmAddr, 0, 0) - return nil -} - -// DeleteShareMemory defines a function to delete shared memory -func DeleteShareMemory(key uintptr) error { - _, _, err := unix.Syscall(unix.SYS_SHM_UNLINK, key, 0, 0o666) - if err != 0 { - return fmt.Errorf("get shared memory failed:%w", err) - } - return nil -}