Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/functional/history/test_db.py
1567 views
1
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
import threading
14
import json
15
import re
16
17
from awscli.compat import queue
18
from awscli.customizations.history.db import DatabaseConnection
19
from awscli.customizations.history.db import RecordBuilder
20
from awscli.customizations.history.db import DatabaseRecordWriter
21
from awscli.customizations.history.db import DatabaseRecordReader
22
from awscli.customizations.history.db import DatabaseHistoryHandler
23
from awscli.testutils import unittest
24
from awscli.compat import sqlite3
25
from tests import CaseInsensitiveDict
26
27
28
class ThreadedRecordWriter(object):
29
def __init__(self, writer):
30
self._read_q = queue.Queue()
31
self._thread = threading.Thread(
32
target=self._threaded_record_writer,
33
args=(writer,))
34
35
def _threaded_record_writer(self, writer):
36
while True:
37
record = self._read_q.get()
38
if record is False:
39
return
40
writer.write_record(record)
41
42
def write_record(self, record):
43
self._read_q.put_nowait(record)
44
45
def start(self):
46
self._thread.start()
47
48
def close(self):
49
self._read_q.put_nowait(False)
50
self._thread.join()
51
52
53
class BaseDatabaseTest(unittest.TestCase):
54
def setUp(self):
55
self.connection = DatabaseConnection(':memory:')
56
self.connection.row_factory = sqlite3.Row
57
58
59
class BaseThreadedDatabaseWriter(BaseDatabaseTest):
60
def setUp(self):
61
super(BaseThreadedDatabaseWriter, self).setUp()
62
self.threads = []
63
self.writer = DatabaseRecordWriter(self.connection)
64
65
def start_n_threads(self, n):
66
for _ in range(n):
67
t = ThreadedRecordWriter(self.writer)
68
t.start()
69
self.threads.append(t)
70
71
def tearDown(self):
72
for t in self.threads:
73
t.close()
74
super(BaseThreadedDatabaseWriter, self).tearDown()
75
76
77
@unittest.skipIf(sqlite3 is None,
78
"sqlite3 not supported in this python")
79
class TestMultithreadedDatabaseWriter(BaseThreadedDatabaseWriter):
80
def _write_records(self, thread_number, records):
81
t = self.threads[thread_number]
82
for record in records:
83
t.write_record(record)
84
85
def test_bulk_writes_all_succeed(self):
86
thread_count = 10
87
self.start_n_threads(thread_count)
88
for i in range(thread_count):
89
self._write_records(i, [
90
{
91
'command_id': 'command',
92
'event_type': 'API_CALL',
93
'payload': i,
94
'source': 'TEST',
95
'timestamp': 1234
96
}, {
97
'command_id': 'command',
98
'event_type': 'HTTP_REQUEST',
99
'payload': i,
100
'source': 'TEST',
101
'timestamp': 1234
102
}, {
103
'command_id': 'command',
104
'event_type': 'HTTP_RESPONSE',
105
'payload': i,
106
'source': 'TEST',
107
'timestamp': 1234
108
}, {
109
'command_id': 'command',
110
'event_type': 'PARSED_RESPONSE',
111
'payload': i,
112
'source': 'TEST',
113
'timestamp': 1234
114
}
115
])
116
for t in self.threads:
117
t.close()
118
thread_id_to_request_id = {}
119
cursor = self.connection.execute(
120
'SELECT request_id, payload FROM records'
121
)
122
records = 0
123
for record in cursor:
124
records += 1
125
request_id = record['request_id']
126
thread_id = record['payload']
127
if thread_id not in thread_id_to_request_id:
128
thread_id_to_request_id[thread_id] = request_id
129
else:
130
prior_request_id = thread_id_to_request_id[thread_id]
131
self.assertEqual(request_id, prior_request_id)
132
self.assertEqual(records, 4 * thread_count)
133
134
135
@unittest.skipIf(sqlite3 is None,
136
"sqlite3 not supported in this python")
137
class TestDatabaseRecordWriter(BaseDatabaseTest):
138
def test_does_create_table(self):
139
cursor = self.connection.execute(
140
"SELECT COUNT(*) FROM sqlite_master WHERE "
141
"type='table' AND name='records'"
142
)
143
result = cursor.fetchone()
144
self.assertEqual(result[0], 1)
145
146
def test_can_write_record(self):
147
writer = DatabaseRecordWriter(connection=self.connection)
148
known_record_fields = {
149
'command_id': 'command',
150
'source': 'TEST',
151
'event_type': 'foo',
152
'payload': {"foo": "bar"},
153
'timestamp': 1234
154
}
155
writer.write_record(known_record_fields)
156
157
cursor = self.connection.execute("SELECT COUNT(*) FROM records")
158
num_records = cursor.fetchone()
159
self.assertEqual(num_records[0], 1)
160
161
cursor.execute("SELECT * FROM records")
162
record = dict(cursor.fetchone())
163
for col_name, row_value in known_record_fields.items():
164
# Normally our reader would take care of parsing the JSON from
165
# the payload.
166
if col_name == 'payload':
167
record[col_name] = json.loads(record[col_name])
168
self.assertEqual(record[col_name], row_value)
169
170
self.assertTrue('id' in record.keys())
171
self.assertTrue('request_id' in record.keys())
172
173
def test_can_write_many_records(self):
174
writer = DatabaseRecordWriter(connection=self.connection)
175
known_record_fields = {
176
'command_id': 'command',
177
'source': 'TEST',
178
'event_type': 'foo',
179
'payload': '',
180
'timestamp': 1234
181
}
182
records_to_write = 40
183
for _ in range(records_to_write):
184
writer.write_record(known_record_fields)
185
186
cursor = self.connection.execute("SELECT COUNT(*) FROM records")
187
num_records = cursor.fetchone()
188
self.assertEqual(num_records[0], records_to_write)
189
190
191
@unittest.skipIf(sqlite3 is None,
192
"sqlite3 not supported in this python")
193
class TestDatabaseRecordReader(BaseDatabaseTest):
194
def _write_sequence_of_records(self, writer, records):
195
for record in records:
196
writer.write_record(record)
197
198
def test_yields_nothing_if_no_matching_record_id(self):
199
reader = DatabaseRecordReader(self.connection)
200
records = [record for record in reader.iter_records('fake_id')]
201
self.assertEqual(len(records), 0)
202
203
def test_yields_nothing_no_recent_records(self):
204
reader = DatabaseRecordReader(self.connection)
205
records = [record for record in reader.iter_latest_records()]
206
self.assertEqual(len(records), 0)
207
208
def test_can_read_record(self):
209
writer = DatabaseRecordWriter(self.connection)
210
self._write_sequence_of_records(writer, [
211
{
212
'command_id': 'command a',
213
'source': 'TEST',
214
'event_type': 'foo',
215
'payload': '',
216
'timestamp': 3
217
},
218
{
219
'command_id': 'command a',
220
'source': 'TEST',
221
'event_type': 'bar',
222
'payload': '',
223
'timestamp': 1
224
},
225
{
226
'command_id': 'command a',
227
'source': 'TEST',
228
'event_type': 'baz',
229
'payload': '',
230
'timestamp': 4
231
}
232
])
233
self._write_sequence_of_records(writer, [
234
{
235
'command_id': 'command b',
236
'source': 'TEST',
237
'event_type': 'qux',
238
'payload': '',
239
'timestamp': 2
240
},
241
{
242
'command_id': 'command b',
243
'source': 'TEST',
244
'event_type': 'zip',
245
'payload': '',
246
'timestamp': 6
247
}
248
])
249
reader = DatabaseRecordReader(self.connection)
250
cursor = self.connection.execute(
251
'select id from records where event_type = "foo" limit 1')
252
identifier = cursor.fetchone()['id']
253
254
# This should select only the three records from writer_a since we
255
# are explicitly looking for the records that match the id of the
256
# foo event record.
257
records = [record for record in reader.iter_records(identifier)]
258
self.assertEqual(len(records), 3)
259
for record in records:
260
record_id = record['id']
261
self.assertEqual(record_id, identifier)
262
263
def test_can_read_most_recent_records(self):
264
writer = DatabaseRecordWriter(self.connection)
265
self._write_sequence_of_records(writer, [
266
{
267
'command_id': 'command a',
268
'source': 'TEST',
269
'event_type': 'foo',
270
'payload': '',
271
'timestamp': 3
272
},
273
{
274
'command_id': 'command a',
275
'source': 'TEST',
276
'event_type': 'bar',
277
'payload': '',
278
'timestamp': 1
279
}
280
])
281
self._write_sequence_of_records(writer, [
282
{
283
'command_id': 'command b',
284
'source': 'TEST',
285
'event_type': 'baz',
286
'payload': '',
287
'timestamp': 2
288
}
289
])
290
291
# Since the foo and bar events were written by the writer_a they all
292
# share an id. foo was written at time 3 which makes it the most
293
# recent, so when we call get_latest_records we should get the
294
# foo and bar records only.
295
reader = DatabaseRecordReader(self.connection)
296
records = set([record['event_type'] for record
297
in reader.iter_latest_records()])
298
self.assertEqual(set(['foo', 'bar']), records)
299
300
301
class TestDatabaseHistoryHandler(unittest.TestCase):
302
UUID_PATTERN = re.compile(
303
'^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$',
304
re.I
305
)
306
307
def setUp(self):
308
self.db = DatabaseConnection(':memory:')
309
self.writer = DatabaseRecordWriter(connection=self.db)
310
self.record_builder = RecordBuilder()
311
self.handler = DatabaseHistoryHandler(
312
writer=self.writer, record_builder=self.record_builder)
313
314
def _get_last_record(self):
315
record = self.db.execute('SELECT * FROM records').fetchone()
316
return record
317
318
def _assert_expected_event_type(self, source, record):
319
self.assertEqual(source, record[3])
320
321
def _assert_expected_payload(self, source, record):
322
loaded_payload = json.loads(record[-1])
323
self.assertEqual(source, loaded_payload)
324
325
def _assert_expected_source(self, source, record):
326
self.assertEqual(source, record[2])
327
328
def _assert_has_request_id(self, record):
329
identifier = record[1]
330
self.assertTrue(self.UUID_PATTERN.match(identifier))
331
332
def _assert_record_has_command_id(self, record):
333
identifier = record[0]
334
self.assertTrue(self.UUID_PATTERN.match(identifier))
335
336
def test_does_emit_write_record(self):
337
self.handler.emit('event_type', 'payload', 'source')
338
record = self._get_last_record()
339
self._assert_record_has_command_id(record)
340
self._assert_expected_event_type('event_type', record)
341
self._assert_expected_payload('payload', record)
342
self._assert_expected_source('source', record)
343
344
def test_can_emit_write_record_with_structure(self):
345
payload = {'foo': 'bar'}
346
self.handler.emit('event_type', payload, 'source')
347
record = self._get_last_record()
348
self._assert_record_has_command_id(record)
349
self._assert_expected_event_type('event_type', record)
350
self._assert_expected_payload(payload, record)
351
self._assert_expected_source('source', record)
352
353
def test_can_emit_cli_version_record(self):
354
# CLI_VERSION records have a list of strings payload
355
payload = 'foobarbaz'
356
self.handler.emit('CLI_VERSION', payload, 'CLI')
357
record = self._get_last_record()
358
self._assert_record_has_command_id(record)
359
self._assert_expected_event_type('CLI_VERSION', record)
360
self._assert_expected_payload(payload, record)
361
self._assert_expected_source('CLI', record)
362
363
def test_can_emit_cli_arguments_record(self):
364
# CLI_ARGUMENTS records have a list of strings payload
365
payload = ['foo', 'bar', 'baz']
366
self.handler.emit('CLI_ARGUMENTS', payload, 'CLI')
367
record = self._get_last_record()
368
self._assert_record_has_command_id(record)
369
self._assert_expected_event_type('CLI_ARGUMENTS', record)
370
self._assert_expected_payload(payload, record)
371
self._assert_expected_source('CLI', record)
372
373
def test_can_emit_api_call_record(self):
374
# API_CALL records have a dictionary based payload
375
payload = {
376
'service': 's3',
377
'operation': 'ListBuckets',
378
'params': {}
379
}
380
self.handler.emit('API_CALL', payload, 'BOTOCORE')
381
record = self._get_last_record()
382
self._assert_record_has_command_id(record)
383
self._assert_has_request_id(record)
384
self._assert_expected_event_type('API_CALL', record)
385
self._assert_expected_payload(payload, record)
386
self._assert_expected_source('BOTOCORE', record)
387
388
def test_can_emit_api_call_record_with_binary_param(self):
389
# API_CALL records have a dictionary based payload
390
payload = {
391
'service': 'lambda',
392
'operation': 'CreateFunction',
393
'params': {
394
"FunctionName": "Name",
395
"Handler": "mod.fn",
396
"Role": "foobar",
397
"Runtime": "python3",
398
"Code": {
399
"ZipFile": b'zipfile binary content \xfe\xed'
400
}
401
}
402
}
403
self.handler.emit('API_CALL', payload, 'BOTOCORE')
404
record = self._get_last_record()
405
parsed_payload = payload.copy()
406
parsed_payload['params']['Code']['ZipFile'] = \
407
'<Byte sequence>'
408
self._assert_record_has_command_id(record)
409
self._assert_has_request_id(record)
410
self._assert_expected_event_type('API_CALL', record)
411
self._assert_expected_payload(parsed_payload, record)
412
self._assert_expected_source('BOTOCORE', record)
413
414
def test_can_emit_http_request_record(self):
415
# HTTP_REQUEST records have have their entire body field as a binary
416
# blob, however it will all be utf-8 valid since the binary fields
417
# from the api call will have been b64 encoded.
418
payload = {
419
'url': ('https://lambda.us-west-2.amazonaws.com/2015-03-31/'
420
'functions'),
421
'method': 'POST',
422
'headers': CaseInsensitiveDict({
423
'foo': 'bar'
424
}),
425
'body': b'body with no invalid utf-8 bytes in it',
426
'streaming': False
427
}
428
self.handler.emit('HTTP_REQUEST', payload, 'BOTOCORE')
429
record = self._get_last_record()
430
parsed_payload = payload.copy()
431
parsed_payload['headers'] = dict(parsed_payload['headers'])
432
parsed_payload['body'] = 'body with no invalid utf-8 bytes in it'
433
self._assert_record_has_command_id(record)
434
self._assert_expected_event_type('HTTP_REQUEST', record)
435
self._assert_expected_payload(parsed_payload, record)
436
self._assert_expected_source('BOTOCORE', record)
437
438
def test_can_emit_http_response_record(self):
439
# HTTP_RESPONSE also contains a binary response in its body, but it
440
# will not contain any non-unicode characters
441
payload = {
442
'status_code': 200,
443
'headers': CaseInsensitiveDict({
444
'foo': 'bar'
445
}),
446
'body': b'body with no invalid utf-8 bytes in it',
447
'streaming': False
448
}
449
self.handler.emit('HTTP_RESPONSE', payload, 'BOTOCORE')
450
record = self._get_last_record()
451
parsed_payload = payload.copy()
452
parsed_payload['headers'] = dict(parsed_payload['headers'])
453
parsed_payload['body'] = 'body with no invalid utf-8 bytes in it'
454
self._assert_record_has_command_id(record)
455
self._assert_expected_event_type('HTTP_RESPONSE', record)
456
self._assert_expected_payload(parsed_payload, record)
457
self._assert_expected_source('BOTOCORE', record)
458
459
def test_can_emit_parsed_response_record(self):
460
payload = {
461
"Count": 1,
462
"Items": [
463
{
464
"strkey": {
465
"S": "string"
466
}
467
}
468
],
469
"ScannedCount": 1,
470
"ConsumedCapacity": None
471
}
472
self.handler.emit('PARSED_RESPONSE', payload, 'BOTOCORE')
473
record = self._get_last_record()
474
self._assert_record_has_command_id(record)
475
self._assert_expected_event_type('PARSED_RESPONSE', record)
476
self._assert_expected_payload(payload, record)
477
self._assert_expected_source('BOTOCORE', record)
478
479
def test_can_emit_parsed_response_record_with_binary(self):
480
# PARSED_RESPONSE can also contain raw bytes
481
payload = {
482
"Count": 1,
483
"Items": [
484
{
485
"bitkey": {
486
"B": b"binary data \xfe\xed"
487
}
488
}
489
],
490
"ScannedCount": 1,
491
"ConsumedCapacity": None
492
}
493
self.handler.emit('PARSED_RESPONSE', payload, 'BOTOCORE')
494
record = self._get_last_record()
495
parsed_payload = payload.copy()
496
parsed_payload['Items'][0]['bitkey']['B'] = "<Byte sequence>"
497
self._assert_record_has_command_id(record)
498
self._assert_expected_event_type('PARSED_RESPONSE', record)
499
self._assert_expected_payload(payload, record)
500
self._assert_expected_source('BOTOCORE', record)
501
502
def test_does_not_mutate_dict(self):
503
payload = {
504
"bitkey": b"binary data \xfe\xed"
505
}
506
copy_payload = payload.copy()
507
self.handler.emit('test', payload, 'BOTOCORE')
508
self.assertEqual(payload, copy_payload)
509
510
def test_does_not_mutate_list(self):
511
payload = ['non binary data', b"binary data \xfe\xed"]
512
copy_payload = list(payload)
513
self.handler.emit('test', payload, 'BOTOCORE')
514
self.assertEqual(payload, copy_payload)
515
516