Before, we operate the job in redis. After, we operate the job in etcd.
Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/lib/sched.cr | 3 +++ src/scheduler/opt_job_in_etcd.cr | 36 ++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 src/scheduler/opt_job_in_etcd.cr
diff --git a/src/lib/sched.cr b/src/lib/sched.cr index b10a16d..f74df9b 100644 --- a/src/lib/sched.cr +++ b/src/lib/sched.cr @@ -7,6 +7,7 @@ require "yaml" require "./mq" require "./job" require "./web_env" +require "./etcd_client" require "./block_helper" require "./taskqueue_api" require "./remote_git_client" @@ -23,6 +24,7 @@ require "../scheduler/request_cluster_state" require "../scheduler/update_job_parameter" require "../scheduler/create_job_cpio" require "../scheduler/download_file" +require "../scheduler/opt_job_in_etcd"
class Sched property es @@ -37,6 +39,7 @@ class Sched @redis = Redis::Client.instance @mq = MQClient.instance @task_queue = TaskQueueAPI.new + @etcd = EtcdClient.new @rgc = RemoteGitClient.new @env = env @log = env.log.as(JSONLogger) diff --git a/src/scheduler/opt_job_in_etcd.cr b/src/scheduler/opt_job_in_etcd.cr new file mode 100644 index 0000000..f38433a --- /dev/null +++ b/src/scheduler/opt_job_in_etcd.cr @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. + +class Sched + def update_id2job(job_content) + id = job_content["id"].to_s + job = get_id2job(id) + job.update(job_content) + @etcd.update("sched/id2job/#{id}", job.dump_to_json) + end + + def set_id2job(job : Job) + @etcd.put("sched/id2job/#{job.id}", job.dump_to_json) + end + + def get_id2job(id) + response = @etcd.range("sched/id2job/#{id}") + raise "get job from etcd failed. id: #{id}" unless response.count == 1 + Job.new(JSON.parse(response.kvs[0].value.not_nil!), id) + end + + def delete_id2job(id) + @etcd.delete("sched/id2job/#{id}") + end + + def update_tbox_wtmp(testbox, wtmp_hash) + @etcd.update("sched/tbox_wtmp/#{testbox}", wtmp_hash) + end + + def move_process2stats(job : Job) + f_queue = "sched/#{job.queue}/in_process/#{job.subqueue}/#{job.id}" + t_queue = "extract_stats/#{job.id}" + value = { "id" => "#{job.id}" } + @etcd.move(f_queue, t_queue, value) + end +end