Function: processing timeout jobs/machines processing crash jobs/machines
Signed-off-by: Wu Zhende wuzhende666@163.com --- src/lib/lifecycle.cr | 317 +++++++++++++++++++++++++- src/lifecycle.cr | 17 +- src/lifecycle/constants.cr | 3 + src/lifecycle/lifecycle.cr | 19 +- src/scheduler/elasticsearch_client.cr | 18 +- 5 files changed, 339 insertions(+), 35 deletions(-)
diff --git a/src/lib/lifecycle.cr b/src/lib/lifecycle.cr index af5cd07..3a72906 100644 --- a/src/lib/lifecycle.cr +++ b/src/lib/lifecycle.cr @@ -1,19 +1,43 @@ # SPDX-License-Identifier: MulanPSL-2.0+ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
+require "set" require "kemal" require "yaml"
-require "./web_env" +require "./mq" +require "./scheduler_api" require "../scheduler/elasticsearch_client" +require "../lifecycle/constants" + +class String + def bigger_than?(time) + return false if self.empty? + + time = time.to_s + return true if time.empty? + + # Historical data contains time in the format of "%Y-%m-%d %H:%M:%S" + # Compatibility processing is required + time = time.gsub(/ /, "T") + time = Time.parse(time, "%Y-%m-%dT%H:%M:%S", Time.local.location) + self_time = Time.parse(self, "%Y-%m-%dT%H:%M:%S", Time.local.location) + + self_time > time + end +end
class Lifecycle property es
- def initialize(env : HTTP::Server::Context) + def initialize + @mq = MQClient.instance @es = Elasticsearch::Client.new - @env = env - @log = env.log.as(JSONLogger) + @scheduler_api = SchedulerAPI.new + @log = JSONLogger.new + @jobs = Hash(String, JSON::Any).new + @machines = Hash(String, JSON::Any).new + @match = Hash(String, Set(String)).new {|h, k| h[k] = Set(String).new} end
def alive(version) @@ -22,18 +46,291 @@ class Lifecycle @log.warn(e) end
- def get_running_testbox - size = @env.params.query["size"]? || 20 - from = @env.params.query["from"]? || 0 + def init_from_es + jobs = get_active_jobs + jobs.each do |result| + job_id = result["_id"].to_s + job = result["_source"].as_h + job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)} + + @jobs[job_id] = JSON.parse(job.to_json) + @match[job["testbox"].to_s] << job_id + end + + machines = get_active_machines + machines.each do |result| + testbox = result["_id"].to_s + machine = result["_source"].as_h + machine.delete("history") + + machine = JSON.parse(machine.to_json) + @machines[testbox] = machine + + deal_match_job(testbox, machine["job_id"].to_s) + end + end + + def deal_match_job(testbox, job_id) + @match[testbox].each do |id| + next if id == job_id + + msg = { + "job_id" => id, + "job_state" => "abnormal", + "testbox" => testbox + } + @mq.pushlish_confirm("job_mq", msg.to_json) + @match[testbox].delete(id) + end + end + + def get_active_jobs query = { - "size" => size, - "from" => from, + "size" => 10000, + "query" => { + "term" => { + "job_state" => "boot" + } + } + } + @es.search("jobs", query) + end + + def get_active_machines + query = { + "size" => 10000, "query" => { "terms" => { - "state" => ["booting", "running"] + "state" => ["booting", "running", "rebooting"] } } } @es.search("testbox", query) end + + def mq_event_loop + puts "deal job events" + q = @mq.ch.queue("job_mq") + q.subscribe(no_ack: false) do |msg| + event = JSON.parse(msg.body_io.to_s) + job_state = event["job_state"]? + + case job_state + when "boot" + on_job_boot(event) + when "close" + on_job_close(event) + when "abnormal" + on_abnormal_job(event) + else + on_other_job(event) + end + @mq.ch.basic_ack(msg.delivery_tag) + end + end + + def on_other_job(event) + event_job_id = event["job_id"].to_s + return if event_job_id.empty? + + update_cached_job(event_job_id, event) + + job = @jobs[event_job_id]? + return unless job + + testbox = job["testbox"].to_s + update_cached_machine(testbox, event) + end + + def update_cached_machine(testbox, event) + machine = @machines[testbox]? + return if machine && !event["time"].to_s.bigger_than?(machine["time"]?) + + update_es_machine_time(testbox, event) + end + + def update_es_machine_time(testbox, event) + machine = @es.get_tbox(testbox) + return unless machine + return unless event["time"].to_s.bigger_than?(machine["time"]?) + + machine.as_h.delete("history") + machine.as_h["time"] = event["time"] + machine.as_h["state"] = JSON::Any.new("booting") + @machines[testbox] = machine + @es.update_tbox(testbox, machine.as_h) + end + + def update_cached_job(job_id, event) + job = @jobs[job_id]? + if job + @jobs[job_id] = JSON.parse(job.as_h.merge!(event.as_h).to_json) + else + job = @es.get_job(job_id) + return unless job + return if JOB_CLOSE_STATE.includes?(job["job_state"]?) + + job = job.dump_to_json_any.as_h + job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)} + job["job_state"] = event["job_state"] + @jobs[job_id] = JSON.parse(job.to_json) + end + end + + def on_abnormal_job(event) + event_job_id = event["job_id"].to_s + return unless @jobs.has_key?(event_job_id) + + @jobs.delete(event_job_id) + spawn @scheduler_api.close_job(event_job_id, "abnormal", "lifecycle") + end + + def on_job_close(event) + event_job_id = event["job_id"].to_s + job = @jobs[event_job_id] + + return unless job + + @jobs.delete(event_job_id) + update_cached_machine(job["testbox"].to_s, event) + end + + def on_job_boot(event) + event_job_id = event["job_id"]?.to_s + @jobs[event_job_id] = event unless event_job_id.empty? + machine_info = @machines[event["testbox"]]? + deal_boot_machine(machine_info, event) + end + + def deal_boot_machine(machine_info, event) + event_job_id = event["job_id"]?.to_s + + unless machine_info + @machines[event["testbox"].to_s] = event + return + end + + machine_job_id = machine_info["job_id"].to_s + # The job is not updated + # No action is required + return if event_job_id == machine_job_id + + time = machine_info["time"]? + # Skip obsolete event + return unless event["time"].to_s.bigger_than?(time) + + @machines[event["testbox"].to_s] = event + + deal_match_job(event["testbox"].to_s, event_job_id) + + # No previous job to process + return if machine_job_id.empty? + return unless @jobs.has_key?(machine_job_id) + + @jobs.delete(machine_job_id) + spawn @scheduler_api.close_job(machine_job_id, "abnormal", "lifecycle") + end + + def max_time(times) + result = "" + times.each do |time| + result = time if time.to_s.bigger_than?(result) + end + return result + end + + def timeout_job_loop + dead_job_id = nil + loop do + close_job(dead_job_id, "timeout") if dead_job_id + deadline, dead_job_id = get_min_deadline + # deal timeout job + next if dead_job_id && deadline <= Time.local + + sleep_until(deadline) + end + end + + def timeout_machine_loop + dead_machine_name = nil + loop do + reboot_timeout_machine(dead_machine_name) if dead_machine_name + deadline, dead_machine_name = get_min_deadline_machine + next if dead_machine_name && deadline <= Time.local + + sleep_until(deadline) + end + end + + def sleep_until(deadline) + s = (deadline - Time.local).total_seconds + sleep(s) + end + + def get_min_deadline + deadline = (Time.local + 60.second) + dead_job_id = nil + @jobs.each do |id, job| + next unless job["deadline"]? + job_deadline = Time.parse(job["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location) + return job_deadline, id if Time.local >= job_deadline + next unless deadline > job_deadline + + deadline = job_deadline + dead_job_id = id + end + return deadline, dead_job_id + end + + def get_min_deadline_machine + deadline = (Time.local + 60.second) + dead_machine_name = nil + @machines.each do |name, machine| + next if machine["deadline"]?.to_s.empty? + + machine_deadline = Time.parse(machine["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location) + return machine_deadline, name if Time.local >= machine_deadline + next unless deadline > machine_deadline + + deadline = machine_deadline + dead_machine_name = name + end + return deadline, dead_machine_name + end + + def close_job(job_id, reason) + @jobs.delete(job_id) + spawn @scheduler_api.close_job(job_id, reason, "lifecycle") + end + + def reboot_timeout_machine(testbox) + @machines.delete(testbox) + machine = @es.get_tbox(testbox) + return unless machine + return if MACHINE_CLOSE_STATE.includes?(machine["state"]) + + deadline = machine["deadline"]? + return unless deadline + + deadline = Time.parse(deadline.to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location) + return if Time.local < deadline + + mq_queue = get_machine_reboot_queue(testbox) + @mq.pushlish_confirm(mq_queue, machine.to_json) + + machine["state"] = "rebooting_queue" + machine["time"] = Time.local.to_s("%Y-%m-%dT%H:%M:%S+0800") + @es.update_tbox(testbox, machine.as_h) + end + + def get_machine_reboot_queue(testbox) + if testbox.includes?(".") + testbox =~ /.*.(.*)-\d+$/ + else + testbox =~ /(.*)--.*/ + end + $1 + rescue + testbox + end end diff --git a/src/lifecycle.cr b/src/lifecycle.cr index a864621..f6b80a8 100644 --- a/src/lifecycle.cr +++ b/src/lifecycle.cr @@ -2,15 +2,20 @@ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
require "lifecycle/lifecycle" -require "./lifecycle/constants.cr" +require "./lifecycle/constants" require "./lib/json_logger" +require "./lib/lifecycle"
module Cycle log = JSONLogger.new + lifecycle = Lifecycle.new
- begin - Kemal.run(LIFECYCLE_PORT) - rescue e - log.error(e) - end + # init @jobs and @machines + lifecycle.init_from_es + lifecycle.mq_event_loop + + spawn lifecycle.timeout_job_loop + spawn lifecycle.timeout_machine_loop + + Kemal.run(LIFECYCLE_PORT) end diff --git a/src/lifecycle/constants.cr b/src/lifecycle/constants.cr index 137b7f9..694a407 100644 --- a/src/lifecycle/constants.cr +++ b/src/lifecycle/constants.cr @@ -2,3 +2,6 @@ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
LIFECYCLE_PORT = (ENV.has_key?("LIFECYCLE_PORT") ? ENV["LIFECYCLE_PORT"] : 11312).to_i32 +JOB_CLOSE_STATE = ["abnormal", "close", "failed", "finished", "timeout", "crash"] +JOB_KEYWORDS = ["testbox", "job_state", "deadline"] +MACHINE_CLOSE_STATE = ["real_rebooting", "rebooting_queue"] diff --git a/src/lifecycle/lifecycle.cr b/src/lifecycle/lifecycle.cr index da42fda..c318721 100644 --- a/src/lifecycle/lifecycle.cr +++ b/src/lifecycle/lifecycle.cr @@ -9,24 +9,7 @@ require "../lib/json_logger"
module Cycle VERSION = "0.1.0" - - add_context_storage_type(Time::Span) - - before_all do |env| - env.set "start_time", Time.monotonic - env.response.headers["Connection"] = "close" - env.create_log - env.create_lifecycle - end - - # echo alive get "/" do |env| - env.lifecycle.alive(VERSION) - end - - # find the testbox that are performing jobs - # curl http://localhost:11312/get_running_testbox?size=10&from=0 - get "/get_running_testbox" do |env| - env.lifecycle.get_running_testbox.to_json + "done" end end diff --git a/src/scheduler/elasticsearch_client.cr b/src/scheduler/elasticsearch_client.cr index 91fb84a..f6ff618 100644 --- a/src/scheduler/elasticsearch_client.cr +++ b/src/scheduler/elasticsearch_client.cr @@ -79,7 +79,10 @@ class Elasticsearch::Client end
def search(index, query) - @client.search({:index => index, :body => query}) + results = @client.search({:index => index, :body => query}) + raise results unless results.is_a?(JSON::Any) + + results["hits"]["hits"].as_a end
def update_account(account_content : JSON::Any, my_email : String) @@ -92,6 +95,19 @@ class Elasticsearch::Client ) end
+ def get_tbox(testbox) + query = {:index => "testbox", :type => "_doc", :id => testbox} + return nil unless @client.exists(query) + + response = @client.get_source(query) + case response + when JSON::Any + return response + else + return nil + end + end + def update_tbox(testbox : String, wtmp_hash : Hash) query = {:index => "testbox", :type => "_doc", :id => testbox} if @client.exists(query)