According to scheduler.cr API to refactor sched class.
Extract submit_job function from sched.cr to submit_job.cr
Signed-off-by: Ren Wen 15991987063@163.com --- src/lib/sched.cr | 172 +---------------------------------- src/scheduler/submit_job.cr | 175 ++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 171 deletions(-) create mode 100644 src/scheduler/submit_job.cr
diff --git a/src/lib/sched.cr b/src/lib/sched.cr index 1b6eb8a..ba89ab9 100644 --- a/src/lib/sched.cr +++ b/src/lib/sched.cr @@ -13,6 +13,7 @@ require "../scheduler/jobfile_operate" require "../scheduler/redis_client" require "../scheduler/elasticsearch_client"
+require "../scheduler/submit_job" require "../scheduler/find_job_boot" require "../scheduler/find_next_job_boot" require "../scheduler/close_job" @@ -51,177 +52,6 @@ class Sched @redis.hash_del("sched/host2queues", hostname) end
- # get cluster config using own lkp_src cluster file, - # a hash type will be returned - def get_cluster_config(cluster_file, lkp_initrd_user, os_arch) - lkp_src = Jobfile::Operate.prepare_lkp_tests(lkp_initrd_user, os_arch) - cluster_file_path = Path.new(lkp_src, "cluster", cluster_file) - return YAML.parse(File.read(cluster_file_path)) - end - - def get_commit_date(job) - if (job["upstream_repo"] != "") && (job["upstream_commit"] != "") - data = JSON.parse(%({"git_repo": "#{job["upstream_repo"]}.git", - "git_command": ["git-log", "--pretty=format:%cd", "--date=unix", - "#{job["upstream_commit"]}", "-1"]})) - response = @rgc.git_command(data) - return response.body if response.status_code == 200 - end - - return nil - end - - def submit_job(env : HTTP::Server::Context) - body = env.request.body.not_nil!.gets_to_end - - job_content = JSON.parse(body) - job = Job.new(job_content, job_content["id"]?) - job["commit_date"] = get_commit_date(job) - - # it is not a cluster job if cluster field is empty or - # field's prefix is 'cs-localhost' - cluster_file = job["cluster"] - if cluster_file.empty? || cluster_file.starts_with?("cs-localhost") - return submit_single_job(job) - else - cluster_config = get_cluster_config(cluster_file, - job.lkp_initrd_user, - job.os_arch) - return submit_cluster_job(job, cluster_config) - end - rescue ex - puts ex.inspect_with_backtrace - return [{ - "job_id" => "0", - "message" => ex.to_s, - "job_state" => "submit", - }] - end - - # return: - # success: [{"job_id" => job_id1, "message => "", "job_state" => "submit"}, ...] - # failure: [..., {"job_id" => 0, "message" => err_msg, "job_state" => "submit"}] - def submit_cluster_job(job, cluster_config) - job_messages = Array(Hash(String, String)).new - lab = job.lab - - # collect all job ids - job_ids = [] of String - - net_id = "192.168.222" - ip0 = cluster_config["ip0"]? - if ip0 - ip0 = ip0.as_i - else - ip0 = 1 - end - - # steps for each host - cluster_config["nodes"].as_h.each do |host, config| - tbox_group = host.to_s - job_id = add_task(tbox_group, lab) - - # return when job_id is '0' - # 2 Questions: - # - how to deal with the jobs added to DB prior to this loop - # - may consume job before all jobs done - return job_messages << { - "job_id" => "0", - "message" => "add task queue sched/#{tbox_group} failed", - "job_state" => "submit", - } unless job_id - - job_ids << job_id - - # add to job content when multi-test - job["testbox"] = tbox_group - job.update_tbox_group(tbox_group) - job["node_roles"] = config["roles"].as_a.join(" ") - direct_macs = config["macs"].as_a - direct_ips = [] of String - direct_macs.size.times do - raise "Host id is greater than 254, host_id: #{ip0}" if ip0 > 254 - direct_ips << "#{net_id}.#{ip0}" - ip0 += 1 - end - job["direct_macs"] = direct_macs.join(" ") - job["direct_ips"] = direct_ips.join(" ") - - response = add_job(job, job_id) - message = (response["error"]? ? response["error"]["root_cause"] : "") - job_messages << { - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}", - } - return job_messages if response["error"]? - end - - cluster_id = job_ids[0] - - # collect all host states - cluster_state = Hash(String, Hash(String, String)).new - job_ids.each do |job_id| - cluster_state[job_id] = {"state" => ""} - # will get cluster id according to job id - @redis.hash_set("sched/id2cluster", job_id, cluster_id) - end - - @redis.hash_set("sched/cluster_state", cluster_id, cluster_state.to_json) - - return job_messages - end - - # return: - # success: [{"job_id" => job_id, "message" => "", job_state => "submit"}] - # failure: [{"job_id" => "0", "message" => err_msg, job_state => "submit"}] - def submit_single_job(job) - queue = job.queue - return [{ - "job_id" => "0", - "message" => "get queue failed", - "job_state" => "submit", - }] unless queue - - # only single job will has "idle job" and "execute rate limiter" - if job["idle_job"].empty? - queue += "#{job.get_uuid_tag}" - else - queue = "#{queue}/idle" - end - - job_id = add_task(queue, job.lab) - return [{ - "job_id" => "0", - "message" => "add task queue sched/#{queue} failed", - "job_state" => "submit", - }] unless job_id - - response = add_job(job, job_id) - message = (response["error"]? ? response["error"]["root_cause"] : "") - - return [{ - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", - "result_root" => "/srv#{job.result_root}", - }] - end - - # return job_id - def add_task(queue, lab) - task_desc = JSON.parse(%({"domain": "compass-ci", "lab": "#{lab}"})) - response = @task_queue.add_task("sched/#{queue}", task_desc) - JSON.parse(response[1].to_json)["id"].to_s if response[0] == 200 - end - - # add job content to es and return a response - def add_job(job, job_id) - job.update_id(job_id) - @es.set_job_content(job) - end - def auto_submit_idle_job(tbox_group) full_path_patterns = "#{ENV["CCI_REPOS"]}/lab-#{ENV["lab"]}/allot/idle/#{tbox_group}/*.yaml" extra_job_fields = [ diff --git a/src/scheduler/submit_job.cr b/src/scheduler/submit_job.cr new file mode 100644 index 0000000..8871464 --- /dev/null +++ b/src/scheduler/submit_job.cr @@ -0,0 +1,175 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + +class Sched + def submit_job(env : HTTP::Server::Context) + body = env.request.body.not_nil!.gets_to_end + + job_content = JSON.parse(body) + job = Job.new(job_content, job_content["id"]?) + job["commit_date"] = get_commit_date(job) + + # it is not a cluster job if cluster field is empty or + # field's prefix is 'cs-localhost' + cluster_file = job["cluster"] + if cluster_file.empty? || cluster_file.starts_with?("cs-localhost") + return submit_single_job(job) + else + cluster_config = get_cluster_config(cluster_file, + job.lkp_initrd_user, + job.os_arch) + return submit_cluster_job(job, cluster_config) + end + rescue ex + puts ex.inspect_with_backtrace + return [{ + "job_id" => "0", + "message" => ex.to_s, + "job_state" => "submit", + }] + end + + # return: + # success: [{"job_id" => job_id1, "message => "", "job_state" => "submit"}, ...] + # failure: [..., {"job_id" => 0, "message" => err_msg, "job_state" => "submit"}] + def submit_cluster_job(job, cluster_config) + job_messages = Array(Hash(String, String)).new + lab = job.lab + + # collect all job ids + job_ids = [] of String + + net_id = "192.168.222" + ip0 = cluster_config["ip0"]? + if ip0 + ip0 = ip0.as_i + else + ip0 = 1 + end + + # steps for each host + cluster_config["nodes"].as_h.each do |host, config| + tbox_group = host.to_s + job_id = add_task(tbox_group, lab) + + # return when job_id is '0' + # 2 Questions: + # - how to deal with the jobs added to DB prior to this loop + # - may consume job before all jobs done + return job_messages << { + "job_id" => "0", + "message" => "add task queue sched/#{tbox_group} failed", + "job_state" => "submit", + } unless job_id + + job_ids << job_id + + # add to job content when multi-test + job["testbox"] = tbox_group + job.update_tbox_group(tbox_group) + job["node_roles"] = config["roles"].as_a.join(" ") + direct_macs = config["macs"].as_a + direct_ips = [] of String + direct_macs.size.times do + raise "Host id is greater than 254, host_id: #{ip0}" if ip0 > 254 + direct_ips << "#{net_id}.#{ip0}" + ip0 += 1 + end + job["direct_macs"] = direct_macs.join(" ") + job["direct_ips"] = direct_ips.join(" ") + + response = add_job(job, job_id) + message = (response["error"]? ? response["error"]["root_cause"] : "") + job_messages << { + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", + } + return job_messages if response["error"]? + end + + cluster_id = job_ids[0] + + # collect all host states + cluster_state = Hash(String, Hash(String, String)).new + job_ids.each do |job_id| + cluster_state[job_id] = {"state" => ""} + # will get cluster id according to job id + @redis.hash_set("sched/id2cluster", job_id, cluster_id) + end + + @redis.hash_set("sched/cluster_state", cluster_id, cluster_state.to_json) + + return job_messages + end + + # return: + # success: [{"job_id" => job_id, "message" => "", job_state => "submit"}] + # failure: [{"job_id" => "0", "message" => err_msg, job_state => "submit"}] + def submit_single_job(job) + queue = job.queue + return [{ + "job_id" => "0", + "message" => "get queue failed", + "job_state" => "submit", + }] unless queue + + # only single job will has "idle job" and "execute rate limiter" + if job["idle_job"].empty? + queue += "#{job.get_uuid_tag}" + else + queue = "#{queue}/idle" + end + + job_id = add_task(queue, job.lab) + return [{ + "job_id" => "0", + "message" => "add task queue sched/#{queue} failed", + "job_state" => "submit", + }] unless job_id + + response = add_job(job, job_id) + message = (response["error"]? ? response["error"]["root_cause"] : "") + + return [{ + "job_id" => job_id, + "message" => message.to_s, + "job_state" => "submit", + "result_root" => "/srv#{job.result_root}", + }] + end + + # return job_id + def add_task(queue, lab) + task_desc = JSON.parse(%({"domain": "compass-ci", "lab": "#{lab}"})) + response = @task_queue.add_task("sched/#{queue}", task_desc) + JSON.parse(response[1].to_json)["id"].to_s if response[0] == 200 + end + + # add job content to es and return a response + def add_job(job, job_id) + job.update_id(job_id) + @es.set_job_content(job) + end + + # get cluster config using own lkp_src cluster file, + # a hash type will be returned + def get_cluster_config(cluster_file, lkp_initrd_user, os_arch) + lkp_src = Jobfile::Operate.prepare_lkp_tests(lkp_initrd_user, os_arch) + cluster_file_path = Path.new(lkp_src, "cluster", cluster_file) + return YAML.parse(File.read(cluster_file_path)) + end + + def get_commit_date(job) + if (job["upstream_repo"] != "") && (job["upstream_commit"] != "") + data = JSON.parse(%({"git_repo": "#{job["upstream_repo"]}.git", + "git_command": ["git-log", "--pretty=format:%cd", "--date=unix", + "#{job["upstream_commit"]}", "-1"]})) + response = @rgc.git_command(data) + return response.body if response.status_code == 200 + end + + return nil + end +end