- # all request states:
- # wait_ready | abort | failed | finished | wait_finish |
- # write_state | roles_ip
- def request_cluster_state(env)
The function should be the first function in the file.
Thanks, Xueliang
- request_state = env.params.query["state"]
- job_id = env.params.query["job_id"]
- cluster_id = @redis.hash_get("sched/id2cluster", job_id).not_nil!
- cluster_state = ""
- states = {"abort" => "abort",
"finished" => "finish",
"failed" => "abort",
"wait_ready" => "ready",
"wait_finish" => "finish"}
- case request_state
- when "abort", "finished", "failed"
# update node state only
update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
- when "wait_ready"
update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
@block_helper.block_until_finished(cluster_id) {
cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state])
cluster_state == "ready" || cluster_state == "abort"
}
return cluster_state
- when "wait_finish"
update_cluster_state(cluster_id, job_id, {"state" => states[request_state]})
while 1
sleep(10)
cluster_state = sync_cluster_state(cluster_id, job_id, states[request_state])
break if (cluster_state == "finish" || cluster_state == "abort")
end
return cluster_state
- when "write_state"
node_roles = env.params.query["node_roles"]
node_ip = env.params.query["ip"]
direct_ips = env.params.query["direct_ips"]
direct_macs = env.params.query["direct_macs"]
job_info = {"roles" => node_roles,
"ip" => node_ip,
"direct_ips" => direct_ips,
"direct_macs" => direct_macs}
update_cluster_state(cluster_id, job_id, job_info)
- when "roles_ip"
role = "server"
role_state = get_role_state(cluster_id, role)
raise "Missing #{role} state in cluster state" unless role_state
return "server=#{role_state["ip"]}\n" \
"direct_server_ips=#{role_state["direct_ips"]}"
- end
- # show cluster state
- return @redis.hash_get("sched/cluster_state", cluster_id)
- end
- # get the node state of role from cluster_state
- private def get_role_state(cluster_id, role)
- cluster_state = get_cluster_state(cluster_id)
- cluster_state.each_value do |role_state|
return role_state if role_state["roles"] == role
- end
- end
- # node_state: "finish" | "ready"
- def sync_cluster_state(cluster_id, job_id, node_state)
- cluster_state = get_cluster_state(cluster_id)
- cluster_state.each_value do |host_state|
state = host_state["state"]
return "abort" if state == "abort"
- end
- cluster_state.each_value do |host_state|
state = host_state["state"]
next if "#{state}" == "#{node_state}"
return "retry"
- end
- # cluster state is node state when all nodes are normal
- return node_state
- end
+end
2.23.0