Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/history/db.py
1567 views
1
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import uuid
14
import time
15
import json
16
import datetime
17
import threading
18
import logging
19
from awscli.compat import collections_abc
20
21
from botocore.history import BaseHistoryHandler
22
23
from awscli.compat import sqlite3
24
from awscli.compat import binary_type
25
26
27
LOG = logging.getLogger(__name__)
28
29
30
class DatabaseConnection(object):
31
_CREATE_TABLE = """
32
CREATE TABLE IF NOT EXISTS records (
33
id TEXT,
34
request_id TEXT,
35
source TEXT,
36
event_type TEXT,
37
timestamp INTEGER,
38
payload TEXT
39
)"""
40
_ENABLE_WAL = 'PRAGMA journal_mode=WAL'
41
42
def __init__(self, db_filename):
43
self._connection = sqlite3.connect(
44
db_filename, check_same_thread=False, isolation_level=None)
45
self._ensure_database_setup()
46
47
def close(self):
48
self._connection.close()
49
50
def execute(self, query, *parameters):
51
return self._connection.execute(query, *parameters)
52
53
def _ensure_database_setup(self):
54
self._create_record_table()
55
self._try_to_enable_wal()
56
57
def _create_record_table(self):
58
self.execute(self._CREATE_TABLE)
59
60
def _try_to_enable_wal(self):
61
try:
62
self.execute(self._ENABLE_WAL)
63
except sqlite3.Error:
64
# This is just a performance enhancement so it is optional. Not all
65
# systems will have a sqlite compiled with the WAL enabled.
66
LOG.debug('Failed to enable sqlite WAL.')
67
68
@property
69
def row_factory(self):
70
return self._connection.row_factory
71
72
@row_factory.setter
73
def row_factory(self, row_factory):
74
self._connection.row_factory = row_factory
75
76
77
class PayloadSerializer(json.JSONEncoder):
78
def _encode_mutable_mapping(self, obj):
79
return dict(obj)
80
81
def _encode_datetime(self, obj):
82
return obj.isoformat()
83
84
def _try_decode_bytes(self, obj):
85
try:
86
obj = obj.decode('utf-8')
87
except UnicodeDecodeError:
88
obj = '<Byte sequence>'
89
return obj
90
91
def _remove_non_unicode_stings(self, obj):
92
if isinstance(obj, str):
93
obj = self._try_decode_bytes(obj)
94
elif isinstance(obj, dict):
95
obj = dict((k, self._remove_non_unicode_stings(v)) for k, v
96
in obj.items())
97
elif isinstance(obj, (list, tuple)):
98
obj = [self._remove_non_unicode_stings(o) for o in obj]
99
return obj
100
101
def encode(self, obj):
102
try:
103
return super(PayloadSerializer, self).encode(obj)
104
except UnicodeDecodeError:
105
# This happens in PY2 in the case where a record payload has some
106
# binary data in it that is not utf-8 encodable. PY2 will not call
107
# the default method on the individual field with bytes in it since
108
# it thinks it can handle it with the normal string serialization
109
# method. Since it cannot tell the difference between a utf-8 str
110
# and a str with raw bytes in it we will get a UnicodeDecodeError
111
# here at the top level. There are no hooks into the serialization
112
# process in PY2 that allow us to fix this behavior, so instead
113
# when we encounter the unicode error we climb the structure
114
# ourselves and replace all strings that are not utf-8 decodable
115
# and try to encode again.
116
scrubbed_obj = self._remove_non_unicode_stings(obj)
117
return super(PayloadSerializer, self).encode(scrubbed_obj)
118
119
def default(self, obj):
120
if isinstance(obj, datetime.datetime):
121
return self._encode_datetime(obj)
122
elif isinstance(obj, collections_abc.MutableMapping):
123
return self._encode_mutable_mapping(obj)
124
elif isinstance(obj, binary_type):
125
# In PY3 the bytes type differs from the str type so the default
126
# method will be called when a bytes object is encountered.
127
# We call the same _try_decode_bytes method that either decodes it
128
# to a utf-8 string and continues serialization, or removes the
129
# value if it is not valid utf-8 string.
130
return self._try_decode_bytes(obj)
131
else:
132
return repr(obj)
133
134
135
class DatabaseRecordWriter(object):
136
_WRITE_RECORD = """
137
INSERT INTO records(
138
id, request_id, source, event_type, timestamp, payload)
139
VALUES (?,?,?,?,?,?) """
140
141
def __init__(self, connection):
142
self._connection = connection
143
self._lock = threading.Lock()
144
145
def close(self):
146
self._connection.close()
147
148
def write_record(self, record):
149
db_record = self._create_db_record(record)
150
with self._lock:
151
self._connection.execute(self._WRITE_RECORD, db_record)
152
153
def _create_db_record(self, record):
154
event_type = record['event_type']
155
json_serialized_payload = json.dumps(record['payload'],
156
cls=PayloadSerializer)
157
db_record = (
158
record['command_id'],
159
record.get('request_id'),
160
record['source'],
161
event_type,
162
record['timestamp'],
163
json_serialized_payload
164
)
165
return db_record
166
167
168
class DatabaseRecordReader(object):
169
_ORDERING = 'ORDER BY timestamp'
170
_GET_LAST_ID_RECORDS = """
171
SELECT * FROM records
172
WHERE id =
173
(SELECT id FROM records WHERE timestamp =
174
(SELECT max(timestamp) FROM records)) %s;""" % _ORDERING
175
_GET_RECORDS_BY_ID = 'SELECT * from records where id = ? %s' % _ORDERING
176
_GET_ALL_RECORDS = (
177
'SELECT a.id AS id_a, '
178
' b.id AS id_b, '
179
' a.timestamp as timestamp, '
180
' a.payload AS args, '
181
' b.payload AS rc '
182
'FROM records a, records b '
183
'where a.event_type == "CLI_ARGUMENTS" AND '
184
' b.event_type = "CLI_RC" AND '
185
' id_a == id_b '
186
'%s DESC' % _ORDERING
187
)
188
189
def __init__(self, connection):
190
self._connection = connection
191
self._connection.row_factory = self._row_factory
192
193
def close(self):
194
self._connection.close()
195
196
def _row_factory(self, cursor, row):
197
d = {}
198
for idx, col in enumerate(cursor.description):
199
val = row[idx]
200
if col[0] == 'payload':
201
val = json.loads(val)
202
d[col[0]] = val
203
return d
204
205
def iter_latest_records(self):
206
cursor = self._connection.execute(self._GET_LAST_ID_RECORDS)
207
for row in cursor:
208
yield row
209
210
def iter_records(self, record_id):
211
cursor = self._connection.execute(self._GET_RECORDS_BY_ID, [record_id])
212
for row in cursor:
213
yield row
214
215
def iter_all_records(self):
216
cursor = self._connection.execute(self._GET_ALL_RECORDS)
217
for row in cursor:
218
yield row
219
220
221
class RecordBuilder(object):
222
_REQUEST_LIFECYCLE_EVENTS = set(
223
['API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE'])
224
_START_OF_REQUEST_LIFECYCLE_EVENT = 'API_CALL'
225
226
def __init__(self):
227
self._identifier = None
228
self._locals = threading.local()
229
230
def _get_current_thread_request_id(self):
231
request_id = getattr(self._locals, 'request_id', None)
232
return request_id
233
234
def _start_http_lifecycle(self):
235
setattr(self._locals, 'request_id', str(uuid.uuid4()))
236
237
def _get_request_id(self, event_type):
238
if event_type == self._START_OF_REQUEST_LIFECYCLE_EVENT:
239
self._start_http_lifecycle()
240
if event_type in self._REQUEST_LIFECYCLE_EVENTS:
241
request_id = self._get_current_thread_request_id()
242
return request_id
243
return None
244
245
def _get_identifier(self):
246
if self._identifier is None:
247
self._identifier = str(uuid.uuid4())
248
return self._identifier
249
250
def build_record(self, event_type, payload, source):
251
uid = self._get_identifier()
252
record = {
253
'command_id': uid,
254
'event_type': event_type,
255
'payload': payload,
256
'source': source,
257
'timestamp': int(time.time() * 1000)
258
}
259
request_id = self._get_request_id(event_type)
260
if request_id:
261
record['request_id'] = request_id
262
return record
263
264
265
class DatabaseHistoryHandler(BaseHistoryHandler):
266
def __init__(self, writer, record_builder):
267
self._writer = writer
268
self._record_builder = record_builder
269
270
def emit(self, event_type, payload, source):
271
record = self._record_builder.build_record(event_type, payload, source)
272
self._writer.write_record(record)
273
274