modelRT/mq/publish_up_down_limit_event.go

189 lines
5.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package mq provides read or write access to message queue services
package mq
import (
"context"
"encoding/json"
"time"
"modelRT/constants"
"modelRT/logger"
"modelRT/mq/event"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
// amqpHeaderCarrier adapts amqp.Table to propagation.TextMapCarrier for trace context injection.
type amqpHeaderCarrier amqp.Table
func (c amqpHeaderCarrier) Get(key string) string {
val, ok := amqp.Table(c)[key]
if !ok {
return ""
}
str, _ := val.(string)
return str
}
func (c amqpHeaderCarrier) Set(key, value string) {
amqp.Table(c)[key] = value
}
func (c amqpHeaderCarrier) Keys() []string {
keys := make([]string, 0, len(c))
for k := range c {
keys = append(keys, k)
}
return keys
}
var _ propagation.TextMapCarrier = amqpHeaderCarrier{}
// EventMessage wraps an EventRecord with the trace context of the computation cycle that produced it.
type EventMessage struct {
Record *event.EventRecord
TraceCarrier map[string]string
}
// MsgChan define variable of channel to store messages that need to be sent to rabbitMQ
var MsgChan chan *EventMessage
func init() {
MsgChan = make(chan *EventMessage, 10000)
}
func initUpDownLimitEventChannel(ctx context.Context) (*amqp.Channel, error) {
var channel *amqp.Channel
var err error
channel, err = GetConn().Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ server channel failed", "error", err)
return nil, err
}
err = channel.ExchangeDeclare(constants.EventDeadExchangeName, "topic", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare event dead letter exchange failed", "error", err)
return nil, err
}
_, err = channel.QueueDeclare(constants.EventUpDownDeadQueueName, true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare event dead letter queue failed", "error", err)
return nil, err
}
err = channel.QueueBind(constants.EventUpDownDeadQueueName, "#", constants.EventDeadExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind event dead letter queue with routing key and exchange failed", "error", err)
return nil, err
}
err = channel.ExchangeDeclare(constants.EventExchangeName, "topic", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare event exchange failed", "error", err)
return nil, err
}
args := amqp.Table{
"x-max-length": int32(50),
"x-dead-letter-exchange": constants.EventDeadExchangeName,
"x-dead-letter-routing-key": constants.EventUpDownDeadRoutingKey,
}
_, err = channel.QueueDeclare(constants.EventUpDownQueueName, true, false, false, false, args)
if err != nil {
logger.Error(ctx, "declare event queue failed", "error", err)
return nil, err
}
err = channel.QueueBind(constants.EventUpDownQueueName, constants.EventUpDownRoutingKey, constants.EventExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind event queue with routing key and exchange failed", "error", err)
return nil, err
}
if err := channel.Confirm(false); err != nil {
logger.Error(ctx, "channel could not be put into confirm mode", "error", err)
return nil, err
}
return channel, nil
}
// PushUpDownLimitEventToRabbitMQ define func to push up and down limit event message to rabbitMQ
func PushUpDownLimitEventToRabbitMQ(ctx context.Context, msgChan chan *EventMessage) {
channel, err := initUpDownLimitEventChannel(ctx)
if err != nil {
logger.Error(ctx, "initializing rabbitMQ channel failed", "error", err)
return
}
// TODO 使用配置修改确认模式通道参数
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 100))
go func() {
for {
select {
case confirm, ok := <-confirms:
if !ok {
return
}
if !confirm.Ack {
logger.Error(ctx, "publish message failed (rejected by rabbitMQ)", "tag", confirm.DeliveryTag)
}
case <-ctx.Done():
return
}
}
}()
for {
select {
case <-ctx.Done():
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by context cancel")
channel.Close()
return
case msg, ok := <-msgChan:
if !ok {
logger.Info(ctx, "push event alarm message to rabbitMQ stopped by msgChan closed, exiting push loop")
channel.Close()
return
}
eventRecord := msg.Record
// TODO 将消息的序列化移动到发送之前以便使用eventRecord的category来作为routing key
recordBytes, err := json.Marshal(eventRecord)
if err != nil {
logger.Error(ctx, "marshal event record failed", "event_uuid", eventRecord.EventUUID, "error", err)
continue
}
// restore computation cycle trace context so the AMQP message carries the correct parent span
msgCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.TraceCarrier))
headers := amqp.Table{}
otel.GetTextMapPropagator().Inject(msgCtx, amqpHeaderCarrier(headers))
// send event alarm message to rabbitMQ queue
routingKey := eventRecord.Category
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
err = channel.PublishWithContext(pubCtx,
constants.EventExchangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: recordBytes,
Headers: headers,
})
cancel()
if err != nil {
logger.Error(ctx, "publish message to rabbitMQ queue failed", "message", recordBytes, "error", err)
}
}
}
}