
On Mon, Nov 09, 2020 at 03:20:32PM +0800, Cao Xueliang wrote:
+ # all request states: + # wait_ready | abort | failed | finished | wait_finish | + # write_state | roles_ip + def request_cluster_state(env)
The function should be the first function in the file.
Got it. Thanks, RenWen
Thanks, Xueliang
+ request_state = env.params.query["state"] + job_id = env.params.query["job_id"] + cluster_id = @redis.hash_get("sched/id2cluster", job_id).not_nil! + cluster_state = "" + + states = {"abort" => "abort", + "finished" => "finish", + "failed" => "abort", + "wait_ready" => "ready", + "wait_finish" => "finish"} + + case request_state + when "abort", "finished", "failed" + # update node state only + update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) + when "wait_ready" + update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) + @block_helper.block_until_finished(cluster_id) { + cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state]) + cluster_state == "ready" || cluster_state == "abort" + } + + return cluster_state + when "wait_finish" + update_cluster_state(cluster_id, job_id, {"state" => states[request_state]}) + while 1 + sleep(10) + cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state]) + break if (cluster_state == "finish" || cluster_state == "abort") + end + + return cluster_state + when "write_state" + node_roles = env.params.query["node_roles"] + node_ip = env.params.query["ip"] + direct_ips = env.params.query["direct_ips"] + direct_macs = env.params.query["direct_macs"] + + job_info = {"roles" => node_roles, + "ip" => node_ip, + "direct_ips" => direct_ips, + "direct_macs" => direct_macs} + update_cluster_state(cluster_id, job_id, job_info) + when "roles_ip" + role = "server" + role_state = get_role_state(cluster_id, role) + raise "Missing #{role} state in cluster state" unless role_state + return "server=#{role_state["ip"]}\n" \ + "direct_server_ips=#{role_state["direct_ips"]}" + end + + # show cluster state + return @redis.hash_get("sched/cluster_state", cluster_id) + end + + # get the node state of role from cluster_state + private def get_role_state(cluster_id, role) + cluster_state = get_cluster_state(cluster_id) + cluster_state.each_value do |role_state| + return role_state if role_state["roles"] == role + end + end + + # node_state: "finish" | "ready" + def sync_cluster_state(cluster_id, job_id, node_state) + cluster_state = get_cluster_state(cluster_id) + cluster_state.each_value do |host_state| + state = host_state["state"] + return "abort" if state == "abort" + end + + cluster_state.each_value do |host_state| + state = host_state["state"] + next if "#{state}" == "#{node_state}" + return "retry" + end + + # cluster state is node state when all nodes are normal + return node_state + end +end -- 2.23.0