use refresh=wait_for to make sure the job can be searched before consume the job.
Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/scheduler/elasticsearch_client.cr | 2 ++ src/scheduler/find_job_boot.cr | 1 + 2 files changed, 3 insertions(+)
diff --git a/src/scheduler/elasticsearch_client.cr b/src/scheduler/elasticsearch_client.cr index f32256b..58bed08 100644 --- a/src/scheduler/elasticsearch_client.cr +++ b/src/scheduler/elasticsearch_client.cr @@ -142,6 +142,7 @@ class Elasticsearch::Client return @client.create( { :index => "jobs", :type => "_doc", + :refresh => "wait_for", :id => job_id, :body => job_content, } @@ -152,6 +153,7 @@ class Elasticsearch::Client return @client.update( { :index => "jobs", :type => "_doc", + :refresh => "wait_for", :id => job_id, :body => {:doc => job_content}, } diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr index 4f49394..41fa89f 100644 --- a/src/scheduler/find_job_boot.cr +++ b/src/scheduler/find_job_boot.cr @@ -76,6 +76,7 @@ class Sched if job_id begin job = @es.get_job(job_id.to_s) + @log.warn("job_is_nil, job id=#{job_id.to_s}") unless job rescue ex @log.warn("Invalid job (id=#{job_id}) in es. Info: #{ex}") @log.warn(ex.inspect_with_backtrace)