SQS消息队列优先级问题
后端组织SQS消息队列流程
队列组织方式:
1
2
3
4
5
6
7
8
9
10// 从代码可以看到,所有服务都使用同一个队列
public class AWSConfig {
public static final String COMFYUI_TASK_HOLDING_QUEUE_URL = "..."; // 所有任务共用一个队列
}
// 在发送消息时
contentGenerationManager.sendLatentsyncAudio2VideoMessage(...);
contentGenerationManager.sendMemoAudio2VideoMessage(...);
// 这些不同的服务消息最终都会调用相同的 doSendMessage 方法
// 都发送到同一个 COMFYUI_TASK_HOLDING_QUEUE_URL任务类型区分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public void sendLatentsyncAudio2VideoMessage(...) {
ContentGenerationRequest request = new ContentGenerationRequest();
// 通过 taskType 来区分不同的服务
request.setTaskType(Content.Type.LATENTSYNC_VIDEO_LIP_SYNC.databaseValue());
request.setPlatform(ContentGenerationRequest.ExecutionPlatform.RUN_POD);
doSendMessage(taskId, request);
}
public void sendMemoAudio2VideoMessage(...) {
ContentGenerationRequest request = new ContentGenerationRequest();
// 不同服务设置不同的 taskType
request.setTaskType(Content.Type.TALKING_AVATAR_AUDIO_2_VIDEO_Memo.databaseValue());
request.setPlatform(ContentGenerationRequest.ExecutionPlatform.RUN_POD);
doSendMessage(taskId, request);
}优先级处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public enum ExecutionPriority {
INTERNAL, // 最高优先级
SLOW_FREE,
SLOW_GENERATION,
FREE,
PAID
}
// 在发送消息时设置优先级
private void doSendMessage(String taskId, ContentGenerationRequest message) {
Task task = taskService.getTaskByTaskId(taskId);
Task.ExecutionPriority priority = task == null ?
Task.ExecutionPriority.INTERNAL :
Optional.ofNullable(task.getPriority()).orElse(Task.ExecutionPriority.FREE);
message.setPriority(priority);
// 消息和优先级一起发送到同一个队列
}
所以当前的架构是:
2. 单队列模式:
- 所有服务(lipsync, talkingavatar等)的请求都放在同一个队列中
- 通过 taskType 区分不同的服务类型
- 不是使用多个队列来处理不同优先级
- 优先级处理:
- 优先级信息作为消息的一部分发送
- 在消息体中包含 priority 字段
- 服务端在处理时会考虑这个优先级
这种设计的优缺点:
优点:
5. 队列管理简单,只需维护一个队列
6. 资源利用更灵活,不会出现某个服务队列空闲而其他队列堆积的情况
7. 系统复杂度较低
缺点:
8. 不同服务之间可能相互影响
9. 优先级处理依赖于服务端的实现
10. 无法通过队列级别进行服务隔离
如果要改进,可以考虑的方案:
11. 服务隔离:
1 | LipSync Queue → [High Priority] → [Medium Priority] → [Low Priority] |
- 优先级隔离:
1
2
3High Priority Queue → [All Services]
Medium Priority Queue → [All Services]
Low Priority Queue → [All Services]
但目前的实现是所有服务共用一个队列,优先级作为消息属性随消息一起发送。
服务端处理SQS消息流程
服务端处理 SQS 消息的主要流程:
服务初始化和启动:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 服务启动时初始化ComfyUI服务器
def setup_comfyui():
global comfyui_server_thread
# 在新线程中启动ComfyUI服务器
comfyui_server_thread = threading.Thread(target=run_comfyui)
comfyui_server_thread.start()
# 等待服务器就绪
url = f'http://{server_address}/stable-paw/start'
while True:
try:
response = requests.get(url=url)
if response.status_code == 200:
break
except:
pass
time.sleep(1)消息接收和解析:
1
2
3
4
5def handler(job):
# 从job输入中获取请求内容
job_input = job['input']
json_request = job_input["request"]
request = json.loads(json_request)任务执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21# 发送生成请求到ComfyUI服务器
url = f'http://{server_address}/stable-paw/content-generation'
response = requests.request(
method="POST",
url=url,
headers=headers,
data=json.dumps(request)
)
# 错误处理
if status_code != 200:
# 发送失败回调
for task_detail in request["tasks"]:
task_id = task_detail["task_id"]
failure_callback_message = {
"taskId": task_id,
"status": "failed"
}
send_sqs_message(status_callback_queue_url, failure_callback_message)
# 删除原始消息
delete_sqs_messgae(request["queue_url"], request["receipt_handle"])状态监控:
1
2
3
4
5
6
7
8# 持续监控任务执行状态
while True:
response = requests.get(url=f'http://{server_address}/stable-paw/status')
if response.status_code == 200:
body = response.json()
if body["status"] == 0: # 任务完成
break
time.sleep(5)SQS 工具函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21# 发送SQS消息
def send_sqs_message(queue_url: str, message: dict) -> bool:
try:
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message)
)
return True
except Exception as e:
traceback.print_exc()
return False
# 删除SQS消息
def delete_sqs_messgae(queue_url: str, receipt_handle: str) -> None:
try:
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
logging.warn(f"Delete sqs message failed. Exception is {e}.")
关键流程说明:
- . 消息流转:
- 后端发送任务到 SQS 队列
- RunPod 服务器从队列获取消息
- 服务端解析消息并执行任务
- 任务完成后发送状态回调
- 删除原始消息
- 并发处理:
- ComfyUI 服务器在独立线程运行
- 主线程处理消息和监控状态
- 使用线程锁保证并发安全
- 错误处理:
- 请求失败时发送失败回调
- 异常情况下重启 ComfyUI 服务器
- 记录详细日志便于调试
- 状态监控:
- 周期性检查任务状态
- 任务完成后发送回调通知
- 支持长时间运行的任务
- 消息清理:
- 任务完成后删除原始消息
- 失败任务也确保消息被删除
- 避免消息重复处理
核心代码执行逻辑:
1 | 接收SQS消息 |
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 Priska's blog!