整个回调流程是这样的:

服务端完成任务后发送状态到 SQS:

1
2
3
4
5
6
7
8
9
def send_callback(task_id, status):
callback_message = {
"taskId": task_id,
"status": status, # success/failed/running
"outputDir": output_location,
# 其他状态信息
}
# 发送到状态回调队列
send_sqs_message(status_callback_queue_url, callback_message)

AWS Lambda 函数监听回调队列:

1
2
3
4
5
6
7
8
9
10
11
12
def lambda_handler(event, context):
# 获取 SQS 消息
for record in event['Records']:
message_body = json.loads(record['body'])

# 转发回调到后端 API
url = "https://backend.apob.ai/callback/content/event"
response = requests.post(
url,
headers={'Content-Type': 'application/json'},
data=json.dumps(message_body)
)

Java 后端接收回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RestController
@RequestMapping("/callback")
public class ContentGenerationCallbackController {

@PostMapping("/content/event")
public void handleCallback(@RequestBody ContentGenerationCallback callback) {
// 1. 获取任务信息
Task task = taskService.getTaskByTaskId(callback.getTaskId());
Content content = contentService.getContentById(task.getResourceId());

// 2. 根据内容类型获取对应的处理器
AbstractContentCallbackHandler<ContentGenerationCallbackDetail> handler =
contentCallbackHandlerRegistry.getHandler(content.getType());

// 3. 根据状态处理回调
switch (callback.getStatus().toLowerCase()) {
case "running":
handler.handleRunningStatus(callbackDetail);
break;
case "failed":
handler.handleFailureStatus(callbackDetail);
break;
case "success":
handler.handleSuccessStatus(callbackDetail);
break;
}
}
}

完整流程示意:

1
2
3
4
5
6
7
8
9
10
11
Service完成任务 

发送状态到SQS回调队列

Lambda函数触发

转发回调到Java后端API

后端根据内容类型分发处理

更新任务状态/处理结果
这样设计的好处:
1. 解耦服务端和后端
2. 保证消息的可靠投递
3. 支持失败重试
4. 可以处理复杂的任务依赖关系