use refresh=wait_for to make sure the job can be searched before consume the job.
Signed-off-by: Cao Xueliang <caoxl78320(a)163.com>
---
src/scheduler/elasticsearch_client.cr | 2 ++
src/scheduler/find_job_boot.cr | 1 +
2 files changed, 3 insertions(+)
diff --git a/src/scheduler/elasticsearch_client.cr b/src/scheduler/elasticsearch_client.cr
index f32256b..58bed08 100644
--- a/src/scheduler/elasticsearch_client.cr
+++ b/src/scheduler/elasticsearch_client.cr
@@ -142,6 +142,7 @@ class Elasticsearch::Client
return @client.create(
{
:index => "jobs", :type => "_doc",
+ :refresh => "wait_for",
:id => job_id,
:body => job_content,
}
@@ -152,6 +153,7 @@ class Elasticsearch::Client
return @client.update(
{
:index => "jobs", :type => "_doc",
+ :refresh => "wait_for",
:id => job_id,
:body => {:doc => job_content},
}
diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr
index 4f49394..41fa89f 100644
--- a/src/scheduler/find_job_boot.cr
+++ b/src/scheduler/find_job_boot.cr
@@ -76,6 +76,7 @@ class Sched
if job_id
begin
job = @es.get_job(job_id.to_s)
+ @log.warn("job_is_nil, job id=#{job_id.to_s}") unless job
rescue ex
@log.warn("Invalid job (id=#{job_id}) in es. Info: #{ex}")
@log.warn(ex.inspect_with_backtrace)
--
2.23.0