When the scheduler service receives a job event, sends a message to the MQ queue. The lifecycle service receives and processes these events.
Signed-off-by: Wu Zhende wuzhende666@163.com --- src/lib/sched.cr | 2 ++ src/scheduler/close_job.cr | 13 +++++++++++++ src/scheduler/constants.cr | 2 ++ src/scheduler/find_job_boot.cr | 13 +++++++++++++ src/scheduler/update_job_parameter.cr | 11 +++++++++++ 5 files changed, 41 insertions(+)
diff --git a/src/lib/sched.cr b/src/lib/sched.cr index 5f28de9..bec90cd 100644 --- a/src/lib/sched.cr +++ b/src/lib/sched.cr @@ -4,6 +4,7 @@ require "kemal" require "yaml"
+require "./mq" require "./job" require "./web_env" require "./block_helper" @@ -34,6 +35,7 @@ class Sched @es = Elasticsearch::Client.new Redis::Client.set_pool_size(1000) @redis = Redis::Client.instance + @mq = MQClient.instance @task_queue = TaskQueueAPI.new @rgc = RemoteGitClient.new @env = env diff --git a/src/scheduler/close_job.cr b/src/scheduler/close_job.cr index b3e6716..ea1dbf5 100644 --- a/src/scheduler/close_job.cr +++ b/src/scheduler/close_job.cr @@ -6,11 +6,14 @@ class Sched job_id = @env.params.query["job_id"]? return unless job_id
+ @env.set "job_id", job_id + job = @redis.get_job(job_id)
# update job_state job_state = @env.params.query["job_state"]? job["job_state"] = job_state if job_state + job["job_state"] = "complete" if job["job_state"] == "boot"
response = @es.set_job_content(job) if response["_id"] == nil @@ -34,5 +37,15 @@ class Sched @log.info(%({"job_id": "#{job_id}", "job_state": "#{job_state}"})) rescue e @log.warn(e) + ensure + source = @env.params.query["source"]? + if source != "lifecycle" + mq_msg = { + "job_id" => @env.get?("job_id").to_s, + "job_state" => "close", + "time" => get_time + } + @mq.pushlish_confirm(JOB_MQ, mq_msg.to_json) + end end end diff --git a/src/scheduler/constants.cr b/src/scheduler/constants.cr index d7d7a9a..3fdb983 100644 --- a/src/scheduler/constants.cr +++ b/src/scheduler/constants.cr @@ -12,6 +12,8 @@ JOB_ES_PORT = 9200 JOB_ES_PORT_DEBUG = 9201 JOB_INDEX_TYPE = "jobs/_doc"
+JOB_MQ = "job_mq" + LAB = (ENV.has_key?("lab") ? ENV["lab"] : "nolab")
SCHED_HOST = (ENV.has_key?("SCHED_HOST") ? ENV["SCHED_HOST"] : "172.17.0.1") diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr index b7fe0e7..a876abc 100644 --- a/src/scheduler/find_job_boot.cr +++ b/src/scheduler/find_job_boot.cr @@ -16,6 +16,7 @@ class Sched host = value end
+ @env.set "testbox", host response = get_job_boot(host, boot_type) job_id = response[/tmpfs/(.*)/job.cgz/, 1]? @log.info(%({"job_id": "#{job_id}", "job_state": "boot"})) if job_id @@ -23,6 +24,15 @@ class Sched response rescue e @log.warn(e) + ensure + mq_msg = { + "job_id" => @env.get?("job_id").to_s, + "testbox" => @env.get?("testbox").to_s, + "deadline" => @env.get?("deadline").to_s, + "time" => get_time, + "job_state" => "boot" + } + @mq.pushlish_confirm(JOB_MQ, mq_msg.to_json) end
# auto submit a job to collect the host information. @@ -104,6 +114,9 @@ class Sched
if job @es.set_job_content(job) + @env.set "job_id", job["id"] + @env.set "deadline", job["deadline"] + @env.set "job_state", job["job_state"] create_job_cpio(job.dump_to_json_any, Kemal.config.public_folder) else # for physical machines diff --git a/src/scheduler/update_job_parameter.cr b/src/scheduler/update_job_parameter.cr index 744274a..cc4cf87 100644 --- a/src/scheduler/update_job_parameter.cr +++ b/src/scheduler/update_job_parameter.cr @@ -8,6 +8,8 @@ class Sched return false end
+ @env.set "job_id", job_id + # try to get report value and then update it job_content = {} of String => String job_content["id"] = job_id @@ -30,7 +32,16 @@ class Sched log = job_content.dup log["job_id"] = log.delete("id").not_nil! @log.info(log.to_json) + + @env.set "job_state", job_content["job_state"]? rescue e @log.warn(e) + ensure + mq_msg = { + "job_id" => @env.get?("job_id").to_s, + "job_state" => (@env.get?("job_state") || "update").to_s, + "time" => get_time + } + @mq.pushlish_confirm(JOB_MQ, mq_msg.to_json) end end