eventRT/event/publish_ui_event.go

82 lines
2.4 KiB
Go

// Package event define real time data evnet operation functions
package event
import (
"context"
"encoding/json"
"time"
"eventRT/constants"
"eventRT/logger"
"eventRT/mq"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
)
// initUIEventChannel declares the fanout exchange and queue used by UI consumers,
// then returns a channel ready for publishing.
func initUIEventChannel(ctx context.Context) (*amqp.Channel, error) {
ch, err := mq.GetConn().Channel()
if err != nil {
logger.Error(ctx, "open rabbitMQ channel for UI event publish failed", "error", err)
return nil, err
}
err = ch.ExchangeDeclare(constants.EventUIExchangeName, "fanout", true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare UI event exchange failed", "error", err)
return nil, err
}
_, err = ch.QueueDeclare(constants.EventUIQueueName, true, false, false, false, nil)
if err != nil {
logger.Error(ctx, "declare UI event queue failed", "error", err)
return nil, err
}
// fanout exchange routes to all bound queues regardless of routing key
err = ch.QueueBind(constants.EventUIQueueName, "", constants.EventUIExchangeName, false, nil)
if err != nil {
logger.Error(ctx, "bind UI event queue to exchange failed", "error", err)
return nil, err
}
return ch, nil
}
// PublishEventToUI sets the event status to Reported and publishes the record
// to the UI-facing fanout exchange. Intended to be called after the event has
// been successfully persisted to the database.
func PublishEventToUI(ctx context.Context, ch *amqp.Channel, record *EventRecord) error {
body, err := json.Marshal(record)
if err != nil {
logger.Error(ctx, "marshal event record for UI publish failed", "event_uuid", record.EventUUID, "error", err)
return err
}
headers := amqp.Table{}
otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers))
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err = ch.PublishWithContext(pubCtx,
constants.EventUIExchangeName,
"", // fanout exchange ignores routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
Headers: headers,
})
if err != nil {
logger.Error(ctx, "publish event to UI exchange failed", "event_uuid", record.EventUUID, "error", err)
return err
}
logger.Info(ctx, "event published to UI exchange", "event_uuid", record.EventUUID, "status", record.Status)
return nil
}