before, we add job to taskqueue, then we save job to es,
after, we save job to es at first, then we add job to etcd.
Signed-off-by: Cao Xueliang <caoxl78320(a)163.com>
---
src/scheduler/submit_job.cr | 64 +++++++++++++++++++------------------
1 file changed, 33 insertions(+), 31 deletions(-)
diff --git a/src/scheduler/submit_job.cr b/src/scheduler/submit_job.cr
index fe9efa1..c639ad7 100644
--- a/src/scheduler/submit_job.cr
+++ b/src/scheduler/submit_job.cr
@@ -58,18 +58,12 @@ class Sched
next if (config["roles"].as_a.map(&.to_s) & roles).empty?
queue = host.to_s
- job_id = add_task("#{queue}/#{subqueue}", lab)
+ job_id = get_job_id(lab)
# return when job_id is '0'
# 2 Questions:
# - how to deal with the jobs added to DB prior to this loop
# - may consume job before all jobs done
- return job_messages << {
- "job_id" => "0",
- "message" => "add task queue sched/#{queue} failed",
- "job_state" => "submit",
- } unless job_id
-
job_ids << job_id
# add to job content when multi-test
@@ -87,15 +81,15 @@ class Sched
job["direct_macs"] = direct_macs.join(" ")
job["direct_ips"] = direct_ips.join(" ")
- response = add_job(job, job_id)
- message = (response["error"]? ? response["error"]["root_cause"] : "")
+ status, msg = add_job(job, job_id)
+ job_id = "0" unless status
job_messages << {
"job_id" => job_id,
- "message" => message.to_s,
+ "message" => msg,
"job_state" => "submit",
"result_root" => "/srv#{job.result_root}",
}
- return job_messages if response["error"]?
+ return job_messages unless status
end
cluster_id = job_ids[0]
@@ -117,31 +111,31 @@ class Sched
# success: [{"job_id" => job_id, "message" => "", job_state => "submit"}]
# failure: [{"job_id" => "0", "message" => err_msg, job_state => "submit"}]
def submit_single_job(job)
- queue = "#{job.queue}/#{job.subqueue}"
+ job_id = get_job_id(job.lab)
- job_id = add_task(queue, job.lab)
+ status, msg = add_job(job, job_id)
+ job_id = "0" unless status
return [{
- "job_id" => "0",
- "message" => "add task queue sched/#{queue} failed",
+ "job_id" => job_id,
+ "message" => msg,
"job_state" => "submit",
- }] unless job_id
-
- response = add_job(job, job_id)
- message = (response["error"]? ? response["error"]["root_cause"] : "")
-
- return [{
- "job_id" => job_id,
- "message" => message.to_s,
- "job_state" => "submit",
"result_root" => "/srv#{job.result_root}",
}]
end
- # return job_id
- def add_task(queue, lab)
- task_desc = JSON.parse(%({"domain": "compass-ci", "lab": "#{lab}"}))
- response = @task_queue.add_task("sched/#{queue}", task_desc)
- JSON.parse(response[1].to_json)["id"].to_s if response[0] == 200
+ def add_task(job, job_id)
+ task_desc = JSON.parse(%({"domain": "compass-ci", "id": "#{job_id}"}))
+ key = "sched/#{job.queue}/ready/#{job.subqueue}/#{job_id}"
+ response = @etcd.put(key, task_desc)
+ if response
+ return true, ""
+ else
+ return false, "add the job to queue failed: job_id #{job_id}, queue #{key}"
+ end
+ end
+
+ def get_job_id(lab)
+ "#{lab}.#{(a)redis.get_new_seqno()}"
end
def save_secrets(job, job_id)
@@ -150,11 +144,19 @@ class Sched
@redis.hash_set("id2secrets", job_id, job["secrets"]?.to_json)
job.delete("secrets")
end
- # add job content to es and return a response
+
def add_job(job, job_id)
+ # deal the job fields at first
save_secrets(job, job_id)
job.update_id(job_id)
- @es.set_job_content(job)
+
+ # save the job to es
+ response = @es.set_job_content(job)
+ msg = (response["error"]? ? response["error"]["root_cause"] : "")
+ return false, msg.to_s if response["error"]?
+
+ # set job_id to ready queue
+ add_task(job, job_id)
end
def get_cluster_config(cluster_file, lkp_initrd_user, os_arch)
--
2.23.0