before, we add job to taskqueue, then we save job to es, after, we save job to es at first, then we add job to etcd.
Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/scheduler/submit_job.cr | 64 +++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 31 deletions(-)
diff --git a/src/scheduler/submit_job.cr b/src/scheduler/submit_job.cr index fe9efa1..c639ad7 100644 --- a/src/scheduler/submit_job.cr +++ b/src/scheduler/submit_job.cr @@ -58,18 +58,12 @@ class Sched next if (config["roles"].as_a.map(&.to_s) & roles).empty?
queue = host.to_s - job_id = add_task("#{queue}/#{subqueue}", lab) + job_id = get_job_id(lab)
# return when job_id is '0' # 2 Questions: # - how to deal with the jobs added to DB prior to this loop # - may consume job before all jobs done - return job_messages << { - "job_id" => "0", - "message" => "add task queue sched/#{queue} failed", - "job_state" => "submit", - } unless job_id - job_ids << job_id
# add to job content when multi-test @@ -87,15 +81,15 @@ class Sched job["direct_macs"] = direct_macs.join(" ") job["direct_ips"] = direct_ips.join(" ")
- response = add_job(job, job_id) - message = (response["error"]? ? response["error"]["root_cause"] : "") + status, msg = add_job(job, job_id) + job_id = "0" unless status job_messages << { "job_id" => job_id, - "message" => message.to_s, + "message" => msg, "job_state" => "submit", "result_root" => "/srv#{job.result_root}", } - return job_messages if response["error"]? + return job_messages unless status end
cluster_id = job_ids[0] @@ -117,31 +111,31 @@ class Sched # success: [{"job_id" => job_id, "message" => "", job_state => "submit"}] # failure: [{"job_id" => "0", "message" => err_msg, job_state => "submit"}] def submit_single_job(job) - queue = "#{job.queue}/#{job.subqueue}" + job_id = get_job_id(job.lab)
- job_id = add_task(queue, job.lab) + status, msg = add_job(job, job_id) + job_id = "0" unless status return [{ - "job_id" => "0", - "message" => "add task queue sched/#{queue} failed", + "job_id" => job_id, + "message" => msg, "job_state" => "submit", - }] unless job_id - - response = add_job(job, job_id) - message = (response["error"]? ? response["error"]["root_cause"] : "") - - return [{ - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", "result_root" => "/srv#{job.result_root}", }] end
- # return job_id - def add_task(queue, lab) - task_desc = JSON.parse(%({"domain": "compass-ci", "lab": "#{lab}"})) - response = @task_queue.add_task("sched/#{queue}", task_desc) - JSON.parse(response[1].to_json)["id"].to_s if response[0] == 200 + def add_task(job, job_id) + task_desc = JSON.parse(%({"domain": "compass-ci", "id": "#{job_id}"})) + key = "sched/#{job.queue}/ready/#{job.subqueue}/#{job_id}" + response = @etcd.put(key, task_desc) + if response + return true, "" + else + return false, "add the job to queue failed: job_id #{job_id}, queue #{key}" + end + end + + def get_job_id(lab) + "#{lab}.#{@redis.get_new_seqno()}" end
def save_secrets(job, job_id) @@ -150,11 +144,19 @@ class Sched @redis.hash_set("id2secrets", job_id, job["secrets"]?.to_json) job.delete("secrets") end - # add job content to es and return a response + def add_job(job, job_id) + # deal the job fields at first save_secrets(job, job_id) job.update_id(job_id) - @es.set_job_content(job) + + # save the job to es + response = @es.set_job_content(job) + msg = (response["error"]? ? response["error"]["root_cause"] : "") + return false, msg.to_s if response["error"]? + + # set job_id to ready queue + add_task(job, job_id) end
def get_cluster_config(cluster_file, lkp_initrd_user, os_arch)