
Find subqueues related to the host default queues of all users, and consume the job from random one. Signed-off-by: Ren Wen <15991987063@163.com> --- src/scheduler/find_job_boot.cr | 20 +++++++++++++++++++- src/scheduler/redis_client.cr | 4 ++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr index 6f51640..77071db 100644 --- a/src/scheduler/find_job_boot.cr +++ b/src/scheduler/find_job_boot.cr @@ -56,7 +56,25 @@ class Sched queues << item.strip end - return rand_queues(queues) + user_queues = [] of String + queues.each do |queue| + matched_queues = @redis.keys("*/sched/#{queue}*") + matched_queues.each do |mq| + user_queues << "#{mq}" + end + end + + user_queues = Set.new(user_queues) + user_queues = user_queues.map do |queue| + if queue =~ /sched\/(.+)\/ready$/ + $1 + else + "" + end + end + user_queues.reject!(&.empty?) + + return rand_queues(user_queues) end def get_job_from_queues(queues, testbox) diff --git a/src/scheduler/redis_client.cr b/src/scheduler/redis_client.cr index b816d67..1253108 100644 --- a/src/scheduler/redis_client.cr +++ b/src/scheduler/redis_client.cr @@ -16,6 +16,10 @@ class Redis::Client @client = Redis::PooledClient.new(host: host, port: port, pool_size: 25, pool_timeout: 0.01) end + def keys(pattern) + @client.keys(pattern) + end + def hash_set(key : String, field, value) @client.hset(key, field.to_s, value.to_s) end -- 2.23.0