[PATCH compass-ci] sched: consume job from queues

[why] Before, we consume job from the queue which has relation with tbox_group. Now, we consume job from the queue which is setted by mac2queues. Signed-off-by: Cao Xueliang <caoxl78320@163.com> --- src/lib/sched.cr | 69 +++++++++++++++++++++++++++++++++----- src/scheduler/scheduler.cr | 2 +- 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/lib/sched.cr b/src/lib/sched.cr index c32091f..d8d0338 100644 --- a/src/lib/sched.cr +++ b/src/lib/sched.cr @@ -240,10 +240,10 @@ class Sched response = add_job(job, job_id) message = (response["error"]? ? response["error"]["root_cause"] : "") job_messages << { - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}" + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", } return job_messages if response["error"]? end @@ -292,10 +292,10 @@ class Sched message = (response["error"]? ? response["error"]["root_cause"] : "") return [{ - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}" + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", }] end @@ -378,6 +378,59 @@ class Sched end end + def rand_queues(queues) + return queues if queues.empty? + + queues_size = queues.size + base = Random.rand(queues_size) + temp_queues = [] of String + + (0..queues_size - 1).each do |index| + temp_queues << queues[(index + base) % queues_size] + end + + return temp_queues + end + + def get_queues(mac) + queues = [] of String + + queues_str = @redis.hash_get("sched/mac2queues", normalize_mac(mac)) + return queues unless queues_str + + queues_str.split(',', remove_empty: true) do |item| + queues << item.strip + end + + return rand_queues(queues) + end + + def get_job_from_queues(queues, testbox) + job = nil + + queues.each do |queue| + job = prepare_job("sched/#{queue}", testbox) + return job if job + end + + return job + end + + def get_job_boot(env : HTTP::Server::Context) + value = env.params.url["value"] + boot_type = env.params.url["boot_type"] + testbox = @redis.hash_get("sched/mac2host", normalize_mac(value)) + + queues = get_queues(value) + job = get_job_from_queues(queues, testbox) + + if job + Jobfile::Operate.create_job_cpio(job.dump_to_json_any, Kemal.config.public_folder) + end + + return boot_content(job, boot_type) + end + def find_job_boot(env : HTTP::Server::Context) api_param = env.params.url["value"] diff --git a/src/scheduler/scheduler.cr b/src/scheduler/scheduler.cr index 371e4ed..1bfea02 100644 --- a/src/scheduler/scheduler.cr +++ b/src/scheduler/scheduler.cr @@ -51,7 +51,7 @@ module Scheduler # /boot.yyy/mac/${mac} get "/boot.:boot_type/:parameter/:value" do |env| env.response.headers["Connection"] = "close" - response = sched.find_job_boot(env) + response = sched.get_job_boot(env) job_id = response[/tmpfs\/(.*)\/job\.cgz/, 1]? puts %({"job_id": "#{job_id}", "job_state": "boot"}) if job_id -- 2.23.0

On Thu, Oct 29, 2020 at 10:36:41AM +0800, Cao Xueliang wrote:
[why] Before, we consume job from the queue which has relation with tbox_group. Now, we consume job from the queue which is setted by mac2queues.
Signed-off-by: Cao Xueliang <caoxl78320@163.com> --- src/lib/sched.cr | 69 +++++++++++++++++++++++++++++++++----- src/scheduler/scheduler.cr | 2 +- 2 files changed, 62 insertions(+), 9 deletions(-)
diff --git a/src/lib/sched.cr b/src/lib/sched.cr index c32091f..d8d0338 100644 --- a/src/lib/sched.cr +++ b/src/lib/sched.cr @@ -240,10 +240,10 @@ class Sched response = add_job(job, job_id) message = (response["error"]? ? response["error"]["root_cause"] : "") job_messages << { - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}" + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", } return job_messages if response["error"]? end @@ -292,10 +292,10 @@ class Sched message = (response["error"]? ? response["error"]["root_cause"] : "")
return [{ - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}" + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", }] end
@@ -378,6 +378,59 @@ class Sched end end
+ def rand_queues(queues) + return queues if queues.empty? + + queues_size = queues.size + base = Random.rand(queues_size) + temp_queues = [] of String + + (0..queues_size - 1).each do |index| + temp_queues << queues[(index + base) % queues_size] + end + + return temp_queues + end + + def get_queues(mac) + queues = [] of String + + queues_str = @redis.hash_get("sched/mac2queues", normalize_mac(mac)) + return queues unless queues_str + + queues_str.split(',', remove_empty: true) do |item| + queues << item.strip + end + + return rand_queues(queues) + end + + def get_job_from_queues(queues, testbox) + job = nil + + queues.each do |queue| + job = prepare_job("sched/#{queue}", testbox) + return job if job + end + + return job + end + + def get_job_boot(env : HTTP::Server::Context) + value = env.params.url["value"]
Better to give var value needs a more meaningful name. Thanks, Xijian
+ boot_type = env.params.url["boot_type"] + testbox = @redis.hash_get("sched/mac2host", normalize_mac(value)) + + queues = get_queues(value) + job = get_job_from_queues(queues, testbox) + + if job + Jobfile::Operate.create_job_cpio(job.dump_to_json_any, Kemal.config.public_folder) + end + + return boot_content(job, boot_type) + end + def find_job_boot(env : HTTP::Server::Context) api_param = env.params.url["value"]
diff --git a/src/scheduler/scheduler.cr b/src/scheduler/scheduler.cr index 371e4ed..1bfea02 100644 --- a/src/scheduler/scheduler.cr +++ b/src/scheduler/scheduler.cr @@ -51,7 +51,7 @@ module Scheduler # /boot.yyy/mac/${mac} get "/boot.:boot_type/:parameter/:value" do |env| env.response.headers["Connection"] = "close" - response = sched.find_job_boot(env) + response = sched.get_job_boot(env)
job_id = response[/tmpfs\/(.*)\/job\.cgz/, 1]? puts %({"job_id": "#{job_id}", "job_state": "boot"}) if job_id -- 2.23.0

On Thu, Oct 29, 2020 at 10:36:41AM +0800, Cao Xueliang wrote:
[why] Before, we consume job from the queue which has relation with tbox_group.
describe the queue source, select mac2queues to get the machine should find job from these queus
Now, we consume job from the queue which is setted by mac2queues.
Signed-off-by: Cao Xueliang <caoxl78320@163.com> --- src/lib/sched.cr | 69 +++++++++++++++++++++++++++++++++----- src/scheduler/scheduler.cr | 2 +- 2 files changed, 62 insertions(+), 9 deletions(-)
diff --git a/src/lib/sched.cr b/src/lib/sched.cr index c32091f..d8d0338 100644 --- a/src/lib/sched.cr +++ b/src/lib/sched.cr @@ -240,10 +240,10 @@ class Sched response = add_job(job, job_id) message = (response["error"]? ? response["error"]["root_cause"] : "") job_messages << { - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}" + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", } return job_messages if response["error"]? end @@ -292,10 +292,10 @@ class Sched message = (response["error"]? ? response["error"]["root_cause"] : "")
return [{ - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}" + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", }] end
@@ -378,6 +378,59 @@ class Sched end end
+ def rand_queues(queues) + return queues if queues.empty?
this judgment shoud add get_job_boot function, not here
+ + queues_size = queues.size + base = Random.rand(queues_size) + temp_queues = [] of String
how about add comment for this function # ensure that tasks in the queue have the same priority
+ + (0..queues_size - 1).each do |index| + temp_queues << queues[(index + base) % queues_size] + end + + return temp_queues + end + + def get_queues(mac) + queues = [] of String + + queues_str = @redis.hash_get("sched/mac2queues", normalize_mac(mac)) ^ how about name it result
+ return queues unless queues_str +
add a comment # our queues use "," to separate more than 2 values # "vm-2p8g.aarch64,vm-2p8g~USER" Thanks, Shenwei
+ queues_str.split(',', remove_empty: true) do |item| + queues << item.strip + end + + return rand_queues(queues) + end + + def get_job_from_queues(queues, testbox) + job = nil + + queues.each do |queue| + job = prepare_job("sched/#{queue}", testbox) + return job if job + end + + return job + end + + def get_job_boot(env : HTTP::Server::Context) + value = env.params.url["value"] + boot_type = env.params.url["boot_type"] + testbox = @redis.hash_get("sched/mac2host", normalize_mac(value)) + + queues = get_queues(value) + job = get_job_from_queues(queues, testbox) + + if job + Jobfile::Operate.create_job_cpio(job.dump_to_json_any, Kemal.config.public_folder) + end + + return boot_content(job, boot_type) + end + def find_job_boot(env : HTTP::Server::Context) api_param = env.params.url["value"]
diff --git a/src/scheduler/scheduler.cr b/src/scheduler/scheduler.cr index 371e4ed..1bfea02 100644 --- a/src/scheduler/scheduler.cr +++ b/src/scheduler/scheduler.cr @@ -51,7 +51,7 @@ module Scheduler # /boot.yyy/mac/${mac} get "/boot.:boot_type/:parameter/:value" do |env| env.response.headers["Connection"] = "close" - response = sched.find_job_boot(env) + response = sched.get_job_boot(env)
job_id = response[/tmpfs\/(.*)\/job\.cgz/, 1]? puts %({"job_id": "#{job_id}", "job_state": "boot"}) if job_id -- 2.23.0
participants (3)
-
Cao Xueliang
-
Xiao Shenwei
-
Xu Xijian