modelRT/handler/async_task_create_handler.go

243 lines
6.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package handler provides HTTP handlers for various endpoints.
package handler
import (
"net/http"
"strings"
"modelRT/config"
"modelRT/database"
"modelRT/logger"
"modelRT/network"
"modelRT/orm"
"modelRT/task"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
// AsyncTaskCreateHandler handles creation of asynchronous tasks
// @Summary 创建异步任务
// @Description 创建新的异步任务并返回任务ID任务将被提交到队列等待处理
// @Tags AsyncTask
// @Accept json
// @Produce json
// @Param request body network.AsyncTaskCreateRequest true "任务创建请求"
// @Success 200 {object} network.SuccessResponse{payload=network.AsyncTaskCreateResponse} "任务创建成功"
// @Failure 400 {object} network.FailureResponse "请求参数错误"
// @Failure 500 {object} network.FailureResponse "服务器内部错误"
// @Router /task/async [post]
func AsyncTaskCreateHandler(c *gin.Context) {
ctx := c.Request.Context()
var request network.AsyncTaskCreateRequest
if err := c.ShouldBindJSON(&request); err != nil {
logger.Error(ctx, "failed to unmarshal async task create request", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid request parameters",
})
return
}
// Validate task type
if !orm.IsValidAsyncTaskType(request.TaskType) {
logger.Error(ctx, "invalid task type", "task_type", request.TaskType)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task type",
})
return
}
// Validate task parameters based on task type
if !validateTaskParams(request.TaskType, request.Params) {
logger.Error(ctx, "invalid task parameters", "task_type", request.TaskType, "params", request.Params)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusBadRequest,
Msg: "invalid task parameters",
})
return
}
pgClient := database.GetPostgresDBClient()
if pgClient == nil {
logger.Error(ctx, "database connection not found in context")
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "database connection error",
})
return
}
// Create task in database
taskType := orm.AsyncTaskType(request.TaskType)
params := orm.JSONMap(request.Params)
asyncTask, err := database.CreateAsyncTask(ctx, pgClient, taskType, params)
if err != nil {
logger.Error(ctx, "failed to create async task in database", "error", err)
c.JSON(http.StatusOK, network.FailureResponse{
Code: http.StatusInternalServerError,
Msg: "failed to create task",
})
return
}
// Send task to message queue
cfg, exists := c.Get("config")
if !exists {
logger.Warn(ctx, "Configuration not found in context, skipping queue publishing")
} else {
modelRTConfig := cfg.(config.ModelRTConfig)
ctx := c.Request.Context()
// Create queue producer
producer, err := task.NewQueueProducer(ctx, modelRTConfig.RabbitMQConfig)
if err != nil {
logger.Error(ctx, "Failed to create queue producer", "error", err)
// Continue without queue publishing
} else {
defer producer.Close()
// Publish task to queue
taskType := task.TaskType(request.TaskType)
priority := 5 // Default priority
if err := producer.PublishTaskWithRetry(ctx, asyncTask.TaskID, taskType, priority, 3); err != nil {
logger.Error(ctx, "Failed to publish task to queue",
"task_id", asyncTask.TaskID,
"error", err)
// Log error but don't affect task creation response
} else {
logger.Info(ctx, "Task published to queue successfully",
"task_id", asyncTask.TaskID,
"queue", task.TaskQueueName)
}
}
}
logger.Info(ctx, "async task created successfully", "task_id", asyncTask.TaskID, "task_type", request.TaskType)
// Return success response
c.JSON(http.StatusOK, network.SuccessResponse{
Code: 2000,
Msg: "task created successfully",
Payload: network.AsyncTaskCreateResponse{
TaskID: asyncTask.TaskID,
},
})
}
func validateTaskParams(taskType string, params map[string]any) bool {
switch taskType {
case string(orm.AsyncTaskTypeTopologyAnalysis):
return validateTopologyAnalysisParams(params)
case string(orm.AsyncTaskTypePerformanceAnalysis):
return validatePerformanceAnalysisParams(params)
case string(orm.AsyncTaskTypeEventAnalysis):
return validateEventAnalysisParams(params)
case string(orm.AsyncTaskTypeBatchImport):
return validateBatchImportParams(params)
case string(orm.AsyncTaskTypeTest):
return validateTestTaskParams(params)
default:
return false
}
}
func validateTopologyAnalysisParams(params map[string]any) bool {
// Check required parameters for topology analysis
if startUUID, ok := params["start_uuid"]; !ok || startUUID == "" {
return false
}
if endUUID, ok := params["end_uuid"]; !ok || endUUID == "" {
return false
}
return true
}
func validatePerformanceAnalysisParams(params map[string]any) bool {
// Check required parameters for performance analysis
if componentIDs, ok := params["component_ids"]; !ok {
return false
} else if ids, isSlice := componentIDs.([]interface{}); !isSlice || len(ids) == 0 {
return false
}
return true
}
func validateEventAnalysisParams(params map[string]any) bool {
// Check required parameters for event analysis
if eventType, ok := params["event_type"]; !ok || eventType == "" {
return false
}
return true
}
func validateBatchImportParams(params map[string]any) bool {
// Check required parameters for batch import
if filePath, ok := params["file_path"]; !ok || filePath == "" {
return false
}
return true
}
func validateTestTaskParams(params map[string]any) bool {
// Test task has optional parameters, all are valid
// sleep_duration defaults to 60 seconds if not provided
return true
}
func splitCommaSeparated(s string) []string {
var result []string
var current strings.Builder
inQuotes := false
escape := false
for _, ch := range s {
if escape {
current.WriteRune(ch)
escape = false
continue
}
switch ch {
case '\\':
escape = true
case '"':
inQuotes = !inQuotes
case ',':
if !inQuotes {
result = append(result, strings.TrimSpace(current.String()))
current.Reset()
} else {
current.WriteRune(ch)
}
default:
current.WriteRune(ch)
}
}
if current.Len() > 0 {
result = append(result, strings.TrimSpace(current.String()))
}
return result
}
func getDBFromContext(c *gin.Context) *gorm.DB {
// Try to get database connection from context
// This should be set by middleware
if db, exists := c.Get("db"); exists {
if gormDB, ok := db.(*gorm.DB); ok {
return gormDB
}
}
// Fallback to global database connection
// This should be implemented based on your application's database setup
// For now, return nil - actual implementation should retrieve from application context
return nil
}