
On Mon, Feb 14, 2022 at 07:07:14PM +0800, Wei Jihui wrote:
From: Wei Jihui <weijihuiall@163.com>
pls ignore this patch. Thanks, Jihui
1. /data_api/es/:index/_search is es search by dsl 2. /data_api/es/:index/_search_by_sql is es search by sql
Signed-off-by: Wei Jihui <weijihuiall@163.com> --- container/data-api/data-api | 36 +++++++----- lib/es_client.rb | 13 ++-- src/lib/data_api.rb | 93 +++++------------------------ src/lib/data_api/es_data_api.rb | 101 ++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 100 deletions(-) create mode 100644 src/lib/data_api/es_data_api.rb
diff --git a/container/data-api/data-api b/container/data-api/data-api index 2bd6e527..834ec702 100755 --- a/container/data-api/data-api +++ b/container/data-api/data-api @@ -7,8 +7,6 @@ require 'sinatra'
CCI_SRC = ENV['CCI_SRC'] || '/c/compass-ci' require "#{CCI_SRC}/src/lib/data_api.rb" -require "#{CCI_SRC}/src/lib/service_logs/service_logs.rb" -require "#{CCI_SRC}/lib/es_query.rb"
set :bind, '0.0.0.0' set :port, 10005 @@ -17,24 +15,32 @@ set :environment, :production
# POST # eg1: -# curl -X POST localhost:10005/es_find -H 'Content-Type: application/json' -d '{ +# curl -X POST localhost:10005/data_api/es/jobs/_search -H 'Content-Type: application/json' -d '{ # "query": {"size":10, "sort": [{"start_time": {"order": "desc"}}]}, -# "index": "jobs", -# "my_token": "16132550-...", -# "my_account": "auto-submit", -# "query_type": "eql" # can be omitted in this scenario +# "cci_credentials": { +# "my_token": "16132550-...", +# "my_account": "auto-submit" +# } # }' -# eg2: -# curl -X POST localhost:10005/es_find -H 'Content-Type: application/json' -d '{ +# Response: +# - es_result : JSON +post '/data_api/es/:index/_search' do + request.body.rewind # in case someone already read it + es_search(params['index'], request.body.read) +end + +# POST +# eg1: +# curl -X POST localhost:10005/data_api/es/jobs/_search_by_sql -H 'Content-Type: application/json' -d '{ # "query": "SELECT * FROM jobs WHERE ...", -# "index": "jobs", -# "my_token": "16132550-...", -# "my_account": "auto-submit", -# "query_type": "sql" # required +# "cci_credentials": { +# "my_token": "16132550-...", +# "my_account": "auto-submit" +# } # }' # Response: # - es_result : JSON -post '/data_api/es_find' do +post '/data_api/es/:index/_search_by_sql' do request.body.rewind # in case someone already read it - es_find(request.body.read) + es_search_by_sql(params['index'], request.body.read) end diff --git a/lib/es_client.rb b/lib/es_client.rb index 4ae1fc00..63e42d76 100644 --- a/lib/es_client.rb +++ b/lib/es_client.rb @@ -6,9 +6,9 @@ require_relative 'es_query.rb' # ------------------------------------------------------------------------------------------- # put_source_by_id(source_id, source_content) # - put a source to ES /<target>/_doc/<_id>, ingore the existence of source -# -# query_by_sql(query_sql) -# - query es db by query_sql, query_sql is sql, like "SELECT * FROM JOBS WHERE ..." +# +# search_by_sql(search_sql) +# - search es db by search_sql, search_sql is sql, like "SELECT * FROM JOBS WHERE ..." # # ------------------------------------------------------------------------------------------- class ESClient < ESQuery @@ -29,12 +29,11 @@ class ESClient < ESQuery ) end
- # query es db by query_sql, query_sql is sql, like: + # search es db by search_sql, search_sql is sql, like: # - "SELECT id, suite FROM JOBS" # - "SELECT * FROM accounts WHERE my_account='test_user'" # this plugin from: https://github.com/NLPchina/elasticsearch-sql - def query_by_sql(query_sql) - @client.perform_request('GET', '_nlpcn/sql', {}, query_sql) + def search_by_sql(search_sql) + @client.perform_request('GET', '_nlpcn/sql', {}, search_sql) end - end diff --git a/src/lib/data_api.rb b/src/lib/data_api.rb index eeaf0f98..8aa4304b 100644 --- a/src/lib/data_api.rb +++ b/src/lib/data_api.rb @@ -3,90 +3,27 @@ # frozen_string_literal: true
CCI_SRC ||= ENV['CCI_SRC'] || '/c/compass-ci' -require 'json' -require 'elasticsearch' -require 'set' -require_relative '../../lib/constants.rb' -require_relative '../../lib/es_query.rb' -require_relative '../../lib/es_client.rb' -require_relative '../../lib/json_logger.rb' +require "#{CCI_SRC}/lib/json_logger.rb" +require "#{CCI_SRC}/src/lib/data_api/es_data_api.rb"
-ES_ACCOUNTS = ESQuery.new(index: 'accounts') -REQUIRED_TOKEN_INDEX = Set.new(['jobs']) -UNOPEN_INDEX = Set.new(['accounts']) -ES_QUERY_KEYWORD = Set.new(['term', 'match']) - -def es_find(params) +def es_search(index, params) begin - result = query_es(params) + result = EsDataApi.search(index, params) rescue StandardError => e - log_error({ - 'message' => e.message, - 'error_message' => "query es error" - }) - return [500, headers.merge('Access-Control-Allow-Origin' => '*'), e.message] + error_msg = { 'error_msg' => e.message } + log_error(error_msg) + return [200, headers.merge('Access-Control-Allow-Origin' => '*'), error_msg.to_json] end [200, headers.merge('Access-Control-Allow-Origin' => '*'), result.to_json] end
-def query_es(params) - request_body = JSON.parse(params) - index = request_body['index'] || 'jobs' - query = request_body['query'] || {'query' => {}} - return "#{index} is not opened" if UNOPEN_INDEX.any? { |unopen_index| index.include?(unopen_index) } - - if REQUIRED_TOKEN_INDEX.any? { |req_index| index.include?(req_index) } - my_account = request_body['my_account'] - my_token = request_body['my_token'] - return "missed my_account" unless my_account - return "missed my_token" unless my_token - return "user authentication failed" unless verify_user(my_account, my_token) - - query = build_account_query(query, my_account, request_body['query_type'], index) - end - - if request_body['query_type'] == 'sql' - es = ESClient.new(index: index) - return es.query_by_sql(query).body - else - es = Elasticsearch::Client.new(hosts: ES_HOSTS) - return es.search index: index + '*', body: query - end -end - -def verify_user(my_account, my_token) - query = {} - query['my_account'] = my_account if my_account - query['my_token'] = my_token if my_token - - result = ES_ACCOUNTS.multi_field_query(query)['hits']['hits'] - return nil if result.size == 0 - - result[0]['_source']['my_token'] == my_token -end - -def build_account_query(query, my_account, query_type, index) - if query_type == 'sql' - user_limit = "my_account='#{my_account}'" - if query =~ /where/i - query = query.gsub(/FROM\s*\S+/i, "FROM #{index} ") - query = query.gsub(/where/i, "WHERE #{user_limit} AND") - else - query = query.gsub(/from\s*\S+/i, "FROM #{index} WHERE #{user_limit}") - end - else - query['query'] ||= {} - query['query']['bool'] ||= {} - query['query']['bool']['must'] ||= [] - query['query']['bool']['must'] << {'term' => {'my_account' => my_account}} - - query['query'].each do |k, v| - if ES_QUERY_KEYWORD.include?(k) - query['query']['bool']['must'] << {k => v} - query['query'].delete(k) - end - end +def es_search_by_sql(index, params) + begin + result = EsDataApi.search_by_sql(index, params) + rescue StandardError => e + error_msg = { 'error_msg' => e.message } + log_error(error_msg) + return [200, headers.merge('Access-Control-Allow-Origin' => '*'), error_msg.to_json] end - - query + [200, headers.merge('Access-Control-Allow-Origin' => '*'), result.to_json] end diff --git a/src/lib/data_api/es_data_api.rb b/src/lib/data_api/es_data_api.rb new file mode 100644 index 00000000..ed89b596 --- /dev/null +++ b/src/lib/data_api/es_data_api.rb @@ -0,0 +1,101 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# frozen_string_literal: true + +CCI_SRC ||= ENV['CCI_SRC'] || '/c/compass-ci' +require 'json' +require 'set' +require "#{CCI_SRC}/lib/constants.rb" +require "#{CCI_SRC}/lib/es_client.rb" + +# this module is for es api +# - search +# - search_by_sql +module EsDataApi + ES_ACCOUNTS = ESClient.new(index: 'accounts') + OPEN_INDEX = Set.new(['jobs']) + REQUIRED_TOKEN_INDEX = Set.new(['jobs']) + ES_QUERY_KEYWORD = Set.new(%w[term match]) + + def self.credentials_for_dsl(query, my_account) + query['query'] ||= {} + query['query']['bool'] ||= {} + query['query']['bool']['must'] ||= [] + query['query']['bool']['must'] << { 'term' => { 'my_account' => my_account } } + query = handle_dsl_query(query) + return query + end + + def self.handle_dsl_query(query) + query['query'].each do |k, v| + if ES_QUERY_KEYWORD.include?(k) + query['query']['bool']['must'] << { k => v } + query['query'].delete(k) + end + end + return query + end + + def self.check_my_account(request_body) + cci_credentials = request_body['cci_credentials'] || {} + my_account = cci_credentials['my_account'] + my_token = cci_credentials['my_token'] + raise 'user authentication failed, please check my_account and my_token.' unless verify_user(my_account, my_token) + + return my_account + end + + def self.search(index, params) + request_body = JSON.parse(params) + query = request_body['query'] || { 'query' => {} } + raise "#{index} is not opened for user query" unless OPEN_INDEX.include?(index) + + if REQUIRED_TOKEN_INDEX.include?(index) + my_account = check_my_account(request_body) + query = credentials_for_dsl(query, my_account) + end + es = Elasticsearch::Client.new(hosts: ES_HOSTS) + return es.search index: index + '*', body: query + end + + def self.credentials_for_sql(query, my_account, index) + user_limit = "my_account='#{my_account}'" + if query =~ /where/i + query = query.gsub(/FROM\s*\S+/i, "FROM #{index} ") + query = query.gsub(/where/i, "WHERE #{user_limit} AND") + else + query = query.gsub(/from\s*\S+/i, "FROM #{index} WHERE #{user_limit}") + end + return query + end + + def self.search_by_sql(index, params) + request_body = JSON.parse(params) + query = request_body['query'] + raise "#{index} is not opened for user query" unless OPEN_INDEX.include?(index) + + if REQUIRED_TOKEN_INDEX.include?(index) + my_account = check_my_account(request_body) + query = credentials_for_sql(query, my_account, index) + end + + es = ESClient.new(index: index) + p query + p 1 + return es.search_by_sql(query).body + end + + def self.verify_user(my_account, my_token) + raise 'missed my_account' unless my_account + raise 'missed my_token' unless my_token + + query = {} + query['my_account'] = my_account + query['my_token'] = my_token + + result = ES_ACCOUNTS.multi_field_query(query)['hits']['hits'] + return nil if result.empty? + + result[0]['_source']['my_token'] == my_token + end +end -- 2.23.0