On Mon, Feb 14, 2022 at 07:07:14PM +0800, Wei Jihui wrote:
From: Wei Jihui weijihuiall@163.com
pls ignore this patch.
Thanks, Jihui
- /data_api/es/:index/_search is es search by dsl
- /data_api/es/:index/_search_by_sql is es search by sql
Signed-off-by: Wei Jihui weijihuiall@163.com
container/data-api/data-api | 36 +++++++----- lib/es_client.rb | 13 ++-- src/lib/data_api.rb | 93 +++++------------------------ src/lib/data_api/es_data_api.rb | 101 ++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 100 deletions(-) create mode 100644 src/lib/data_api/es_data_api.rb
diff --git a/container/data-api/data-api b/container/data-api/data-api index 2bd6e527..834ec702 100755 --- a/container/data-api/data-api +++ b/container/data-api/data-api @@ -7,8 +7,6 @@ require 'sinatra'
CCI_SRC = ENV['CCI_SRC'] || '/c/compass-ci' require "#{CCI_SRC}/src/lib/data_api.rb" -require "#{CCI_SRC}/src/lib/service_logs/service_logs.rb" -require "#{CCI_SRC}/lib/es_query.rb"
set :bind, '0.0.0.0' set :port, 10005 @@ -17,24 +15,32 @@ set :environment, :production
# POST # eg1: -# curl -X POST localhost:10005/es_find -H 'Content-Type: application/json' -d '{ +# curl -X POST localhost:10005/data_api/es/jobs/_search -H 'Content-Type: application/json' -d '{ # "query": {"size":10, "sort": [{"start_time": {"order": "desc"}}]}, -# "index": "jobs", -# "my_token": "16132550-...", -# "my_account": "auto-submit", -# "query_type": "eql" # can be omitted in this scenario +# "cci_credentials": { +# "my_token": "16132550-...", +# "my_account": "auto-submit" +# } # }' -# eg2: -# curl -X POST localhost:10005/es_find -H 'Content-Type: application/json' -d '{ +# Response: +# - es_result : JSON +post '/data_api/es/:index/_search' do
- request.body.rewind # in case someone already read it
- es_search(params['index'], request.body.read)
+end
+# POST +# eg1: +# curl -X POST localhost:10005/data_api/es/jobs/_search_by_sql -H 'Content-Type: application/json' -d '{ # "query": "SELECT * FROM jobs WHERE ...", -# "index": "jobs", -# "my_token": "16132550-...", -# "my_account": "auto-submit", -# "query_type": "sql" # required +# "cci_credentials": { +# "my_token": "16132550-...", +# "my_account": "auto-submit" +# } # }' # Response: # - es_result : JSON -post '/data_api/es_find' do +post '/data_api/es/:index/_search_by_sql' do request.body.rewind # in case someone already read it
- es_find(request.body.read)
- es_search_by_sql(params['index'], request.body.read)
end diff --git a/lib/es_client.rb b/lib/es_client.rb index 4ae1fc00..63e42d76 100644 --- a/lib/es_client.rb +++ b/lib/es_client.rb @@ -6,9 +6,9 @@ require_relative 'es_query.rb' # ------------------------------------------------------------------------------------------- # put_source_by_id(source_id, source_content) # - put a source to ES /<target>/_doc/<_id>, ingore the existence of source -# -# query_by_sql(query_sql) -# - query es db by query_sql, query_sql is sql, like "SELECT * FROM JOBS WHERE ..." +# +# search_by_sql(search_sql) +# - search es db by search_sql, search_sql is sql, like "SELECT * FROM JOBS WHERE ..." # # ------------------------------------------------------------------------------------------- class ESClient < ESQuery @@ -29,12 +29,11 @@ class ESClient < ESQuery ) end
- # query es db by query_sql, query_sql is sql, like:
- # search es db by search_sql, search_sql is sql, like: # - "SELECT id, suite FROM JOBS" # - "SELECT * FROM accounts WHERE my_account='test_user'" # this plugin from: https://github.com/NLPchina/elasticsearch-sql
- def query_by_sql(query_sql)
- @client.perform_request('GET', '_nlpcn/sql', {}, query_sql)
- def search_by_sql(search_sql)
- @client.perform_request('GET', '_nlpcn/sql', {}, search_sql) end
end diff --git a/src/lib/data_api.rb b/src/lib/data_api.rb index eeaf0f98..8aa4304b 100644 --- a/src/lib/data_api.rb +++ b/src/lib/data_api.rb @@ -3,90 +3,27 @@ # frozen_string_literal: true
CCI_SRC ||= ENV['CCI_SRC'] || '/c/compass-ci' -require 'json' -require 'elasticsearch' -require 'set' -require_relative '../../lib/constants.rb' -require_relative '../../lib/es_query.rb' -require_relative '../../lib/es_client.rb' -require_relative '../../lib/json_logger.rb' +require "#{CCI_SRC}/lib/json_logger.rb" +require "#{CCI_SRC}/src/lib/data_api/es_data_api.rb"
-ES_ACCOUNTS = ESQuery.new(index: 'accounts') -REQUIRED_TOKEN_INDEX = Set.new(['jobs']) -UNOPEN_INDEX = Set.new(['accounts']) -ES_QUERY_KEYWORD = Set.new(['term', 'match'])
-def es_find(params) +def es_search(index, params) begin
- result = query_es(params)
- result = EsDataApi.search(index, params) rescue StandardError => e
- log_error({
'message' => e.message,
'error_message' => "query es error"
- })
- return [500, headers.merge('Access-Control-Allow-Origin' => '*'), e.message]
- error_msg = { 'error_msg' => e.message }
- log_error(error_msg)
- return [200, headers.merge('Access-Control-Allow-Origin' => '*'), error_msg.to_json] end [200, headers.merge('Access-Control-Allow-Origin' => '*'), result.to_json]
end
-def query_es(params)
- request_body = JSON.parse(params)
- index = request_body['index'] || 'jobs'
- query = request_body['query'] || {'query' => {}}
- return "#{index} is not opened" if UNOPEN_INDEX.any? { |unopen_index| index.include?(unopen_index) }
- if REQUIRED_TOKEN_INDEX.any? { |req_index| index.include?(req_index) }
- my_account = request_body['my_account']
- my_token = request_body['my_token']
- return "missed my_account" unless my_account
- return "missed my_token" unless my_token
- return "user authentication failed" unless verify_user(my_account, my_token)
- query = build_account_query(query, my_account, request_body['query_type'], index)
- end
- if request_body['query_type'] == 'sql'
- es = ESClient.new(index: index)
- return es.query_by_sql(query).body
- else
- es = Elasticsearch::Client.new(hosts: ES_HOSTS)
- return es.search index: index + '*', body: query
- end
-end
-def verify_user(my_account, my_token)
- query = {}
- query['my_account'] = my_account if my_account
- query['my_token'] = my_token if my_token
- result = ES_ACCOUNTS.multi_field_query(query)['hits']['hits']
- return nil if result.size == 0
- result[0]['_source']['my_token'] == my_token
-end
-def build_account_query(query, my_account, query_type, index)
- if query_type == 'sql'
- user_limit = "my_account='#{my_account}'"
- if query =~ /where/i
query = query.gsub(/FROM\s*\S+/i, "FROM #{index} ")
query = query.gsub(/where/i, "WHERE #{user_limit} AND")
- else
query = query.gsub(/from\s*\S+/i, "FROM #{index} WHERE #{user_limit}")
- end
- else
- query['query'] ||= {}
- query['query']['bool'] ||= {}
- query['query']['bool']['must'] ||= []
- query['query']['bool']['must'] << {'term' => {'my_account' => my_account}}
- query['query'].each do |k, v|
if ES_QUERY_KEYWORD.include?(k)
query['query']['bool']['must'] << {k => v}
query['query'].delete(k)
end
- end
+def es_search_by_sql(index, params)
- begin
- result = EsDataApi.search_by_sql(index, params)
- rescue StandardError => e
- error_msg = { 'error_msg' => e.message }
- log_error(error_msg)
- return [200, headers.merge('Access-Control-Allow-Origin' => '*'), error_msg.to_json] end
- query
- [200, headers.merge('Access-Control-Allow-Origin' => '*'), result.to_json]
end diff --git a/src/lib/data_api/es_data_api.rb b/src/lib/data_api/es_data_api.rb new file mode 100644 index 00000000..ed89b596 --- /dev/null +++ b/src/lib/data_api/es_data_api.rb @@ -0,0 +1,101 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# frozen_string_literal: true
+CCI_SRC ||= ENV['CCI_SRC'] || '/c/compass-ci' +require 'json' +require 'set' +require "#{CCI_SRC}/lib/constants.rb" +require "#{CCI_SRC}/lib/es_client.rb"
+# this module is for es api +# - search +# - search_by_sql +module EsDataApi
- ES_ACCOUNTS = ESClient.new(index: 'accounts')
- OPEN_INDEX = Set.new(['jobs'])
- REQUIRED_TOKEN_INDEX = Set.new(['jobs'])
- ES_QUERY_KEYWORD = Set.new(%w[term match])
- def self.credentials_for_dsl(query, my_account)
- query['query'] ||= {}
- query['query']['bool'] ||= {}
- query['query']['bool']['must'] ||= []
- query['query']['bool']['must'] << { 'term' => { 'my_account' => my_account } }
- query = handle_dsl_query(query)
- return query
- end
- def self.handle_dsl_query(query)
- query['query'].each do |k, v|
if ES_QUERY_KEYWORD.include?(k)
query['query']['bool']['must'] << { k => v }
query['query'].delete(k)
end
- end
- return query
- end
- def self.check_my_account(request_body)
- cci_credentials = request_body['cci_credentials'] || {}
- my_account = cci_credentials['my_account']
- my_token = cci_credentials['my_token']
- raise 'user authentication failed, please check my_account and my_token.' unless verify_user(my_account, my_token)
- return my_account
- end
- def self.search(index, params)
- request_body = JSON.parse(params)
- query = request_body['query'] || { 'query' => {} }
- raise "#{index} is not opened for user query" unless OPEN_INDEX.include?(index)
- if REQUIRED_TOKEN_INDEX.include?(index)
my_account = check_my_account(request_body)
query = credentials_for_dsl(query, my_account)
- end
- es = Elasticsearch::Client.new(hosts: ES_HOSTS)
- return es.search index: index + '*', body: query
- end
- def self.credentials_for_sql(query, my_account, index)
- user_limit = "my_account='#{my_account}'"
- if query =~ /where/i
query = query.gsub(/FROM\s*\S+/i, "FROM #{index} ")
query = query.gsub(/where/i, "WHERE #{user_limit} AND")
- else
query = query.gsub(/from\s*\S+/i, "FROM #{index} WHERE #{user_limit}")
- end
- return query
- end
- def self.search_by_sql(index, params)
- request_body = JSON.parse(params)
- query = request_body['query']
- raise "#{index} is not opened for user query" unless OPEN_INDEX.include?(index)
- if REQUIRED_TOKEN_INDEX.include?(index)
my_account = check_my_account(request_body)
query = credentials_for_sql(query, my_account, index)
- end
- es = ESClient.new(index: index)
- p query
- p 1
- return es.search_by_sql(query).body
- end
- def self.verify_user(my_account, my_token)
- raise 'missed my_account' unless my_account
- raise 'missed my_token' unless my_token
- query = {}
- query['my_account'] = my_account
- query['my_token'] = my_token
- result = ES_ACCOUNTS.multi_field_query(query)['hits']['hits']
- return nil if result.empty?
- result[0]['_source']['my_token'] == my_token
- end
+end
2.23.0