Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
jupyter-naas
GitHub Repository: jupyter-naas/awesome-notebooks
Path: blob/master/Elasticsearch/elasticsearchconnector.py
2973 views
1
# Import elasticsearch module
2
from elasticsearch import Elasticsearch,ImproperlyConfigured,TransportError
3
import json
4
5
class ElasticsearchConnector:
6
7
def __init__(self,credobject=None):
8
"""
9
Description:
10
Accepts elasticsearch connection parameters and connects to elasticsearch cloud
11
"""
12
13
#Parameter check
14
try:
15
assert credobject is not None,"Found credentials object empty"
16
except AssertionError:
17
print("Empty Credentials")
18
19
try:
20
with open(credobject, "r") as f:
21
credentials = json.load(f)
22
except OSError:
23
print("Unable to open file. Invalid path.")
24
return
25
except TypeError:
26
credentials = credobject
27
28
#Initializing parameters
29
self.user = credentials.get('user',None)
30
self.password = credentials.get('password',None)
31
self.endpoint = credentials.get('endpoint',None)
32
self.port = credentials.get('port',None)
33
self.protocol = credentials.get('protocol',None)
34
35
self.connection = self.get_connection()
36
37
def get_connection(self):
38
print("Establishing connection to Elasticsearch")
39
try:
40
es = Elasticsearch([self.endpoint],http_auth=(self.user,self.password),scheme=self.protocol,port=self.port)
41
print("Connection established")
42
return es
43
except ImproperlyConfigured as e:
44
print("Unable to connect to Elasticsearch server : Invalid credentials")
45
46
def save_data(self,parameters,data):
47
print("Saving data to Elasticsearch")
48
try:
49
resultset = self.connection.index(index=parameters.get('index',None),doc_type=parameters.get('type',None),body=data)
50
return resultset
51
except TransportError as e:
52
print("Unable to save data to elasticsearch. Please check your connection credentials")
53
54
def search_data(self,parameters,query,search_type='search'):
55
# import pdb;pdb.set_trace()
56
print("Fetching data from Elasticsearch server")
57
if(search_type == 'search'):
58
try:
59
resultset = self.connection.search(index=parameters.get('index',None), body=query[0])
60
return resultset
61
except TransportError as e:
62
print("Unable to search data. Please check your query and try again")
63
except AttributeError as e:
64
print("Please connect to Elasticsearch server and try again")
65
elif(search_type == 'msearch'):
66
response = []
67
try:
68
for each in query:
69
req_head = {'index': parameters.get('index',None), 'type': parameters.get('type',None)}
70
req_body = each
71
response.append(self.connection.msearch(body = [req_head,req_body]))
72
return response
73
except TransportError as e:
74
print("Unable to search data. Please check your query and try again")
75
except AttributeError as e:
76
print("Please connect to Elasticsearch server and try again")
77
else:
78
print("Invalid Search type : Use 'search' or 'msearch' as valid search types")
79