// Package event define real time data evnet operation functions package event import ( "context" "encoding/json" "time" "eventRT/constants" "eventRT/logger" "eventRT/mq" amqp "github.com/rabbitmq/amqp091-go" "go.opentelemetry.io/otel" ) // initUIEventChannel declares the fanout exchange and queue used by UI consumers, // then returns a channel ready for publishing. func initUIEventChannel(ctx context.Context) (*amqp.Channel, error) { ch, err := mq.GetConn().Channel() if err != nil { logger.Error(ctx, "open rabbitMQ channel for UI event publish failed", "error", err) return nil, err } err = ch.ExchangeDeclare(constants.EventUIExchangeName, "fanout", true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare UI event exchange failed", "error", err) return nil, err } _, err = ch.QueueDeclare(constants.EventUIQueueName, true, false, false, false, nil) if err != nil { logger.Error(ctx, "declare UI event queue failed", "error", err) return nil, err } // fanout exchange routes to all bound queues regardless of routing key err = ch.QueueBind(constants.EventUIQueueName, "", constants.EventUIExchangeName, false, nil) if err != nil { logger.Error(ctx, "bind UI event queue to exchange failed", "error", err) return nil, err } return ch, nil } // PublishEventToUI sets the event status to Reported and publishes the record // to the UI-facing fanout exchange. Intended to be called after the event has // been successfully persisted to the database. func PublishEventToUI(ctx context.Context, ch *amqp.Channel, record *EventRecord) error { body, err := json.Marshal(record) if err != nil { logger.Error(ctx, "marshal event record for UI publish failed", "event_uuid", record.EventUUID, "error", err) return err } headers := amqp.Table{} otel.GetTextMapPropagator().Inject(ctx, amqpHeaderCarrier(headers)) pubCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err = ch.PublishWithContext(pubCtx, constants.EventUIExchangeName, "", // fanout exchange ignores routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, Headers: headers, }) if err != nil { logger.Error(ctx, "publish event to UI exchange failed", "event_uuid", record.EventUUID, "error", err) return err } logger.Info(ctx, "event published to UI exchange", "event_uuid", record.EventUUID, "status", record.Status) return nil }