使用Python對ElasticSearch獲取數據及操作,供大家參考,具體內容如下
成都創新互聯公司10多年企業網站建設服務;為您提供網站建設,網站制作,網頁設計及高端網站定制服務,企業網站建設及推廣,對成都自拌料攪拌車等多個方面擁有多年的網站營銷經驗的網站建設公司。Version
Python :2.7
ElasticSearch:6.3
代碼:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2018/7/4 @Author : LiuXueWen @Site : @File : ElasticSearchOperation.py @Software: PyCharm @Description: 對elasticsearch數據的操作,包括獲取數據,發送數據 """ import elasticsearch import json import Util_Ini_Operation class elasticsearch_data(): def __init__(self,hosts,username,password,maxsize,is_ssl): # 初始化ini操作腳本,獲取配置文件 try: # 判斷請求方式是否ssl加密 if is_ssl == "true": # 獲取證書地址 cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs") es_ssl = elasticsearch.Elasticsearch( # 地址 hosts=hosts, # 用戶名密碼 http_auth=(username,password), # 開啟ssl use_ssl=True, # 確認有加密證書 verify_certs=True, # 對應的加密證書地址 client_cert=cert_pem ) self.es = es_ssl elif is_ssl == "false": # 創建普通類型的ES客戶端 es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize)) self.es = es_ordinary except Exception as e: print(e) def query_data(self,keywords_list,date): gte = "now-"+str(date) query_data = { # 查詢語句 "query": { "bool": { "must": [ { "query_string": { "query": keywords_list, "analyze_wildcard": True } }, { "range": { "@timestamp": { "gte": gte, "lte": "now", "format": "epoch_millis" } } } ], "must_not": [] } } } return query_data # 從es獲取數據 def get_datas_by_query(self,index_name,keywords,param,date): ''' :param index_name: 索引名稱 :param keywords: 關鍵字詞,數組 :param param: 需要數據條件,例如_source :param date: 過去時間范圍,字符串格式,例如過去30分鐘內數據,"30m" :return: all_datas 返回查詢到的所有數據(已經過param過濾) ''' all_datas = [] # 遍歷所有的查詢條件 for keywords_list in keywords: # DSL語句 query_data = self.query_data(keywords_list,date) res = self.es.search( index=index_name, body=query_data ) for hit in res['hits']['hits']: # 獲取指定的內容 response = hit[param] # 添加所有數據到數據集中 all_datas.append(response) # 返回所有數據內容 return all_datas # 當索引不存在創建索引 def create_index(self,index_name): ''' :param index_name: 索引名稱 :return:如果創建成功返回創建結果信息,試過已經存在創建新的index失敗返回index的名稱 ''' # 獲取索引的映射 # index_mapping = IndexMapping.index_mapping # # 判斷索引是否存在 # if self.es.indices.exists(index=index_name) is not True: # # 創建索引 # res = self.es.indices.create(index=index_name,body=index_mapping) # # 返回結果 # return res # else: # # 返回索引名稱 # return index_name pass # 插入指定的單條數據內容 def insert_single_data(self,index_name,doc_type,data): ''' :param index_name: 索引名稱 :param doc_type: 文檔類型 :param data: 需要插入的數據內容 :return: 執行結果 ''' res = self.es.index(index=index_name,doc_type=doc_type,body=data) return res # 向ES中新增數據,批量插入 def insert_datas(self,index_name): ''' :desc 通過讀取指定的文件內容獲取需要插入的數據集 :param index_name: 索引名稱 :return: 插入成功的數據條數 ''' insert_datas = [] # 判斷插入數據的索引是否存在 self.createIndex(index_name=index_name) # 獲取插入數據的文件地址 data_file_path = self.ini.get_key_value("datafile","datafilepath") # 獲取需要插入的數據集 with open(data_file_path,"r+") as data_file: # 獲取文件所有數據 data_lines = data_file.readlines() for data_line in data_lines: # string to json data_line = json.loads(data_line) insert_datas.append(data_line) # 批量處理 res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True) return res # 從ES中在指定的索引中刪除指定數據(根據id判斷) def delete_data_by_id(self,index_name,doc_type,id): ''' :param index_name: 索引名稱 :param index_type: 文檔類型 :param id: 唯一標識id :return: 刪除結果信息 ''' res = self.es.delete(index=index_name,doc_type=doc_type,id=id) return res # 根據條件刪除數據 def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time): ''' :param index_name:索引名稱,為空查詢所有索引 :param doc_type:文檔類型,為空查詢所有文檔類型 :param param:過濾條件值 :param gt_time:時間范圍,大于該時間 :param lt_time:時間范圍,小于該時間 :return:執行條件刪除后的結果信息 ''' # DSL語句 query_data = { # 查詢語句 "query": { "bool": { "must": [ { "query_string": { "query": param, "analyze_wildcard": True } }, { "range": { "@timestamp": { "gte": gt_time, "lte": lt_time, "format": "epoch_millis" } } } ], "must_not": [] } } } res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True) return res # 指定index中刪除指定時間段內的全部數據 def delete_all_datas(self,index_name,doc_type,gt_time,lt_time): ''' :param index_name:索引名稱,為空查詢所有索引 :param doc_type:文檔類型,為空查詢所有文檔類型 :param gt_time:時間范圍,大于該時間 :param lt_time:時間范圍,小于該時間 :return:執行條件刪除后的結果信息 ''' # DSL語句 query_data = { # 查詢語句 "query": { "bool": { "must": [ { "match_all": {} }, { "range": { "@timestamp": { "gte": gt_time, "lte": lt_time, "format": "epoch_millis" } } } ], "must_not": [] } } } res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True) return res # 修改ES中指定的數據 def update_data_by_id(self,index_name,doc_type,id,data): ''' :param index_name: 索引名稱 :param doc_type: 文檔類型,為空表示所有類型 :param id: 文檔唯一標識編號 :param data: 更新的數據 :return: 更新結果信息 ''' res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data) return res
另外有需要云服務器可以了解下創新互聯scvps.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業上云的綜合解決方案,具有“安全穩定、簡單易用、服務可用性高、性價比高”等特點與優勢,專為企業上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。
標題名稱:Python對ElasticSearch獲取數據及操作-創新互聯
分享路徑:http://m.newbst.com/article10/dgihdo.html
成都網站建設公司_創新互聯,為您提供營銷型網站建設、軟件開發、網站設計、定制網站、響應式網站、用戶體驗
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯