[why]
Before, we consume job from the queue which has relation with
tbox_group.
Now, we consume job from the queue which is setted by mac2queues.
Signed-off-by: Cao Xueliang <caoxl78320(a)163.com>
---
src/lib/sched.cr | 69 +++++++++++++++++++++++++++++++++-----
src/scheduler/scheduler.cr | 2 +-
2 files changed, 62 insertions(+), 9 deletions(-)
diff --git a/src/lib/sched.cr b/src/lib/sched.cr
index c32091f..d8d0338 100644
--- a/src/lib/sched.cr
+++ b/src/lib/sched.cr
@@ -240,10 +240,10 @@ class Sched
response = add_job(job, job_id)
message = (response["error"]? ? response["error"]["root_cause"] : "")
job_messages << {
- "job_id" => job_id,
- "message" => message.to_s,
- "job_state" => "submit",
- "result_root" => "/srv#{job.result_root}"
+ "job_id" => job_id,
+ "message" => message.to_s,
+ "job_state" => "submit",
+ "result_root" => "/srv#{job.result_root}",
}
return job_messages if response["error"]?
end
@@ -292,10 +292,10 @@ class Sched
message = (response["error"]? ? response["error"]["root_cause"] : "")
return [{
- "job_id" => job_id,
- "message" => message.to_s,
- "job_state" => "submit",
- "result_root" => "/srv#{job.result_root}"
+ "job_id" => job_id,
+ "message" => message.to_s,
+ "job_state" => "submit",
+ "result_root" => "/srv#{job.result_root}",
}]
end
@@ -378,6 +378,59 @@ class Sched
end
end
+ def rand_queues(queues)
+ return queues if queues.empty?
+
+ queues_size = queues.size
+ base = Random.rand(queues_size)
+ temp_queues = [] of String
+
+ (0..queues_size - 1).each do |index|
+ temp_queues << queues[(index + base) % queues_size]
+ end
+
+ return temp_queues
+ end
+
+ def get_queues(mac)
+ queues = [] of String
+
+ queues_str = @redis.hash_get("sched/mac2queues", normalize_mac(mac))
+ return queues unless queues_str
+
+ queues_str.split(',', remove_empty: true) do |item|
+ queues << item.strip
+ end
+
+ return rand_queues(queues)
+ end
+
+ def get_job_from_queues(queues, testbox)
+ job = nil
+
+ queues.each do |queue|
+ job = prepare_job("sched/#{queue}", testbox)
+ return job if job
+ end
+
+ return job
+ end
+
+ def get_job_boot(env : HTTP::Server::Context)
+ value = env.params.url["value"]
+ boot_type = env.params.url["boot_type"]
+ testbox = @redis.hash_get("sched/mac2host", normalize_mac(value))
+
+ queues = get_queues(value)
+ job = get_job_from_queues(queues, testbox)
+
+ if job
+ Jobfile::Operate.create_job_cpio(job.dump_to_json_any, Kemal.config.public_folder)
+ end
+
+ return boot_content(job, boot_type)
+ end
+
def find_job_boot(env : HTTP::Server::Context)
api_param = env.params.url["value"]
diff --git a/src/scheduler/scheduler.cr b/src/scheduler/scheduler.cr
index 371e4ed..1bfea02 100644
--- a/src/scheduler/scheduler.cr
+++ b/src/scheduler/scheduler.cr
@@ -51,7 +51,7 @@ module Scheduler
# /boot.yyy/mac/${mac}
get "/boot.:boot_type/:parameter/:value" do |env|
env.response.headers["Connection"] = "close"
- response = sched.find_job_boot(env)
+ response = sched.get_job_boot(env)
job_id = response[/tmpfs\/(.*)\/job\.cgz/, 1]?
puts %({"job_id": "#{job_id}", "job_state": "boot"}) if job_id
--
2.23.0