diff --git a/config/config.go b/config/config.go index b6379d3..3c09187 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,20 @@ type ServiceConfig struct { DeployEnv string `mapstructure:"deploy_env"` } +// RabbitMQConfig define config struct of RabbitMQ config +type RabbitMQConfig struct { + CACertPath string `mapstructure:"ca_cert_path"` + ClientKeyPath string `mapstructure:"client_key_path"` + ClientKeyPassword string `mapstructure:"client_key_password"` + ClientCertPath string `mapstructure:"client_cert_path"` + InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` + ServerName string `mapstructure:"server_name"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` +} + // KafkaConfig define config struct of kafka config type KafkaConfig struct { Servers string `mapstructure:"Servers"` @@ -82,6 +96,7 @@ type ModelRTConfig struct { BaseConfig `mapstructure:"base"` ServiceConfig `mapstructure:"service"` PostgresConfig `mapstructure:"postgres"` + RabbitMQConfig `mapstructure:"rabbitmq"` KafkaConfig `mapstructure:"kafka"` LoggerConfig `mapstructure:"logger"` AntsConfig `mapstructure:"ants"` diff --git a/handler/real_time_data_pull.go b/handler/real_time_data_pull.go index 24ccf50..986ed11 100644 --- a/handler/real_time_data_pull.go +++ b/handler/real_time_data_pull.go @@ -401,7 +401,6 @@ func stopAllPolling(ctx context.Context, stopChanMap map[string]chan struct{}) { close(stopChan) } clear(stopChanMap) - return } // redisPollingConfig define struct for param which query real time data from redis diff --git a/handler/real_time_data_query.go b/handler/real_time_data_query.go index c734739..03e9786 100644 --- a/handler/real_time_data_query.go +++ b/handler/real_time_data_query.go @@ -168,7 +168,6 @@ func receiveRealTimeDataByWebSocket(ctx context.Context, params url.Values, tran } transportChannel <- subPoss } - return } // messageTypeToString define func of auxiliary to convert message type to string diff --git a/handler/real_time_data_subscription.go b/handler/real_time_data_subscription.go index afc396b..3f171cf 100644 --- a/handler/real_time_data_subscription.go +++ b/handler/real_time_data_subscription.go @@ -620,7 +620,6 @@ func (s *SharedSubState) RemoveTargets(ctx context.Context, clientID string, mea // UpdateTargets define function to update targets in SharedSubState func (s *SharedSubState) UpdateTargets(ctx context.Context, tx *gorm.DB, clientID string, measurements []network.RealTimeMeasurementItem) ([]network.TargetResult, error) { requestTargetsCount := processRealTimeRequestCount(measurements) - targetProcessResults := make([]network.TargetResult, 0, requestTargetsCount) s.globalMutex.RLock() config, exist := s.subMap[clientID] diff --git a/main.go b/main.go index a0aca1e..8658a56 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( "modelRT/diagram" "modelRT/logger" "modelRT/model" + "modelRT/mq" "modelRT/pool" "modelRT/router" "modelRT/util" @@ -149,6 +150,9 @@ func main() { } defer anchorRealTimePool.Release() + // init rabbitmq connection + mq.InitRabbitProxy(ctx, modelRTConfig.RabbitMQConfig) + postgresDBClient.Transaction(func(tx *gorm.DB) error { // load circuit diagram from postgres // componentTypeMap, err := database.QueryCircuitDiagramComponentFromDB(cancelCtx, tx, parsePool) diff --git a/mq/publish_event.go b/mq/publish_event.go new file mode 100644 index 0000000..7984741 --- /dev/null +++ b/mq/publish_event.go @@ -0,0 +1,129 @@ +// Package mq provides read or write access to message queue services +package mq + +import ( + "context" + "time" + + "modelRT/logger" + + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + routingKey = "event-alarm-routing-key" + exchangeName = "event-alarm-exchange" + queueName = "event-alarm-queue" + deadRoutingKey = "event-alarm-dead-letter-routing-key" + deadExchangeName = "event-alarm-dead-letter-exchange" + deadQueueName = "event-alarm-dead-letter-queue" +) + +func initChannel(ctx context.Context) (*amqp.Channel, error) { + var channel *amqp.Channel + var err error + + channel, err = GetConn().Channel() + if err != nil { + logger.Error(ctx, "open rabbitMQ server channel failed", "error", err) + } + + err = channel.ExchangeDeclare(deadExchangeName, "topic", true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare event dead letter exchange failed", "error", err) + } + + _, err = channel.QueueDeclare(deadQueueName, true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare event dead letter queue failed", "error", err) + } + + err = channel.QueueBind(deadQueueName, deadRoutingKey, deadExchangeName, false, nil) + if err != nil { + logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err) + } + + err = channel.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil) + if err != nil { + logger.Error(ctx, "declare event exchange failed", "error", err) + } + + args := amqp.Table{ + // messages that accumulate to the maximum number will be automatically transferred to the dead letter queue + "x-max-length": int32(50), + "x-dead-letter-exchange": deadExchangeName, + "x-dead-letter-routing-key": deadRoutingKey, + } + _, err = channel.QueueDeclare(queueName, true, false, false, false, args) + if err != nil { + logger.Error(ctx, "declare event queue failed", "error", err) + } + + err = channel.QueueBind(queueName, routingKey, exchangeName, false, nil) + if err != nil { + logger.Error(ctx, "bind event queue with routing key and exchange failed:", "error", err) + } + + if err := channel.Confirm(false); err != nil { + logger.Error(ctx, "channel could not be put into confirm mode", "error", err) + } + return channel, nil +} + +func pushEventToRabbitMQ(ctx context.Context, msgChan chan string) { + channel, err := initChannel(ctx) + if err != nil { + logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err) + return + } + // TODO 使用配置修改确认模式通道参数 + confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100)) + + go func() { + for { + select { + case confirm, ok := <-confirms: + if !ok { + return + } + if !confirm.Ack { + logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag) + } + case <-ctx.Done(): + return + } + } + }() + + for { + select { + case <-ctx.Done(): + logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel") + channel.Close() + return + case msg, ok := <-msgChan: + if !ok { + logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop") + channel.Close() + return + } + + // send event alarm message to rabbitMQ queue + pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + err = channel.PublishWithContext(pubCtx, + exchangeName, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(msg), + }) + cancel() + + if err != nil { + logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", msg, "error", err) + } + } + } +} diff --git a/mq/publisher_with_ssh_091.go b/mq/publisher_with_ssh_091.go deleted file mode 100644 index 2319558..0000000 --- a/mq/publisher_with_ssh_091.go +++ /dev/null @@ -1,160 +0,0 @@ -// Package mq provides read or write access to message queue services -package mq - -import ( - "context" - "crypto/tls" - "crypto/x509" - "encoding/pem" - "fmt" - "log" - "os" - "time" - - amqp "github.com/rabbitmq/amqp091-go" - "github.com/youmark/pkcs8" -) - -func main() { - exchangeName := "getting-started-go-exchange" - queueName := "getting-started-go-queue" - routingKey := "routing-key" - - deadExchangeName := "getting-started-go-dead-letter-exchange" - deadQueueName := "getting-started-go-dead-letter-queue" - deadRoutingKey := "dead-letter-routing-key" - - caCert, err := os.ReadFile("../certs/ca_certificate.pem") - if err != nil { - log.Fatal("read ca file failed", err) - } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - - keyData, err := os.ReadFile("../certs/client_key.pem") - if err != nil { - log.Fatal("read private key file failed", err) - } - - block, _ := pem.Decode(keyData) - password := []byte("ecl3000") - privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) - if err != nil { - log.Fatal("parse private key failed", err) - } - - pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) - pemBlock := &pem.Block{Type: "PRIVATE KEY", Bytes: pemBytes} - - certPEM, err := os.ReadFile("../certs/client_certificate.pem") - clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock)) - if err != nil { - log.Fatal("load client cert failed", err) - } - - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{clientCert}, - RootCAs: caCertPool, - InsecureSkipVerify: false, - ServerName: "douxu-buntu22", - } - - url := "amqps://192.168.2.104:5671/" - - conn, err := amqp.DialConfig(url, amqp.Config{ - TLSClientConfig: tlsConfig, - SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, - Heartbeat: 10 * time.Second, - }) - if err != nil { - log.Fatal("Error opening connection: ", err) - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Fatal("Error opening channel: ", err) - } - defer ch.Close() - - go func() { - closeErr := <-conn.NotifyClose(make(chan *amqp.Error)) - log.Printf("Connection closed: %v", closeErr) - }() - - err = ch.ExchangeDeclare(deadExchangeName, "topic", true, false, false, false, nil) - if err != nil { - log.Fatal("Error declaring dead letter exchange: ", err) - } - - _, err = ch.QueueDeclare(deadQueueName, true, false, false, false, nil) - if err != nil { - log.Fatal("Error declaring dead letter queue: ", err) - } - - err = ch.QueueBind(deadQueueName, deadRoutingKey, deadExchangeName, false, nil) - if err != nil { - log.Fatal("Error binding dead letter: ", err) - } - - err = ch.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil) - if err != nil { - log.Fatal("Error declaring exchange: ", err) - } - - args := amqp.Table{ - "x-max-length": int32(50), - "x-dead-letter-exchange": deadExchangeName, - "x-dead-letter-routing-key": deadRoutingKey, - } - _, err = ch.QueueDeclare(queueName, true, false, false, false, args) - if err != nil { - log.Fatal("Error declaring queue: ", err) - } - - err = ch.QueueBind(queueName, routingKey, exchangeName, false, nil) - if err != nil { - log.Fatal("Error binding queue: ", err) - } - - if err := ch.Confirm(false); err != nil { - log.Fatal("Channel could not be put into confirm mode: ", err) - } - - confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) - - for i := 0; i < 100; i++ { - msgBody := fmt.Sprintf("Hello, World!%d", i) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - err = ch.PublishWithContext(ctx, - exchangeName, // exchange - routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(msgBody), - }) - cancel() - - if err != nil { - log.Printf("Error publishing message: %v", err) - time.Sleep(1 * time.Second) - continue - } - - select { - case confirm := <-confirms: - if confirm.Ack { - log.Printf("[Publisher] Message %d accepted", i) - } else { - log.Printf("[Publisher] Message %d Nack (Rejected by RabbitMQ)", i) - } - case <-time.After(5 * time.Second): - log.Printf("[Publisher] Timeout waiting for confirm for message %d", i) - } - } - - log.Println("Producer finished sending messages") -} diff --git a/mq/rabbitmq_init.go b/mq/rabbitmq_init.go new file mode 100644 index 0000000..9866f93 --- /dev/null +++ b/mq/rabbitmq_init.go @@ -0,0 +1,185 @@ +// Package mq define message queue operation functions +package mq + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "net/url" + "os" + "sync" + "time" + + "modelRT/config" + "modelRT/logger" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/youmark/pkcs8" +) + +var ( + _globalRabbitMQProxy *RabbitMQProxy + rabbitMQOnce sync.Once +) + +// RabbitMQProxy define stuct of rabbitMQ connection proxy +type RabbitMQProxy struct { + Conn *amqp.Connection + mu sync.Mutex +} + +// RabbitMQCertConf define stuct of rabbitMQ connection certificates config +type RabbitMQCertConf struct { + serverName string + insecureSkipVerify bool + clientCert tls.Certificate + caCertPool *x509.CertPool +} + +// GetConn define func to return the rabbitMQ connection +func GetConn() *amqp.Connection { + _globalRabbitMQProxy.mu.Lock() + defer _globalRabbitMQProxy.mu.Unlock() + return _globalRabbitMQProxy.Conn +} + +// InitRabbitProxy return instance of rabbitMQ connection +func InitRabbitProxy(ctx context.Context, rCfg config.RabbitMQConfig) *RabbitMQProxy { + amqpURI := generateRabbitMQURI(rCfg) + certConf, err := readCertFiles(ctx, rCfg) + if err != nil { + logger.Error(ctx, "read rabbitMQ cert files failed", "error", err) + panic(err) + } + rabbitMQOnce.Do(func() { + conn := initRabbitMQ(ctx, amqpURI, certConf) + _globalRabbitMQProxy = &RabbitMQProxy{Conn: conn} + go _globalRabbitMQProxy.handleReconnect(ctx, amqpURI) + }) + return _globalRabbitMQProxy +} + +// initRabbitMQ return instance of rabbitMQ connection +func initRabbitMQ(ctx context.Context, rabbitMQURI string, certConf *RabbitMQCertConf) *amqp.Connection { + logger.Info(ctx, fmt.Sprintf("connecting to rabbitMQ server at: %s", rabbitMQURI)) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{certConf.clientCert}, + RootCAs: certConf.caCertPool, + InsecureSkipVerify: certConf.insecureSkipVerify, + ServerName: certConf.serverName, + } + + conn, err := amqp.DialConfig(rabbitMQURI, amqp.Config{ + TLSClientConfig: tlsConfig, + SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, + Heartbeat: 10 * time.Second, + }) + if err != nil { + logger.Error(ctx, "Error opening connection: ", "error", err) + } + defer conn.Close() + + return conn +} + +func (p *RabbitMQProxy) handleReconnect(ctx context.Context, rabbitMQURI string) { + for { + closeChan := make(chan *amqp.Error) + GetConn().NotifyClose(closeChan) + + err, ok := <-closeChan + + if !ok { + logger.Info(ctx, "rabbitMQ notify channel closed, stopping solicitor") + break + } + + if err != nil { + logger.Warn(ctx, "rabbitMQ connection closed by error", "reason", err) + } else { + logger.Info(ctx, "rabbitMQ connection closed normally (nil err)") + } + + for { + time.Sleep(5 * time.Second) + newConn, err := amqp.Dial(rabbitMQURI) + if err == nil { + p.mu.Lock() + p.Conn = newConn + p.mu.Unlock() + logger.Info(ctx, "rabbitMQ reconnected successfully") + break + } + logger.Error(ctx, "rabbitMQ reconnect failed", "err", err) + } + } +} + +func generateRabbitMQURI(rCfg config.RabbitMQConfig) string { + user := url.QueryEscape(rCfg.User) + password := url.QueryEscape(rCfg.Password) + + amqpURI := fmt.Sprintf("amqp://%s:%s@%s:%d/", + user, + password, + rCfg.Host, + rCfg.Port, + ) + return amqpURI +} + +func readCertFiles(ctx context.Context, rCfg config.RabbitMQConfig) (*RabbitMQCertConf, error) { + var initFailedFlag bool + certConf := RabbitMQCertConf{ + insecureSkipVerify: rCfg.InsecureSkipVerify, + } + + caCert, err := os.ReadFile(rCfg.CACertPath) + if err != nil { + logger.Error(ctx, "read server ca file failed", "error", err) + initFailedFlag = true + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + certConf.caCertPool = caCertPool + + keyData, err := os.ReadFile(rCfg.ClientKeyPath) + if err != nil { + logger.Error(ctx, "read private key file failed", "error", err) + initFailedFlag = true + } + + block, _ := pem.Decode(keyData) + privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(rCfg.ClientKeyPassword)) + if err != nil { + logger.Error(ctx, "parse private key failed", "error", err) + initFailedFlag = true + } + + pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + logger.Error(ctx, "parse private key failed", "error", err) + initFailedFlag = true + } + pemBlock := &pem.Block{Type: "PRIVATE KEY", Bytes: pemBytes} + + certPEM, err := os.ReadFile(rCfg.ClientCertPath) + if err != nil { + logger.Error(ctx, "parse private key failed", "error", err) + initFailedFlag = true + } + clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock)) + if err != nil { + logger.Error(ctx, "load client cert failed", "error", err) + initFailedFlag = true + } + certConf.serverName = rCfg.ServerName + certConf.clientCert = clientCert + if initFailedFlag { + return nil, fmt.Errorf("rabbitMQ cert files init failed") + } + return &certConf, nil +}