[why] support multi-queue-name for [tbox_group, host, ...] scheduler need know the sub-queue for it. like: "sched/$tbox_group-1" "sched/$tbox_group/1024" "sched/$tbox_group.taishan200-2280..."
[how] - taskqueue support "/keys" API. - scheduler use @task_queue.query_keys("sched/$tbox_group*") to get the keys list.
Signed-off-by: Tong Qunfeng taxcom@tom.com --- src/lib/taskqueue_api.cr | 8 ++++++++ src/taskqueue/queue.cr | 12 +++++++++++- src/taskqueue/redis_client.cr | 22 ++++++++++++++++++++++ src/taskqueue/taskqueue.cr | 15 +++++++++++++++ 4 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/src/lib/taskqueue_api.cr b/src/lib/taskqueue_api.cr index 99c3933..5a83a15 100644 --- a/src/lib/taskqueue_api.cr +++ b/src/lib/taskqueue_api.cr @@ -18,6 +18,14 @@ class TaskQueueAPI arrange_response(response) end
+ def query_keys(service_key_with_wild_char : String) + params = HTTP::Params.encode({"queue" => service_key_with_wild_char}) + client = HTTP::Client.new(@host, port: @port) + response = client.get("/keys?" + params) + client.close + arrange_response(response) + end + def consume_task(service_queue_path : String) params = HTTP::Params.encode({"queue" => service_queue_path}) response_put_api("consume", params) diff --git a/src/taskqueue/queue.cr b/src/taskqueue/queue.cr index 674606e..f98ee06 100644 --- a/src/taskqueue/queue.cr +++ b/src/taskqueue/queue.cr @@ -146,6 +146,16 @@ class TaskQueue end end
+ def queue_respond_keys(env) + queue_name_pattern, ext_set = queue_check_params(env, ["queue"]) + return ext_set if ext_set + + response = get_keys(queue_name_pattern[0]) + env.response.status_code = 201 if response.empty? + + return response + end + # loop try: when there has no task, return until get one or timeout # default timeout is 0.015ms, we delay for 10ms at each time private def operate_with_timeout(timeout) @@ -195,7 +205,7 @@ class TaskQueue # prefix is start with "queues", need delete it s_name = prefix.sub("#{QUEUE_NAME_BASE}/", "") move_first_task_in_redis_with_score("#{s_name}/#{uuid}", - "#{s_name}/ready") + "#{s_name}/ready") end end end diff --git a/src/taskqueue/redis_client.cr b/src/taskqueue/redis_client.cr index 27d46cf..7f8ea59 100644 --- a/src/taskqueue/redis_client.cr +++ b/src/taskqueue/redis_client.cr @@ -261,6 +261,28 @@ class TaskQueue return keys end
+ private def get_keys(queue_name) + query_prefix = QUEUE_NAME_BASE + "/" + query_prefix_len = query_prefix.size + search = query_prefix + queue_name + response = [] of String + + cursor = "0" + loop do + cursor, keys = @redis.scan(cursor, search, 256) + case keys + when Array(Redis::RedisValue) + keys.each do |key| + response << "#{key}"[query_prefix_len..-1] + end + end + + break if cursor == "0" + end + + return response + end + private def move_first_task_in_redis_with_score(from : String, to : String) # result was ["crystal.87230", "1600782938.9017849"] result = @redis.zrange("#{QUEUE_NAME_BASE}/#{from}", 0, 0, with_scores: true) diff --git a/src/taskqueue/taskqueue.cr b/src/taskqueue/taskqueue.cr index 98c5623..13f48db 100644 --- a/src/taskqueue/taskqueue.cr +++ b/src/taskqueue/taskqueue.cr @@ -104,6 +104,21 @@ class TaskQueue nil end
+ # ------------------- + # request: curl http://localhost:3060/keys? + # queue=sched* + # wild match: *, ?, [-] + # + # response: 200 ["scheda", "schedb", ...] + # 201 ## when no find + # 400 "Missing parameter <queue>" + # 413 "Query results too large keys" + get "/keys" do |env| + response = queue_respond_keys(env) + # debug_message(env, response, Time.utc) # maybe too large + response.to_json unless env.response.status_code == 201 + end + @port = (ENV.has_key?("TASKQUEUE_PORT") ? ENV["TASKQUEUE_PORT"].to_i32 : TASKQUEUE_PORT) Kemal.run(@port) end