package api import ( "context" "datart/data" "datart/data/mongo" "datart/log" "errors" "fmt" "strconv" "strings" "sync" "github.com/gin-gonic/gin" "github.com/google/uuid" "go.mongodb.org/mongo-driver/v2/bson" ) const ( pageSizeLimit = 500 ) func (a *Api) GetEvents(ctx *gin.Context) { filter, sort, pageNo, pageSize, err := a.checkAndGenGetEventsRequest(ctx) if err != nil { log.Error(err) ctx.JSON(200, gin.H{ "code": 1, "msg": err.Error(), }) return } events, err := mongo.FindEventsWithPageLimit(ctx.Request.Context(), filter, sort, pageNo, pageSize) if err != nil { log.Error(err, fmt.Sprintf(" params: %v, %d, %d, %d", filter, sort, pageNo, pageSize)) ctx.JSON(200, gin.H{ "code": 2, "msg": err.Error(), }) return } ctx.JSON(200, gin.H{ "code": 0, "msg": "success", "data": events, }) } func (a *Api) PostUpsertEvents(ctx *gin.Context) { curd, uuids, events, err := a.checkAndGenUpsertEventsRequest(ctx) if err != nil { log.Error(err) ctx.JSON(200, gin.H{ "code": 1, "msg": err.Error(), "data": map[string]any{ "success_uuids": nil, }, }) return } suuids, err := mongo.BulkWriteEventsWithUUID(ctx.Request.Context(), curd, events) if err != nil { log.Error(err, fmt.Sprintf(" params:%v", events)) ctx.JSON(200, gin.H{ "code": 2, "msg": err.Error(), "data": map[string]any{ "success_uuids": suuids, }, }) } else { suuids = uuids ctx.JSON(200, gin.H{ "code": 0, "msg": "success", "data": map[string]any{ "success_uuids": uuids, }, }) } uuid2Event := map[string]any{} for i, event := range events { uuid2Event[uuids[i]] = event } go func(suuids []string, uuid2Event map[string]any) { workers := 5 ch := make(chan any, len(suuids)) var wg sync.WaitGroup for range workers { wg.Add(1) go func() { defer wg.Done() for e := range ch { if perr := data.PublishEvent(context.Background(), e); perr != nil { log.Error(perr, fmt.Sprintf("publish event failed: %v", e)) } } }() } for _, suuid := range suuids { if e, ok := uuid2Event[suuid]; ok { ch <- e } } close(ch) wg.Wait() }(suuids, uuid2Event) } func (a *Api) checkAndGenGetEventsRequest(ctx *gin.Context) (bson.M, int, int64, int64, error) { uuidStr := ctx.Query("uuid") if len(uuidStr) > 0 { if uuid.Validate(uuidStr) != nil { return nil, 0, -1, -1, errors.New("invalid uuid") } return bson.M{"event_uuid": uuidStr}, 0, -1, -1, nil } filter := bson.M{} var err error begin, end := int64(-1), int64(-1) beginStr := ctx.Query("begin") if len(beginStr) > 0 { if begin, err = strconv.ParseInt(beginStr, 10, 64); err != nil { return nil, 0, -1, -1, err } } endStr := ctx.Query("end") if len(endStr) > 0 { if end, err = strconv.ParseInt(endStr, 10, 64); err != nil { return nil, 0, -1, -1, err } } if begin > 0 && end > 0 && begin > end { return nil, 0, -1, -1, errors.New("invalid time") } switch { case begin > 0 && end < 0: filter["timestamp"] = bson.M{"$gte": begin} case begin < 0 && end > 0: filter["timestamp"] = bson.M{"$lte": end} case begin > 0 && end > 0: filter["timestamp"] = bson.M{"$gte": begin, "$lte": end} } statusStr := ctx.Query("status") if len(statusStr) > 0 { statusStrs := strings.Split(statusStr, ",") statuss := make([]int, len(statusStrs)) for i := range statusStrs { s, err := strconv.Atoi(statusStrs[i]) if err != nil { return nil, 0, -1, -1, errors.New("invalid status") } statuss[i] = s } filter["status"] = bson.M{"$in": statuss} } var sort int sortStr := ctx.Query("sort") if len(sortStr) > 0 { s, err := strconv.Atoi(sortStr) if err != nil { return nil, 0, -1, -1, err } if s != 1 && s != -1 { return nil, 0, -1, -1, errors.New("invalid sort") } sort = s } pageNo, pageSize := -1, -1 pageNoStr := ctx.Query("page_no") pageSizeStr := ctx.Query("page_size") if len(pageNoStr) > 0 && len(pageSizeStr) > 0 { pageNo, err = strconv.Atoi(pageNoStr) if err != nil { return nil, 0, -1, -1, err } pageSize, err = strconv.Atoi(pageSizeStr) if err != nil { return nil, 0, -1, -1, err } if pageNo <= 0 || pageSize <= 0 { return nil, 0, -1, -1, errors.New("invalid page param") } if pageSize > pageSizeLimit { return nil, 0, -1, -1, fmt.Errorf("too many events, max %d", pageSizeLimit) } } return filter, sort, int64(pageNo), int64(pageSize), nil } func (a *Api) checkAndGenUpsertEventsRequest(ctx *gin.Context) (byte, []string, []map[string]any, error) { insert := true var status int var err error statusStr := ctx.Query("status") if len(statusStr) > 0 { insert = false status, err = strconv.Atoi(statusStr) if err != nil { return 0, nil, nil, err } } events := []map[string]any{} uuids := []string{} if !insert { err := ctx.ShouldBindJSON(&uuids) if err != nil { return 0, nil, nil, err } if len(uuids) == 0 { return 0, nil, nil, errors.New("no uuid") } if len(uuids) > pageSizeLimit { return 0, nil, nil, fmt.Errorf("too many uuids, max %d", pageSizeLimit) } operation := mongo.GenOperation(mongo.EventStatusAction[status], ctx.RemoteIP()) for _, uid := range uuids { if uuid.Validate(uid) != nil { return 0, nil, nil, errors.New("invalid uuid") } events = append(events, map[string]any{"event_uuid": uid, "status": status, "operation": operation}) } return 'u', uuids, events, nil } err = ctx.ShouldBindJSON(&events) if err != nil { return 0, nil, nil, err } if len(events) == 0 { return 0, nil, nil, errors.New("no event") } if len(events) > pageSizeLimit { return 0, nil, nil, fmt.Errorf("too many events, max %d", pageSizeLimit) } for _, event := range events { noUUID := true if eu, ok := event["event_uuid"]; ok { if eID, ok := eu.(string); ok { if uuid.Validate(eID) == nil { uuids = append(uuids, eID) noUUID = false } } } if noUUID { return 0, nil, nil, errors.New("invalid uuid") } if len(event) < 2 { return 0, nil, nil, errors.New("invalid event") } } return 'c', uuids, events, nil }