82 lines
2.4 KiB
Go
82 lines
2.4 KiB
Go
// Package event define real time data evnet operation functions
|
|
package event
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"eventRT/constants"
|
|
"eventRT/logger"
|
|
"eventRT/mq"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"go.opentelemetry.io/otel"
|
|
)
|
|
|
|
// initUIEventChannel declares the fanout exchange and queue used by UI consumers,
|
|
// then returns a channel ready for publishing.
|
|
func initUIEventChannel(ctx context.Context) (*amqp.Channel, error) {
|
|
ch, err := mq.GetConn().Channel()
|
|
if err != nil {
|
|
logger.Error(ctx, "open rabbitMQ channel for UI event publish failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
err = ch.ExchangeDeclare(constants.EventUIExchangeName, "fanout", true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare UI event exchange failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
_, err = ch.QueueDeclare(constants.EventUIQueueName, true, false, false, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "declare UI event queue failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
// fanout exchange routes to all bound queues regardless of routing key
|
|
err = ch.QueueBind(constants.EventUIQueueName, "", constants.EventUIExchangeName, false, nil)
|
|
if err != nil {
|
|
logger.Error(ctx, "bind UI event queue to exchange failed", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
// PublishEventToUI sets the event status to Reported and publishes the record
|
|
// to the UI-facing fanout exchange. Intended to be called after the event has
|
|
// been successfully persisted to the database.
|
|
func PublishEventToUI(ctx context.Context, ch *amqp.Channel, record *EventRecord) error {
|
|
body, err := json.Marshal(record)
|
|
if err != nil {
|
|
logger.Error(ctx, "marshal event record for UI publish failed", "event_uuid", record.EventUUID, "error", err)
|
|
return err
|
|
}
|
|
|
|
headers := amqp.Table{}
|
|
otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers))
|
|
|
|
pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
err = ch.PublishWithContext(pubCtx,
|
|
constants.EventUIExchangeName,
|
|
"", // fanout exchange ignores routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
ContentType: "application/json",
|
|
Body: body,
|
|
Headers: headers,
|
|
})
|
|
if err != nil {
|
|
logger.Error(ctx, "publish event to UI exchange failed", "event_uuid", record.EventUUID, "error", err)
|
|
return err
|
|
}
|
|
|
|
logger.Info(ctx, "event published to UI exchange", "event_uuid", record.EventUUID, "status", record.Status)
|
|
return nil
|
|
}
|