后端组织SQS消息队列流程

  1. 队列组织方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 从代码可以看到,所有服务都使用同一个队列
    public class AWSConfig {
    public static final String COMFYUI_TASK_HOLDING_QUEUE_URL = "..."; // 所有任务共用一个队列
    }

    // 在发送消息时
    contentGenerationManager.sendLatentsyncAudio2VideoMessage(...);
    contentGenerationManager.sendMemoAudio2VideoMessage(...);
    // 这些不同的服务消息最终都会调用相同的 doSendMessage 方法
    // 都发送到同一个 COMFYUI_TASK_HOLDING_QUEUE_URL
  2. 任务类型区分:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public void sendLatentsyncAudio2VideoMessage(...) {
    ContentGenerationRequest request = new ContentGenerationRequest();
    // 通过 taskType 来区分不同的服务
    request.setTaskType(Content.Type.LATENTSYNC_VIDEO_LIP_SYNC.databaseValue());
    request.setPlatform(ContentGenerationRequest.ExecutionPlatform.RUN_POD);
    doSendMessage(taskId, request);
    }

    public void sendMemoAudio2VideoMessage(...) {
    ContentGenerationRequest request = new ContentGenerationRequest();
    // 不同服务设置不同的 taskType
    request.setTaskType(Content.Type.TALKING_AVATAR_AUDIO_2_VIDEO_Memo.databaseValue());
    request.setPlatform(ContentGenerationRequest.ExecutionPlatform.RUN_POD);
    doSendMessage(taskId, request);
    }
  3. 优先级处理:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public enum ExecutionPriority {
    INTERNAL, // 最高优先级
    SLOW_FREE,
    SLOW_GENERATION,
    FREE,
    PAID
    }

    // 在发送消息时设置优先级
    private void doSendMessage(String taskId, ContentGenerationRequest message) {
    Task task = taskService.getTaskByTaskId(taskId);
    Task.ExecutionPriority priority = task == null ?
    Task.ExecutionPriority.INTERNAL :
    Optional.ofNullable(task.getPriority()).orElse(Task.ExecutionPriority.FREE);

    message.setPriority(priority);
    // 消息和优先级一起发送到同一个队列
    }

所以当前的架构是:
2. 单队列模式:

  • 所有服务(lipsync, talkingavatar等)的请求都放在同一个队列中
  • 通过 taskType 区分不同的服务类型
  • 不是使用多个队列来处理不同优先级
  1. 优先级处理:
    • 优先级信息作为消息的一部分发送
    • 在消息体中包含 priority 字段
    • 服务端在处理时会考虑这个优先级

这种设计的优缺点:
优点:
5. 队列管理简单,只需维护一个队列
6. 资源利用更灵活,不会出现某个服务队列空闲而其他队列堆积的情况
7. 系统复杂度较低

缺点:
8. 不同服务之间可能相互影响
9. 优先级处理依赖于服务端的实现
10. 无法通过队列级别进行服务隔离

如果要改进,可以考虑的方案:
11. 服务隔离:

1
2
LipSync Queue     → [High Priority] → [Medium Priority] → [Low Priority]
TalkingAvatar Queue → [High Priority] → [Medium Priority] → [Low Priority]
  1. 优先级隔离:
    1
    2
    3
    High Priority Queue → [All Services]
    Medium Priority Queue → [All Services]
    Low Priority Queue → [All Services]

但目前的实现是所有服务共用一个队列,优先级作为消息属性随消息一起发送。

服务端处理SQS消息流程

服务端处理 SQS 消息的主要流程:

  1. 服务初始化和启动:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # 服务启动时初始化ComfyUI服务器
    def setup_comfyui():
    global comfyui_server_thread
    # 在新线程中启动ComfyUI服务器
    comfyui_server_thread = threading.Thread(target=run_comfyui)
    comfyui_server_thread.start()

    # 等待服务器就绪
    url = f'http://{server_address}/stable-paw/start'
    while True:
    try:
    response = requests.get(url=url)
    if response.status_code == 200:
    break
    except:
    pass
    time.sleep(1)
  2. 消息接收和解析:

    1
    2
    3
    4
    5
    def handler(job):
    # 从job输入中获取请求内容
    job_input = job['input']
    json_request = job_input["request"]
    request = json.loads(json_request)
  3. 任务执行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # 发送生成请求到ComfyUI服务器
    url = f'http://{server_address}/stable-paw/content-generation'
    response = requests.request(
    method="POST",
    url=url,
    headers=headers,
    data=json.dumps(request)
    )

    # 错误处理
    if status_code != 200:
    # 发送失败回调
    for task_detail in request["tasks"]:
    task_id = task_detail["task_id"]
    failure_callback_message = {
    "taskId": task_id,
    "status": "failed"
    }
    send_sqs_message(status_callback_queue_url, failure_callback_message)
    # 删除原始消息
    delete_sqs_messgae(request["queue_url"], request["receipt_handle"])
  4. 状态监控:

    1
    2
    3
    4
    5
    6
    7
    8
    # 持续监控任务执行状态
    while True:
    response = requests.get(url=f'http://{server_address}/stable-paw/status')
    if response.status_code == 200:
    body = response.json()
    if body["status"] == 0: # 任务完成
    break
    time.sleep(5)
  5. SQS 工具函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # 发送SQS消息
    def send_sqs_message(queue_url: str, message: dict) -> bool:
    try:
    response = sqs_client.send_message(
    QueueUrl=queue_url,
    MessageBody=json.dumps(message)
    )
    return True
    except Exception as e:
    traceback.print_exc()
    return False

    # 删除SQS消息
    def delete_sqs_messgae(queue_url: str, receipt_handle: str) -> None:
    try:
    sqs_client.delete_message(
    QueueUrl=queue_url,
    ReceiptHandle=receipt_handle
    )
    except Exception as e:
    logging.warn(f"Delete sqs message failed. Exception is {e}.")

关键流程说明:

  1. . 消息流转:
  • 后端发送任务到 SQS 队列
  • RunPod 服务器从队列获取消息
  • 服务端解析消息并执行任务
  • 任务完成后发送状态回调
  • 删除原始消息
  1. 并发处理:
  • ComfyUI 服务器在独立线程运行
  • 主线程处理消息和监控状态
  • 使用线程锁保证并发安全
  1. 错误处理:
  • 请求失败时发送失败回调
  • 异常情况下重启 ComfyUI 服务器
  • 记录详细日志便于调试
  1. 状态监控:
  • 周期性检查任务状态
  • 任务完成后发送回调通知
  • 支持长时间运行的任务
  1. 消息清理:
  • 任务完成后删除原始消息
  • 失败任务也确保消息被删除
  • 避免消息重复处理

核心代码执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
接收SQS消息 

解析消息内容

发送到ComfyUI服务器

监控执行状态 ←→ 状态轮询

发送回调消息

清理原始消息