Function:
processing timeout jobs/machines
processing crash jobs/machines
Signed-off-by: Wu Zhende <wuzhende666(a)163.com>
---
src/lib/lifecycle.cr | 310 +++++++++++++++++++++++++++++++++++++++++--
src/lifecycle.cr | 17 ++-
2 files changed, 311 insertions(+), 16 deletions(-)
diff --git a/src/lib/lifecycle.cr b/src/lib/lifecycle.cr
index af5cd07..8c52f11 100644
--- a/src/lib/lifecycle.cr
+++ b/src/lib/lifecycle.cr
@@ -1,19 +1,40 @@
# SPDX-License-Identifier: MulanPSL-2.0+
# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
+require "set"
require "kemal"
require "yaml"
-require "./web_env"
+require "./mq"
+require "./scheduler_api"
require "../scheduler/elasticsearch_client"
+require "../lifecycle/constants"
+
+class String
+ def bigger_than?(time)
+ return false if self.empty?
+
+ time = time.to_s
+ return true if time.empty?
+
+ time = Time.parse(time, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+ self_time = Time.parse(self, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+
+ self_time > time
+ end
+end
class Lifecycle
property es
- def initialize(env : HTTP::Server::Context)
+ def initialize
+ @mq = MQClient.instance
@es = Elasticsearch::Client.new
- @env = env
- @log = env.log.as(JSONLogger)
+ @scheduler_api = SchedulerAPI.new
+ @log = JSONLogger.new
+ @jobs = Hash(String, JSON::Any).new
+ @machines = Hash(String, JSON::Any).new
+ @match = Hash(String, Set(String)).new {|h, k| h[k] = Set(String).new}
end
def alive(version)
@@ -22,18 +43,287 @@ class Lifecycle
@log.warn(e)
end
- def get_running_testbox
- size = @env.params.query["size"]? || 20
- from = @env.params.query["from"]? || 0
+ def init_from_es
+ jobs = get_active_jobs
+ jobs.each do |result|
+ job_id = result["_id"].to_s
+ job = result["_source"].as_h
+ job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)}
+
+ @jobs[job_id] = JSON.parse(job.to_json)
+ @match[job["testbox"].to_s] << job_id
+ end
+
+ machines = get_active_machines
+ machines.each do |result|
+ testbox = result["_id"].to_s
+ machine = result["_source"].as_h
+ machine.delete("history")
+
+ machine = JSON.parse(machine.to_json)
+ @machines[testbox] = machine
+
+ deal_match_job(testbox, machine["job_id"].to_s)
+ end
+ end
+
+ def deal_match_job(testbox, job_id)
+ @match[testbox].each do |id|
+ next if id == job_id
+
+ msg = {
+ "job_id" => id,
+ "job_state" => "occupied",
+ "testbox" => testbox
+ }
+ @mq.pushlish_confirm("job_mq", msg.to_json)
+ @match[testbox].delete(id)
+ end
+ end
+
+ def get_active_jobs
+ query = {
+ "size" => 10000,
+ "query" => {
+ "term" => {
+ "job_state" => "boot"
+ }
+ }
+ }
+ @es.search("jobs", query)
+ end
+
+ def get_active_machines
query = {
- "size" => size,
- "from" => from,
+ "size" => 10000,
"query" => {
"terms" => {
- "state" => ["booting", "running"]
+ "state" => ["booting", "running", "rebooting"]
}
}
}
@es.search("testbox", query)
end
+
+ def deal_job_events_from_mq
+ q = @mq.ch.queue("job_mq")
+ q.subscribe(no_ack: false) do |msg|
+ event = JSON.parse(msg.body_io.to_s)
+ job_state = event["job_state"]?
+
+ case job_state
+ when "boot"
+ deal_boot_event(event)
+ when "close"
+ deal_close_event(event)
+ when "occupied"
+ deal_occupied_event(event)
+ else
+ deal_other_event(event)
+ end
+ @mq.ch.basic_ack(msg.delivery_tag)
+ end
+ end
+
+ def deal_other_event(event)
+ event_job_id = event["job_id"].to_s
+ return if event_job_id.empty?
+
+ update_cached_job(event_job_id, event)
+
+ job = @jobs[event_job_id]?
+ return unless job
+
+ testbox = job["testbox"].to_s
+ update_cached_machine(testbox, event)
+ end
+
+ def update_cached_machine(testbox, event)
+ machine = @machines[testbox]?
+ return if machine && !event["time"].to_s.bigger_than?(machine["time"]?)
+
+ update_es_machine_time(testbox, event)
+ end
+
+ def update_es_machine_time(testbox, event)
+ machine = @es.get_tbox(testbox)
+ return unless machine
+ return unless event["time"].to_s.bigger_than?(machine["time"]?)
+
+ machine.as_h.delete("history")
+ machine.as_h["time"] = event["time"]
+ machine.as_h["state"] = JSON::Any.new("booting")
+ @machines[testbox] = machine
+ @es.update_tbox(testbox, machine.as_h)
+ end
+
+ def update_cached_job(job_id, event)
+ job = @jobs[job_id]?
+ if job
+ @jobs[job_id] = JSON.parse(job.as_h.merge!(event.as_h).to_json)
+ else
+ job = @es.get_job(job_id)
+ return unless job
+ return if JOB_CLOSE_STATE.includes?(job["job_state"]?)
+
+ job = job.dump_to_json_any.as_h
+ job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)}
+ job["job_state"] = event["job_state"]
+ @jobs[job_id] = JSON.parse(job.to_json)
+ end
+ end
+
+ def deal_occupied_event(event)
+ event_job_id = event["job_id"].to_s
+ return unless @jobs.has_key?(event_job_id)
+
+ @jobs.delete(event_job_id)
+ spawn @scheduler_api.close_job(event_job_id, "occupied", "lifecycle")
+ end
+
+ def deal_close_event(event)
+ event_job_id = event["job_id"].to_s
+ job = @jobs[event_job_id]
+
+ return unless job
+
+ @jobs.delete(event_job_id)
+ update_cached_machine(job["testbox"].to_s, event)
+ end
+
+ def deal_boot_event(event)
+ event_job_id = event["job_id"]?.to_s
+ @jobs[event_job_id] = event unless event_job_id.empty?
+ machine = @machines[event["testbox"]]?
+ deal_boot_machine(machine, event)
+ end
+
+ def deal_boot_machine(machine, event)
+ event_job_id = event["job_id"]?.to_s
+ if machine
+ machine_job_id = machine["job_id"].to_s
+ # The job is not updated
+ # No action is required
+ return if event_job_id == machine_job_id
+
+ time = machine["time"]?
+ # Skip obsolete event
+ return unless event["time"].to_s.bigger_than?(time)
+
+ @machines[event["testbox"].to_s] = event
+ deal_match_job(event["testbox"].to_s, event_job_id)
+
+ # No previous job to process
+ return if machine_job_id.empty?
+ return unless @jobs.has_key?(machine_job_id)
+
+ @jobs.delete(machine_job_id)
+ spawn @scheduler_api.close_job(machine_job_id, "occupied", "lifecycle")
+ else
+ @machines[event["testbox"].to_s] = event
+ end
+ end
+
+ def max_time(times)
+ result = ""
+ times.each do |time|
+ result = time if time.to_s.bigger_than?(result)
+ end
+ return result
+ end
+
+ def deal_timeout_job
+ dead_job_id = nil
+ loop do
+ close_job(dead_job_id, "timeout") if dead_job_id
+ deadline, dead_job_id = get_min_deadline
+
+ # deal timeout job
+ next if dead_job_id && deadline <= Time.local
+
+ sleep_until(deadline)
+ end
+ end
+
+ def deal_timeout_machine
+ dead_machine_name = nil
+ loop do
+ reboot_timeout_machine(dead_machine_name) if dead_machine_name
+ deadline, dead_machine_name = get_min_deadline_machine
+
+ next if dead_machine_name && deadline <= Time.local
+
+ sleep_until(deadline)
+ end
+ end
+
+ def sleep_until(deadline)
+ s = (deadline - Time.local).total_seconds
+ sleep(s)
+ end
+
+ def get_min_deadline
+ deadline = (Time.local + 60.second)
+ dead_job_id = nil
+ @jobs.each do |id, job|
+ next unless job["deadline"]?
+ job_deadline = Time.parse(job["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+ return job_deadline, id if Time.local >= job_deadline
+ next unless deadline > job_deadline
+
+ deadline = job_deadline
+ dead_job_id = id
+ end
+ return deadline, dead_job_id
+ end
+
+ def get_min_deadline_machine
+ deadline = (Time.local + 60.second)
+ dead_machine_name = nil
+ @machines.each do |name, machine|
+ next if machine["deadline"]?.to_s.empty?
+ machine_deadline = Time.parse(machine["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+ return machine_deadline, name if Time.local >= machine_deadline
+ next unless deadline > machine_deadline
+
+ deadline = machine_deadline
+ dead_machine_name = name
+ end
+ return deadline, dead_machine_name
+ end
+
+ def close_job(job_id, reason)
+ @jobs.delete(job_id)
+ spawn @scheduler_api.close_job(job_id, reason, "lifecycle")
+ end
+
+ def reboot_timeout_machine(testbox)
+ @machines.delete(testbox)
+ machine = @es.get_tbox(testbox)
+
+ return unless machine
+ return if MACHINE_CLOSE_STATE.includes?(machine["state"])
+
+ deadline = machine["deadline"]?
+ return unless deadline
+
+ deadline = Time.parse(deadline.to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+ return if Time.local < deadline
+
+ mq_queue = get_machine_reboot_queue(testbox)
+ @mq.pushlish_confirm(mq_queue, machine.to_json)
+
+ machine["state"] = "rebooting_queue"
+ machine["time"] = Time.local.to_s("%Y-%m-%dT%H:%M:%S+0800")
+ @es.update_tbox(testbox, machine.as_h)
+ end
+
+ def get_machine_reboot_queue(testbox)
+ if testbox.includes?(".")
+ testbox =~ /.*\.(.*)-\d+$/
+ else
+ testbox =~ /(.*)--.*/
+ end
+ $1
+ end
end
diff --git a/src/lifecycle.cr b/src/lifecycle.cr
index a864621..f73cef5 100644
--- a/src/lifecycle.cr
+++ b/src/lifecycle.cr
@@ -2,15 +2,20 @@
# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
require "lifecycle/lifecycle"
-require "./lifecycle/constants.cr"
+require "./lifecycle/constants"
require "./lib/json_logger"
+require "./lib/lifecycle"
module Cycle
log = JSONLogger.new
+ lifecycle = Lifecycle.new
- begin
- Kemal.run(LIFECYCLE_PORT)
- rescue e
- log.error(e)
- end
+ # init @jobs and @machines
+ lifecycle.init_from_es
+ lifecycle.deal_job_events_from_mq
+
+ spawn lifecycle.deal_timeout_job
+ spawn lifecycle.deal_timeout_machine
+
+ Kemal.run(LIFECYCLE_PORT)
end
--
2.23.0