Function:
  processing timeout jobs/machines
  processing crash jobs/machines
Signed-off-by: Wu Zhende <wuzhende666(a)163.com>
---
 src/lib/lifecycle.cr | 310 +++++++++++++++++++++++++++++++++++++++++--
 src/lifecycle.cr     |  17 ++-
 2 files changed, 311 insertions(+), 16 deletions(-)
diff --git a/src/lib/lifecycle.cr b/src/lib/lifecycle.cr
index af5cd07..8c52f11 100644
--- a/src/lib/lifecycle.cr
+++ b/src/lib/lifecycle.cr
@@ -1,19 +1,40 @@
 # SPDX-License-Identifier: MulanPSL-2.0+
 # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
 
+require "set"
 require "kemal"
 require "yaml"
 
-require "./web_env"
+require "./mq"
+require "./scheduler_api"
 require "../scheduler/elasticsearch_client"
+require "../lifecycle/constants"
+
+class String
+  def bigger_than?(time)
+    return false if self.empty?
+
+    time = time.to_s
+    return true if time.empty?
+
+    time = Time.parse(time, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+    self_time = Time.parse(self, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+
+    self_time > time
+  end
+end
 
 class Lifecycle
   property es
 
-  def initialize(env : HTTP::Server::Context)
+  def initialize
+    @mq = MQClient.instance
     @es = Elasticsearch::Client.new
-    @env = env
-    @log = env.log.as(JSONLogger)
+    @scheduler_api = SchedulerAPI.new
+    @log = JSONLogger.new
+    @jobs = Hash(String, JSON::Any).new
+    @machines = Hash(String, JSON::Any).new
+    @match = Hash(String, Set(String)).new {|h, k| h[k] = Set(String).new}
   end
 
   def alive(version)
@@ -22,18 +43,287 @@ class Lifecycle
     @log.warn(e)
   end
 
-  def get_running_testbox
-    size = @env.params.query["size"]? || 20
-    from = @env.params.query["from"]? || 0
+  def init_from_es
+    jobs = get_active_jobs
+    jobs.each do |result|
+      job_id = result["_id"].to_s
+      job = result["_source"].as_h
+      job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)}
+
+      @jobs[job_id] = JSON.parse(job.to_json)
+      @match[job["testbox"].to_s] << job_id
+    end
+
+    machines = get_active_machines
+    machines.each do |result|
+      testbox = result["_id"].to_s
+      machine = result["_source"].as_h
+      machine.delete("history")
+
+      machine = JSON.parse(machine.to_json)
+      @machines[testbox] = machine
+
+      deal_match_job(testbox, machine["job_id"].to_s)
+    end
+  end
+
+  def deal_match_job(testbox, job_id)
+    @match[testbox].each do |id|
+      next if id == job_id
+
+      msg = {
+        "job_id" => id,
+        "job_state" => "occupied",
+        "testbox" => testbox
+      }
+      @mq.pushlish_confirm("job_mq", msg.to_json)
+      @match[testbox].delete(id)
+    end
+  end
+
+  def get_active_jobs
+    query = {
+      "size" => 10000,
+      "query" => {
+        "term" => {
+          "job_state" => "boot"
+        }
+      }
+    }
+    @es.search("jobs", query)
+  end
+
+  def get_active_machines
     query = {
-      "size" => size,
-      "from" => from,
+      "size" => 10000,
       "query" => {
         "terms" => {
-          "state" => ["booting", "running"]
+          "state" => ["booting", "running", "rebooting"]
         }
       }
     }
     @es.search("testbox", query)
   end
+
+  def deal_job_events_from_mq
+    q = @mq.ch.queue("job_mq")
+    q.subscribe(no_ack: false) do |msg|
+      event = JSON.parse(msg.body_io.to_s)
+      job_state = event["job_state"]?
+
+      case job_state
+      when "boot"
+        deal_boot_event(event)
+      when "close"
+        deal_close_event(event)
+      when "occupied"
+        deal_occupied_event(event)
+      else
+        deal_other_event(event)
+      end
+      @mq.ch.basic_ack(msg.delivery_tag)
+    end
+  end
+
+  def deal_other_event(event)
+    event_job_id = event["job_id"].to_s
+    return if event_job_id.empty?
+
+    update_cached_job(event_job_id, event)
+
+    job = @jobs[event_job_id]?
+    return unless job
+
+    testbox = job["testbox"].to_s
+    update_cached_machine(testbox, event)
+  end
+
+  def update_cached_machine(testbox, event)
+    machine = @machines[testbox]?
+    return if machine && !event["time"].to_s.bigger_than?(machine["time"]?)
+
+    update_es_machine_time(testbox, event)
+  end
+
+  def update_es_machine_time(testbox, event)
+    machine = @es.get_tbox(testbox)
+    return unless machine
+    return unless event["time"].to_s.bigger_than?(machine["time"]?)
+
+    machine.as_h.delete("history")
+    machine.as_h["time"] = event["time"]
+    machine.as_h["state"] = JSON::Any.new("booting")
+    @machines[testbox] = machine
+    @es.update_tbox(testbox, machine.as_h)
+  end
+
+  def update_cached_job(job_id, event)
+    job = @jobs[job_id]?
+    if job
+      @jobs[job_id] = JSON.parse(job.as_h.merge!(event.as_h).to_json)
+    else
+      job = @es.get_job(job_id)
+      return unless job
+      return if JOB_CLOSE_STATE.includes?(job["job_state"]?)
+
+      job = job.dump_to_json_any.as_h
+      job.delete_if{|key, _| !JOB_KEYWORDS.includes?(key)}
+      job["job_state"] = event["job_state"]
+      @jobs[job_id] = JSON.parse(job.to_json)
+    end
+  end
+
+  def deal_occupied_event(event)
+    event_job_id = event["job_id"].to_s
+    return unless @jobs.has_key?(event_job_id)
+
+    @jobs.delete(event_job_id)
+    spawn @scheduler_api.close_job(event_job_id, "occupied", "lifecycle")
+  end
+
+  def deal_close_event(event)
+    event_job_id = event["job_id"].to_s
+    job = @jobs[event_job_id]
+
+    return unless job
+
+    @jobs.delete(event_job_id)
+    update_cached_machine(job["testbox"].to_s, event)
+  end
+
+  def deal_boot_event(event)
+    event_job_id = event["job_id"]?.to_s
+    @jobs[event_job_id] = event unless event_job_id.empty?
+    machine = @machines[event["testbox"]]?
+    deal_boot_machine(machine, event)
+  end
+
+  def deal_boot_machine(machine, event)
+    event_job_id = event["job_id"]?.to_s
+    if machine
+      machine_job_id = machine["job_id"].to_s
+      # The job is not updated
+      # No action is required
+      return if event_job_id == machine_job_id
+
+      time = machine["time"]?
+      # Skip obsolete event
+      return unless event["time"].to_s.bigger_than?(time)
+
+      @machines[event["testbox"].to_s] = event
+      deal_match_job(event["testbox"].to_s, event_job_id)
+
+      # No previous job to process
+      return if machine_job_id.empty?
+      return unless @jobs.has_key?(machine_job_id)
+
+      @jobs.delete(machine_job_id)
+      spawn  @scheduler_api.close_job(machine_job_id, "occupied", "lifecycle")
+    else
+      @machines[event["testbox"].to_s] = event
+    end
+  end
+
+  def max_time(times)
+    result = ""
+    times.each do |time|
+      result = time if time.to_s.bigger_than?(result)
+    end
+    return result
+  end
+
+  def deal_timeout_job
+    dead_job_id = nil
+    loop do
+      close_job(dead_job_id, "timeout") if dead_job_id
+      deadline, dead_job_id = get_min_deadline
+
+      # deal timeout job
+      next if dead_job_id && deadline <= Time.local
+
+      sleep_until(deadline)
+    end
+  end
+
+  def deal_timeout_machine
+    dead_machine_name = nil
+    loop do
+      reboot_timeout_machine(dead_machine_name) if dead_machine_name
+      deadline, dead_machine_name = get_min_deadline_machine
+
+      next if dead_machine_name && deadline <= Time.local
+
+      sleep_until(deadline)
+    end
+  end
+
+  def sleep_until(deadline)
+    s = (deadline - Time.local).total_seconds
+    sleep(s)
+  end
+
+  def get_min_deadline
+    deadline = (Time.local + 60.second)
+    dead_job_id = nil
+    @jobs.each do |id, job|
+      next unless job["deadline"]?
+      job_deadline = Time.parse(job["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+      return job_deadline, id if Time.local >= job_deadline
+      next unless deadline > job_deadline
+
+      deadline = job_deadline
+      dead_job_id = id
+    end
+    return deadline, dead_job_id
+  end
+
+  def get_min_deadline_machine
+    deadline = (Time.local + 60.second)
+    dead_machine_name = nil
+    @machines.each do |name, machine|
+      next if machine["deadline"]?.to_s.empty?
+      machine_deadline = Time.parse(machine["deadline"].to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+      return machine_deadline, name if Time.local >= machine_deadline
+      next unless deadline > machine_deadline
+
+      deadline = machine_deadline
+      dead_machine_name = name
+    end
+    return deadline, dead_machine_name
+  end
+
+  def close_job(job_id, reason)
+    @jobs.delete(job_id)
+    spawn @scheduler_api.close_job(job_id, reason, "lifecycle")
+  end
+
+  def reboot_timeout_machine(testbox)
+    @machines.delete(testbox)
+    machine = @es.get_tbox(testbox)
+
+    return unless machine
+    return if MACHINE_CLOSE_STATE.includes?(machine["state"])
+
+    deadline = machine["deadline"]?
+    return unless deadline
+
+    deadline = Time.parse(deadline.to_s, "%Y-%m-%dT%H:%M:%S", Time.local.location)
+    return if Time.local < deadline
+
+    mq_queue = get_machine_reboot_queue(testbox)
+    @mq.pushlish_confirm(mq_queue, machine.to_json)
+
+    machine["state"] = "rebooting_queue"
+    machine["time"] = Time.local.to_s("%Y-%m-%dT%H:%M:%S+0800")
+    @es.update_tbox(testbox, machine.as_h)
+  end
+
+  def get_machine_reboot_queue(testbox)
+    if testbox.includes?(".")
+      testbox =~ /.*\.(.*)-\d+$/
+    else
+      testbox =~ /(.*)--.*/
+    end
+    $1
+  end
 end
diff --git a/src/lifecycle.cr b/src/lifecycle.cr
index a864621..f73cef5 100644
--- a/src/lifecycle.cr
+++ b/src/lifecycle.cr
@@ -2,15 +2,20 @@
 # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
 
 require "lifecycle/lifecycle"
-require "./lifecycle/constants.cr"
+require "./lifecycle/constants"
 require "./lib/json_logger"
+require "./lib/lifecycle"
 
 module Cycle
   log = JSONLogger.new
+  lifecycle = Lifecycle.new
 
-  begin
-    Kemal.run(LIFECYCLE_PORT)
-  rescue e
-    log.error(e)
-  end
+  # init @jobs and @machines
+  lifecycle.init_from_es
+  lifecycle.deal_job_events_from_mq
+
+  spawn lifecycle.deal_timeout_job
+  spawn lifecycle.deal_timeout_machine
+
+  Kemal.run(LIFECYCLE_PORT)
 end
-- 
2.23.0