Path: blob/master/Elasticsearch/elasticsearchconnector.py
2973 views
# Import elasticsearch module1from elasticsearch import Elasticsearch,ImproperlyConfigured,TransportError2import json34class ElasticsearchConnector:56def __init__(self,credobject=None):7"""8Description:9Accepts elasticsearch connection parameters and connects to elasticsearch cloud10"""1112#Parameter check13try:14assert credobject is not None,"Found credentials object empty"15except AssertionError:16print("Empty Credentials")1718try:19with open(credobject, "r") as f:20credentials = json.load(f)21except OSError:22print("Unable to open file. Invalid path.")23return24except TypeError:25credentials = credobject2627#Initializing parameters28self.user = credentials.get('user',None)29self.password = credentials.get('password',None)30self.endpoint = credentials.get('endpoint',None)31self.port = credentials.get('port',None)32self.protocol = credentials.get('protocol',None)3334self.connection = self.get_connection()3536def get_connection(self):37print("Establishing connection to Elasticsearch")38try:39es = Elasticsearch([self.endpoint],http_auth=(self.user,self.password),scheme=self.protocol,port=self.port)40print("Connection established")41return es42except ImproperlyConfigured as e:43print("Unable to connect to Elasticsearch server : Invalid credentials")4445def save_data(self,parameters,data):46print("Saving data to Elasticsearch")47try:48resultset = self.connection.index(index=parameters.get('index',None),doc_type=parameters.get('type',None),body=data)49return resultset50except TransportError as e:51print("Unable to save data to elasticsearch. Please check your connection credentials")5253def search_data(self,parameters,query,search_type='search'):54# import pdb;pdb.set_trace()55print("Fetching data from Elasticsearch server")56if(search_type == 'search'):57try:58resultset = self.connection.search(index=parameters.get('index',None), body=query[0])59return resultset60except TransportError as e:61print("Unable to search data. Please check your query and try again")62except AttributeError as e:63print("Please connect to Elasticsearch server and try again")64elif(search_type == 'msearch'):65response = []66try:67for each in query:68req_head = {'index': parameters.get('index',None), 'type': parameters.get('type',None)}69req_body = each70response.append(self.connection.msearch(body = [req_head,req_body]))71return response72except TransportError as e:73print("Unable to search data. Please check your query and try again")74except AttributeError as e:75print("Please connect to Elasticsearch server and try again")76else:77print("Invalid Search type : Use 'search' or 'msearch' as valid search types")7879