dataRT/data/rabbit/consume.go

51 lines
934 B
Go

package rabbit
import (
"context"
"errors"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
func NewConsumer(ctx context.Context, tag string, xqr *XQR) (*rmq.Consumer, error) {
cli := client
if tag != "default" {
endpoints, err := genEndpoints(tag)
if err != nil {
return nil, err
}
cli, err = NewClient(ctx, endpoints)
if err != nil {
return nil, err
}
}
return cli.conn.NewConsumer(ctx, xqr.QueueName, nil)
}
func Consume(ctx context.Context, consumer *rmq.Consumer, msgChan chan<- []byte) {
for {
deliCtx, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
// TODO
return
}
if err != nil {
// An error occurred receiving the message
// TODO
return
}
for _, data := range deliCtx.Message().Data {
msgChan <- data
}
err = deliCtx.Accept(ctx)
if err != nil {
// TODO
return
}
}
}