package api import ( "context" "datart/data" "datart/data/mongo" "datart/log" "errors" "fmt" "strconv" "strings" "sync" "github.com/gin-gonic/gin" "github.com/google/uuid" "go.mongodb.org/mongo-driver/v2/bson" ) const ( pageSizeLimit = 500 ) func (a *Api) GetEvents(ctx *gin.Context) { filter, sort, pageNo, pageSize, err := a.checkAndGenGetEventsRequest(ctx) if err != nil { log.Error(err) ctx.JSON(200, gin.H{ "code": 1, "msg": err.Error(), }) return } events, err := mongo.FindEventsWithPageLimit(ctx.Request.Context(), filter, sort, pageNo, pageSize) if err != nil { log.Error(err, fmt.Sprintf(" params: %v, %d, %d, %d", filter, sort, pageNo, pageSize)) ctx.JSON(200, gin.H{ "code": 2, "msg": err.Error(), }) return } ctx.JSON(200, gin.H{ "code": 0, "msg": "success", "data": events, }) } func (a *Api) PostUpsertEvents(ctx *gin.Context) { uuids, update, events, err := a.checkAndGenUpsertEventsRequest(ctx) if err != nil { log.Error(err) ctx.JSON(200, gin.H{ "code": 1, "msg": err.Error(), }) return } if len(uuids) > 0 { operation := mongo.GenOperation(mongo.EventStatusAction[update.Status], ctx.RemoteIP()) err := mongo.UpdateEvents(ctx.Request.Context(), bson.M{"event_uuid": bson.M{"$in": uuids}}, bson.M{"$set": bson.M{"status": update.Status}, "$push": bson.M{"operations": operation}}) if err != nil { log.Error(err, fmt.Sprintf(" params:%v %v", update, uuids)) ctx.JSON(200, gin.H{ "code": 2, "msg": err.Error(), }) return } events = make([]map[string]any, len(uuids)) for i := range events { events[i] = map[string]any{"event_uuid": uuids[i], "status": update.Status} } } else { err := mongo.InsertEvents(ctx.Request.Context(), events) if err != nil { log.Error(err, fmt.Sprintf(" params: %v", events)) ctx.JSON(200, gin.H{ "code": 3, "msg": err.Error(), }) return } } go func(evts []map[string]any) { workers := 5 ch := make(chan map[string]any, len(evts)) var wg sync.WaitGroup for range workers { wg.Add(1) go func() { defer wg.Done() for e := range ch { if err := data.PublishEvent(context.Background(), e); err != nil { log.Error(err, fmt.Sprintf("publish event failed: %v", e)) } } }() } for _, e := range evts { ch <- e } close(ch) wg.Wait() }(events) ctx.JSON(200, gin.H{ "code": 0, "msg": "success", }) } func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, int64, error) { uuidStr := ctx.Query("uuid") if len(uuidStr) > 0 { if uuid.Validate(uuidStr) != nil { return nil, 0, -1, -1, errors.New("invalid uuid") } return bson.M{"event_uuid": uuidStr}, 0, -1, -1, nil } filter := bson.M{} var err error begin, end := int64(-1), int64(-1) beginStr := ctx.Query("begin") if len(beginStr) > 0 { if begin, err = strconv.ParseInt(beginStr, 10, 64); err != nil { return nil, 0, -1, -1, err } } endStr := ctx.Query("end") if len(endStr) > 0 { if end, err = strconv.ParseInt(endStr, 10, 64); err != nil { return nil, 0, -1, -1, err } } if begin > 0 && end > 0 && begin > end { return nil, 0, -1, -1, errors.New("invalid time") } switch { case begin > 0 && end < 0: filter["timestamp"] = bson.M{"$gte": begin} case begin < 0 && end > 0: filter["timestamp"] = bson.M{"$lte": end} case begin > 0 && end > 0: filter["timestamp"] = bson.M{"$gte": begin, "$lte": end} } statusStr := ctx.Query("status") if len(statusStr) > 0 { statusStrs := strings.Split(statusStr, ",") statuss := make([]int, len(statusStrs)) for i := range statusStrs { s, err := strconv.Atoi(statusStrs[i]) if err != nil { return nil, 0, -1, -1, errors.New("invalid status") } statuss[i] = s } filter["status"] = bson.M{"$in": statuss} } var sort int sortStr := ctx.Query("sort") if len(sortStr) > 0 { s, err := strconv.Atoi(sortStr) if err != nil { return nil, 0, -1, -1, err } if s != 1 && s != -1 { return nil, 0, -1, -1, errors.New("invalid sort") } sort = s } pageNo, pageSize := -1, -1 pageNoStr := ctx.Query("page_no") pageSizeStr := ctx.Query("page_size") if len(pageNoStr) > 0 && len(pageSizeStr) > 0 { pageNo, err = strconv.Atoi(pageNoStr) if err != nil { return nil, 0, -1, -1, err } pageSize, err = strconv.Atoi(pageSizeStr) if err != nil { return nil, 0, -1, -1, err } if pageNo <= 0 || pageSize <= 0 { return nil, 0, -1, -1, errors.New("invalid page param") } if pageSize > pageSizeLimit { return nil, 0, -1, -1, fmt.Errorf("too many events, max %d", pageSizeLimit) } } return filter, sort, int64(pageNo), int64(pageSize), nil } func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) ([]string, *mongo.Event, []map[string]any, error) { insert := true update := &mongo.Event{} statusStr := ctx.Query("status") if len(statusStr) > 0 { insert = false status, err := strconv.Atoi(statusStr) if err != nil { return nil, nil, nil, err } update.Status = status } if !insert { uuids := []string{} err := ctx.ShouldBindJSON(&uuids) if err != nil { return nil, nil, nil, err } if len(uuids) == 0 { return nil, nil, nil, errors.New("no uuid") } if len(uuids) > pageSizeLimit { return nil, nil, nil, fmt.Errorf("too many uuids, max %d", pageSizeLimit) } return uuids, update, nil, nil } events := []map[string]any{} err := ctx.ShouldBindJSON(&events) if err != nil { return nil, nil, nil, err } if len(events) == 0 { return nil, nil, nil, errors.New("no event") } if len(events) > pageSizeLimit { return nil, nil, nil, fmt.Errorf("too many events, max %d", pageSizeLimit) } return nil, nil, events, nil } func validateEventUpsert(event map[string]any) error { noUUID := true if eu, ok := event["event_uuid"]; ok { if eID, ok := eu.(string); ok { if uuid.Validate(eID) == nil { noUUID = false } } } if noUUID { if uid, err := uuid.NewV7(); err != nil { return err } else { event["event_uuid"] = uid.String() } } if len(event) < 2 { return errors.New("invalid event") } return nil }