According to scheduler.cr API to refactor sched class.
Extract request_cluster_state function from sched.cr to request_cluster_state.cr.
Signed-off-by: Ren Wen 15991987063@163.com --- src/lib/sched.cr | 108 +----------------------- src/scheduler/request_cluster_state.cr | 111 +++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 107 deletions(-) create mode 100644 src/scheduler/request_cluster_state.cr
diff --git a/src/lib/sched.cr b/src/lib/sched.cr index 3709cb1..077b071 100644 --- a/src/lib/sched.cr +++ b/src/lib/sched.cr @@ -15,6 +15,7 @@ require "../scheduler/elasticsearch_client"
require "../scheduler/find_job_boot" require "../scheduler/find_next_job_boot" +require "../scheduler/request_cluster_state"
class Sched property es @@ -49,113 +50,6 @@ class Sched @redis.hash_del("sched/host2queues", hostname) end
- # return: - # Hash(String, Hash(String, String)) - def get_cluster_state(cluster_id) - cluster_state = @redis.hash_get("sched/cluster_state", cluster_id) - if cluster_state - cluster_state = Hash(String, Hash(String, String)).from_json(cluster_state) - else - cluster_state = Hash(String, Hash(String, String)).new - end - return cluster_state - end - - # Update job info according to cluster id. - def update_cluster_state(cluster_id, job_id, job_info : Hash(String, String)) - cluster_state = get_cluster_state(cluster_id) - if cluster_state[job_id]? - cluster_state[job_id].merge!(job_info) - @redis.hash_set("sched/cluster_state", cluster_id, cluster_state.to_json) - end - end - - # Return response according to different request states. - # all request states: - # wait_ready | abort | failed | finished | wait_finish | - # write_state | roles_ip - def request_cluster_state(env) - request_state = env.params.query["state"] - job_id = env.params.query["job_id"] - cluster_id = @redis.hash_get("sched/id2cluster", job_id).not_nil! - cluster_state = "" - - states = {"abort" => "abort", - "finished" => "finish", - "failed" => "abort", - "wait_ready" => "ready", - "wait_finish" => "finish"} - - case request_state - when "abort", "finished", "failed" - # update node state only - update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) - when "wait_ready" - update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) - @block_helper.block_until_finished(cluster_id) { - cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state]) - cluster_state == "ready" || cluster_state == "abort" - } - - return cluster_state - when "wait_finish" - update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) - while 1 - sleep(10) - cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state]) - break if (cluster_state == "finish" || cluster_state == "abort") - end - - return cluster_state - when "write_state" - node_roles = env.params.query["node_roles"] - node_ip = env.params.query["ip"] - direct_ips = env.params.query["direct_ips"] - direct_macs = env.params.query["direct_macs"] - - job_info = {"roles" => node_roles, - "ip" => node_ip, - "direct_ips" => direct_ips, - "direct_macs" => direct_macs} - update_cluster_state(cluster_id, job_id, job_info) - when "roles_ip" - role = "server" - role_state = get_role_state(cluster_id, role) - raise "Missing #{role} state in cluster state" unless role_state - return "server=#{role_state["ip"]}\n" \ - "direct_server_ips=#{role_state["direct_ips"]}" - end - - # show cluster state - return @redis.hash_get("sched/cluster_state", cluster_id) - end - - # get the node state of role from cluster_state - private def get_role_state(cluster_id, role) - cluster_state = get_cluster_state(cluster_id) - cluster_state.each_value do |role_state| - return role_state if role_state["roles"] == role - end - end - - # node_state: "finish" | "ready" - def sync_cluster_state(cluster_id, job_id, node_state) - cluster_state = get_cluster_state(cluster_id) - cluster_state.each_value do |host_state| - state = host_state["state"] - return "abort" if state == "abort" - end - - cluster_state.each_value do |host_state| - state = host_state["state"] - next if "#{state}" == "#{node_state}" - return "retry" - end - - # cluster state is node state when all nodes are normal - return node_state - end - # get cluster config using own lkp_src cluster file, # a hash type will be returned def get_cluster_config(cluster_file, lkp_initrd_user, os_arch) diff --git a/src/scheduler/request_cluster_state.cr b/src/scheduler/request_cluster_state.cr new file mode 100644 index 0000000..07ba6fd --- /dev/null +++ b/src/scheduler/request_cluster_state.cr @@ -0,0 +1,111 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + +class Sched + # return: + # Hash(String, Hash(String, String)) + def get_cluster_state(cluster_id) + cluster_state = @redis.hash_get("sched/cluster_state", cluster_id) + if cluster_state + cluster_state = Hash(String, Hash(String, String)).from_json(cluster_state) + else + cluster_state = Hash(String, Hash(String, String)).new + end + return cluster_state + end + + # Update job info according to cluster id. + def update_cluster_state(cluster_id, job_id, job_info : Hash(String, String)) + cluster_state = get_cluster_state(cluster_id) + if cluster_state[job_id]? + cluster_state[job_id].merge!(job_info) + @redis.hash_set("sched/cluster_state", cluster_id, cluster_state.to_json) + end + end + + # Return response according to different request states. + # all request states: + # wait_ready | abort | failed | finished | wait_finish | + # write_state | roles_ip + def request_cluster_state(env) + request_state = env.params.query["state"] + job_id = env.params.query["job_id"] + cluster_id = @redis.hash_get("sched/id2cluster", job_id).not_nil! + cluster_state = "" + + states = {"abort" => "abort", + "finished" => "finish", + "failed" => "abort", + "wait_ready" => "ready", + "wait_finish" => "finish"} + + case request_state + when "abort", "finished", "failed" + # update node state only + update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) + when "wait_ready" + update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) + @block_helper.block_until_finished(cluster_id) { + cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state]) + cluster_state == "ready" || cluster_state == "abort" + } + + return cluster_state + when "wait_finish" + update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) + while 1 + sleep(10) + cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state]) + break if (cluster_state == "finish" || cluster_state == "abort") + end + + return cluster_state + when "write_state" + node_roles = env.params.query["node_roles"] + node_ip = env.params.query["ip"] + direct_ips = env.params.query["direct_ips"] + direct_macs = env.params.query["direct_macs"] + + job_info = {"roles" => node_roles, + "ip" => node_ip, + "direct_ips" => direct_ips, + "direct_macs" => direct_macs} + update_cluster_state(cluster_id, job_id, job_info) + when "roles_ip" + role = "server" + role_state = get_role_state(cluster_id, role) + raise "Missing #{role} state in cluster state" unless role_state + return "server=#{role_state["ip"]}\n" \ + "direct_server_ips=#{role_state["direct_ips"]}" + end + + # show cluster state + return @redis.hash_get("sched/cluster_state", cluster_id) + end + + # get the node state of role from cluster_state + private def get_role_state(cluster_id, role) + cluster_state = get_cluster_state(cluster_id) + cluster_state.each_value do |role_state| + return role_state if role_state["roles"] == role + end + end + + # node_state: "finish" | "ready" + def sync_cluster_state(cluster_id, job_id, node_state) + cluster_state = get_cluster_state(cluster_id) + cluster_state.each_value do |host_state| + state = host_state["state"] + return "abort" if state == "abort" + end + + cluster_state.each_value do |host_state| + state = host_state["state"] + next if "#{state}" == "#{node_state}" + return "retry" + end + + # cluster state is node state when all nodes are normal + return node_state + end +end