[PATCH compass-ci 3/7] scheduler: submit job to etcd db

before, we add job to taskqueue, then we save job to es, after, we save job to es at first, then we add job to etcd. Signed-off-by: Cao Xueliang <caoxl78320@163.com> --- src/scheduler/submit_job.cr | 64 +++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/src/scheduler/submit_job.cr b/src/scheduler/submit_job.cr index fe9efa1..c639ad7 100644 --- a/src/scheduler/submit_job.cr +++ b/src/scheduler/submit_job.cr @@ -58,18 +58,12 @@ class Sched next if (config["roles"].as_a.map(&.to_s) & roles).empty? queue = host.to_s - job_id = add_task("#{queue}/#{subqueue}", lab) + job_id = get_job_id(lab) # return when job_id is '0' # 2 Questions: # - how to deal with the jobs added to DB prior to this loop # - may consume job before all jobs done - return job_messages << { - "job_id" => "0", - "message" => "add task queue sched/#{queue} failed", - "job_state" => "submit", - } unless job_id - job_ids << job_id # add to job content when multi-test @@ -87,15 +81,15 @@ class Sched job["direct_macs"] = direct_macs.join(" ") job["direct_ips"] = direct_ips.join(" ") - response = add_job(job, job_id) - message = (response["error"]? ? response["error"]["root_cause"] : "") + status, msg = add_job(job, job_id) + job_id = "0" unless status job_messages << { "job_id" => job_id, - "message" => message.to_s, + "message" => msg, "job_state" => "submit", "result_root" => "/srv#{job.result_root}", } - return job_messages if response["error"]? + return job_messages unless status end cluster_id = job_ids[0] @@ -117,31 +111,31 @@ class Sched # success: [{"job_id" => job_id, "message" => "", job_state => "submit"}] # failure: [{"job_id" => "0", "message" => err_msg, job_state => "submit"}] def submit_single_job(job) - queue = "#{job.queue}/#{job.subqueue}" + job_id = get_job_id(job.lab) - job_id = add_task(queue, job.lab) + status, msg = add_job(job, job_id) + job_id = "0" unless status return [{ - "job_id" => "0", - "message" => "add task queue sched/#{queue} failed", + "job_id" => job_id, + "message" => msg, "job_state" => "submit", - }] unless job_id - - response = add_job(job, job_id) - message = (response["error"]? ? response["error"]["root_cause"] : "") - - return [{ - "job_id" => job_id, - "message" => message.to_s, - "job_state" => "submit", "result_root" => "/srv#{job.result_root}", }] end - # return job_id - def add_task(queue, lab) - task_desc = JSON.parse(%({"domain": "compass-ci", "lab": "#{lab}"})) - response = @task_queue.add_task("sched/#{queue}", task_desc) - JSON.parse(response[1].to_json)["id"].to_s if response[0] == 200 + def add_task(job, job_id) + task_desc = JSON.parse(%({"domain": "compass-ci", "id": "#{job_id}"})) + key = "sched/#{job.queue}/ready/#{job.subqueue}/#{job_id}" + response = @etcd.put(key, task_desc) + if response + return true, "" + else + return false, "add the job to queue failed: job_id #{job_id}, queue #{key}" + end + end + + def get_job_id(lab) + "#{lab}.#{@redis.get_new_seqno()}" end def save_secrets(job, job_id) @@ -150,11 +144,19 @@ class Sched @redis.hash_set("id2secrets", job_id, job["secrets"]?.to_json) job.delete("secrets") end - # add job content to es and return a response + def add_job(job, job_id) + # deal the job fields at first save_secrets(job, job_id) job.update_id(job_id) - @es.set_job_content(job) + + # save the job to es + response = @es.set_job_content(job) + msg = (response["error"]? ? response["error"]["root_cause"] : "") + return false, msg.to_s if response["error"]? + + # set job_id to ready queue + add_task(job, job_id) end def get_cluster_config(cluster_file, lkp_initrd_user, os_arch) -- 2.23.0
participants (1)
-
Cao Xueliang