154 lines
3.9 KiB
Go
154 lines
3.9 KiB
Go
// Package event define real time data evnet operation functions
|
|
package event
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
"eventRT/constants"
|
|
"eventRT/database"
|
|
"eventRT/logger"
|
|
"eventRT/mq"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
)
|
|
|
|
// amqpHeaderCarrier adapts amqp.Table to propagation.TextMapCarrier for trace context extraction.
|
|
type amqpHeaderCarrier amqp.Table
|
|
|
|
func (c amqpHeaderCarrier) Get(key string) string {
|
|
val, ok := amqp.Table(c)[key]
|
|
if !ok {
|
|
return ""
|
|
}
|
|
str, _ := val.(string)
|
|
return str
|
|
}
|
|
|
|
func (c amqpHeaderCarrier) Set(key, value string) {
|
|
amqp.Table(c)[key] = value
|
|
}
|
|
|
|
func (c amqpHeaderCarrier) Keys() []string {
|
|
keys := make([]string, 0, len(c))
|
|
for k := range c {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
var _ propagation.TextMapCarrier = amqpHeaderCarrier{}
|
|
|
|
const (
|
|
queueName = "event-up-down-queue"
|
|
exchangeName = "event-exchange"
|
|
)
|
|
|
|
// ReceiptUpDownLimitAlarm define func to receipt up down limit alarm event from modelRT service and process the event alarm
|
|
func ReceiptUpDownLimitAlarm(ctx context.Context) {
|
|
conn := mq.GetConn()
|
|
if conn == nil {
|
|
logger.Error(ctx, "get rabbitMQ connection for receiving alarms failed")
|
|
return
|
|
}
|
|
|
|
channel, err := conn.Channel()
|
|
if err != nil {
|
|
logger.Error(ctx, "open rabbitMQ channel for consumer failed", "error", err)
|
|
return
|
|
}
|
|
defer channel.Close()
|
|
|
|
uiChannel, err := initUIEventChannel(ctx)
|
|
if err != nil {
|
|
logger.Error(ctx, "init UI event channel failed", "error", err)
|
|
return
|
|
}
|
|
defer uiChannel.Close()
|
|
|
|
err = channel.QueueBind(
|
|
queueName, // 队列名
|
|
"event.#.updown.*",
|
|
exchangeName, // 交换机名
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
logger.Error(ctx, "channel bind queue and exchange failed", "error", err)
|
|
return
|
|
}
|
|
|
|
err = channel.Qos(1, 0, false)
|
|
if err != nil {
|
|
logger.Error(ctx, "set rabbitMQ Qos config failed", "error", err)
|
|
}
|
|
|
|
// registered consumer
|
|
msgs, err := channel.Consume(
|
|
queueName, // queue
|
|
"", // consumer tag
|
|
false, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
if err != nil {
|
|
logger.Error(ctx, "failed to register a consumer", "error", err)
|
|
return
|
|
}
|
|
|
|
logger.Info(ctx, "started receiving up-down limit alarms from rabbitMQ")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info(ctx, "receipt up-down limit alarm stopped by context cancel")
|
|
return
|
|
case msg, ok := <-msgs:
|
|
if !ok {
|
|
logger.Error(ctx, "message channel closed, exiting consumer loop")
|
|
return
|
|
}
|
|
processAlarmEventMessage(ctx, msg, uiChannel)
|
|
}
|
|
}
|
|
}
|
|
|
|
func processAlarmEventMessage(ctx context.Context, msg amqp.Delivery, uiCh *amqp.Channel) {
|
|
// extract upstream trace context injected by modelRT into AMQP headers
|
|
ctx = otel.GetTextMapPropagator().Extract(ctx, amqpHeaderCarrier(msg.Headers))
|
|
ctx, span := otel.Tracer("eventRT/event").Start(ctx, "processAlarmEventMessage")
|
|
defer span.End()
|
|
|
|
logger.Info(ctx, "received event alarm from modelRT up and down limit compute process")
|
|
mongodbClient := database.GetMongoClient()
|
|
|
|
var alarmEvent EventRecord
|
|
if err := json.Unmarshal(msg.Body, &alarmEvent); err != nil {
|
|
logger.Error(ctx, "unmarshal alarm event message failed", "error", err)
|
|
return
|
|
}
|
|
|
|
alarmEvent.IsPersisted = true
|
|
alarmEvent.Status = constants.EventStatusReported
|
|
|
|
result, err := mongodbClient.Database(constants.EventDBName).Collection(constants.EventCollectionName).InsertOne(ctx, alarmEvent)
|
|
if err != nil {
|
|
logger.Error(ctx, "failed to insert alarm event into database", "error", err)
|
|
return
|
|
}
|
|
logger.Info(ctx, "alarm event inserted into database", "result", result)
|
|
|
|
if err := msg.Ack(false); err != nil {
|
|
logger.Error(ctx, "ack alarm event message failed", "error", err)
|
|
return
|
|
}
|
|
|
|
if err := PublishEventToUI(ctx, uiCh, &alarmEvent); err != nil {
|
|
logger.Error(ctx, "publish alarm event to UI failed", "event_uuid", alarmEvent.EventUUID, "error", err)
|
|
}
|
|
}
|