According to scheduler.cr API to refactor sched class.
Extract request_cluster_state function from sched.cr to
request_cluster_state.cr.
Signed-off-by: Ren Wen <15991987063(a)163.com>
---
src/lib/sched.cr | 108 +-----------------------
src/scheduler/request_cluster_state.cr | 111 +++++++++++++++++++++++++
2 files changed, 112 insertions(+), 107 deletions(-)
create mode 100644 src/scheduler/request_cluster_state.cr
diff --git a/src/lib/sched.cr b/src/lib/sched.cr
index 3709cb1..077b071 100644
--- a/src/lib/sched.cr
+++ b/src/lib/sched.cr
@@ -15,6 +15,7 @@ require "../scheduler/elasticsearch_client"
require "../scheduler/find_job_boot"
require "../scheduler/find_next_job_boot"
+require "../scheduler/request_cluster_state"
class Sched
property es
@@ -49,113 +50,6 @@ class Sched
@redis.hash_del("sched/host2queues", hostname)
end
- # return:
- # Hash(String, Hash(String, String))
- def get_cluster_state(cluster_id)
- cluster_state = @redis.hash_get("sched/cluster_state", cluster_id)
- if cluster_state
- cluster_state = Hash(String, Hash(String, String)).from_json(cluster_state)
- else
- cluster_state = Hash(String, Hash(String, String)).new
- end
- return cluster_state
- end
-
- # Update job info according to cluster id.
- def update_cluster_state(cluster_id, job_id, job_info : Hash(String, String))
- cluster_state = get_cluster_state(cluster_id)
- if cluster_state[job_id]?
- cluster_state[job_id].merge!(job_info)
- @redis.hash_set("sched/cluster_state", cluster_id, cluster_state.to_json)
- end
- end
-
- # Return response according to different request states.
- # all request states:
- # wait_ready | abort | failed | finished | wait_finish |
- # write_state | roles_ip
- def request_cluster_state(env)
- request_state = env.params.query["state"]
- job_id = env.params.query["job_id"]
- cluster_id = @redis.hash_get("sched/id2cluster", job_id).not_nil!
- cluster_state = ""
-
- states = {"abort" => "abort",
- "finished" => "finish",
- "failed" => "abort",
- "wait_ready" => "ready",
- "wait_finish" => "finish"}
-
- case request_state
- when "abort", "finished", "failed"
- # update node state only
- update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
- when "wait_ready"
- update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
- @block_helper.block_until_finished(cluster_id) {
- cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state])
- cluster_state == "ready" || cluster_state == "abort"
- }
-
- return cluster_state
- when "wait_finish"
- update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
- while 1
- sleep(10)
- cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state])
- break if (cluster_state == "finish" || cluster_state == "abort")
- end
-
- return cluster_state
- when "write_state"
- node_roles = env.params.query["node_roles"]
- node_ip = env.params.query["ip"]
- direct_ips = env.params.query["direct_ips"]
- direct_macs = env.params.query["direct_macs"]
-
- job_info = {"roles" => node_roles,
- "ip" => node_ip,
- "direct_ips" => direct_ips,
- "direct_macs" => direct_macs}
- update_cluster_state(cluster_id, job_id, job_info)
- when "roles_ip"
- role = "server"
- role_state = get_role_state(cluster_id, role)
- raise "Missing #{role} state in cluster state" unless role_state
- return "server=#{role_state["ip"]}\n" \
- "direct_server_ips=#{role_state["direct_ips"]}"
- end
-
- # show cluster state
- return @redis.hash_get("sched/cluster_state", cluster_id)
- end
-
- # get the node state of role from cluster_state
- private def get_role_state(cluster_id, role)
- cluster_state = get_cluster_state(cluster_id)
- cluster_state.each_value do |role_state|
- return role_state if role_state["roles"] == role
- end
- end
-
- # node_state: "finish" | "ready"
- def sync_cluster_state(cluster_id, job_id, node_state)
- cluster_state = get_cluster_state(cluster_id)
- cluster_state.each_value do |host_state|
- state = host_state["state"]
- return "abort" if state == "abort"
- end
-
- cluster_state.each_value do |host_state|
- state = host_state["state"]
- next if "#{state}" == "#{node_state}"
- return "retry"
- end
-
- # cluster state is node state when all nodes are normal
- return node_state
- end
-
# get cluster config using own lkp_src cluster file,
# a hash type will be returned
def get_cluster_config(cluster_file, lkp_initrd_user, os_arch)
diff --git a/src/scheduler/request_cluster_state.cr b/src/scheduler/request_cluster_state.cr
new file mode 100644
index 0000000..07ba6fd
--- /dev/null
+++ b/src/scheduler/request_cluster_state.cr
@@ -0,0 +1,111 @@
+# SPDX-License-Identifier: MulanPSL-2.0+
+# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
+
+class Sched
+ # return:
+ # Hash(String, Hash(String, String))
+ def get_cluster_state(cluster_id)
+ cluster_state = @redis.hash_get("sched/cluster_state", cluster_id)
+ if cluster_state
+ cluster_state = Hash(String, Hash(String, String)).from_json(cluster_state)
+ else
+ cluster_state = Hash(String, Hash(String, String)).new
+ end
+ return cluster_state
+ end
+
+ # Update job info according to cluster id.
+ def update_cluster_state(cluster_id, job_id, job_info : Hash(String, String))
+ cluster_state = get_cluster_state(cluster_id)
+ if cluster_state[job_id]?
+ cluster_state[job_id].merge!(job_info)
+ @redis.hash_set("sched/cluster_state", cluster_id, cluster_state.to_json)
+ end
+ end
+
+ # Return response according to different request states.
+ # all request states:
+ # wait_ready | abort | failed | finished | wait_finish |
+ # write_state | roles_ip
+ def request_cluster_state(env)
+ request_state = env.params.query["state"]
+ job_id = env.params.query["job_id"]
+ cluster_id = @redis.hash_get("sched/id2cluster", job_id).not_nil!
+ cluster_state = ""
+
+ states = {"abort" => "abort",
+ "finished" => "finish",
+ "failed" => "abort",
+ "wait_ready" => "ready",
+ "wait_finish" => "finish"}
+
+ case request_state
+ when "abort", "finished", "failed"
+ # update node state only
+ update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
+ when "wait_ready"
+ update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
+ @block_helper.block_until_finished(cluster_id) {
+ cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state])
+ cluster_state == "ready" || cluster_state == "abort"
+ }
+
+ return cluster_state
+ when "wait_finish"
+ update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
+ while 1
+ sleep(10)
+ cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state])
+ break if (cluster_state == "finish" || cluster_state == "abort")
+ end
+
+ return cluster_state
+ when "write_state"
+ node_roles = env.params.query["node_roles"]
+ node_ip = env.params.query["ip"]
+ direct_ips = env.params.query["direct_ips"]
+ direct_macs = env.params.query["direct_macs"]
+
+ job_info = {"roles" => node_roles,
+ "ip" => node_ip,
+ "direct_ips" => direct_ips,
+ "direct_macs" => direct_macs}
+ update_cluster_state(cluster_id, job_id, job_info)
+ when "roles_ip"
+ role = "server"
+ role_state = get_role_state(cluster_id, role)
+ raise "Missing #{role} state in cluster state" unless role_state
+ return "server=#{role_state["ip"]}\n" \
+ "direct_server_ips=#{role_state["direct_ips"]}"
+ end
+
+ # show cluster state
+ return @redis.hash_get("sched/cluster_state", cluster_id)
+ end
+
+ # get the node state of role from cluster_state
+ private def get_role_state(cluster_id, role)
+ cluster_state = get_cluster_state(cluster_id)
+ cluster_state.each_value do |role_state|
+ return role_state if role_state["roles"] == role
+ end
+ end
+
+ # node_state: "finish" | "ready"
+ def sync_cluster_state(cluster_id, job_id, node_state)
+ cluster_state = get_cluster_state(cluster_id)
+ cluster_state.each_value do |host_state|
+ state = host_state["state"]
+ return "abort" if state == "abort"
+ end
+
+ cluster_state.each_value do |host_state|
+ state = host_state["state"]
+ next if "#{state}" == "#{node_state}"
+ return "retry"
+ end
+
+ # cluster state is node state when all nodes are normal
+ return node_state
+ end
+end
--
2.23.0