[PATCH v2 compass-ci 2/2] scheduler: subqueue: consume jobs from subqueue

Find subqueues related to the host default queues of all users, and consume according to random sequence. Signed-off-by: Ren Wen <15991987063@163.com> --- src/scheduler/constants.cr | 2 ++ src/scheduler/find_job_boot.cr | 11 ++++++++++- src/scheduler/redis_client.cr | 4 ++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/scheduler/constants.cr b/src/scheduler/constants.cr index b2ea831..1146b21 100644 --- a/src/scheduler/constants.cr +++ b/src/scheduler/constants.cr @@ -1,6 +1,8 @@ # SPDX-License-Identifier: MulanPSL-2.0+ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. +require "../taskqueue/constants" + JOB_REDIS_HOST = "172.17.0.1" JOB_REDIS_PORT = 6379 JOB_REDIS_PORT_DEBUG = 6380 diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr index 3e33604..fb94e91 100644 --- a/src/scheduler/find_job_boot.cr +++ b/src/scheduler/find_job_boot.cr @@ -56,7 +56,16 @@ class Sched queues << item.strip end - return rand_queues(queues) + sub_queues = [] of String + queues.each do |queue| + matched_queues = @redis.keys("#{QUEUE_NAME_BASE}/sched/#{queue}/*/ready") + matched_queues.each do |mq| + match_data = "#{mq}".match(%r(^#{QUEUE_NAME_BASE}/sched/(#{queue}/.+)/ready$)) + sub_queues << match_data[1] if match_data + end + end + + return rand_queues(sub_queues) end def get_job_from_queues(queues, testbox) diff --git a/src/scheduler/redis_client.cr b/src/scheduler/redis_client.cr index b816d67..1253108 100644 --- a/src/scheduler/redis_client.cr +++ b/src/scheduler/redis_client.cr @@ -16,6 +16,10 @@ class Redis::Client @client = Redis::PooledClient.new(host: host, port: port, pool_size: 25, pool_timeout: 0.01) end + def keys(pattern) + @client.keys(pattern) + end + def hash_set(key : String, field, value) @client.hset(key, field.to_s, value.to_s) end -- 2.23.0
participants (1)
-
Ren Wen