Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/tests/unit/customizations/codeartifact/test_adapter_login.py
2637 views
1
import errno
2
import os
3
import re
4
import signal
5
import stat
6
import subprocess
7
8
from datetime import datetime
9
from dateutil.tz import tzlocal, tzutc
10
from dateutil.relativedelta import relativedelta
11
12
from awscli.testutils import (
13
unittest, mock, skip_if_windows, FileCreator
14
)
15
from awscli.compat import urlparse, RawConfigParser
16
from awscli.customizations.codeartifact.login import (
17
BaseLogin, SwiftLogin, NuGetLogin, DotNetLogin, NpmLogin, PipLogin,
18
TwineLogin, get_relative_expiration_time, CommandFailedError
19
)
20
21
22
class TestBaseLogin(unittest.TestCase):
23
def setUp(self):
24
self.domain = 'domain'
25
self.domain_owner = 'domain-owner'
26
self.package_format = 'npm'
27
self.repository = 'repository'
28
self.auth_token = 'auth-token'
29
self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)
30
+ relativedelta(minutes=9)).replace(microsecond=0)
31
self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \
32
'a2z.com/{format}/{repository}/'.format(
33
domain=self.domain,
34
domainOwner=self.domain_owner,
35
format=self.package_format,
36
repository=self.repository
37
)
38
39
self.subprocess_utils = mock.Mock()
40
41
self.test_subject = BaseLogin(
42
self.auth_token, self.expiration, self.endpoint,
43
self.domain, self.repository, self.subprocess_utils
44
)
45
46
def test_login(self):
47
with self.assertRaises(NotImplementedError):
48
self.test_subject.login()
49
50
def test_get_commands(self):
51
with self.assertRaises(NotImplementedError):
52
self.test_subject.get_commands(
53
self.endpoint, self.auth_token
54
)
55
56
def test_run_commands_command_failed(self):
57
error_to_be_caught = subprocess.CalledProcessError(
58
returncode=1,
59
cmd=['cmd'],
60
output=None,
61
stderr=b'Command error message.'
62
)
63
self.subprocess_utils.run.side_effect = error_to_be_caught
64
with self.assertRaisesRegex(
65
CommandFailedError,
66
rf"{re.escape(str(error_to_be_caught))}"
67
rf" Stderr from command:\nCommand error message."
68
):
69
self.test_subject._run_commands('tool', ['cmd'])
70
71
def test_run_commands_command_failed_redact_auth_token(self):
72
error_to_be_caught = subprocess.CalledProcessError(
73
returncode=1,
74
cmd=['cmd', 'with', 'auth-token', 'present'],
75
output=None,
76
stderr=b'Command error message.'
77
)
78
self.subprocess_utils.run.side_effect = error_to_be_caught
79
with self.assertRaisesRegex(
80
CommandFailedError,
81
(rf"(?=.*cmd)(?=.*with)(?!.*auth-token)(?=.*present)"
82
rf"(?=.*Stderr from command:\nCommand error message.)")
83
):
84
self.test_subject._run_commands('tool', ['cmd'])
85
86
def test_run_commands_nonexistent_command(self):
87
self.subprocess_utils.run.side_effect = OSError(
88
errno.ENOENT, 'not found error'
89
)
90
tool = 'NotSupported'
91
with self.assertRaisesRegex(
92
ValueError, '%s was not found.' % tool):
93
self.test_subject._run_commands(tool, ['echo', tool])
94
95
def test_run_commands_unhandled_error(self):
96
self.subprocess_utils.run.side_effect = OSError(
97
errno.ENOSYS, 'unhandled error'
98
)
99
tool = 'NotSupported'
100
with self.assertRaisesRegex(OSError, 'unhandled error'):
101
self.test_subject._run_commands(tool, ['echo', tool])
102
103
104
def handle_timeout(signum, frame, test_name):
105
raise TimeoutError(f"{test_name} timed out!!")
106
107
108
class TestSwiftLogin(unittest.TestCase):
109
110
def setUp(self):
111
self.domain = 'domain'
112
self.domain_owner = 'domain-owner'
113
self.package_format = 'swift'
114
self.repository = 'repository'
115
self.auth_token = 'auth-token'
116
self.namespace = 'namespace'
117
self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)
118
+ relativedelta(minutes=9)).replace(microsecond=0)
119
self.endpoint = f'https://{self.domain}-{self.domain_owner}.codeartifact' \
120
f'.aws.a2z.com/{self.package_format}/{self.repository}/'
121
self.hostname = urlparse.urlparse(self.endpoint).hostname
122
self.new_entry = SwiftLogin.DEFAULT_NETRC_FMT.format(
123
hostname=self.hostname,
124
auth_token=self.auth_token
125
)
126
127
self.file_creator = FileCreator()
128
self.test_netrc_path = self.file_creator.full_path('netrc')
129
self.get_netrc_path_patch = mock.patch(
130
'awscli.customizations.codeartifact.login.SwiftLogin'
131
'.get_netrc_path'
132
)
133
self.get_netrc_path_mock = self.get_netrc_path_patch.start()
134
self.get_netrc_path_mock.return_value = self.test_netrc_path
135
136
self.base_command = ['swift', 'package-registry', 'set', self.endpoint]
137
self.macos_commands = [
138
self.base_command[:],
139
['swift', 'package-registry', 'login', self.endpoint + 'login',
140
'--token', self.auth_token]
141
]
142
self.non_macos_commands = [
143
self.base_command[:],
144
['swift', 'package-registry', 'login', self.endpoint + 'login']
145
]
146
147
self.subprocess_utils = mock.Mock()
148
149
self.test_subject = SwiftLogin(
150
self.auth_token, self.expiration, self.endpoint,
151
self.domain, self.repository, self.subprocess_utils
152
)
153
154
def tearDown(self):
155
self.get_netrc_path_patch.stop()
156
self.file_creator.remove_all()
157
158
def _assert_netrc_has_expected_content(self, expected_contents):
159
with open(self.test_netrc_path, 'r') as file:
160
actual_contents = file.read()
161
self.assertEqual(expected_contents, actual_contents)
162
163
def _assert_netrc_has_expected_permissions(self):
164
file_stat = os.stat(self.test_netrc_path)
165
file_mode = file_stat.st_mode
166
self.assertTrue(stat.S_IRUSR & file_mode)
167
self.assertTrue(stat.S_IWUSR & file_mode)
168
169
def test_get_netrc_path(self):
170
self.assertEqual(
171
SwiftLogin.get_netrc_path(),
172
self.test_netrc_path
173
)
174
175
def test_regex_only_match_escaped_hostname(self):
176
pattern = re.compile(
177
SwiftLogin.NETRC_REGEX_FMT.format(
178
escaped_hostname=re.escape(self.hostname)
179
),
180
re.M
181
)
182
bad_endpoint = f'https://{self.domain}-{self.domain_owner}-codeartifact' \
183
f'-aws-a2z-com/{self.package_format}/{self.repository}/'
184
bad_hostname = urlparse.urlparse(bad_endpoint).hostname
185
bad_entry = SwiftLogin.DEFAULT_NETRC_FMT.format(
186
hostname=bad_hostname,
187
auth_token=self.auth_token
188
)
189
self.assertTrue(pattern.match(self.new_entry))
190
self.assertFalse(pattern.match(bad_entry))
191
192
def test_create_netrc_if_not_exist(self):
193
self.assertFalse(os.path.exists(self.test_netrc_path))
194
self.test_subject._update_netrc_entry(
195
self.hostname,
196
'a new entry',
197
self.test_netrc_path
198
)
199
self.assertTrue(os.path.exists(self.test_netrc_path))
200
self._assert_netrc_has_expected_permissions()
201
self._assert_netrc_has_expected_content('a new entry\n')
202
203
def test_replacement_token_has_backslash(self):
204
existing_content = (
205
f'machine {self.hostname} login token password expired-auth-token\n'
206
)
207
with open(self.test_netrc_path, 'w+') as f:
208
f.write(existing_content)
209
self.test_subject.auth_token = r'new-token_.\1\g<entry_start>\n\w'
210
# make sure it uses re.sub() to replace the token
211
self.test_subject._update_netrc_entry(
212
self.hostname,
213
'',
214
self.test_netrc_path
215
)
216
self.assertTrue(os.path.exists(self.test_netrc_path))
217
self._assert_netrc_has_expected_content(
218
f'machine {self.hostname} login token password {self.test_subject.auth_token}\n'
219
)
220
221
def test_update_netrc_with_existing_entry(self):
222
existing_content = \
223
f'machine {self.hostname} login token password expired-auth-token\n'
224
225
expected_content = f'{self.new_entry}\n'
226
with open(self.test_netrc_path, 'w+') as f:
227
f.write(existing_content)
228
229
self.test_subject._update_netrc_entry(
230
self.hostname,
231
self.new_entry,
232
self.test_netrc_path
233
)
234
self._assert_netrc_has_expected_content(expected_content)
235
236
def test_update_netrc_with_existing_entry_in_between(self):
237
existing_content = (
238
f'some random line above\n'
239
f'machine {self.hostname} login token password expired-auth-token\n'
240
f'some random line below\n'
241
)
242
243
expected_content = (
244
f'some random line above\n'
245
f'{self.new_entry}\n'
246
f'some random line below\n'
247
)
248
with open(self.test_netrc_path, 'w+') as f:
249
f.write(existing_content)
250
251
self.test_subject._update_netrc_entry(
252
self.hostname,
253
self.new_entry,
254
self.test_netrc_path
255
)
256
self._assert_netrc_has_expected_content(expected_content)
257
258
def test_append_netrc_without_ending_newline(self):
259
existing_content = 'machine host login user password 1234'
260
261
expected_content = (
262
f'machine host login user password 1234\n'
263
f'{self.new_entry}\n'
264
)
265
with open(self.test_netrc_path, 'w+') as f:
266
f.write(existing_content)
267
268
self.test_subject._update_netrc_entry(
269
self.hostname,
270
self.new_entry,
271
self.test_netrc_path
272
)
273
self._assert_netrc_has_expected_content(expected_content)
274
275
def test_append_netrc_with_ending_newline(self):
276
existing_content = 'machine host login user password 1234\n'
277
278
expected_content = (
279
f'machine host login user password 1234\n'
280
f'{self.new_entry}\n'
281
)
282
with open(self.test_netrc_path, 'w+') as f:
283
f.write(existing_content)
284
285
self.test_subject._update_netrc_entry(
286
self.hostname,
287
self.new_entry,
288
self.test_netrc_path
289
)
290
self._assert_netrc_has_expected_content(expected_content)
291
292
def test_update_netrc_with_multiple_spaces_and_newlines(self):
293
existing_content = (
294
f' machine {self.hostname}\n'
295
f' login token \n'
296
f'password expired-auth-token \n'
297
f'\n'
298
f'machine example1.com\n'
299
f' login user1 \n'
300
f'password 1234\n'
301
)
302
expected_content = (
303
f' machine {self.hostname}\n'
304
f' login token \n'
305
f'password auth-token \n'
306
f'\n'
307
f'machine example1.com\n'
308
f' login user1 \n'
309
f'password 1234\n'
310
)
311
with open(self.test_netrc_path, 'w+') as f:
312
f.write(existing_content)
313
314
self.test_subject._update_netrc_entry(
315
self.hostname,
316
self.new_entry,
317
self.test_netrc_path
318
)
319
self._assert_netrc_has_expected_content(expected_content)
320
321
def test_update_netrc_with_multiple_existing_entries(self):
322
existing_content = (
323
f'machine {self.hostname} login token password expired-auth-token-1\n'
324
f'machine {self.hostname} login token password expired-auth-token-2\n'
325
)
326
expected_content = (
327
f'{self.new_entry}\n'
328
f'{self.new_entry}\n'
329
)
330
with open(self.test_netrc_path, 'w+') as f:
331
f.write(existing_content)
332
333
self.test_subject._update_netrc_entry(
334
self.hostname,
335
self.new_entry,
336
self.test_netrc_path
337
)
338
self._assert_netrc_has_expected_content(expected_content)
339
340
@mock.patch('awscli.customizations.codeartifact.login.is_macos', True)
341
def test_login_macos(self):
342
self.test_subject.login()
343
expected_calls = [
344
mock.call(
345
command,
346
capture_output=True,
347
check=True
348
) for command in self.macos_commands
349
]
350
self.subprocess_utils.run.assert_has_calls(
351
expected_calls, any_order=True
352
)
353
354
@mock.patch('awscli.customizations.codeartifact.login.is_macos', False)
355
def test_login_non_macos(self):
356
self.test_subject.login()
357
expected_calls = [
358
mock.call(
359
command,
360
capture_output=True,
361
check=True
362
) for command in self.non_macos_commands
363
]
364
self.subprocess_utils.run.assert_has_calls(
365
expected_calls, any_order=True
366
)
367
368
def test_login_swift_tooling_error(self):
369
self.subprocess_utils.run.side_effect = \
370
subprocess.CalledProcessError(
371
returncode=1, cmd='swift command', stderr=b''
372
)
373
with self.assertRaises(CommandFailedError):
374
self.test_subject.login()
375
376
def test_login_swift_not_installed(self):
377
self.subprocess_utils.run.side_effect = OSError(
378
errno.ENOENT, 'not found error'
379
)
380
with self.assertRaisesRegex(
381
ValueError,
382
'swift was not found. Please verify installation.'):
383
self.test_subject.login()
384
385
def test_get_scope(self):
386
expected_value = 'namespace'
387
scope = self.test_subject.get_scope(self.namespace)
388
self.assertEqual(scope, expected_value)
389
390
def test_get_scope_none_namespace(self):
391
expected_value = None
392
scope = self.test_subject.get_scope(None)
393
self.assertEqual(scope, expected_value)
394
395
def test_get_scope_invalid_leading_character(self):
396
with self.assertRaises(ValueError):
397
self.test_subject.get_scope(f'.{self.namespace}')
398
399
def test_get_scope_invalid_length(self):
400
with self.assertRaises(ValueError):
401
self.test_subject.get_scope("a"*40)
402
403
@mock.patch('awscli.customizations.codeartifact.login.is_macos', True)
404
def test_get_commands_macos(self):
405
commands = self.test_subject.get_commands(
406
self.endpoint, self.auth_token
407
)
408
self.assertCountEqual(commands, self.macos_commands)
409
410
@mock.patch('awscli.customizations.codeartifact.login.is_macos', False)
411
def test_get_commands_non_macos(self):
412
commands = self.test_subject.get_commands(
413
self.endpoint, self.auth_token
414
)
415
self.assertCountEqual(commands, self.non_macos_commands)
416
417
@mock.patch('awscli.customizations.codeartifact.login.is_macos', True)
418
def test_get_commands_with_scope_macos(self):
419
commands = self.test_subject.get_commands(
420
self.endpoint, self.auth_token, scope=self.namespace
421
)
422
self.macos_commands[0] += ['--scope', self.namespace]
423
self.assertCountEqual(commands, self.macos_commands)
424
425
@mock.patch('awscli.customizations.codeartifact.login.is_macos', False)
426
def test_get_commands_with_scope_non_macos(self):
427
commands = self.test_subject.get_commands(
428
self.endpoint, self.auth_token, scope=self.namespace
429
)
430
self.non_macos_commands[0] += ['--scope', self.namespace]
431
self.assertCountEqual(commands, self.non_macos_commands)
432
433
def test_login_dry_run(self):
434
self.test_subject.login(dry_run=True)
435
self.subprocess_utils.check_output.assert_not_called()
436
self.assertFalse(os.path.exists(self.test_netrc_path))
437
438
439
class TestNuGetLogin(unittest.TestCase):
440
_NUGET_INDEX_URL_FMT = NuGetLogin._NUGET_INDEX_URL_FMT
441
_NUGET_SOURCES_LIST_RESPONSE = b"""\
442
Registered Sources:
443
1. Source Name 1 [Enabled]
444
https://source1.com/index.json
445
2. Ab[.d7 $#!], [Disabled]
446
https://source2.com/index.json"""
447
448
_NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING = b'1.' + b' ' * 10000 + b'a'
449
450
_NUGET_SOURCES_LIST_RESPONSE_WITH_SPACE = b"""\
451
Registered Sources:
452
453
1. Source Name 1 [Enabled]
454
https://source1.com/index.json
455
2. Ab[.d7 $#!], [Disabled]
456
https://source2.com/index.json"""
457
458
def setUp(self):
459
self.domain = 'domain'
460
self.domain_owner = 'domain-owner'
461
self.package_format = 'nuget'
462
self.repository = 'repository'
463
self.auth_token = 'auth-token'
464
self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)
465
+ relativedelta(minutes=9)).replace(microsecond=0)
466
self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \
467
'a2z.com/{format}/{repository}/'.format(
468
domain=self.domain,
469
domainOwner=self.domain_owner,
470
format=self.package_format,
471
repository=self.repository
472
)
473
474
self.nuget_index_url = self._NUGET_INDEX_URL_FMT.format(
475
endpoint=self.endpoint,
476
)
477
self.source_name = self.domain + '/' + self.repository
478
self.list_operation_command = [
479
'nuget', 'sources', 'list',
480
'-format', 'detailed',
481
]
482
self.add_operation_command = [
483
'nuget', 'sources', 'add',
484
'-name', self.source_name,
485
'-source', self.nuget_index_url,
486
'-username', 'aws',
487
'-password', self.auth_token
488
]
489
self.update_operation_command = [
490
'nuget', 'sources', 'update',
491
'-name', self.source_name,
492
'-source', self.nuget_index_url,
493
'-username', 'aws',
494
'-password', self.auth_token
495
]
496
497
self.subprocess_utils = mock.Mock()
498
499
self.test_subject = NuGetLogin(
500
self.auth_token, self.expiration, self.endpoint,
501
self.domain, self.repository, self.subprocess_utils
502
)
503
504
def test_login(self):
505
self.subprocess_utils.check_output.return_value = \
506
self._NUGET_SOURCES_LIST_RESPONSE
507
self.test_subject.login()
508
self.subprocess_utils.check_output.assert_any_call(
509
self.list_operation_command,
510
stderr=self.subprocess_utils.PIPE
511
)
512
self.subprocess_utils.run.assert_called_with(
513
self.add_operation_command,
514
capture_output=True,
515
check=True
516
)
517
518
def test_login_dry_run(self):
519
self.subprocess_utils.check_output.return_value = \
520
self._NUGET_SOURCES_LIST_RESPONSE
521
self.test_subject.login(dry_run=True)
522
self.subprocess_utils.check_output.assert_called_once_with(
523
['nuget', 'sources', 'list', '-format', 'detailed'],
524
stderr=self.subprocess_utils.PIPE
525
)
526
527
def test_login_old_nuget(self):
528
self.subprocess_utils.check_output.return_value = \
529
self._NUGET_SOURCES_LIST_RESPONSE_WITH_SPACE
530
self.test_subject.login()
531
self.subprocess_utils.check_output.assert_any_call(
532
self.list_operation_command,
533
stderr=self.subprocess_utils.PIPE
534
)
535
self.subprocess_utils.run.assert_called_with(
536
self.add_operation_command,
537
capture_output=True,
538
check=True
539
)
540
541
def test_login_dry_run_old_nuget(self):
542
self.subprocess_utils.check_output.return_value = \
543
self._NUGET_SOURCES_LIST_RESPONSE_WITH_SPACE
544
self.test_subject.login(dry_run=True)
545
self.subprocess_utils.check_output.assert_called_once_with(
546
['nuget', 'sources', 'list', '-format', 'detailed'],
547
stderr=self.subprocess_utils.PIPE
548
)
549
550
def test_login_source_name_already_exists(self):
551
list_response = 'Registered Sources:\n' \
552
' 1. ' + self.source_name + ' [ENABLED]\n' \
553
' https://source.com/index.json'
554
self.subprocess_utils.check_output.return_value = \
555
list_response.encode('utf-8')
556
self.test_subject.login()
557
self.subprocess_utils.run.assert_called_with(
558
self.update_operation_command,
559
capture_output=True,
560
check=True
561
)
562
563
def test_login_source_url_already_exists_old_nuget(self):
564
non_default_source_name = 'Source Name'
565
list_response = 'Registered Sources:\n' \
566
'\n' \
567
' 1. ' + non_default_source_name + ' [ENABLED]\n' \
568
' ' + self.nuget_index_url
569
self.subprocess_utils.check_output.return_value = \
570
list_response.encode('utf-8')
571
self.test_subject.login()
572
self.subprocess_utils.run.assert_called_with(
573
[
574
'nuget', 'sources', 'update',
575
'-name', non_default_source_name,
576
'-source', self.nuget_index_url,
577
'-username', 'aws',
578
'-password', self.auth_token
579
],
580
capture_output=True,
581
check=True
582
)
583
584
def test_login_source_url_already_exists(self):
585
non_default_source_name = 'Source Name'
586
list_response = 'Registered Sources:\n' \
587
' 1. ' + non_default_source_name + ' [ENABLED]\n' \
588
' ' + self.nuget_index_url
589
self.subprocess_utils.check_output.return_value = \
590
list_response.encode('utf-8')
591
self.test_subject.login()
592
self.subprocess_utils.run.assert_called_with(
593
[
594
'nuget', 'sources', 'update',
595
'-name', non_default_source_name,
596
'-source', self.nuget_index_url,
597
'-username', 'aws',
598
'-password', self.auth_token
599
],
600
capture_output=True,
601
check=True
602
)
603
604
def test_login_nuget_not_installed(self):
605
self.subprocess_utils.check_output.side_effect = OSError(
606
errno.ENOENT, 'not found error'
607
)
608
with self.assertRaisesRegex(
609
ValueError,
610
'nuget was not found. Please verify installation.'):
611
self.test_subject.login()
612
613
@skip_if_windows("Windows does not support signal.SIGALRM.")
614
def test_login_nuget_sources_listed_with_backtracking(self):
615
self.subprocess_utils.check_output.return_value = \
616
self._NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING
617
signal.signal(
618
signal.SIGALRM,
619
lambda signum, frame: handle_timeout(signum, frame, self.id()))
620
signal.alarm(10)
621
self.test_subject.login()
622
signal.alarm(0)
623
self.subprocess_utils.check_output.assert_any_call(
624
self.list_operation_command,
625
stderr=self.subprocess_utils.PIPE
626
)
627
628
629
class TestDotNetLogin(unittest.TestCase):
630
_NUGET_INDEX_URL_FMT = NuGetLogin._NUGET_INDEX_URL_FMT
631
_NUGET_SOURCES_LIST_RESPONSE = b"""\
632
Registered Sources:
633
1. Source Name 1 [Enabled]
634
https://source1.com/index.json
635
2. Ab[.d7 $#!], [Disabled]
636
https://source2.com/index.json"""
637
638
_NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING = b'1.' + b' ' * 10000 + b'a'
639
640
_NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT = b"""\
641
Welcome to dotnet 2.0!
642
643
Registered Sources:
644
1. Source Name 1 [Enabled]
645
https://source1.com/index.json
646
2. ati-nugetserver [Disabled]
647
http://atinugetserver-env.elasticbeanstalk.com/nuget
648
warn : You are running the 'list source' operation with an 'HTTP' source,
649
'ati-nugetserver' [http://atinugetserver-env..elasticbeanstalk.com/nuget]'.
650
Non-HTTPS access will be removed in a future version. Consider migrating
651
to an 'HTTPS' source."""
652
653
def setUp(self):
654
self.domain = 'domain'
655
self.domain_owner = 'domain-owner'
656
self.package_format = 'nuget'
657
self.repository = 'repository'
658
self.auth_token = 'auth-token'
659
self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)
660
+ relativedelta(minutes=9)).replace(microsecond=0)
661
self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \
662
'a2z.com/{format}/{repository}/'.format(
663
domain=self.domain,
664
domainOwner=self.domain_owner,
665
format=self.package_format,
666
repository=self.repository
667
)
668
669
self.nuget_index_url = self._NUGET_INDEX_URL_FMT.format(
670
endpoint=self.endpoint,
671
)
672
self.source_name = self.domain + '/' + self.repository
673
self.list_operation_command = [
674
'dotnet', 'nuget', 'list', 'source', '--format', 'detailed'
675
]
676
self.add_operation_command_windows = [
677
'dotnet', 'nuget', 'add', 'source', self.nuget_index_url,
678
'--name', self.source_name,
679
'--username', 'aws',
680
'--password', self.auth_token
681
]
682
self.add_operation_command_non_windows = [
683
'dotnet', 'nuget', 'add', 'source', self.nuget_index_url,
684
'--name', self.source_name,
685
'--username', 'aws',
686
'--password', self.auth_token,
687
'--store-password-in-clear-text'
688
]
689
self.update_operation_command_windows = [
690
'dotnet', 'nuget', 'update', 'source', self.source_name,
691
'--source', self.nuget_index_url,
692
'--username', 'aws',
693
'--password', self.auth_token
694
]
695
self.update_operation_command_non_windows = [
696
'dotnet', 'nuget', 'update', 'source', self.source_name,
697
'--source', self.nuget_index_url,
698
'--username', 'aws',
699
'--password', self.auth_token,
700
'--store-password-in-clear-text'
701
]
702
703
self.subprocess_utils = mock.Mock()
704
705
self.test_subject = DotNetLogin(
706
self.auth_token, self.expiration, self.endpoint,
707
self.domain, self.repository, self.subprocess_utils
708
)
709
710
@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)
711
def test_login(self):
712
self.subprocess_utils.check_output.return_value = \
713
self._NUGET_SOURCES_LIST_RESPONSE
714
self.test_subject.login()
715
self.subprocess_utils.check_output.assert_any_call(
716
self.list_operation_command,
717
stderr=self.subprocess_utils.PIPE
718
)
719
self.subprocess_utils.run.assert_called_with(
720
self.add_operation_command_non_windows,
721
capture_output=True,
722
check=True
723
)
724
725
@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)
726
def test_login_on_windows(self):
727
self.subprocess_utils.check_output.return_value = \
728
self._NUGET_SOURCES_LIST_RESPONSE
729
self.test_subject.login()
730
self.subprocess_utils.check_output.assert_any_call(
731
self.list_operation_command,
732
stderr=self.subprocess_utils.PIPE
733
)
734
self.subprocess_utils.run.assert_called_with(
735
self.add_operation_command_windows,
736
capture_output=True,
737
check=True
738
)
739
740
def test_login_dry_run(self):
741
self.subprocess_utils.check_output.return_value = \
742
self._NUGET_SOURCES_LIST_RESPONSE
743
self.test_subject.login(dry_run=True)
744
self.subprocess_utils.check_output.assert_called_once_with(
745
self.list_operation_command,
746
stderr=self.subprocess_utils.PIPE
747
)
748
749
@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)
750
def test_login_sources_listed_with_extra_non_list_text(self):
751
self.subprocess_utils.check_output.return_value = \
752
self._NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT
753
self.test_subject.login()
754
self.subprocess_utils.check_output.assert_any_call(
755
self.list_operation_command,
756
stderr=self.subprocess_utils.PIPE
757
)
758
self.subprocess_utils.run.assert_called_with(
759
self.add_operation_command_non_windows,
760
capture_output=True,
761
check=True
762
)
763
764
@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)
765
def test_login_sources_listed_with_extra_non_list_text_on_windows(self):
766
self.subprocess_utils.check_output.return_value = \
767
self._NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT
768
self.test_subject.login()
769
self.subprocess_utils.check_output.assert_any_call(
770
self.list_operation_command,
771
stderr=self.subprocess_utils.PIPE
772
)
773
self.subprocess_utils.run.assert_called_with(
774
self.add_operation_command_windows,
775
capture_output=True,
776
check=True
777
)
778
779
def test_login_sources_listed_with_extra_non_list_text_dry_run(self):
780
self.subprocess_utils.check_output.return_value = \
781
self._NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT
782
self.test_subject.login(dry_run=True)
783
self.subprocess_utils.check_output.assert_called_once_with(
784
self.list_operation_command,
785
stderr=self.subprocess_utils.PIPE
786
)
787
788
789
@skip_if_windows("Windows does not support signal.SIGALRM.")
790
@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)
791
def test_login_dotnet_sources_listed_with_backtracking(self):
792
self.subprocess_utils.check_output.return_value = \
793
self._NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING
794
signal.signal(
795
signal.SIGALRM,
796
lambda signum, frame: handle_timeout(signum, frame, self.id()))
797
signal.alarm(10)
798
self.test_subject.login()
799
signal.alarm(0)
800
801
@skip_if_windows("Windows does not support signal.SIGALRM.")
802
@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)
803
def test_login_dotnet_sources_listed_with_backtracking_windows(self):
804
self.subprocess_utils.check_output.return_value = \
805
self._NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING
806
signal.signal(
807
signal.SIGALRM,
808
lambda signum, frame: handle_timeout(signum, frame, self.id()))
809
signal.alarm(10)
810
self.test_subject.login()
811
signal.alarm(0)
812
813
@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)
814
def test_login_source_name_already_exists(self):
815
list_response = 'Registered Sources:\n' \
816
' 1. ' + self.source_name + ' [ENABLED]\n' \
817
' https://source.com/index.json'
818
self.subprocess_utils.check_output.return_value = \
819
list_response.encode('utf-8')
820
self.test_subject.login()
821
self.subprocess_utils.run.assert_called_with(
822
self.update_operation_command_non_windows,
823
capture_output=True,
824
check=True
825
)
826
827
@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)
828
def test_login_source_name_already_exists_on_windows(self):
829
list_response = 'Registered Sources:\n' \
830
' 1. ' + self.source_name + ' [ENABLED]\n' \
831
' https://source.com/index.json'
832
self.subprocess_utils.check_output.return_value = \
833
list_response.encode('utf-8')
834
self.test_subject.login()
835
self.subprocess_utils.run.assert_called_with(
836
self.update_operation_command_windows,
837
capture_output=True,
838
check=True
839
)
840
841
@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)
842
def test_login_source_url_already_exists(self):
843
non_default_source_name = 'Source Name'
844
list_response = 'Registered Sources:\n' \
845
' 1. ' + non_default_source_name + ' [ENABLED]\n' \
846
' ' + self.nuget_index_url
847
self.subprocess_utils.check_output.return_value = \
848
list_response.encode('utf-8')
849
self.test_subject.login()
850
self.subprocess_utils.run.assert_called_with(
851
[
852
'dotnet', 'nuget', 'update', 'source', non_default_source_name,
853
'--source', self.nuget_index_url,
854
'--username', 'aws',
855
'--password', self.auth_token
856
],
857
capture_output=True,
858
check=True
859
)
860
861
def test_login_dotnet_not_installed(self):
862
self.subprocess_utils.check_output.side_effect = OSError(
863
errno.ENOENT, 'not found error'
864
)
865
with self.assertRaisesRegex(
866
ValueError,
867
'dotnet was not found. Please verify installation.'):
868
self.test_subject.login()
869
870
871
class TestNpmLogin(unittest.TestCase):
872
873
NPM_CMD = NpmLogin.NPM_CMD
874
875
def setUp(self):
876
self.domain = 'domain'
877
self.domain_owner = 'domain-owner'
878
self.package_format = 'npm'
879
self.repository = 'repository'
880
self.auth_token = 'auth-token'
881
self.namespace = 'namespace'
882
self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)
883
+ relativedelta(minutes=9)).replace(microsecond=0)
884
self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \
885
'a2z.com/{format}/{repository}/'.format(
886
domain=self.domain,
887
domainOwner=self.domain_owner,
888
format=self.package_format,
889
repository=self.repository
890
)
891
892
repo_uri = urlparse.urlsplit(self.endpoint)
893
always_auth_config = '//{}{}:always-auth'.format(
894
repo_uri.netloc, repo_uri.path
895
)
896
auth_token_config = '//{}{}:_authToken'.format(
897
repo_uri.netloc, repo_uri.path
898
)
899
self.commands = []
900
self.commands.append([
901
self.NPM_CMD, 'config', 'set', 'registry', self.endpoint
902
])
903
self.commands.append(
904
[self.NPM_CMD, 'config', 'set', always_auth_config, 'true']
905
)
906
self.commands.append(
907
[self.NPM_CMD, 'config', 'set', auth_token_config, self.auth_token]
908
)
909
910
self.subprocess_utils = mock.Mock()
911
912
self.test_subject = NpmLogin(
913
self.auth_token, self.expiration, self.endpoint,
914
self.domain, self.repository, self.subprocess_utils
915
)
916
917
def test_login(self):
918
self.test_subject.login()
919
expected_calls = [
920
mock.call(
921
command,
922
capture_output=True,
923
check=True
924
) for command in self.commands
925
]
926
self.subprocess_utils.run.assert_has_calls(
927
expected_calls, any_order=True
928
)
929
930
def test_login_always_auth_error_ignored(self):
931
"""Test login ignores error for always-auth.
932
933
This test is for NPM version >= 9 where the support of 'always-auth'
934
has been dropped. Running the command to set config gives a non-zero
935
exit code. This is to make sure that login ignores that error and all
936
other commands executes successfully.
937
"""
938
def side_effect(command, capture_output, check):
939
"""Set side_effect for always-auth config setting command"""
940
if any('always-auth' in arg for arg in command):
941
raise subprocess.CalledProcessError(
942
returncode=1,
943
cmd=command
944
)
945
946
return mock.DEFAULT
947
948
self.subprocess_utils.run.side_effect = side_effect
949
expected_calls = []
950
951
for command in self.commands:
952
expected_calls.append(mock.call(
953
command,
954
capture_output=True,
955
check=True
956
)
957
)
958
self.test_subject.login()
959
960
self.subprocess_utils.run.assert_has_calls(
961
expected_calls, any_order=True
962
)
963
964
def test_get_scope(self):
965
expected_value = '@{}'.format(self.namespace)
966
scope = self.test_subject.get_scope(self.namespace)
967
self.assertEqual(scope, expected_value)
968
969
def test_get_scope_none_namespace(self):
970
expected_value = None
971
scope = self.test_subject.get_scope(None)
972
self.assertEqual(scope, expected_value)
973
974
def test_get_scope_invalid_name(self):
975
with self.assertRaises(ValueError):
976
self.test_subject.get_scope('.{}'.format(self.namespace))
977
978
def test_get_scope_without_prefix(self):
979
expected_value = '@{}'.format(self.namespace)
980
scope = self.test_subject.get_scope('@{}'.format(self.namespace))
981
self.assertEqual(scope, expected_value)
982
983
def test_get_commands(self):
984
commands = self.test_subject.get_commands(
985
self.endpoint, self.auth_token
986
)
987
self.assertCountEqual(commands, self.commands)
988
989
def test_get_commands_with_scope(self):
990
commands = self.test_subject.get_commands(
991
self.endpoint, self.auth_token, scope=self.namespace
992
)
993
self.commands[0][3] = '{}:registry'.format(self.namespace)
994
self.assertCountEqual(commands, self.commands)
995
996
def test_login_dry_run(self):
997
self.test_subject.login(dry_run=True)
998
self.subprocess_utils.check_call.assert_not_called()
999
1000
1001
class TestPipLogin(unittest.TestCase):
1002
1003
PIP_INDEX_URL_FMT = PipLogin.PIP_INDEX_URL_FMT
1004
1005
def setUp(self):
1006
self.domain = 'domain'
1007
self.domain_owner = 'domain-owner'
1008
self.package_format = 'pip'
1009
self.repository = 'repository'
1010
self.auth_token = 'auth-token'
1011
self.expiration = (datetime.now(tzlocal()) + relativedelta(years=1)
1012
+ relativedelta(months=9)).replace(microsecond=0)
1013
self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \
1014
'a2z.com/{format}/{repository}/'.format(
1015
domain=self.domain,
1016
domainOwner=self.domain_owner,
1017
format=self.package_format,
1018
repository=self.repository
1019
)
1020
1021
repo_uri = urlparse.urlsplit(self.endpoint)
1022
self.pip_index_url = self.PIP_INDEX_URL_FMT.format(
1023
scheme=repo_uri.scheme,
1024
auth_token=self.auth_token,
1025
netloc=repo_uri.netloc,
1026
path=repo_uri.path
1027
)
1028
1029
self.subprocess_utils = mock.Mock()
1030
1031
self.test_subject = PipLogin(
1032
self.auth_token, self.expiration, self.endpoint,
1033
self.domain, self.repository, self.subprocess_utils
1034
)
1035
1036
def test_get_commands(self):
1037
expected_commands = [
1038
['pip', 'config', 'set', 'global.index-url', self.pip_index_url]
1039
]
1040
commands = self.test_subject.get_commands(
1041
self.endpoint, self.auth_token
1042
)
1043
self.assertCountEqual(commands, expected_commands)
1044
1045
def test_login(self):
1046
self.test_subject.login()
1047
self.subprocess_utils.run.assert_called_once_with(
1048
['pip', 'config', 'set', 'global.index-url', self.pip_index_url],
1049
capture_output=True,
1050
check=True
1051
)
1052
1053
def test_login_dry_run(self):
1054
self.test_subject.login(dry_run=True)
1055
self.subprocess_utils.run.assert_not_called()
1056
1057
1058
class TestTwineLogin(unittest.TestCase):
1059
1060
DEFAULT_PYPI_RC_FMT = TwineLogin.DEFAULT_PYPI_RC_FMT
1061
1062
def setUp(self):
1063
self.file_creator = FileCreator()
1064
self.domain = 'domain'
1065
self.domain_owner = 'domain-owner'
1066
self.package_format = 'pip'
1067
self.repository = 'repository'
1068
self.auth_token = 'auth-token'
1069
self.expiration = (datetime.now(tzlocal()) + relativedelta(years=1)
1070
+ relativedelta(months=9)).replace(microsecond=0)
1071
self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \
1072
'a2z.com/{format}/{repository}/'.format(
1073
domain=self.domain,
1074
domainOwner=self.domain_owner,
1075
format=self.package_format,
1076
repository=self.repository
1077
)
1078
self.default_pypi_rc = self.DEFAULT_PYPI_RC_FMT.format(
1079
repository_endpoint=self.endpoint,
1080
auth_token=self.auth_token
1081
)
1082
self.subprocess_utils = mock.Mock()
1083
self.test_pypi_rc_path = self.file_creator.full_path('pypirc')
1084
if not os.path.isdir(os.path.dirname(self.test_pypi_rc_path)):
1085
os.makedirs(os.path.dirname(self.test_pypi_rc_path))
1086
1087
self.test_subject = TwineLogin(
1088
self.auth_token,
1089
self.expiration,
1090
self.endpoint,
1091
self.domain,
1092
self.repository,
1093
self.subprocess_utils,
1094
self.test_pypi_rc_path
1095
)
1096
1097
def tearDown(self):
1098
self.file_creator.remove_all()
1099
1100
def _assert_pypi_rc_has_expected_content(
1101
self, pypi_rc_str, server, repo_url=None, username=None, password=None
1102
):
1103
pypi_rc = RawConfigParser()
1104
pypi_rc.read_string(pypi_rc_str)
1105
1106
self.assertIn('distutils', pypi_rc.sections())
1107
self.assertIn('index-servers', pypi_rc.options('distutils'))
1108
index_servers = pypi_rc.get('distutils', 'index-servers')
1109
index_servers = [
1110
index_server.strip()
1111
for index_server
1112
in index_servers.split('\n')
1113
if index_server.strip() != ''
1114
]
1115
self.assertIn(server, index_servers)
1116
1117
if repo_url or username or password:
1118
self.assertIn(server, pypi_rc.sections())
1119
1120
if repo_url:
1121
self.assertIn('repository', pypi_rc.options(server))
1122
self.assertEqual(pypi_rc.get(server, 'repository'), repo_url)
1123
1124
if username:
1125
self.assertIn('username', pypi_rc.options(server))
1126
self.assertEqual(pypi_rc.get(server, 'username'), username)
1127
1128
if password:
1129
self.assertIn('password', pypi_rc.options(server))
1130
self.assertEqual(pypi_rc.get(server, 'password'), password)
1131
1132
def test_get_pypi_rc_path(self):
1133
self.assertEqual(
1134
TwineLogin.get_pypi_rc_path(),
1135
os.path.join(os.path.expanduser("~"), ".pypirc")
1136
)
1137
1138
def test_login_pypi_rc_not_found_defaults_set(self):
1139
self.test_subject.login()
1140
1141
with open(self.test_pypi_rc_path) as f:
1142
test_pypi_rc_str = f.read()
1143
1144
self._assert_pypi_rc_has_expected_content(
1145
pypi_rc_str=test_pypi_rc_str,
1146
server='codeartifact',
1147
repo_url=self.endpoint,
1148
username='aws',
1149
password=self.auth_token
1150
)
1151
1152
def test_login_dry_run(self):
1153
self.test_subject.login(dry_run=True)
1154
self.subprocess_utils.run.assert_not_called()
1155
self.assertFalse(os.path.exists(self.test_pypi_rc_path))
1156
1157
def test_login_existing_pypi_rc_not_clobbered(self):
1158
existing_pypi_rc = '''\
1159
[distutils]
1160
index-servers=
1161
pypi
1162
test
1163
1164
[pypi]
1165
repository: http://www.python.org/pypi/
1166
username: monty
1167
password: JgCXIr5xGG
1168
1169
[test]
1170
repository: http://example.com/test/
1171
username: testusername
1172
password: testpassword
1173
'''
1174
1175
with open(self.test_pypi_rc_path, 'w+') as f:
1176
f.write(existing_pypi_rc)
1177
1178
self.test_subject.login()
1179
1180
with open(self.test_pypi_rc_path) as f:
1181
test_pypi_rc_str = f.read()
1182
1183
self._assert_pypi_rc_has_expected_content(
1184
pypi_rc_str=test_pypi_rc_str,
1185
server='codeartifact',
1186
repo_url=self.endpoint,
1187
username='aws',
1188
password=self.auth_token
1189
)
1190
1191
self._assert_pypi_rc_has_expected_content(
1192
pypi_rc_str=test_pypi_rc_str,
1193
server='pypi',
1194
repo_url='http://www.python.org/pypi/',
1195
username='monty',
1196
password='JgCXIr5xGG'
1197
)
1198
1199
self._assert_pypi_rc_has_expected_content(
1200
pypi_rc_str=test_pypi_rc_str,
1201
server='test',
1202
repo_url='http://example.com/test/',
1203
username='testusername',
1204
password='testpassword'
1205
)
1206
1207
def test_login_existing_pypi_rc_with_codeartifact_not_clobbered(self):
1208
existing_pypi_rc = '''\
1209
[distutils]
1210
index-servers=
1211
pypi
1212
codeartifact
1213
1214
[pypi]
1215
repository: http://www.python.org/pypi/
1216
username: monty
1217
password: JgCXIr5xGG
1218
1219
[codeartifact]
1220
repository: https://test-testOwner.codeartifact.aws.a2z.com/pypi/testRepo/
1221
username: aws
1222
password: expired_token
1223
'''
1224
1225
with open(self.test_pypi_rc_path, 'w+') as f:
1226
f.write(existing_pypi_rc)
1227
1228
self.test_subject.login()
1229
1230
with open(self.test_pypi_rc_path) as f:
1231
test_pypi_rc_str = f.read()
1232
1233
self._assert_pypi_rc_has_expected_content(
1234
pypi_rc_str=test_pypi_rc_str,
1235
server='codeartifact',
1236
repo_url=self.endpoint,
1237
username='aws',
1238
password=self.auth_token
1239
)
1240
1241
self._assert_pypi_rc_has_expected_content(
1242
pypi_rc_str=test_pypi_rc_str,
1243
server='pypi',
1244
repo_url='http://www.python.org/pypi/',
1245
username='monty',
1246
password='JgCXIr5xGG'
1247
)
1248
1249
def test_login_existing_invalid_pypi_rc_error(self):
1250
# This is an invalid pypirc as the list of servers are expected under
1251
# an 'index-servers' option instead of 'servers'.
1252
existing_pypi_rc = '''\
1253
[distutils]
1254
servers=
1255
pypi
1256
1257
[pypi]
1258
repository: http://www.python.org/pypi/
1259
username: monty
1260
password: JgCXIr5xGG
1261
'''
1262
1263
with open(self.test_pypi_rc_path, 'w+') as f:
1264
f.write(existing_pypi_rc)
1265
1266
with open(self.test_pypi_rc_path) as f:
1267
original_content = f.read()
1268
1269
with self.assertRaises(Exception):
1270
self.test_subject.login()
1271
1272
# We should just leave the pypirc untouched when it's invalid.
1273
with open(self.test_pypi_rc_path) as f:
1274
self.assertEqual(f.read(), original_content)
1275
1276
1277
class TestRelativeExpirationTime(unittest.TestCase):
1278
1279
def test_with_years_months_days(self):
1280
remaining = relativedelta(years=1, months=9)
1281
message = get_relative_expiration_time(remaining)
1282
self.assertEqual(message, '1 year and 9 months')
1283
1284
def test_with_years_months(self):
1285
remaining = relativedelta(years=1, months=8, days=30, hours=23,
1286
minutes=59, seconds=30)
1287
message = get_relative_expiration_time(remaining)
1288
self.assertEqual(message, '1 year and 8 months')
1289
1290
def test_with_years_month(self):
1291
remaining = relativedelta(years=3, days=30, hours=23,
1292
minutes=59, seconds=30)
1293
message = get_relative_expiration_time(remaining)
1294
self.assertEqual(message, '3 years')
1295
1296
def test_with_years_days(self):
1297
remaining = relativedelta(years=1, days=9)
1298
message = get_relative_expiration_time(remaining)
1299
self.assertEqual(message, '1 year')
1300
1301
def test_with_year(self):
1302
remaining = relativedelta(months=11, days=30)
1303
message = get_relative_expiration_time(remaining)
1304
self.assertEqual(message, '11 months and 30 days')
1305
1306
def test_with_years(self):
1307
remaining = relativedelta(years=1, months=11)
1308
message = get_relative_expiration_time(remaining)
1309
self.assertEqual(message, '1 year and 11 months')
1310
1311
def test_with_years_days_hours_minutes(self):
1312
remaining = relativedelta(years=2, days=7, hours=11, minutes=44)
1313
message = get_relative_expiration_time(remaining)
1314
self.assertEqual(message, '2 years')
1315
1316
def test_with_days_minutes(self):
1317
remaining = relativedelta(days=1, minutes=44)
1318
message = get_relative_expiration_time(remaining)
1319
self.assertEqual(message, '1 day')
1320
1321
def test_with_day(self):
1322
remaining = relativedelta(days=1)
1323
message = get_relative_expiration_time(remaining)
1324
self.assertEqual(message, '1 day')
1325
1326
def test_with_hour(self):
1327
self.expiration = (datetime.now(tzlocal())
1328
+ relativedelta(hours=1)).replace(microsecond=0)
1329
remaining = relativedelta(
1330
self.expiration, datetime.now(tzutc())) + relativedelta(seconds=30)
1331
message = get_relative_expiration_time(remaining)
1332
self.assertEqual(message, '1 hour')
1333
1334
def test_with_minutes_seconds(self):
1335
remaining = relativedelta(hours=1)
1336
message = get_relative_expiration_time(remaining)
1337
self.assertEqual(message, '1 hour')
1338
1339
def test_with_full_time(self):
1340
remaining = relativedelta(
1341
years=2, months=3, days=7, hours=11, minutes=44)
1342
message = get_relative_expiration_time(remaining)
1343
self.assertEqual(message, '2 years and 3 months')
1344
1345