Function: processing timeout jobs/machines processing crash jobs/machines
Signed-off-by: Wu Zhende wuzhende666@163.com --- src/lib/lifecycle.cr | 310 +++++++++++++++++++++++++++++++++++++++++-- src/lifecycle.cr | 17 ++- 2 files changed, 311 insertions(+), 16 deletions(-)
diff --git a/src/lib/lifecycle.cr b/src/lib/lifecycle.cr index af5cd07..8c52f11 100644 --- a/src/lib/lifecycle.cr +++ b/src/lib/lifecycle.cr @@ -1,19 +1,40 @@ # SPDX-License-Identifier: MulanPSL-2.0+ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
+require "set" require "kemal" require "yaml"
-require "./web_env" +require "./mq" +require "./scheduler_api" require "../scheduler/elasticsearch_client" +require "../lifecycle/constants" + +class String + def bigger_than?(time) + return false if self.empty? + + time = time.to_s + return true if time.empty? + + time = Time.parse(time, "%Y-%m-%dT%H:%M:%S", Time.local.location) + self_time = Time.parse(self, "%Y-%m-%dT%H:%M:%S", Time.local.location) + + self_time > time + end +end
class Lifecycle property es
- def initialize(env : HTTP::Server::Context) + def initialize + @mq = MQClient.instance @es = Elasticsearch::Client.new - @env = env - @log = env.log.as(JSONLogger) + @scheduler_api = SchedulerAPI.new + @log = JSONLogger.new + @jobs = Hash(String, JSON::Any).new + @machines = Hash(String, JSON::Any).new + @match = Hash(String, Set(String)).new {|h, k| h[k] = Set(String).new} end
def alive(version) @@ -22,18 +43,287 @@ class Lifecycle @log.warn(e) end
- def get_running_testbox - size = @env.params.query["size"]? || 20 - from = @env.params.query["from"]? || 0 + def init_from_es + jobs = get_active_jobs + jobs.each do |result| + job_id = result["_id"].to_s + job = result["_source"].as_h + job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)} + + @jobs[job_id] = JSON.parse(job.to_json) + @match[job["testbox"].to_s] << job_id + end + + machines = get_active_machines + machines.each do |result| + testbox = result["_id"].to_s + machine = result["_source"].as_h + machine.delete("history") + + machine = JSON.parse(machine.to_json) + @machines[testbox] = machine + + deal_match_job(testbox, machine["job_id"].to_s) + end + end + + def deal_match_job(testbox, job_id) + @match[testbox].each do |id| + next if id == job_id + + msg = { + "job_id" => id, + "job_state" => "occupied", + "testbox" => testbox + } + @mq.pushlish_confirm("job_mq", msg.to_json) + @match[testbox].delete(id) + end + end + + def get_active_jobs + query = { + "size" => 10000, + "query" => { + "term" => { + "job_state" => "boot" + } + } + } + @es.search("jobs", query) + end + + def get_active_machines query = { - "size" => size, - "from" => from, + "size" => 10000, "query" => { "terms" => { - "state" => ["booting", "running"] + "state" => ["booting", "running", "rebooting"] } } } @es.search("testbox", query) end + + def deal_job_events_from_mq + q = @mq.ch.queue("job_mq") + q.subscribe(no_ack: false) do |msg| + event = JSON.parse(msg.body_io.to_s) + job_state = event["job_state"]? + + case job_state + when "boot" + deal_boot_event(event) + when "close" + deal_close_event(event) + when "occupied" + deal_occupied_event(event) + else + deal_other_event(event) + end + @mq.ch.basic_ack(msg.delivery_tag) + end + end + + def deal_other_event(event) + event_job_id = event["job_id"].to_s + return if event_job_id.empty? + + update_cached_job(event_job_id, event) + + job = @jobs[event_job_id]? + return unless job + + testbox = job["testbox"].to_s + update_cached_machine(testbox, event) + end + + def update_cached_machine(testbox, event) + machine = @machines[testbox]? + return if machine && !event["time"].to_s.bigger_than?(machine["time"]?) + + update_es_machine_time(testbox, event) + end + + def update_es_machine_time(testbox, event) + machine = @es.get_tbox(testbox) + return unless machine + return unless event["time"].to_s.bigger_than?(machine["time"]?) + + machine.as_h.delete("history") + machine.as_h["time"] = event["time"] + machine.as_h["state"] = JSON::Any.new("booting") + @machines[testbox] = machine + @es.update_tbox(testbox, machine.as_h) + end + + def update_cached_job(job_id, event) + job = @jobs[job_id]? + if job + @jobs[job_id] = JSON.parse(job.as_h.merge!(event.as_h).to_json) + else + job = @es.get_job(job_id) + return unless job + return if JOB_CLOSE_STATE.includes?(job["job_state"]?) + + job = job.dump_to_json_any.as_h + job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)} + job["job_state"] = event["job_state"] + @jobs[job_id] = JSON.parse(job.to_json) + end + end + + def deal_occupied_event(event) + event_job_id = event["job_id"].to_s + return unless @jobs.has_key?(event_job_id) + + @jobs.delete(event_job_id) + spawn @scheduler_api.close_job(event_job_id, "occupied", "lifecycle") + end + + def deal_close_event(event) + event_job_id = event["job_id"].to_s + job = @jobs[event_job_id] + + return unless job + + @jobs.delete(event_job_id) + update_cached_machine(job["testbox"].to_s, event) + end + + def deal_boot_event(event) + event_job_id = event["job_id"]?.to_s + @jobs[event_job_id] = event unless event_job_id.empty? + machine = @machines[event["testbox"]]? + deal_boot_machine(machine, event) + end + + def deal_boot_machine(machine, event) + event_job_id = event["job_id"]?.to_s + if machine + machine_job_id = machine["job_id"].to_s + # The job is not updated + # No action is required + return if event_job_id == machine_job_id + + time = machine["time"]? + # Skip obsolete event + return unless event["time"].to_s.bigger_than?(time) + + @machines[event["testbox"].to_s] = event + deal_match_job(event["testbox"].to_s, event_job_id) + + # No previous job to process + return if machine_job_id.empty? + return unless @jobs.has_key?(machine_job_id) + + @jobs.delete(machine_job_id) + spawn @scheduler_api.close_job(machine_job_id, "occupied", "lifecycle") + else + @machines[event["testbox"].to_s] = event + end + end + + def max_time(times) + result = "" + times.each do |time| + result = time if time.to_s.bigger_than?(result) + end + return result + end + + def deal_timeout_job + dead_job_id = nil + loop do + close_job(dead_job_id, "timeout") if dead_job_id + deadline, dead_job_id = get_min_deadline + + # deal timeout job + next if dead_job_id && deadline <= Time.local + + sleep_until(deadline) + end + end + + def deal_timeout_machine + dead_machine_name = nil + loop do + reboot_timeout_machine(dead_machine_name) if dead_machine_name + deadline, dead_machine_name = get_min_deadline_machine + + next if dead_machine_name && deadline <= Time.local + + sleep_until(deadline) + end + end + + def sleep_until(deadline) + s = (deadline - Time.local).total_seconds + sleep(s) + end + + def get_min_deadline + deadline = (Time.local + 60.second) + dead_job_id = nil + @jobs.each do |id, job| + next unless job["deadline"]? + job_deadline = Time.parse(job["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location) + return job_deadline, id if Time.local >= job_deadline + next unless deadline > job_deadline + + deadline = job_deadline + dead_job_id = id + end + return deadline, dead_job_id + end + + def get_min_deadline_machine + deadline = (Time.local + 60.second) + dead_machine_name = nil + @machines.each do |name, machine| + next if machine["deadline"]?.to_s.empty? + machine_deadline = Time.parse(machine["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location) + return machine_deadline, name if Time.local >= machine_deadline + next unless deadline > machine_deadline + + deadline = machine_deadline + dead_machine_name = name + end + return deadline, dead_machine_name + end + + def close_job(job_id, reason) + @jobs.delete(job_id) + spawn @scheduler_api.close_job(job_id, reason, "lifecycle") + end + + def reboot_timeout_machine(testbox) + @machines.delete(testbox) + machine = @es.get_tbox(testbox) + + return unless machine + return if MACHINE_CLOSE_STATE.includes?(machine["state"]) + + deadline = machine["deadline"]? + return unless deadline + + deadline = Time.parse(deadline.to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location) + return if Time.local < deadline + + mq_queue = get_machine_reboot_queue(testbox) + @mq.pushlish_confirm(mq_queue, machine.to_json) + + machine["state"] = "rebooting_queue" + machine["time"] = Time.local.to_s("%Y-%m-%dT%H:%M:%S+0800") + @es.update_tbox(testbox, machine.as_h) + end + + def get_machine_reboot_queue(testbox) + if testbox.includes?(".") + testbox =~ /.*.(.*)-\d+$/ + else + testbox =~ /(.*)--.*/ + end + $1 + end end diff --git a/src/lifecycle.cr b/src/lifecycle.cr index a864621..f73cef5 100644 --- a/src/lifecycle.cr +++ b/src/lifecycle.cr @@ -2,15 +2,20 @@ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
require "lifecycle/lifecycle" -require "./lifecycle/constants.cr" +require "./lifecycle/constants" require "./lib/json_logger" +require "./lib/lifecycle"
module Cycle log = JSONLogger.new + lifecycle = Lifecycle.new
- begin - Kemal.run(LIFECYCLE_PORT) - rescue e - log.error(e) - end + # init @jobs and @machines + lifecycle.init_from_es + lifecycle.deal_job_events_from_mq + + spawn lifecycle.deal_timeout_job + spawn lifecycle.deal_timeout_machine + + Kemal.run(LIFECYCLE_PORT) end