Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/s3/test_results.py
1569 views
1
# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License"). You
4
# may not use this file except in compliance with the License. A copy of
5
# the License is located at
6
#
7
# http://aws.amazon.com/apache2.0/
8
#
9
# or in the "license" file accompanying this file. This file is
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
# ANY KIND, either express or implied. See the License for the specific
12
# language governing permissions and limitations under the License.
13
from s3transfer.exceptions import CancelledError
14
from s3transfer.exceptions import FatalError
15
16
from awscli.testutils import unittest
17
from awscli.testutils import mock
18
from awscli.compat import queue
19
from awscli.compat import StringIO
20
from awscli.customizations.s3.results import ShutdownThreadRequest
21
from awscli.customizations.s3.results import QueuedResult
22
from awscli.customizations.s3.results import ProgressResult
23
from awscli.customizations.s3.results import SuccessResult
24
from awscli.customizations.s3.results import FailureResult
25
from awscli.customizations.s3.results import ErrorResult
26
from awscli.customizations.s3.results import CtrlCResult
27
from awscli.customizations.s3.results import DryRunResult
28
from awscli.customizations.s3.results import FinalTotalSubmissionsResult
29
from awscli.customizations.s3.results import UploadResultSubscriber
30
from awscli.customizations.s3.results import UploadStreamResultSubscriber
31
from awscli.customizations.s3.results import DownloadResultSubscriber
32
from awscli.customizations.s3.results import DownloadStreamResultSubscriber
33
from awscli.customizations.s3.results import CopyResultSubscriber
34
from awscli.customizations.s3.results import DeleteResultSubscriber
35
from awscli.customizations.s3.results import ResultRecorder
36
from awscli.customizations.s3.results import ResultPrinter
37
from awscli.customizations.s3.results import OnlyShowErrorsResultPrinter
38
from awscli.customizations.s3.results import NoProgressResultPrinter
39
from awscli.customizations.s3.results import ResultProcessor
40
from awscli.customizations.s3.results import CommandResultRecorder
41
from awscli.customizations.s3.utils import relative_path
42
from awscli.customizations.s3.utils import WarningResult
43
from tests.unit.customizations.s3 import FakeTransferFuture
44
from tests.unit.customizations.s3 import FakeTransferFutureMeta
45
from tests.unit.customizations.s3 import FakeTransferFutureCallArgs
46
47
48
class BaseResultSubscriberTest(unittest.TestCase):
49
def setUp(self):
50
self.result_queue = queue.Queue()
51
52
self.bucket = 'mybucket'
53
self.key = 'mykey'
54
self.filename = 'myfile'
55
self.size = 20 * (1024 * 1024) # 20 MB
56
57
self.ref_exception = Exception()
58
self.set_ref_transfer_futures()
59
60
self.src = None
61
self.dest = None
62
self.transfer_type = None
63
64
def set_ref_transfer_futures(self):
65
self.future = self.get_success_transfer_future('foo')
66
self.failure_future = self.get_failed_transfer_future(
67
self.ref_exception)
68
69
def get_success_transfer_future(self, result):
70
return self._get_transfer_future(result=result)
71
72
def get_failed_transfer_future(self, exception):
73
return self._get_transfer_future(exception=exception)
74
75
def _get_transfer_future(self, result=None, exception=None):
76
call_args = self._get_transfer_future_call_args()
77
meta = FakeTransferFutureMeta(size=self.size, call_args=call_args)
78
return FakeTransferFuture(
79
result=result, exception=exception, meta=meta)
80
81
def _get_transfer_future_call_args(self):
82
return FakeTransferFutureCallArgs(
83
fileobj=self.filename, key=self.key, bucket=self.bucket)
84
85
def get_queued_result(self):
86
return self.result_queue.get(block=False)
87
88
def assert_result_queue_is_empty(self):
89
self.assertTrue(self.result_queue.empty())
90
91
92
class TestUploadResultSubscriber(BaseResultSubscriberTest):
93
def setUp(self):
94
super(TestUploadResultSubscriber, self).setUp()
95
self.src = relative_path(self.filename)
96
self.dest = 's3://' + self.bucket + '/' + self.key
97
self.transfer_type = 'upload'
98
self.result_subscriber = UploadResultSubscriber(self.result_queue)
99
100
def test_on_queued(self):
101
self.result_subscriber.on_queued(self.future)
102
result = self.get_queued_result()
103
self.assert_result_queue_is_empty()
104
self.assertEqual(
105
result,
106
QueuedResult(
107
transfer_type=self.transfer_type,
108
src=self.src,
109
dest=self.dest,
110
total_transfer_size=self.size
111
)
112
)
113
114
def test_on_progress(self):
115
# Simulate a queue result (i.e. submitting and processing the result)
116
# before processing the progress result.
117
self.result_subscriber.on_queued(self.future)
118
self.assertEqual(
119
self.get_queued_result(),
120
QueuedResult(
121
transfer_type=self.transfer_type,
122
src=self.src,
123
dest=self.dest,
124
total_transfer_size=self.size
125
)
126
)
127
self.assert_result_queue_is_empty()
128
129
ref_bytes_transferred = 1024 * 1024 # 1MB
130
self.result_subscriber.on_progress(self.future, ref_bytes_transferred)
131
result = self.get_queued_result()
132
self.assert_result_queue_is_empty()
133
self.assertEqual(
134
result,
135
ProgressResult(
136
transfer_type=self.transfer_type,
137
src=self.src,
138
dest=self.dest,
139
bytes_transferred=ref_bytes_transferred,
140
total_transfer_size=self.size,
141
timestamp=mock.ANY
142
)
143
)
144
145
def test_on_done_success(self):
146
# Simulate a queue result (i.e. submitting and processing the result)
147
# before processing the progress result.
148
self.result_subscriber.on_queued(self.future)
149
self.assertEqual(
150
self.get_queued_result(),
151
QueuedResult(
152
transfer_type=self.transfer_type,
153
src=self.src,
154
dest=self.dest,
155
total_transfer_size=self.size
156
)
157
)
158
self.assert_result_queue_is_empty()
159
160
self.result_subscriber.on_done(self.future)
161
result = self.get_queued_result()
162
self.assert_result_queue_is_empty()
163
self.assertEqual(
164
result,
165
SuccessResult(
166
transfer_type=self.transfer_type,
167
src=self.src,
168
dest=self.dest,
169
)
170
)
171
172
def test_on_done_failure(self):
173
# Simulate a queue result (i.e. submitting and processing the result)
174
# before processing the progress result.
175
self.result_subscriber.on_queued(self.future)
176
self.assertEqual(
177
self.get_queued_result(),
178
QueuedResult(
179
transfer_type=self.transfer_type,
180
src=self.src,
181
dest=self.dest,
182
total_transfer_size=self.size
183
)
184
)
185
self.assert_result_queue_is_empty()
186
187
self.result_subscriber.on_done(self.failure_future)
188
result = self.get_queued_result()
189
self.assert_result_queue_is_empty()
190
self.assertEqual(
191
result,
192
FailureResult(
193
transfer_type=self.transfer_type,
194
src=self.src,
195
dest=self.dest,
196
exception=self.ref_exception
197
)
198
)
199
200
def test_on_done_unexpected_cancelled(self):
201
# Simulate a queue result (i.e. submitting and processing the result)
202
# before processing the progress result.
203
self.result_subscriber.on_queued(self.future)
204
self.assertEqual(
205
self.get_queued_result(),
206
QueuedResult(
207
transfer_type=self.transfer_type,
208
src=self.src,
209
dest=self.dest,
210
total_transfer_size=self.size
211
)
212
)
213
self.assert_result_queue_is_empty()
214
215
cancelled_exception = FatalError('some error')
216
cancelled_future = self.get_failed_transfer_future(cancelled_exception)
217
self.result_subscriber.on_done(cancelled_future)
218
result = self.get_queued_result()
219
self.assert_result_queue_is_empty()
220
self.assertEqual(result, ErrorResult(exception=cancelled_exception))
221
222
def test_on_done_cancelled_for_ctrl_c(self):
223
# Simulate a queue result (i.e. submitting and processing the result)
224
# before processing the progress result.
225
self.result_subscriber.on_queued(self.future)
226
self.assertEqual(
227
self.get_queued_result(),
228
QueuedResult(
229
transfer_type=self.transfer_type,
230
src=self.src,
231
dest=self.dest,
232
total_transfer_size=self.size
233
)
234
)
235
self.assert_result_queue_is_empty()
236
237
cancelled_exception = CancelledError('KeyboardInterrupt()')
238
cancelled_future = self.get_failed_transfer_future(cancelled_exception)
239
self.result_subscriber.on_done(cancelled_future)
240
result = self.get_queued_result()
241
self.assert_result_queue_is_empty()
242
self.assertEqual(result, CtrlCResult(exception=cancelled_exception))
243
244
245
class TestUploadStreamResultSubscriber(TestUploadResultSubscriber):
246
def setUp(self):
247
super(TestUploadStreamResultSubscriber, self).setUp()
248
self.src = '-'
249
self.result_subscriber = UploadStreamResultSubscriber(
250
self.result_queue)
251
252
253
class TestDownloadResultSubscriber(TestUploadResultSubscriber):
254
def setUp(self):
255
super(TestDownloadResultSubscriber, self).setUp()
256
self.src = 's3://' + self.bucket + '/' + self.key
257
self.dest = relative_path(self.filename)
258
self.transfer_type = 'download'
259
self.result_subscriber = DownloadResultSubscriber(self.result_queue)
260
261
262
class TestDownloadStreamResultSubscriber(TestDownloadResultSubscriber):
263
def setUp(self):
264
super(TestDownloadStreamResultSubscriber, self).setUp()
265
self.dest = '-'
266
self.result_subscriber = DownloadStreamResultSubscriber(
267
self.result_queue)
268
269
270
class TestCopyResultSubscriber(TestUploadResultSubscriber):
271
def setUp(self):
272
self.source_bucket = 'sourcebucket'
273
self.source_key = 'sourcekey'
274
self.copy_source = {
275
'Bucket': self.source_bucket,
276
'Key': self.source_key,
277
}
278
super(TestCopyResultSubscriber, self).setUp()
279
self.src = 's3://' + self.source_bucket + '/' + self.source_key
280
self.dest = 's3://' + self.bucket + '/' + self.key
281
self.transfer_type = 'copy'
282
self.result_subscriber = CopyResultSubscriber(self.result_queue)
283
284
def _get_transfer_future_call_args(self):
285
return FakeTransferFutureCallArgs(
286
copy_source=self.copy_source, key=self.key, bucket=self.bucket)
287
288
def test_transfer_type_override(self):
289
new_transfer_type = 'move'
290
self.result_subscriber = CopyResultSubscriber(
291
self.result_queue, new_transfer_type)
292
self.result_subscriber.on_queued(self.future)
293
result = self.get_queued_result()
294
self.assert_result_queue_is_empty()
295
expected = QueuedResult(
296
transfer_type=new_transfer_type,
297
src=self.src,
298
dest=self.dest,
299
total_transfer_size=self.size
300
)
301
self.assertEqual(result, expected)
302
303
304
class TestDeleteResultSubscriber(TestUploadResultSubscriber):
305
def setUp(self):
306
super(TestDeleteResultSubscriber, self).setUp()
307
self.src = 's3://' + self.bucket + '/' + self.key
308
self.dest = None
309
self.transfer_type = 'delete'
310
self.result_subscriber = DeleteResultSubscriber(self.result_queue)
311
312
def _get_transfer_future_call_args(self):
313
return FakeTransferFutureCallArgs(
314
bucket=self.bucket, key=self.key)
315
316
317
class ResultRecorderTest(unittest.TestCase):
318
def setUp(self):
319
self.transfer_type = 'upload'
320
self.src = 'file'
321
self.dest = 's3://mybucket/mykey'
322
self.total_transfer_size = 20 * (1024 ** 1024) # 20MB
323
self.warning_message = 'a dummy warning message'
324
self.exception_message = 'a dummy exception message'
325
self.exception = Exception(self.exception_message)
326
self.result_recorder = ResultRecorder()
327
self.result_recorder.start_time = 0
328
329
def test_queued_result(self):
330
self.result_recorder(
331
QueuedResult(
332
transfer_type=self.transfer_type, src=self.src,
333
dest=self.dest, total_transfer_size=self.total_transfer_size
334
)
335
)
336
self.assertEqual(
337
self.result_recorder.expected_bytes_transferred,
338
self.total_transfer_size
339
)
340
self.assertEqual(self.result_recorder.expected_files_transferred, 1)
341
342
def test_multiple_queued_results(self):
343
num_results = 5
344
for i in range(num_results):
345
self.result_recorder(
346
QueuedResult(
347
transfer_type=self.transfer_type,
348
src=self.src + str(i),
349
dest=self.dest + str(i),
350
total_transfer_size=self.total_transfer_size
351
)
352
)
353
354
self.assertEqual(
355
self.result_recorder.expected_bytes_transferred,
356
num_results * self.total_transfer_size
357
)
358
self.assertEqual(
359
self.result_recorder.expected_files_transferred, num_results)
360
361
def test_queued_result_with_no_full_transfer_size(self):
362
self.result_recorder(
363
QueuedResult(
364
transfer_type=self.transfer_type, src=self.src,
365
dest=self.dest, total_transfer_size=None
366
)
367
)
368
# Since we do not know how many bytes are expected to be transferred
369
# do not incremenent the count as we have no idea how much it may be.
370
self.assertEqual(
371
self.result_recorder.expected_bytes_transferred, 0)
372
self.assertEqual(
373
self.result_recorder.expected_files_transferred, 1)
374
375
def test_progress_result(self):
376
self.result_recorder(
377
QueuedResult(
378
transfer_type=self.transfer_type, src=self.src,
379
dest=self.dest, total_transfer_size=self.total_transfer_size
380
)
381
)
382
383
bytes_transferred = 1024 * 1024 # 1MB
384
self.result_recorder(
385
ProgressResult(
386
transfer_type=self.transfer_type, src=self.src,
387
dest=self.dest, bytes_transferred=bytes_transferred,
388
total_transfer_size=self.total_transfer_size,
389
timestamp=0
390
)
391
)
392
393
self.assertEqual(
394
self.result_recorder.bytes_transferred, bytes_transferred)
395
396
def test_multiple_progress_results(self):
397
self.result_recorder(
398
QueuedResult(
399
transfer_type=self.transfer_type, src=self.src,
400
dest=self.dest, total_transfer_size=self.total_transfer_size
401
)
402
)
403
404
bytes_transferred = 1024 * 1024 # 1MB
405
num_results = 5
406
for i in range(num_results):
407
self.result_recorder(
408
ProgressResult(
409
transfer_type=self.transfer_type, src=self.src,
410
dest=self.dest, bytes_transferred=bytes_transferred,
411
total_transfer_size=self.total_transfer_size,
412
timestamp=i
413
)
414
)
415
416
self.assertEqual(
417
self.result_recorder.bytes_transferred,
418
num_results * bytes_transferred
419
)
420
421
def test_progress_result_with_no_known_transfer_size(self):
422
self.result_recorder(
423
QueuedResult(
424
transfer_type=self.transfer_type, src=self.src,
425
dest=self.dest, total_transfer_size=None
426
)
427
)
428
429
bytes_transferred = 1024 * 1024
430
self.result_recorder(
431
ProgressResult(
432
transfer_type=self.transfer_type, src=self.src,
433
dest=self.dest, bytes_transferred=bytes_transferred,
434
total_transfer_size=None, timestamp=0
435
)
436
)
437
# Because the transfer size is still not known, update the
438
# expected bytes transferred with what was actually transferred.
439
self.assertEqual(
440
self.result_recorder.bytes_transferred, bytes_transferred)
441
self.assertEqual(
442
self.result_recorder.expected_bytes_transferred, bytes_transferred)
443
444
def test_progress_result_with_transfer_size_provided_during_progress(self):
445
self.result_recorder(
446
QueuedResult(
447
transfer_type=self.transfer_type, src=self.src,
448
dest=self.dest, total_transfer_size=None
449
)
450
)
451
452
bytes_transferred = 1024 * 1024
453
self.result_recorder(
454
ProgressResult(
455
transfer_type=self.transfer_type, src=self.src,
456
dest=self.dest, bytes_transferred=bytes_transferred,
457
total_transfer_size=self.total_transfer_size,
458
timestamp=0
459
)
460
)
461
462
self.assertEqual(
463
self.result_recorder.bytes_transferred, bytes_transferred)
464
# With the total size provided in the progress result, it should
465
# accurately be reflected in the expected bytes transferred.
466
self.assertEqual(
467
self.result_recorder.expected_bytes_transferred,
468
self.total_transfer_size)
469
470
def test_captures_start_time_on_queued(self):
471
result_recorder = ResultRecorder()
472
self.assertIsNone(result_recorder.start_time)
473
result_recorder(
474
QueuedResult(
475
transfer_type=self.transfer_type, src=self.src,
476
dest=self.dest, total_transfer_size=self.total_transfer_size
477
)
478
)
479
self.assertIsInstance(result_recorder.start_time, float)
480
481
def test_progress_calculates_transfer_speed(self):
482
start_time = 0
483
self.result_recorder.start_time = start_time
484
self.total_transfer_size = 10
485
self.result_recorder(
486
QueuedResult(
487
transfer_type=self.transfer_type, src=self.src,
488
dest=self.dest, total_transfer_size=self.total_transfer_size
489
)
490
)
491
# At this point nothing should have been uploaded so transfer speed
492
# is zero
493
self.assertEqual(self.result_recorder.bytes_transfer_speed, 0)
494
495
self.result_recorder(
496
ProgressResult(
497
transfer_type=self.transfer_type, src=self.src,
498
dest=self.dest, bytes_transferred=1,
499
total_transfer_size=self.total_transfer_size,
500
timestamp=(start_time + 1)
501
)
502
)
503
504
# One bytes has been transferred in one second
505
self.assertEqual(self.result_recorder.bytes_transfer_speed, 1)
506
507
self.result_recorder(
508
ProgressResult(
509
transfer_type=self.transfer_type, src=self.src,
510
dest=self.dest, bytes_transferred=4,
511
total_transfer_size=self.total_transfer_size,
512
timestamp=(start_time + 2)
513
)
514
)
515
516
# Five bytes have been transferred in two seconds
517
self.assertEqual(self.result_recorder.bytes_transfer_speed, 2.5)
518
519
self.result_recorder(
520
ProgressResult(
521
transfer_type=self.transfer_type, src=self.src,
522
dest=self.dest, bytes_transferred=1,
523
total_transfer_size=self.total_transfer_size,
524
timestamp=(start_time + 3)
525
)
526
)
527
528
# Six bytes have been transferred in three seconds
529
self.assertEqual(self.result_recorder.bytes_transfer_speed, 2.0)
530
531
def test_success_result(self):
532
self.result_recorder(
533
QueuedResult(
534
transfer_type=self.transfer_type, src=self.src,
535
dest=self.dest, total_transfer_size=self.total_transfer_size
536
)
537
)
538
539
self.result_recorder(
540
SuccessResult(
541
transfer_type=self.transfer_type, src=self.src,
542
dest=self.dest
543
)
544
)
545
self.assertEqual(self.result_recorder.files_transferred, 1)
546
self.assertEqual(self.result_recorder.files_failed, 0)
547
548
def test_multiple_success_results(self):
549
num_results = 5
550
for i in range(num_results):
551
self.result_recorder(
552
QueuedResult(
553
transfer_type=self.transfer_type,
554
src=self.src + str(i),
555
dest=self.dest + str(i),
556
total_transfer_size=self.total_transfer_size
557
)
558
)
559
560
for i in range(num_results):
561
self.result_recorder(
562
SuccessResult(
563
transfer_type=self.transfer_type,
564
src=self.src + str(i),
565
dest=self.dest + str(i),
566
)
567
)
568
569
self.assertEqual(self.result_recorder.files_transferred, num_results)
570
self.assertEqual(self.result_recorder.files_failed, 0)
571
572
def test_failure_result(self):
573
self.result_recorder(
574
QueuedResult(
575
transfer_type=self.transfer_type, src=self.src,
576
dest=self.dest, total_transfer_size=self.total_transfer_size
577
)
578
)
579
580
self.result_recorder(
581
FailureResult(
582
transfer_type=self.transfer_type, src=self.src, dest=self.dest,
583
exception=self.exception
584
)
585
)
586
587
self.assertEqual(self.result_recorder.files_transferred, 1)
588
self.assertEqual(self.result_recorder.files_failed, 1)
589
self.assertEqual(
590
self.result_recorder.bytes_failed_to_transfer,
591
self.total_transfer_size)
592
self.assertEqual(self.result_recorder.bytes_transferred, 0)
593
594
def test_multiple_failure_results(self):
595
num_results = 5
596
for i in range(num_results):
597
self.result_recorder(
598
QueuedResult(
599
transfer_type=self.transfer_type,
600
src=self.src + str(i),
601
dest=self.dest + str(i),
602
total_transfer_size=self.total_transfer_size
603
)
604
)
605
606
for i in range(num_results):
607
self.result_recorder(
608
FailureResult(
609
transfer_type=self.transfer_type,
610
src=self.src + str(i),
611
dest=self.dest + str(i),
612
exception=self.exception
613
)
614
)
615
616
self.assertEqual(self.result_recorder.files_transferred, num_results)
617
self.assertEqual(self.result_recorder.files_failed, num_results)
618
self.assertEqual(
619
self.result_recorder.bytes_failed_to_transfer,
620
self.total_transfer_size * num_results)
621
self.assertEqual(self.result_recorder.bytes_transferred, 0)
622
623
def test_failure_result_mid_progress(self):
624
self.result_recorder(
625
QueuedResult(
626
transfer_type=self.transfer_type, src=self.src,
627
dest=self.dest, total_transfer_size=self.total_transfer_size
628
)
629
)
630
631
bytes_transferred = 1024 * 1024 # 1MB
632
self.result_recorder(
633
ProgressResult(
634
transfer_type=self.transfer_type, src=self.src,
635
dest=self.dest, bytes_transferred=bytes_transferred,
636
total_transfer_size=self.total_transfer_size,
637
timestamp=0
638
)
639
)
640
641
self.result_recorder(
642
FailureResult(
643
transfer_type=self.transfer_type, src=self.src, dest=self.dest,
644
exception=self.exception
645
)
646
)
647
648
self.assertEqual(self.result_recorder.files_transferred, 1)
649
self.assertEqual(self.result_recorder.files_failed, 1)
650
self.assertEqual(
651
self.result_recorder.bytes_failed_to_transfer,
652
self.total_transfer_size - bytes_transferred)
653
self.assertEqual(
654
self.result_recorder.bytes_transferred, bytes_transferred)
655
656
def test_failure_result_still_did_not_know_transfer_size(self):
657
self.result_recorder(
658
QueuedResult(
659
transfer_type=self.transfer_type, src=self.src,
660
dest=self.dest, total_transfer_size=None
661
)
662
)
663
self.result_recorder(
664
FailureResult(
665
transfer_type=self.transfer_type, src=self.src, dest=self.dest,
666
exception=self.exception
667
)
668
)
669
self.assertEqual(self.result_recorder.files_transferred, 1)
670
self.assertEqual(self.result_recorder.files_failed, 1)
671
# Because we never knew how many bytes to expect, do not make
672
# any adjustments to bytes failed to transfer because it is impossible
673
# to know that.
674
self.assertEqual(
675
self.result_recorder.bytes_failed_to_transfer, 0)
676
677
def test_failure_result_and_learned_of_transfer_size_in_progress(self):
678
self.result_recorder(
679
QueuedResult(
680
transfer_type=self.transfer_type, src=self.src,
681
dest=self.dest, total_transfer_size=None
682
)
683
)
684
685
bytes_transferred = 1024 * 1024
686
self.result_recorder(
687
ProgressResult(
688
transfer_type=self.transfer_type, src=self.src,
689
dest=self.dest, bytes_transferred=bytes_transferred,
690
total_transfer_size=self.total_transfer_size,
691
timestamp=0
692
)
693
)
694
self.result_recorder(
695
FailureResult(
696
transfer_type=self.transfer_type, src=self.src, dest=self.dest,
697
exception=self.exception
698
)
699
)
700
self.assertEqual(self.result_recorder.files_transferred, 1)
701
self.assertEqual(self.result_recorder.files_failed, 1)
702
# Since we knew how many bytes to expect at some point, it should
703
# be accurately reflected in the amount failed to send when the
704
# failure result is processed.
705
self.assertEqual(
706
self.result_recorder.bytes_failed_to_transfer,
707
self.total_transfer_size - bytes_transferred)
708
709
def test_can_handle_results_with_no_dest(self):
710
# This is just a quick smoke test to make sure that a result with
711
# no destination like deletes can be handled for the lifecycle of
712
# a transfer (i.e. being queued and finishing)
713
self.result_recorder(
714
QueuedResult(
715
transfer_type=self.transfer_type, src=self.src,
716
dest=None, total_transfer_size=None))
717
self.result_recorder(
718
SuccessResult(
719
transfer_type=self.transfer_type, src=self.src, dest=None))
720
self.assertEqual(self.result_recorder.expected_files_transferred, 1)
721
self.assertEqual(self.result_recorder.files_transferred, 1)
722
723
def test_warning_result(self):
724
self.result_recorder(
725
WarningResult(message=self.warning_message))
726
self.assertEqual(self.result_recorder.files_warned, 1)
727
728
def test_multiple_warning_results(self):
729
num_results = 5
730
for _ in range(num_results):
731
self.result_recorder(
732
WarningResult(message=self.warning_message))
733
self.assertEqual(self.result_recorder.files_warned, num_results)
734
735
def test_error_result(self):
736
self.result_recorder(ErrorResult(exception=self.exception))
737
self.assertEqual(self.result_recorder.errors, 1)
738
739
def test_ctrl_c_result(self):
740
self.result_recorder(CtrlCResult(Exception()))
741
self.assertEqual(self.result_recorder.errors, 1)
742
743
def test_expected_totals_are_final(self):
744
self.result_recorder(
745
QueuedResult(
746
transfer_type=self.transfer_type, src=self.src,
747
dest=self.dest, total_transfer_size=self.total_transfer_size
748
)
749
)
750
self.result_recorder(FinalTotalSubmissionsResult(1))
751
self.assertEqual(
752
self.result_recorder.final_expected_files_transferred, 1)
753
self.assertTrue(self.result_recorder.expected_totals_are_final())
754
755
def test_expected_totals_are_final_reaches_final_after_notification(self):
756
self.result_recorder(FinalTotalSubmissionsResult(1))
757
self.assertFalse(self.result_recorder.expected_totals_are_final())
758
self.result_recorder(
759
QueuedResult(
760
transfer_type=self.transfer_type, src=self.src,
761
dest=self.dest, total_transfer_size=self.total_transfer_size
762
)
763
)
764
self.assertTrue(self.result_recorder.expected_totals_are_final())
765
766
def test_expected_totals_are_final_is_false_with_no_notification(self):
767
self.assertIsNone(
768
self.result_recorder.final_expected_files_transferred)
769
self.assertFalse(self.result_recorder.expected_totals_are_final())
770
self.result_recorder(
771
QueuedResult(
772
transfer_type=self.transfer_type, src=self.src,
773
dest=self.dest, total_transfer_size=self.total_transfer_size
774
)
775
)
776
# It should still be None because it has not yet been notified
777
# of finals.
778
self.assertIsNone(
779
self.result_recorder.final_expected_files_transferred)
780
# This should remain False as well.
781
self.assertFalse(self.result_recorder.expected_totals_are_final())
782
783
def test_unknown_result_object(self):
784
self.result_recorder(object())
785
# Nothing should have been affected
786
self.assertEqual(self.result_recorder.bytes_transferred, 0)
787
self.assertEqual(self.result_recorder.expected_bytes_transferred, 0)
788
self.assertEqual(self.result_recorder.expected_files_transferred, 0)
789
self.assertEqual(self.result_recorder.files_transferred, 0)
790
791
def test_result_with_unicode(self):
792
unicode_source = u'\u2713'
793
self.result_recorder(
794
QueuedResult(
795
transfer_type=self.transfer_type, src=unicode_source,
796
dest=self.dest, total_transfer_size=self.total_transfer_size
797
)
798
)
799
self.assertEqual(
800
self.result_recorder.expected_bytes_transferred,
801
self.total_transfer_size
802
)
803
self.assertEqual(self.result_recorder.expected_files_transferred, 1)
804
805
def test_result_with_encoded_unicode(self):
806
unicode_source = u'\u2713'.encode('utf-8')
807
self.result_recorder(
808
QueuedResult(
809
transfer_type=self.transfer_type, src=unicode_source,
810
dest=self.dest, total_transfer_size=self.total_transfer_size
811
)
812
)
813
self.assertEqual(
814
self.result_recorder.expected_bytes_transferred,
815
self.total_transfer_size
816
)
817
self.assertEqual(self.result_recorder.expected_files_transferred, 1)
818
819
820
class BaseResultPrinterTest(unittest.TestCase):
821
def setUp(self):
822
self.result_recorder = ResultRecorder()
823
self.out_file = StringIO()
824
self.error_file = StringIO()
825
self.result_printer = ResultPrinter(
826
result_recorder=self.result_recorder,
827
out_file=self.out_file,
828
error_file=self.error_file
829
)
830
831
def get_progress_result(self):
832
# NOTE: The actual values are not important for the purpose
833
# of printing as the ResultPrinter only looks at the type and
834
# the ResultRecorder to determine what to print out on progress.
835
return ProgressResult(
836
transfer_type=None, src=None, dest=None, bytes_transferred=None,
837
total_transfer_size=None, timestamp=0
838
)
839
840
841
class TestResultPrinter(BaseResultPrinterTest):
842
def test_unknown_result_object(self):
843
self.result_printer(object())
844
# Nothing should have been printed because of it.
845
self.assertEqual(self.out_file.getvalue(), '')
846
self.assertEqual(self.error_file.getvalue(), '')
847
848
def test_progress(self):
849
mb = 1024 * 1024
850
851
self.result_recorder.expected_bytes_transferred = 20 * mb
852
self.result_recorder.expected_files_transferred = 4
853
self.result_recorder.final_expected_files_transferred = 4
854
self.result_recorder.bytes_transferred = mb
855
self.result_recorder.files_transferred = 1
856
857
progress_result = self.get_progress_result()
858
self.result_printer(progress_result)
859
ref_progress_statement = (
860
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) '
861
'remaining\r')
862
self.assertEqual(self.out_file.getvalue(), ref_progress_statement)
863
864
def test_progress_with_no_expected_transfer_bytes(self):
865
self.result_recorder.files_transferred = 1
866
self.result_recorder.expected_files_transferred = 4
867
self.result_recorder.final_expected_files_transferred = 4
868
self.result_recorder.bytes_transferred = 0
869
self.result_recorder.expected_bytes_transferred = 0
870
871
progress_result = self.get_progress_result()
872
self.result_printer(progress_result)
873
ref_progress_statement = (
874
'Completed 1 file(s) with 3 file(s) remaining\r')
875
self.assertEqual(self.out_file.getvalue(), ref_progress_statement)
876
877
def test_progress_then_more_progress(self):
878
mb = 1024 * 1024
879
880
progress_result = self.get_progress_result()
881
882
# Add the first progress update and print it out
883
self.result_recorder.expected_bytes_transferred = 20 * mb
884
self.result_recorder.expected_files_transferred = 4
885
self.result_recorder.final_expected_files_transferred = 4
886
self.result_recorder.bytes_transferred = mb
887
self.result_recorder.files_transferred = 1
888
889
self.result_printer(progress_result)
890
ref_progress_statement = (
891
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) remaining\r'
892
)
893
self.assertEqual(self.out_file.getvalue(), ref_progress_statement)
894
895
# Add the second progress update
896
self.result_recorder.bytes_transferred += mb
897
self.result_printer(progress_result)
898
899
# The result should be the combination of the two
900
ref_progress_statement = (
901
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) remaining\r'
902
'Completed 2.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) remaining\r'
903
)
904
self.assertEqual(self.out_file.getvalue(), ref_progress_statement)
905
906
def test_progress_still_calculating_totals(self):
907
mb = 1024 * 1024
908
909
self.result_recorder.expected_bytes_transferred = 20 * mb
910
self.result_recorder.expected_files_transferred = 4
911
self.result_recorder.bytes_transferred = mb
912
self.result_recorder.files_transferred = 1
913
914
progress_result = self.get_progress_result()
915
self.result_printer(progress_result)
916
ref_progress_statement = (
917
'Completed 1.0 MiB/~20.0 MiB (0 Bytes/s) with ~3 file(s) '
918
'remaining (calculating...)\r')
919
self.assertEqual(self.out_file.getvalue(), ref_progress_statement)
920
921
def test_progress_still_calculating_totals_no_bytes(self):
922
self.result_recorder.expected_bytes_transferred = 0
923
self.result_recorder.expected_files_transferred = 4
924
self.result_recorder.bytes_transferred = 0
925
self.result_recorder.files_transferred = 1
926
927
progress_result = self.get_progress_result()
928
self.result_printer(progress_result)
929
ref_progress_statement = (
930
'Completed 1 file(s) with ~3 file(s) remaining (calculating...)\r')
931
self.assertEqual(self.out_file.getvalue(), ref_progress_statement)
932
933
def test_progress_with_transfer_speed_reporting(self):
934
mb = 1024 * 1024
935
936
self.result_recorder.expected_bytes_transferred = 20 * mb
937
self.result_recorder.expected_files_transferred = 4
938
self.result_recorder.final_expected_files_transferred = 4
939
self.result_recorder.bytes_transferred = mb
940
self.result_recorder.files_transferred = 1
941
self.result_recorder.bytes_transfer_speed = 1024 * 7
942
943
progress_result = self.get_progress_result()
944
self.result_printer(progress_result)
945
ref_progress_statement = (
946
'Completed 1.0 MiB/20.0 MiB (7.0 KiB/s) with 3 file(s) '
947
'remaining\r')
948
self.assertEqual(self.out_file.getvalue(), ref_progress_statement)
949
950
def test_success(self):
951
transfer_type = 'upload'
952
src = 'file'
953
dest = 's3://mybucket/mykey'
954
955
# Pretend that this is the final result in the result queue that
956
# is processed.
957
self.result_recorder.final_expected_files_transferred = 1
958
self.result_recorder.expected_files_transferred = 1
959
self.result_recorder.files_transferred = 1
960
961
success_result = SuccessResult(
962
transfer_type=transfer_type, src=src, dest=dest)
963
964
self.result_printer(success_result)
965
966
ref_success_statement = (
967
'upload: file to s3://mybucket/mykey\n'
968
)
969
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
970
971
def test_success_with_progress(self):
972
mb = 1024 * 1024
973
974
progress_result = self.get_progress_result()
975
976
# Add the first progress update and print it out
977
self.result_recorder.expected_bytes_transferred = 20 * mb
978
self.result_recorder.expected_files_transferred = 4
979
self.result_recorder.final_expected_files_transferred = 4
980
self.result_recorder.bytes_transferred = mb
981
self.result_recorder.files_transferred = 1
982
self.result_printer(progress_result)
983
984
# Add a success result and print it out.
985
transfer_type = 'upload'
986
src = 'file'
987
dest = 's3://mybucket/mykey'
988
success_result = SuccessResult(
989
transfer_type=transfer_type, src=src, dest=dest)
990
991
self.result_recorder.files_transferred += 1
992
self.result_printer(success_result)
993
994
# The statement should consist of:
995
# * The first progress statement
996
# * The success statement
997
# * And the progress again since the transfer is still ongoing
998
ref_statement = (
999
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) remaining\r'
1000
'upload: file to s3://mybucket/mykey \n'
1001
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 2 file(s) remaining\r'
1002
)
1003
self.assertEqual(self.out_file.getvalue(), ref_statement)
1004
1005
def test_success_with_files_remaining(self):
1006
transfer_type = 'upload'
1007
src = 'file'
1008
dest = 's3://mybucket/mykey'
1009
1010
mb = 1024 * 1024
1011
self.result_recorder.expected_files_transferred = 4
1012
self.result_recorder.files_transferred = 1
1013
self.result_recorder.expected_bytes_transferred = 4 * mb
1014
self.result_recorder.bytes_transferred = mb
1015
1016
success_result = SuccessResult(
1017
transfer_type=transfer_type, src=src, dest=dest)
1018
1019
self.result_printer(success_result)
1020
1021
ref_success_statement = (
1022
'upload: file to s3://mybucket/mykey\n'
1023
'Completed 1.0 MiB/~4.0 MiB (0 Bytes/s) with ~3 file(s) '
1024
'remaining (calculating...)\r'
1025
)
1026
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
1027
1028
def test_success_but_no_expected_files_transferred_provided(self):
1029
transfer_type = 'upload'
1030
src = 'file'
1031
dest = 's3://mybucket/mykey'
1032
1033
mb = 1024 * 1024
1034
self.result_recorder.expected_files_transferred = 1
1035
self.result_recorder.files_transferred = 1
1036
self.result_recorder.expected_bytes_transferred = mb
1037
self.result_recorder.bytes_transferred = mb
1038
1039
success_result = SuccessResult(
1040
transfer_type=transfer_type, src=src, dest=dest)
1041
1042
self.result_printer(success_result)
1043
1044
ref_success_statement = (
1045
'upload: file to s3://mybucket/mykey\n'
1046
'Completed 1.0 MiB/~1.0 MiB (0 Bytes/s) with ~0 file(s) '
1047
'remaining (calculating...)\r'
1048
)
1049
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
1050
1051
def test_success_for_delete(self):
1052
transfer_type = 'delete'
1053
src = 's3://mybucket/mykey'
1054
1055
# Pretend that this is the final result in the result queue that
1056
# is processed.
1057
self.result_recorder.final_expected_files_transferred = 1
1058
self.result_recorder.expected_files_transferred = 1
1059
self.result_recorder.files_transferred = 1
1060
1061
success_result = SuccessResult(
1062
transfer_type=transfer_type, src=src, dest=None)
1063
1064
self.result_printer(success_result)
1065
1066
ref_success_statement = (
1067
'delete: s3://mybucket/mykey\n'
1068
)
1069
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
1070
1071
def test_delete_success_with_files_remaining(self):
1072
transfer_type = 'delete'
1073
src = 's3://mybucket/mykey'
1074
1075
self.result_recorder.expected_files_transferred = 4
1076
self.result_recorder.files_transferred = 1
1077
1078
success_result = SuccessResult(
1079
transfer_type=transfer_type, src=src, dest=None)
1080
1081
self.result_printer(success_result)
1082
1083
ref_success_statement = (
1084
'delete: s3://mybucket/mykey\n'
1085
'Completed 1 file(s) with ~3 file(s) remaining (calculating...)\r'
1086
)
1087
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
1088
1089
def test_delete_success_but_no_expected_files_transferred_provided(self):
1090
transfer_type = 'delete'
1091
src = 's3://mybucket/mykey'
1092
1093
self.result_recorder.expected_files_transferred = 1
1094
self.result_recorder.files_transferred = 1
1095
1096
success_result = SuccessResult(
1097
transfer_type=transfer_type, src=src, dest=None)
1098
1099
self.result_printer(success_result)
1100
1101
ref_success_statement = (
1102
'delete: s3://mybucket/mykey\n'
1103
'Completed 1 file(s) with ~0 file(s) remaining (calculating...)\r'
1104
)
1105
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
1106
1107
def test_failure(self):
1108
transfer_type = 'upload'
1109
src = 'file'
1110
dest = 's3://mybucket/mykey'
1111
1112
# Pretend that this is the final result in the result queue that
1113
# is processed.
1114
self.result_recorder.final_expected_files_transferred = 1
1115
self.result_recorder.expected_files_transferred = 1
1116
self.result_recorder.files_transferred = 1
1117
1118
failure_result = FailureResult(
1119
transfer_type=transfer_type, src=src, dest=dest,
1120
exception=Exception('my exception'))
1121
1122
self.result_printer(failure_result)
1123
1124
ref_failure_statement = (
1125
'upload failed: file to s3://mybucket/mykey my exception\n'
1126
)
1127
self.assertEqual(self.error_file.getvalue(), ref_failure_statement)
1128
self.assertEqual(self.out_file.getvalue(), '')
1129
1130
def test_failure_with_files_remaining(self):
1131
shared_file = self.out_file
1132
self.result_printer = ResultPrinter(
1133
result_recorder=self.result_recorder,
1134
out_file=shared_file,
1135
error_file=shared_file
1136
)
1137
1138
transfer_type = 'upload'
1139
src = 'file'
1140
dest = 's3://mybucket/mykey'
1141
1142
mb = 1024 * 1024
1143
self.result_recorder.expected_files_transferred = 4
1144
self.result_recorder.files_transferred = 1
1145
self.result_recorder.expected_bytes_transferred = 4 * mb
1146
self.result_recorder.bytes_transferred = mb
1147
1148
failure_result = FailureResult(
1149
transfer_type=transfer_type, src=src, dest=dest,
1150
exception=Exception('my exception'))
1151
1152
self.result_printer(failure_result)
1153
1154
ref_statement = (
1155
'upload failed: file to s3://mybucket/mykey my exception\n'
1156
'Completed 1.0 MiB/~4.0 MiB (0 Bytes/s) with ~3 file(s) '
1157
'remaining (calculating...)\r'
1158
)
1159
self.assertEqual(self.out_file.getvalue(), ref_statement)
1160
1161
def test_failure_but_no_expected_files_transferred_provided(self):
1162
shared_file = self.out_file
1163
self.result_printer = ResultPrinter(
1164
result_recorder=self.result_recorder,
1165
out_file=shared_file,
1166
error_file=shared_file
1167
)
1168
1169
transfer_type = 'upload'
1170
src = 'file'
1171
dest = 's3://mybucket/mykey'
1172
1173
mb = 1024 * 1024
1174
self.result_recorder.expected_files_transferred = 1
1175
self.result_recorder.files_transferred = 1
1176
self.result_recorder.expected_bytes_transferred = mb
1177
self.result_recorder.bytes_transferred = mb
1178
1179
failure_result = FailureResult(
1180
transfer_type=transfer_type, src=src, dest=dest,
1181
exception=Exception('my exception'))
1182
1183
self.result_printer(failure_result)
1184
1185
ref_statement = (
1186
'upload failed: file to s3://mybucket/mykey my exception\n'
1187
'Completed 1.0 MiB/~1.0 MiB (0 Bytes/s) with ~0 file(s) '
1188
'remaining (calculating...)\r'
1189
)
1190
self.assertEqual(self.out_file.getvalue(), ref_statement)
1191
1192
def test_failure_with_progress(self):
1193
# Make errors and regular outprint go to the same file to track order.
1194
shared_file = self.out_file
1195
self.result_printer = ResultPrinter(
1196
result_recorder=self.result_recorder,
1197
out_file=shared_file,
1198
error_file=shared_file
1199
)
1200
1201
mb = 1024 * 1024
1202
1203
progress_result = self.get_progress_result()
1204
1205
# Add the first progress update and print it out
1206
self.result_recorder.expected_bytes_transferred = 20 * mb
1207
self.result_recorder.expected_files_transferred = 4
1208
self.result_recorder.final_expected_files_transferred = 4
1209
self.result_recorder.bytes_transferred = mb
1210
self.result_recorder.files_transferred = 1
1211
self.result_printer(progress_result)
1212
1213
# Add a success result and print it out.
1214
transfer_type = 'upload'
1215
src = 'file'
1216
dest = 's3://mybucket/mykey'
1217
failure_result = FailureResult(
1218
transfer_type=transfer_type, src=src, dest=dest,
1219
exception=Exception('my exception'))
1220
1221
self.result_recorder.bytes_failed_to_transfer = 3 * mb
1222
self.result_recorder.files_transferred += 1
1223
self.result_printer(failure_result)
1224
1225
# The statement should consist of:
1226
# * The first progress statement
1227
# * The failure statement
1228
# * And the progress again since the transfer is still ongoing
1229
ref_statement = (
1230
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) remaining\r'
1231
'upload failed: file to s3://mybucket/mykey my exception \n'
1232
'Completed 4.0 MiB/20.0 MiB (0 Bytes/s) with 2 file(s) remaining\r'
1233
)
1234
self.assertEqual(shared_file.getvalue(), ref_statement)
1235
1236
def test_failure_for_delete(self):
1237
transfer_type = 'delete'
1238
src = 's3://mybucket/mykey'
1239
1240
# Pretend that this is the final result in the result queue that
1241
# is processed.
1242
self.result_recorder.final_expected_files_transferred = 1
1243
self.result_recorder.expected_files_transferred = 1
1244
self.result_recorder.files_transferred = 1
1245
1246
failure_result = FailureResult(
1247
transfer_type=transfer_type, src=src, dest=None,
1248
exception=Exception('my exception'))
1249
1250
self.result_printer(failure_result)
1251
1252
ref_failure_statement = (
1253
'delete failed: s3://mybucket/mykey my exception\n'
1254
)
1255
self.assertEqual(self.error_file.getvalue(), ref_failure_statement)
1256
self.assertEqual(self.out_file.getvalue(), '')
1257
1258
def test_delete_failure_with_files_remaining(self):
1259
shared_file = self.out_file
1260
self.result_printer = ResultPrinter(
1261
result_recorder=self.result_recorder,
1262
out_file=shared_file,
1263
error_file=shared_file
1264
)
1265
1266
transfer_type = 'delete'
1267
src = 's3://mybucket/mykey'
1268
1269
self.result_recorder.expected_files_transferred = 4
1270
self.result_recorder.expected_files_transferred = 4
1271
self.result_recorder.files_transferred = 1
1272
1273
failure_result = FailureResult(
1274
transfer_type=transfer_type, src=src, dest=None,
1275
exception=Exception('my exception'))
1276
1277
self.result_printer(failure_result)
1278
1279
ref_statement = (
1280
'delete failed: s3://mybucket/mykey my exception\n'
1281
'Completed 1 file(s) with ~3 file(s) remaining (calculating...)\r'
1282
)
1283
self.assertEqual(self.out_file.getvalue(), ref_statement)
1284
1285
def test_delete_failure_but_no_expected_files_transferred_provided(self):
1286
shared_file = self.out_file
1287
self.result_printer = ResultPrinter(
1288
result_recorder=self.result_recorder,
1289
out_file=shared_file,
1290
error_file=shared_file
1291
)
1292
1293
transfer_type = 'delete'
1294
src = 's3://mybucket/mykey'
1295
1296
self.result_recorder.expected_files_transferred = 1
1297
self.result_recorder.files_transferred = 1
1298
1299
failure_result = FailureResult(
1300
transfer_type=transfer_type, src=src, dest=None,
1301
exception=Exception('my exception'))
1302
1303
self.result_printer(failure_result)
1304
1305
ref_statement = (
1306
'delete failed: s3://mybucket/mykey my exception\n'
1307
'Completed 1 file(s) with ~0 file(s) remaining (calculating...)\r'
1308
)
1309
self.assertEqual(self.out_file.getvalue(), ref_statement)
1310
1311
def test_warning(self):
1312
# Pretend that this is the final result in the result queue that
1313
# is processed.
1314
self.result_recorder.final_expected_files_transferred = 1
1315
self.result_recorder.expected_files_transferred = 1
1316
self.result_recorder.files_transferred = 1
1317
1318
self.result_printer(WarningResult('warning: my warning'))
1319
ref_warning_statement = 'warning: my warning\n'
1320
self.assertEqual(self.error_file.getvalue(), ref_warning_statement)
1321
self.assertEqual(self.out_file.getvalue(), '')
1322
1323
def test_warning_with_progress(self):
1324
# Make errors and regular outprint go to the same file to track order.
1325
shared_file = self.out_file
1326
self.result_printer = ResultPrinter(
1327
result_recorder=self.result_recorder,
1328
out_file=shared_file,
1329
error_file=shared_file
1330
)
1331
1332
mb = 1024 * 1024
1333
1334
progress_result = self.get_progress_result()
1335
1336
# Add the first progress update and print it out
1337
self.result_recorder.expected_bytes_transferred = 20 * mb
1338
self.result_recorder.expected_files_transferred = 4
1339
self.result_recorder.final_expected_files_transferred = 4
1340
self.result_recorder.bytes_transferred = mb
1341
self.result_recorder.files_transferred = 1
1342
self.result_printer(progress_result)
1343
1344
self.result_printer(WarningResult('warning: my warning'))
1345
1346
# The statement should consist of:
1347
# * The first progress statement
1348
# * The warning statement
1349
# * And the progress again since the transfer is still ongoing
1350
ref_statement = (
1351
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) remaining\r'
1352
'warning: my warning \n'
1353
'Completed 1.0 MiB/20.0 MiB (0 Bytes/s) with 3 file(s) remaining\r'
1354
)
1355
1356
self.assertEqual(shared_file.getvalue(), ref_statement)
1357
1358
def test_error(self):
1359
self.result_printer(ErrorResult(Exception('my exception')))
1360
ref_error_statement = 'fatal error: my exception\n'
1361
self.assertEqual(self.error_file.getvalue(), ref_error_statement)
1362
1363
def test_ctrl_c_error(self):
1364
self.result_printer(CtrlCResult(Exception()))
1365
ref_error_statement = 'cancelled: ctrl-c received\n'
1366
self.assertEqual(self.error_file.getvalue(), ref_error_statement)
1367
1368
def test_error_while_progress(self):
1369
mb = 1024 * 1024
1370
self.result_recorder.expected_bytes_transferred = 20 * mb
1371
self.result_recorder.expected_files_transferred = 4
1372
self.result_recorder.final_expected_files_transferred = 4
1373
self.result_recorder.bytes_transferred = mb
1374
self.result_recorder.files_transferred = 1
1375
1376
self.result_printer(ErrorResult(Exception('my exception')))
1377
ref_error_statement = 'fatal error: my exception\n'
1378
# Even though there was progress, we do not want to print the
1379
# progress because errors are really only seen when the entire
1380
# s3 command fails.
1381
self.assertEqual(self.error_file.getvalue(), ref_error_statement)
1382
1383
def test_dry_run(self):
1384
result = DryRunResult(
1385
transfer_type='upload',
1386
src='s3://mybucket/key',
1387
dest='./local/file'
1388
)
1389
self.result_printer(result)
1390
expected = '(dryrun) upload: s3://mybucket/key to ./local/file\n'
1391
self.assertEqual(self.out_file.getvalue(), expected)
1392
1393
def test_final_total_notification_with_no_more_expected_progress(self):
1394
transfer_type = 'upload'
1395
src = 'file'
1396
dest = 's3://mybucket/mykey'
1397
1398
mb = 1024 * 1024
1399
self.result_recorder.expected_files_transferred = 1
1400
self.result_recorder.files_transferred = 1
1401
self.result_recorder.expected_bytes_transferred = mb
1402
self.result_recorder.bytes_transferred = mb
1403
1404
success_result = SuccessResult(
1405
transfer_type=transfer_type, src=src, dest=dest)
1406
1407
self.result_printer(success_result)
1408
1409
ref_success_statement = (
1410
'upload: file to s3://mybucket/mykey\n'
1411
'Completed 1.0 MiB/~1.0 MiB (0 Bytes/s) with ~0 file(s) '
1412
'remaining (calculating...)\r'
1413
)
1414
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
1415
1416
# Now the result recorder/printer is notified it was just
1417
# there will be no more queueing. Therefore it should
1418
# clear out remaining progress if the expected number of files
1419
# transferred is equal to the number of files that has completed
1420
# because this is the final task meaning we want to clear any progress
1421
# that is displayed.
1422
self.result_recorder.final_expected_files_transferred = 1
1423
self.result_printer(FinalTotalSubmissionsResult(1))
1424
ref_success_statement = (
1425
'upload: file to s3://mybucket/mykey\n'
1426
'Completed 1.0 MiB/~1.0 MiB (0 Bytes/s) '
1427
'with ~0 file(s) remaining (calculating...)\r'
1428
' '
1429
' \n'
1430
)
1431
self.assertEqual(self.out_file.getvalue(), ref_success_statement)
1432
1433
def test_final_total_does_not_print_out_newline_for_no_transfers(self):
1434
self.result_recorder.final_expected_files_transferred = 0
1435
self.result_printer(FinalTotalSubmissionsResult(0))
1436
self.assertEqual(self.out_file.getvalue(), '')
1437
1438
def test_print_unicode_success_src_and_dest(self):
1439
# Pretend that this is the final result in the result queue that
1440
# is processed.
1441
self.result_recorder.final_expected_files_transferred = 1
1442
self.result_recorder.expected_files_transferred = 1
1443
self.result_recorder.files_transferred = 1
1444
1445
result = SuccessResult(
1446
transfer_type='upload',
1447
src=u'/tmp/\u2713',
1448
dest='s3://mybucket/mykey'
1449
)
1450
self.result_printer(result)
1451
expected = u'upload: /tmp/\u2713 to s3://mybucket/mykey\n'
1452
self.assertEqual(self.out_file.getvalue(), expected)
1453
1454
def test_print_unicode_success_src(self):
1455
# Pretend that this is the final result in the result queue that
1456
# is processed.
1457
self.result_recorder.final_expected_files_transferred = 1
1458
self.result_recorder.expected_files_transferred = 1
1459
self.result_recorder.files_transferred = 1
1460
1461
result = SuccessResult(
1462
transfer_type='delete',
1463
src=u's3://mybucket/tmp/\u2713',
1464
dest=None
1465
)
1466
self.result_printer(result)
1467
expected = u'delete: s3://mybucket/tmp/\u2713\n'
1468
self.assertEqual(self.out_file.getvalue(), expected)
1469
1470
def test_print_unicode_dryrun(self):
1471
result = DryRunResult(
1472
transfer_type='upload',
1473
src=u's3://mybucket/\u2713',
1474
dest='./local/file'
1475
)
1476
self.result_printer(result)
1477
expected = u'(dryrun) upload: s3://mybucket/\u2713 to ./local/file\n'
1478
self.assertEqual(self.out_file.getvalue(), expected)
1479
1480
def test_print_unicode_failure(self):
1481
transfer_type = 'upload'
1482
src = u'\u2713'
1483
dest = 's3://mybucket/mykey'
1484
1485
# Pretend that this is the final result in the result queue that
1486
# is processed.
1487
self.result_recorder.final_expected_files_transferred = 1
1488
self.result_recorder.expected_files_transferred = 1
1489
self.result_recorder.files_transferred = 1
1490
1491
failure_result = FailureResult(
1492
transfer_type=transfer_type, src=src, dest=dest,
1493
exception=Exception('my exception'))
1494
1495
self.result_printer(failure_result)
1496
1497
ref_failure_statement = (
1498
u'upload failed: \u2713 to s3://mybucket/mykey my exception\n'
1499
)
1500
self.assertEqual(self.error_file.getvalue(), ref_failure_statement)
1501
self.assertEqual(self.out_file.getvalue(), '')
1502
1503
def test_print_unicode_warning(self):
1504
# Pretend that this is the final result in the result queue that
1505
# is processed.
1506
self.result_recorder.final_expected_files_transferred = 1
1507
self.result_recorder.expected_files_transferred = 1
1508
self.result_recorder.files_transferred = 1
1509
1510
self.result_printer(WarningResult(u'warning: unicode exists \u2713'))
1511
ref_warning_statement = u'warning: unicode exists \u2713\n'
1512
self.assertEqual(self.error_file.getvalue(), ref_warning_statement)
1513
self.assertEqual(self.out_file.getvalue(), '')
1514
1515
def test_print_unicode_error(self):
1516
self.result_printer(ErrorResult(Exception('unicode exists \u2713')))
1517
ref_error_statement = 'fatal error: unicode exists \u2713\n'
1518
self.assertEqual(self.error_file.getvalue(), ref_error_statement)
1519
1520
1521
class TestNoProgressResultPrinter(BaseResultPrinterTest):
1522
def setUp(self):
1523
super(TestNoProgressResultPrinter, self).setUp()
1524
self.result_printer = NoProgressResultPrinter(
1525
result_recorder=self.result_recorder,
1526
out_file=self.out_file,
1527
error_file=self.error_file
1528
)
1529
1530
def test_does_not_print_progress_result(self):
1531
progress_result = self.get_progress_result()
1532
self.result_printer(progress_result)
1533
self.assertEqual(self.out_file.getvalue(), '')
1534
1535
def test_does_print_sucess_result(self):
1536
transfer_type = 'upload'
1537
src = 'file'
1538
dest = 's3://mybucket/mykey'
1539
success_result = SuccessResult(
1540
transfer_type=transfer_type, src=src, dest=dest)
1541
1542
self.result_printer(success_result)
1543
expected_message = 'upload: file to s3://mybucket/mykey\n'
1544
self.assertEqual(self.out_file.getvalue(), expected_message)
1545
1546
def test_print_failure_result(self):
1547
transfer_type = 'upload'
1548
src = 'file'
1549
dest = 's3://mybucket/mykey'
1550
failure_result = FailureResult(
1551
transfer_type=transfer_type, src=src, dest=dest,
1552
exception=Exception('my exception'))
1553
1554
self.result_printer(failure_result)
1555
1556
ref_failure_statement = (
1557
'upload failed: file to s3://mybucket/mykey my exception\n'
1558
)
1559
self.assertEqual(self.error_file.getvalue(), ref_failure_statement)
1560
1561
def test_print_warnings_result(self):
1562
self.result_printer(WarningResult('warning: my warning'))
1563
ref_warning_statement = 'warning: my warning\n'
1564
self.assertEqual(self.error_file.getvalue(), ref_warning_statement)
1565
1566
def test_final_total_does_not_try_to_clear_empty_progress(self):
1567
transfer_type = 'upload'
1568
src = 'file'
1569
dest = 's3://mybucket/mykey'
1570
1571
mb = 1024 * 1024
1572
self.result_recorder.expected_files_transferred = 1
1573
self.result_recorder.files_transferred = 1
1574
self.result_recorder.expected_bytes_transferred = mb
1575
self.result_recorder.bytes_transferred = mb
1576
1577
success_result = SuccessResult(
1578
transfer_type=transfer_type, src=src, dest=dest)
1579
self.result_printer(success_result)
1580
ref_statement = 'upload: file to s3://mybucket/mykey\n'
1581
self.assertEqual(self.out_file.getvalue(), ref_statement)
1582
1583
self.result_recorder.final_expected_files_transferred = 1
1584
self.result_printer(FinalTotalSubmissionsResult(1))
1585
self.assertEqual(self.out_file.getvalue(), ref_statement)
1586
1587
1588
class TestOnlyShowErrorsResultPrinter(BaseResultPrinterTest):
1589
def setUp(self):
1590
super(TestOnlyShowErrorsResultPrinter, self).setUp()
1591
self.result_printer = OnlyShowErrorsResultPrinter(
1592
result_recorder=self.result_recorder,
1593
out_file=self.out_file,
1594
error_file=self.error_file
1595
)
1596
1597
def test_does_not_print_progress_result(self):
1598
progress_result = self.get_progress_result()
1599
self.result_printer(progress_result)
1600
self.assertEqual(self.out_file.getvalue(), '')
1601
1602
def test_does_not_print_sucess_result(self):
1603
transfer_type = 'upload'
1604
src = 'file'
1605
dest = 's3://mybucket/mykey'
1606
success_result = SuccessResult(
1607
transfer_type=transfer_type, src=src, dest=dest)
1608
1609
self.result_printer(success_result)
1610
self.assertEqual(self.out_file.getvalue(), '')
1611
1612
def test_print_failure_result(self):
1613
transfer_type = 'upload'
1614
src = 'file'
1615
dest = 's3://mybucket/mykey'
1616
failure_result = FailureResult(
1617
transfer_type=transfer_type, src=src, dest=dest,
1618
exception=Exception('my exception'))
1619
1620
self.result_printer(failure_result)
1621
1622
ref_failure_statement = (
1623
'upload failed: file to s3://mybucket/mykey my exception\n'
1624
)
1625
self.assertEqual(self.error_file.getvalue(), ref_failure_statement)
1626
1627
def test_print_warnings_result(self):
1628
self.result_printer(WarningResult('warning: my warning'))
1629
ref_warning_statement = 'warning: my warning\n'
1630
self.assertEqual(self.error_file.getvalue(), ref_warning_statement)
1631
1632
def test_final_total_does_not_try_to_clear_empty_progress(self):
1633
transfer_type = 'upload'
1634
src = 'file'
1635
dest = 's3://mybucket/mykey'
1636
1637
mb = 1024 * 1024
1638
self.result_recorder.expected_files_transferred = 1
1639
self.result_recorder.files_transferred = 1
1640
self.result_recorder.expected_bytes_transferred = mb
1641
self.result_recorder.bytes_transferred = mb
1642
1643
success_result = SuccessResult(
1644
transfer_type=transfer_type, src=src, dest=dest)
1645
self.result_printer(success_result)
1646
ref_statement = ''
1647
self.assertEqual(self.out_file.getvalue(), ref_statement)
1648
1649
self.result_recorder.final_expected_files_transferred = 1
1650
self.result_printer(FinalTotalSubmissionsResult(1))
1651
# The final total submission result should be a noop and
1652
# not print anything out.
1653
self.assertEqual(self.out_file.getvalue(), ref_statement)
1654
1655
1656
class TestResultProcessor(unittest.TestCase):
1657
def setUp(self):
1658
self.result_queue = queue.Queue()
1659
self.result_recorder = mock.Mock()
1660
self.result_printer = mock.Mock()
1661
self.results_handled = []
1662
1663
self.result_processor = ResultProcessor(
1664
self.result_queue, [self.results_handled.append])
1665
1666
def _handle_result_with_exception(self, result):
1667
raise Exception()
1668
1669
def test_run(self):
1670
transfer_type = 'upload'
1671
src = 'src'
1672
dest = 'dest'
1673
total_transfer_size = 1024 * 1024
1674
results_to_process = [
1675
QueuedResult(transfer_type, src, dest, total_transfer_size),
1676
SuccessResult(transfer_type, src, dest)
1677
]
1678
results_with_shutdown = results_to_process + [ShutdownThreadRequest()]
1679
1680
for result in results_with_shutdown:
1681
self.result_queue.put(result)
1682
self.result_processor.run()
1683
1684
self.assertEqual(self.results_handled, results_to_process)
1685
1686
def test_run_without_result_handlers(self):
1687
transfer_type = 'upload'
1688
src = 'src'
1689
dest = 'dest'
1690
total_transfer_size = 1024 * 1024
1691
results_to_process = [
1692
QueuedResult(transfer_type, src, dest, total_transfer_size),
1693
SuccessResult(transfer_type, src, dest)
1694
]
1695
results_with_shutdown = results_to_process + [ShutdownThreadRequest()]
1696
1697
for result in results_with_shutdown:
1698
self.result_queue.put(result)
1699
self.result_processor = ResultProcessor(self.result_queue)
1700
self.result_processor.run()
1701
1702
# Ensure that the entire result queue got processed even though
1703
# there was no handlers provided.
1704
self.assertTrue(self.result_queue.empty())
1705
1706
def test_exception_handled_in_loop(self):
1707
transfer_type = 'upload'
1708
src = 'src'
1709
dest = 'dest'
1710
total_transfer_size = 1024 * 1024
1711
results_to_process = [
1712
QueuedResult(transfer_type, src, dest, total_transfer_size),
1713
SuccessResult(transfer_type, src, dest)
1714
]
1715
results_with_shutdown = results_to_process + [ShutdownThreadRequest()]
1716
1717
for result in results_with_shutdown:
1718
self.result_queue.put(result)
1719
1720
results_handled_after_exception = []
1721
self.result_processor = ResultProcessor(
1722
self.result_queue,
1723
[self.results_handled.append, self._handle_result_with_exception,
1724
results_handled_after_exception.append])
1725
1726
self.result_processor.run()
1727
1728
self.assertEqual(self.results_handled, results_to_process)
1729
# The exception happens in the second handler, the exception being
1730
# thrown should result in the first handler and the ResultProcessor
1731
# continuing to process through the result queue despite the exception.
1732
# However, any handlers after the exception should not be run just
1733
# in case order of the handlers mattering and an unhandled exception
1734
# in one affects another handler.
1735
self.assertEqual(results_handled_after_exception, results_to_process)
1736
1737
def test_does_not_handle_results_after_receiving_error_result(self):
1738
transfer_type = 'upload'
1739
src = 'src'
1740
dest = 'dest'
1741
results_to_be_handled = [
1742
SuccessResult(transfer_type, src, dest),
1743
ErrorResult(Exception('my exception'))
1744
]
1745
result_not_to_be_handled = [
1746
ErrorResult(Exception('my second exception'))
1747
]
1748
results_with_shutdown = results_to_be_handled + \
1749
result_not_to_be_handled + [ShutdownThreadRequest()]
1750
1751
for result in results_with_shutdown:
1752
self.result_queue.put(result)
1753
1754
self.result_processor.run()
1755
# Only the results including and before the first the ErrorResult
1756
# should be handled. Any results after the first ErrorResult should
1757
# be ignored because ErrorResults are considered fatal meaning the
1758
# ResultProcessor needs to consume through the rest of result queue
1759
# to shutdown as quickly as possible.
1760
self.assertEqual(self.results_handled, results_to_be_handled)
1761
1762
def test_does_not_process_results_after_shutdown(self):
1763
transfer_type = 'upload'
1764
src = 'src'
1765
dest = 'dest'
1766
total_transfer_size = 1024 * 1024
1767
results_to_process = [
1768
QueuedResult(transfer_type, src, dest, total_transfer_size),
1769
SuccessResult(transfer_type, src, dest)
1770
]
1771
results_with_shutdown = results_to_process + [
1772
ShutdownThreadRequest(), WarningResult('my warning')]
1773
1774
for result in results_with_shutdown:
1775
self.result_queue.put(result)
1776
self.result_processor.run()
1777
# Because a ShutdownThreadRequest was sent the processor should
1778
# not have processed anymore results stored after it.
1779
self.assertEqual(self.results_handled, results_to_process)
1780
1781
1782
class TestCommandResultRecorder(unittest.TestCase):
1783
def setUp(self):
1784
self.result_queue = queue.Queue()
1785
self.result_recorder = ResultRecorder()
1786
self.result_processor = ResultProcessor(
1787
self.result_queue, [self.result_recorder])
1788
self.command_result_recorder = CommandResultRecorder(
1789
self.result_queue, self.result_recorder, self.result_processor)
1790
1791
self.transfer_type = 'upload'
1792
self.src = 'file'
1793
self.dest = 's3://mybucket/mykey'
1794
self.total_transfer_size = 20 * (1024 ** 1024)
1795
1796
def test_success(self):
1797
with self.command_result_recorder:
1798
self.result_queue.put(
1799
QueuedResult(
1800
transfer_type=self.transfer_type, src=self.src,
1801
dest=self.dest,
1802
total_transfer_size=self.total_transfer_size
1803
)
1804
)
1805
self.result_queue.put(
1806
SuccessResult(
1807
transfer_type=self.transfer_type, src=self.src,
1808
dest=self.dest
1809
)
1810
)
1811
self.assertEqual(
1812
self.command_result_recorder.get_command_result(), (0, 0))
1813
1814
def test_fail(self):
1815
with self.command_result_recorder:
1816
self.result_queue.put(
1817
QueuedResult(
1818
transfer_type=self.transfer_type, src=self.src,
1819
dest=self.dest,
1820
total_transfer_size=self.total_transfer_size
1821
)
1822
)
1823
self.result_queue.put(
1824
FailureResult(
1825
transfer_type=self.transfer_type, src=self.src,
1826
dest=self.dest, exception=Exception('my exception')
1827
)
1828
)
1829
self.assertEqual(
1830
self.command_result_recorder.get_command_result(), (1, 0))
1831
1832
def test_warning(self):
1833
with self.command_result_recorder:
1834
self.result_queue.put(WarningResult(message='my warning'))
1835
self.assertEqual(
1836
self.command_result_recorder.get_command_result(), (0, 1))
1837
1838
def test_error(self):
1839
with self.command_result_recorder:
1840
raise Exception('my exception')
1841
self.assertEqual(
1842
self.command_result_recorder.get_command_result(), (1, 0))
1843
1844
def test_notify_total_submissions(self):
1845
total = 5
1846
self.command_result_recorder.notify_total_submissions(total)
1847
self.assertEqual(
1848
self.result_queue.get(), FinalTotalSubmissionsResult(total))
1849
1850