When the scheduler service receives a job event,
sends a message to the MQ queue.
The lifecycle service receives and processes these events.
Signed-off-by: Wu Zhende <wuzhende666(a)163.com>
---
src/lib/sched.cr | 2 ++
src/scheduler/close_job.cr | 13 +++++++++++++
src/scheduler/constants.cr | 2 ++
src/scheduler/find_job_boot.cr | 13 +++++++++++++
src/scheduler/update_job_parameter.cr | 11 +++++++++++
5 files changed, 41 insertions(+)
diff --git a/src/lib/sched.cr b/src/lib/sched.cr
index 5f28de9..bec90cd 100644
--- a/src/lib/sched.cr
+++ b/src/lib/sched.cr
@@ -4,6 +4,7 @@
require "kemal"
require "yaml"
+require "./mq"
require "./job"
require "./web_env"
require "./block_helper"
@@ -34,6 +35,7 @@ class Sched
@es = Elasticsearch::Client.new
Redis::Client.set_pool_size(1000)
@redis = Redis::Client.instance
+ @mq = MQClient.instance
@task_queue = TaskQueueAPI.new
@rgc = RemoteGitClient.new
@env = env
diff --git a/src/scheduler/close_job.cr b/src/scheduler/close_job.cr
index b3e6716..ea1dbf5 100644
--- a/src/scheduler/close_job.cr
+++ b/src/scheduler/close_job.cr
@@ -6,11 +6,14 @@ class Sched
job_id = @env.params.query["job_id"]?
return unless job_id
+ @env.set "job_id", job_id
+
job = @redis.get_job(job_id)
# update job_state
job_state = @env.params.query["job_state"]?
job["job_state"] = job_state if job_state
+ job["job_state"] = "complete" if job["job_state"] == "boot"
response = @es.set_job_content(job)
if response["_id"] == nil
@@ -34,5 +37,15 @@ class Sched
@log.info(%({"job_id": "#{job_id}", "job_state": "#{job_state}"}))
rescue e
@log.warn(e)
+ ensure
+ source = @env.params.query["source"]?
+ if source != "lifecycle"
+ mq_msg = {
+ "job_id" => @env.get?("job_id").to_s,
+ "job_state" => "close",
+ "time" => get_time
+ }
+ @mq.pushlish_confirm(JOB_MQ, mq_msg.to_json)
+ end
end
end
diff --git a/src/scheduler/constants.cr b/src/scheduler/constants.cr
index d7d7a9a..3fdb983 100644
--- a/src/scheduler/constants.cr
+++ b/src/scheduler/constants.cr
@@ -12,6 +12,8 @@ JOB_ES_PORT = 9200
JOB_ES_PORT_DEBUG = 9201
JOB_INDEX_TYPE = "jobs/_doc"
+JOB_MQ = "job_mq"
+
LAB = (ENV.has_key?("lab") ? ENV["lab"] : "nolab")
SCHED_HOST = (ENV.has_key?("SCHED_HOST") ? ENV["SCHED_HOST"] : "172.17.0.1")
diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr
index b7fe0e7..a876abc 100644
--- a/src/scheduler/find_job_boot.cr
+++ b/src/scheduler/find_job_boot.cr
@@ -16,6 +16,7 @@ class Sched
host = value
end
+ @env.set "testbox", host
response = get_job_boot(host, boot_type)
job_id = response[/tmpfs\/(.*)\/job\.cgz/, 1]?
@log.info(%({"job_id": "#{job_id}", "job_state": "boot"})) if job_id
@@ -23,6 +24,15 @@ class Sched
response
rescue e
@log.warn(e)
+ ensure
+ mq_msg = {
+ "job_id" => @env.get?("job_id").to_s,
+ "testbox" => @env.get?("testbox").to_s,
+ "deadline" => @env.get?("deadline").to_s,
+ "time" => get_time,
+ "job_state" => "boot"
+ }
+ @mq.pushlish_confirm(JOB_MQ, mq_msg.to_json)
end
# auto submit a job to collect the host information.
@@ -104,6 +114,9 @@ class Sched
if job
@es.set_job_content(job)
+ @env.set "job_id", job["id"]
+ @env.set "deadline", job["deadline"]
+ @env.set "job_state", job["job_state"]
create_job_cpio(job.dump_to_json_any, Kemal.config.public_folder)
else
# for physical machines
diff --git a/src/scheduler/update_job_parameter.cr b/src/scheduler/update_job_parameter.cr
index 744274a..cc4cf87 100644
--- a/src/scheduler/update_job_parameter.cr
+++ b/src/scheduler/update_job_parameter.cr
@@ -8,6 +8,8 @@ class Sched
return false
end
+ @env.set "job_id", job_id
+
# try to get report value and then update it
job_content = {} of String => String
job_content["id"] = job_id
@@ -30,7 +32,16 @@ class Sched
log = job_content.dup
log["job_id"] = log.delete("id").not_nil!
@log.info(log.to_json)
+
+ @env.set "job_state", job_content["job_state"]?
rescue e
@log.warn(e)
+ ensure
+ mq_msg = {
+ "job_id" => @env.get?("job_id").to_s,
+ "job_state" => (@env.get?("job_state") || "update").to_s,
+ "time" => get_time
+ }
+ @mq.pushlish_confirm(JOB_MQ, mq_msg.to_json)
end
end
--
2.23.0