Path: blob/develop/tests/unit/customizations/codeartifact/test_adapter_login.py
2637 views
import errno1import os2import re3import signal4import stat5import subprocess67from datetime import datetime8from dateutil.tz import tzlocal, tzutc9from dateutil.relativedelta import relativedelta1011from awscli.testutils import (12unittest, mock, skip_if_windows, FileCreator13)14from awscli.compat import urlparse, RawConfigParser15from awscli.customizations.codeartifact.login import (16BaseLogin, SwiftLogin, NuGetLogin, DotNetLogin, NpmLogin, PipLogin,17TwineLogin, get_relative_expiration_time, CommandFailedError18)192021class TestBaseLogin(unittest.TestCase):22def setUp(self):23self.domain = 'domain'24self.domain_owner = 'domain-owner'25self.package_format = 'npm'26self.repository = 'repository'27self.auth_token = 'auth-token'28self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)29+ relativedelta(minutes=9)).replace(microsecond=0)30self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \31'a2z.com/{format}/{repository}/'.format(32domain=self.domain,33domainOwner=self.domain_owner,34format=self.package_format,35repository=self.repository36)3738self.subprocess_utils = mock.Mock()3940self.test_subject = BaseLogin(41self.auth_token, self.expiration, self.endpoint,42self.domain, self.repository, self.subprocess_utils43)4445def test_login(self):46with self.assertRaises(NotImplementedError):47self.test_subject.login()4849def test_get_commands(self):50with self.assertRaises(NotImplementedError):51self.test_subject.get_commands(52self.endpoint, self.auth_token53)5455def test_run_commands_command_failed(self):56error_to_be_caught = subprocess.CalledProcessError(57returncode=1,58cmd=['cmd'],59output=None,60stderr=b'Command error message.'61)62self.subprocess_utils.run.side_effect = error_to_be_caught63with self.assertRaisesRegex(64CommandFailedError,65rf"{re.escape(str(error_to_be_caught))}"66rf" Stderr from command:\nCommand error message."67):68self.test_subject._run_commands('tool', ['cmd'])6970def test_run_commands_command_failed_redact_auth_token(self):71error_to_be_caught = subprocess.CalledProcessError(72returncode=1,73cmd=['cmd', 'with', 'auth-token', 'present'],74output=None,75stderr=b'Command error message.'76)77self.subprocess_utils.run.side_effect = error_to_be_caught78with self.assertRaisesRegex(79CommandFailedError,80(rf"(?=.*cmd)(?=.*with)(?!.*auth-token)(?=.*present)"81rf"(?=.*Stderr from command:\nCommand error message.)")82):83self.test_subject._run_commands('tool', ['cmd'])8485def test_run_commands_nonexistent_command(self):86self.subprocess_utils.run.side_effect = OSError(87errno.ENOENT, 'not found error'88)89tool = 'NotSupported'90with self.assertRaisesRegex(91ValueError, '%s was not found.' % tool):92self.test_subject._run_commands(tool, ['echo', tool])9394def test_run_commands_unhandled_error(self):95self.subprocess_utils.run.side_effect = OSError(96errno.ENOSYS, 'unhandled error'97)98tool = 'NotSupported'99with self.assertRaisesRegex(OSError, 'unhandled error'):100self.test_subject._run_commands(tool, ['echo', tool])101102103def handle_timeout(signum, frame, test_name):104raise TimeoutError(f"{test_name} timed out!!")105106107class TestSwiftLogin(unittest.TestCase):108109def setUp(self):110self.domain = 'domain'111self.domain_owner = 'domain-owner'112self.package_format = 'swift'113self.repository = 'repository'114self.auth_token = 'auth-token'115self.namespace = 'namespace'116self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)117+ relativedelta(minutes=9)).replace(microsecond=0)118self.endpoint = f'https://{self.domain}-{self.domain_owner}.codeartifact' \119f'.aws.a2z.com/{self.package_format}/{self.repository}/'120self.hostname = urlparse.urlparse(self.endpoint).hostname121self.new_entry = SwiftLogin.DEFAULT_NETRC_FMT.format(122hostname=self.hostname,123auth_token=self.auth_token124)125126self.file_creator = FileCreator()127self.test_netrc_path = self.file_creator.full_path('netrc')128self.get_netrc_path_patch = mock.patch(129'awscli.customizations.codeartifact.login.SwiftLogin'130'.get_netrc_path'131)132self.get_netrc_path_mock = self.get_netrc_path_patch.start()133self.get_netrc_path_mock.return_value = self.test_netrc_path134135self.base_command = ['swift', 'package-registry', 'set', self.endpoint]136self.macos_commands = [137self.base_command[:],138['swift', 'package-registry', 'login', self.endpoint + 'login',139'--token', self.auth_token]140]141self.non_macos_commands = [142self.base_command[:],143['swift', 'package-registry', 'login', self.endpoint + 'login']144]145146self.subprocess_utils = mock.Mock()147148self.test_subject = SwiftLogin(149self.auth_token, self.expiration, self.endpoint,150self.domain, self.repository, self.subprocess_utils151)152153def tearDown(self):154self.get_netrc_path_patch.stop()155self.file_creator.remove_all()156157def _assert_netrc_has_expected_content(self, expected_contents):158with open(self.test_netrc_path, 'r') as file:159actual_contents = file.read()160self.assertEqual(expected_contents, actual_contents)161162def _assert_netrc_has_expected_permissions(self):163file_stat = os.stat(self.test_netrc_path)164file_mode = file_stat.st_mode165self.assertTrue(stat.S_IRUSR & file_mode)166self.assertTrue(stat.S_IWUSR & file_mode)167168def test_get_netrc_path(self):169self.assertEqual(170SwiftLogin.get_netrc_path(),171self.test_netrc_path172)173174def test_regex_only_match_escaped_hostname(self):175pattern = re.compile(176SwiftLogin.NETRC_REGEX_FMT.format(177escaped_hostname=re.escape(self.hostname)178),179re.M180)181bad_endpoint = f'https://{self.domain}-{self.domain_owner}-codeartifact' \182f'-aws-a2z-com/{self.package_format}/{self.repository}/'183bad_hostname = urlparse.urlparse(bad_endpoint).hostname184bad_entry = SwiftLogin.DEFAULT_NETRC_FMT.format(185hostname=bad_hostname,186auth_token=self.auth_token187)188self.assertTrue(pattern.match(self.new_entry))189self.assertFalse(pattern.match(bad_entry))190191def test_create_netrc_if_not_exist(self):192self.assertFalse(os.path.exists(self.test_netrc_path))193self.test_subject._update_netrc_entry(194self.hostname,195'a new entry',196self.test_netrc_path197)198self.assertTrue(os.path.exists(self.test_netrc_path))199self._assert_netrc_has_expected_permissions()200self._assert_netrc_has_expected_content('a new entry\n')201202def test_replacement_token_has_backslash(self):203existing_content = (204f'machine {self.hostname} login token password expired-auth-token\n'205)206with open(self.test_netrc_path, 'w+') as f:207f.write(existing_content)208self.test_subject.auth_token = r'new-token_.\1\g<entry_start>\n\w'209# make sure it uses re.sub() to replace the token210self.test_subject._update_netrc_entry(211self.hostname,212'',213self.test_netrc_path214)215self.assertTrue(os.path.exists(self.test_netrc_path))216self._assert_netrc_has_expected_content(217f'machine {self.hostname} login token password {self.test_subject.auth_token}\n'218)219220def test_update_netrc_with_existing_entry(self):221existing_content = \222f'machine {self.hostname} login token password expired-auth-token\n'223224expected_content = f'{self.new_entry}\n'225with open(self.test_netrc_path, 'w+') as f:226f.write(existing_content)227228self.test_subject._update_netrc_entry(229self.hostname,230self.new_entry,231self.test_netrc_path232)233self._assert_netrc_has_expected_content(expected_content)234235def test_update_netrc_with_existing_entry_in_between(self):236existing_content = (237f'some random line above\n'238f'machine {self.hostname} login token password expired-auth-token\n'239f'some random line below\n'240)241242expected_content = (243f'some random line above\n'244f'{self.new_entry}\n'245f'some random line below\n'246)247with open(self.test_netrc_path, 'w+') as f:248f.write(existing_content)249250self.test_subject._update_netrc_entry(251self.hostname,252self.new_entry,253self.test_netrc_path254)255self._assert_netrc_has_expected_content(expected_content)256257def test_append_netrc_without_ending_newline(self):258existing_content = 'machine host login user password 1234'259260expected_content = (261f'machine host login user password 1234\n'262f'{self.new_entry}\n'263)264with open(self.test_netrc_path, 'w+') as f:265f.write(existing_content)266267self.test_subject._update_netrc_entry(268self.hostname,269self.new_entry,270self.test_netrc_path271)272self._assert_netrc_has_expected_content(expected_content)273274def test_append_netrc_with_ending_newline(self):275existing_content = 'machine host login user password 1234\n'276277expected_content = (278f'machine host login user password 1234\n'279f'{self.new_entry}\n'280)281with open(self.test_netrc_path, 'w+') as f:282f.write(existing_content)283284self.test_subject._update_netrc_entry(285self.hostname,286self.new_entry,287self.test_netrc_path288)289self._assert_netrc_has_expected_content(expected_content)290291def test_update_netrc_with_multiple_spaces_and_newlines(self):292existing_content = (293f' machine {self.hostname}\n'294f' login token \n'295f'password expired-auth-token \n'296f'\n'297f'machine example1.com\n'298f' login user1 \n'299f'password 1234\n'300)301expected_content = (302f' machine {self.hostname}\n'303f' login token \n'304f'password auth-token \n'305f'\n'306f'machine example1.com\n'307f' login user1 \n'308f'password 1234\n'309)310with open(self.test_netrc_path, 'w+') as f:311f.write(existing_content)312313self.test_subject._update_netrc_entry(314self.hostname,315self.new_entry,316self.test_netrc_path317)318self._assert_netrc_has_expected_content(expected_content)319320def test_update_netrc_with_multiple_existing_entries(self):321existing_content = (322f'machine {self.hostname} login token password expired-auth-token-1\n'323f'machine {self.hostname} login token password expired-auth-token-2\n'324)325expected_content = (326f'{self.new_entry}\n'327f'{self.new_entry}\n'328)329with open(self.test_netrc_path, 'w+') as f:330f.write(existing_content)331332self.test_subject._update_netrc_entry(333self.hostname,334self.new_entry,335self.test_netrc_path336)337self._assert_netrc_has_expected_content(expected_content)338339@mock.patch('awscli.customizations.codeartifact.login.is_macos', True)340def test_login_macos(self):341self.test_subject.login()342expected_calls = [343mock.call(344command,345capture_output=True,346check=True347) for command in self.macos_commands348]349self.subprocess_utils.run.assert_has_calls(350expected_calls, any_order=True351)352353@mock.patch('awscli.customizations.codeartifact.login.is_macos', False)354def test_login_non_macos(self):355self.test_subject.login()356expected_calls = [357mock.call(358command,359capture_output=True,360check=True361) for command in self.non_macos_commands362]363self.subprocess_utils.run.assert_has_calls(364expected_calls, any_order=True365)366367def test_login_swift_tooling_error(self):368self.subprocess_utils.run.side_effect = \369subprocess.CalledProcessError(370returncode=1, cmd='swift command', stderr=b''371)372with self.assertRaises(CommandFailedError):373self.test_subject.login()374375def test_login_swift_not_installed(self):376self.subprocess_utils.run.side_effect = OSError(377errno.ENOENT, 'not found error'378)379with self.assertRaisesRegex(380ValueError,381'swift was not found. Please verify installation.'):382self.test_subject.login()383384def test_get_scope(self):385expected_value = 'namespace'386scope = self.test_subject.get_scope(self.namespace)387self.assertEqual(scope, expected_value)388389def test_get_scope_none_namespace(self):390expected_value = None391scope = self.test_subject.get_scope(None)392self.assertEqual(scope, expected_value)393394def test_get_scope_invalid_leading_character(self):395with self.assertRaises(ValueError):396self.test_subject.get_scope(f'.{self.namespace}')397398def test_get_scope_invalid_length(self):399with self.assertRaises(ValueError):400self.test_subject.get_scope("a"*40)401402@mock.patch('awscli.customizations.codeartifact.login.is_macos', True)403def test_get_commands_macos(self):404commands = self.test_subject.get_commands(405self.endpoint, self.auth_token406)407self.assertCountEqual(commands, self.macos_commands)408409@mock.patch('awscli.customizations.codeartifact.login.is_macos', False)410def test_get_commands_non_macos(self):411commands = self.test_subject.get_commands(412self.endpoint, self.auth_token413)414self.assertCountEqual(commands, self.non_macos_commands)415416@mock.patch('awscli.customizations.codeartifact.login.is_macos', True)417def test_get_commands_with_scope_macos(self):418commands = self.test_subject.get_commands(419self.endpoint, self.auth_token, scope=self.namespace420)421self.macos_commands[0] += ['--scope', self.namespace]422self.assertCountEqual(commands, self.macos_commands)423424@mock.patch('awscli.customizations.codeartifact.login.is_macos', False)425def test_get_commands_with_scope_non_macos(self):426commands = self.test_subject.get_commands(427self.endpoint, self.auth_token, scope=self.namespace428)429self.non_macos_commands[0] += ['--scope', self.namespace]430self.assertCountEqual(commands, self.non_macos_commands)431432def test_login_dry_run(self):433self.test_subject.login(dry_run=True)434self.subprocess_utils.check_output.assert_not_called()435self.assertFalse(os.path.exists(self.test_netrc_path))436437438class TestNuGetLogin(unittest.TestCase):439_NUGET_INDEX_URL_FMT = NuGetLogin._NUGET_INDEX_URL_FMT440_NUGET_SOURCES_LIST_RESPONSE = b"""\441Registered Sources:4421. Source Name 1 [Enabled]443https://source1.com/index.json4442. Ab[.d7 $#!], [Disabled]445https://source2.com/index.json"""446447_NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING = b'1.' + b' ' * 10000 + b'a'448449_NUGET_SOURCES_LIST_RESPONSE_WITH_SPACE = b"""\450Registered Sources:4514521. Source Name 1 [Enabled]453https://source1.com/index.json4542. Ab[.d7 $#!], [Disabled]455https://source2.com/index.json"""456457def setUp(self):458self.domain = 'domain'459self.domain_owner = 'domain-owner'460self.package_format = 'nuget'461self.repository = 'repository'462self.auth_token = 'auth-token'463self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)464+ relativedelta(minutes=9)).replace(microsecond=0)465self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \466'a2z.com/{format}/{repository}/'.format(467domain=self.domain,468domainOwner=self.domain_owner,469format=self.package_format,470repository=self.repository471)472473self.nuget_index_url = self._NUGET_INDEX_URL_FMT.format(474endpoint=self.endpoint,475)476self.source_name = self.domain + '/' + self.repository477self.list_operation_command = [478'nuget', 'sources', 'list',479'-format', 'detailed',480]481self.add_operation_command = [482'nuget', 'sources', 'add',483'-name', self.source_name,484'-source', self.nuget_index_url,485'-username', 'aws',486'-password', self.auth_token487]488self.update_operation_command = [489'nuget', 'sources', 'update',490'-name', self.source_name,491'-source', self.nuget_index_url,492'-username', 'aws',493'-password', self.auth_token494]495496self.subprocess_utils = mock.Mock()497498self.test_subject = NuGetLogin(499self.auth_token, self.expiration, self.endpoint,500self.domain, self.repository, self.subprocess_utils501)502503def test_login(self):504self.subprocess_utils.check_output.return_value = \505self._NUGET_SOURCES_LIST_RESPONSE506self.test_subject.login()507self.subprocess_utils.check_output.assert_any_call(508self.list_operation_command,509stderr=self.subprocess_utils.PIPE510)511self.subprocess_utils.run.assert_called_with(512self.add_operation_command,513capture_output=True,514check=True515)516517def test_login_dry_run(self):518self.subprocess_utils.check_output.return_value = \519self._NUGET_SOURCES_LIST_RESPONSE520self.test_subject.login(dry_run=True)521self.subprocess_utils.check_output.assert_called_once_with(522['nuget', 'sources', 'list', '-format', 'detailed'],523stderr=self.subprocess_utils.PIPE524)525526def test_login_old_nuget(self):527self.subprocess_utils.check_output.return_value = \528self._NUGET_SOURCES_LIST_RESPONSE_WITH_SPACE529self.test_subject.login()530self.subprocess_utils.check_output.assert_any_call(531self.list_operation_command,532stderr=self.subprocess_utils.PIPE533)534self.subprocess_utils.run.assert_called_with(535self.add_operation_command,536capture_output=True,537check=True538)539540def test_login_dry_run_old_nuget(self):541self.subprocess_utils.check_output.return_value = \542self._NUGET_SOURCES_LIST_RESPONSE_WITH_SPACE543self.test_subject.login(dry_run=True)544self.subprocess_utils.check_output.assert_called_once_with(545['nuget', 'sources', 'list', '-format', 'detailed'],546stderr=self.subprocess_utils.PIPE547)548549def test_login_source_name_already_exists(self):550list_response = 'Registered Sources:\n' \551' 1. ' + self.source_name + ' [ENABLED]\n' \552' https://source.com/index.json'553self.subprocess_utils.check_output.return_value = \554list_response.encode('utf-8')555self.test_subject.login()556self.subprocess_utils.run.assert_called_with(557self.update_operation_command,558capture_output=True,559check=True560)561562def test_login_source_url_already_exists_old_nuget(self):563non_default_source_name = 'Source Name'564list_response = 'Registered Sources:\n' \565'\n' \566' 1. ' + non_default_source_name + ' [ENABLED]\n' \567' ' + self.nuget_index_url568self.subprocess_utils.check_output.return_value = \569list_response.encode('utf-8')570self.test_subject.login()571self.subprocess_utils.run.assert_called_with(572[573'nuget', 'sources', 'update',574'-name', non_default_source_name,575'-source', self.nuget_index_url,576'-username', 'aws',577'-password', self.auth_token578],579capture_output=True,580check=True581)582583def test_login_source_url_already_exists(self):584non_default_source_name = 'Source Name'585list_response = 'Registered Sources:\n' \586' 1. ' + non_default_source_name + ' [ENABLED]\n' \587' ' + self.nuget_index_url588self.subprocess_utils.check_output.return_value = \589list_response.encode('utf-8')590self.test_subject.login()591self.subprocess_utils.run.assert_called_with(592[593'nuget', 'sources', 'update',594'-name', non_default_source_name,595'-source', self.nuget_index_url,596'-username', 'aws',597'-password', self.auth_token598],599capture_output=True,600check=True601)602603def test_login_nuget_not_installed(self):604self.subprocess_utils.check_output.side_effect = OSError(605errno.ENOENT, 'not found error'606)607with self.assertRaisesRegex(608ValueError,609'nuget was not found. Please verify installation.'):610self.test_subject.login()611612@skip_if_windows("Windows does not support signal.SIGALRM.")613def test_login_nuget_sources_listed_with_backtracking(self):614self.subprocess_utils.check_output.return_value = \615self._NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING616signal.signal(617signal.SIGALRM,618lambda signum, frame: handle_timeout(signum, frame, self.id()))619signal.alarm(10)620self.test_subject.login()621signal.alarm(0)622self.subprocess_utils.check_output.assert_any_call(623self.list_operation_command,624stderr=self.subprocess_utils.PIPE625)626627628class TestDotNetLogin(unittest.TestCase):629_NUGET_INDEX_URL_FMT = NuGetLogin._NUGET_INDEX_URL_FMT630_NUGET_SOURCES_LIST_RESPONSE = b"""\631Registered Sources:6321. Source Name 1 [Enabled]633https://source1.com/index.json6342. Ab[.d7 $#!], [Disabled]635https://source2.com/index.json"""636637_NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING = b'1.' + b' ' * 10000 + b'a'638639_NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT = b"""\640Welcome to dotnet 2.0!641642Registered Sources:6431. Source Name 1 [Enabled]644https://source1.com/index.json6452. ati-nugetserver [Disabled]646http://atinugetserver-env.elasticbeanstalk.com/nuget647warn : You are running the 'list source' operation with an 'HTTP' source,648'ati-nugetserver' [http://atinugetserver-env..elasticbeanstalk.com/nuget]'.649Non-HTTPS access will be removed in a future version. Consider migrating650to an 'HTTPS' source."""651652def setUp(self):653self.domain = 'domain'654self.domain_owner = 'domain-owner'655self.package_format = 'nuget'656self.repository = 'repository'657self.auth_token = 'auth-token'658self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)659+ relativedelta(minutes=9)).replace(microsecond=0)660self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \661'a2z.com/{format}/{repository}/'.format(662domain=self.domain,663domainOwner=self.domain_owner,664format=self.package_format,665repository=self.repository666)667668self.nuget_index_url = self._NUGET_INDEX_URL_FMT.format(669endpoint=self.endpoint,670)671self.source_name = self.domain + '/' + self.repository672self.list_operation_command = [673'dotnet', 'nuget', 'list', 'source', '--format', 'detailed'674]675self.add_operation_command_windows = [676'dotnet', 'nuget', 'add', 'source', self.nuget_index_url,677'--name', self.source_name,678'--username', 'aws',679'--password', self.auth_token680]681self.add_operation_command_non_windows = [682'dotnet', 'nuget', 'add', 'source', self.nuget_index_url,683'--name', self.source_name,684'--username', 'aws',685'--password', self.auth_token,686'--store-password-in-clear-text'687]688self.update_operation_command_windows = [689'dotnet', 'nuget', 'update', 'source', self.source_name,690'--source', self.nuget_index_url,691'--username', 'aws',692'--password', self.auth_token693]694self.update_operation_command_non_windows = [695'dotnet', 'nuget', 'update', 'source', self.source_name,696'--source', self.nuget_index_url,697'--username', 'aws',698'--password', self.auth_token,699'--store-password-in-clear-text'700]701702self.subprocess_utils = mock.Mock()703704self.test_subject = DotNetLogin(705self.auth_token, self.expiration, self.endpoint,706self.domain, self.repository, self.subprocess_utils707)708709@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)710def test_login(self):711self.subprocess_utils.check_output.return_value = \712self._NUGET_SOURCES_LIST_RESPONSE713self.test_subject.login()714self.subprocess_utils.check_output.assert_any_call(715self.list_operation_command,716stderr=self.subprocess_utils.PIPE717)718self.subprocess_utils.run.assert_called_with(719self.add_operation_command_non_windows,720capture_output=True,721check=True722)723724@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)725def test_login_on_windows(self):726self.subprocess_utils.check_output.return_value = \727self._NUGET_SOURCES_LIST_RESPONSE728self.test_subject.login()729self.subprocess_utils.check_output.assert_any_call(730self.list_operation_command,731stderr=self.subprocess_utils.PIPE732)733self.subprocess_utils.run.assert_called_with(734self.add_operation_command_windows,735capture_output=True,736check=True737)738739def test_login_dry_run(self):740self.subprocess_utils.check_output.return_value = \741self._NUGET_SOURCES_LIST_RESPONSE742self.test_subject.login(dry_run=True)743self.subprocess_utils.check_output.assert_called_once_with(744self.list_operation_command,745stderr=self.subprocess_utils.PIPE746)747748@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)749def test_login_sources_listed_with_extra_non_list_text(self):750self.subprocess_utils.check_output.return_value = \751self._NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT752self.test_subject.login()753self.subprocess_utils.check_output.assert_any_call(754self.list_operation_command,755stderr=self.subprocess_utils.PIPE756)757self.subprocess_utils.run.assert_called_with(758self.add_operation_command_non_windows,759capture_output=True,760check=True761)762763@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)764def test_login_sources_listed_with_extra_non_list_text_on_windows(self):765self.subprocess_utils.check_output.return_value = \766self._NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT767self.test_subject.login()768self.subprocess_utils.check_output.assert_any_call(769self.list_operation_command,770stderr=self.subprocess_utils.PIPE771)772self.subprocess_utils.run.assert_called_with(773self.add_operation_command_windows,774capture_output=True,775check=True776)777778def test_login_sources_listed_with_extra_non_list_text_dry_run(self):779self.subprocess_utils.check_output.return_value = \780self._NUGET_SOURCES_LIST_RESPONSE_WITH_EXTRA_NON_LIST_TEXT781self.test_subject.login(dry_run=True)782self.subprocess_utils.check_output.assert_called_once_with(783self.list_operation_command,784stderr=self.subprocess_utils.PIPE785)786787788@skip_if_windows("Windows does not support signal.SIGALRM.")789@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)790def test_login_dotnet_sources_listed_with_backtracking(self):791self.subprocess_utils.check_output.return_value = \792self._NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING793signal.signal(794signal.SIGALRM,795lambda signum, frame: handle_timeout(signum, frame, self.id()))796signal.alarm(10)797self.test_subject.login()798signal.alarm(0)799800@skip_if_windows("Windows does not support signal.SIGALRM.")801@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)802def test_login_dotnet_sources_listed_with_backtracking_windows(self):803self.subprocess_utils.check_output.return_value = \804self._NUGET_SOURCES_LIST_RESPONSE_BACKTRACKING805signal.signal(806signal.SIGALRM,807lambda signum, frame: handle_timeout(signum, frame, self.id()))808signal.alarm(10)809self.test_subject.login()810signal.alarm(0)811812@mock.patch('awscli.customizations.codeartifact.login.is_windows', False)813def test_login_source_name_already_exists(self):814list_response = 'Registered Sources:\n' \815' 1. ' + self.source_name + ' [ENABLED]\n' \816' https://source.com/index.json'817self.subprocess_utils.check_output.return_value = \818list_response.encode('utf-8')819self.test_subject.login()820self.subprocess_utils.run.assert_called_with(821self.update_operation_command_non_windows,822capture_output=True,823check=True824)825826@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)827def test_login_source_name_already_exists_on_windows(self):828list_response = 'Registered Sources:\n' \829' 1. ' + self.source_name + ' [ENABLED]\n' \830' https://source.com/index.json'831self.subprocess_utils.check_output.return_value = \832list_response.encode('utf-8')833self.test_subject.login()834self.subprocess_utils.run.assert_called_with(835self.update_operation_command_windows,836capture_output=True,837check=True838)839840@mock.patch('awscli.customizations.codeartifact.login.is_windows', True)841def test_login_source_url_already_exists(self):842non_default_source_name = 'Source Name'843list_response = 'Registered Sources:\n' \844' 1. ' + non_default_source_name + ' [ENABLED]\n' \845' ' + self.nuget_index_url846self.subprocess_utils.check_output.return_value = \847list_response.encode('utf-8')848self.test_subject.login()849self.subprocess_utils.run.assert_called_with(850[851'dotnet', 'nuget', 'update', 'source', non_default_source_name,852'--source', self.nuget_index_url,853'--username', 'aws',854'--password', self.auth_token855],856capture_output=True,857check=True858)859860def test_login_dotnet_not_installed(self):861self.subprocess_utils.check_output.side_effect = OSError(862errno.ENOENT, 'not found error'863)864with self.assertRaisesRegex(865ValueError,866'dotnet was not found. Please verify installation.'):867self.test_subject.login()868869870class TestNpmLogin(unittest.TestCase):871872NPM_CMD = NpmLogin.NPM_CMD873874def setUp(self):875self.domain = 'domain'876self.domain_owner = 'domain-owner'877self.package_format = 'npm'878self.repository = 'repository'879self.auth_token = 'auth-token'880self.namespace = 'namespace'881self.expiration = (datetime.now(tzlocal()) + relativedelta(hours=10)882+ relativedelta(minutes=9)).replace(microsecond=0)883self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \884'a2z.com/{format}/{repository}/'.format(885domain=self.domain,886domainOwner=self.domain_owner,887format=self.package_format,888repository=self.repository889)890891repo_uri = urlparse.urlsplit(self.endpoint)892always_auth_config = '//{}{}:always-auth'.format(893repo_uri.netloc, repo_uri.path894)895auth_token_config = '//{}{}:_authToken'.format(896repo_uri.netloc, repo_uri.path897)898self.commands = []899self.commands.append([900self.NPM_CMD, 'config', 'set', 'registry', self.endpoint901])902self.commands.append(903[self.NPM_CMD, 'config', 'set', always_auth_config, 'true']904)905self.commands.append(906[self.NPM_CMD, 'config', 'set', auth_token_config, self.auth_token]907)908909self.subprocess_utils = mock.Mock()910911self.test_subject = NpmLogin(912self.auth_token, self.expiration, self.endpoint,913self.domain, self.repository, self.subprocess_utils914)915916def test_login(self):917self.test_subject.login()918expected_calls = [919mock.call(920command,921capture_output=True,922check=True923) for command in self.commands924]925self.subprocess_utils.run.assert_has_calls(926expected_calls, any_order=True927)928929def test_login_always_auth_error_ignored(self):930"""Test login ignores error for always-auth.931932This test is for NPM version >= 9 where the support of 'always-auth'933has been dropped. Running the command to set config gives a non-zero934exit code. This is to make sure that login ignores that error and all935other commands executes successfully.936"""937def side_effect(command, capture_output, check):938"""Set side_effect for always-auth config setting command"""939if any('always-auth' in arg for arg in command):940raise subprocess.CalledProcessError(941returncode=1,942cmd=command943)944945return mock.DEFAULT946947self.subprocess_utils.run.side_effect = side_effect948expected_calls = []949950for command in self.commands:951expected_calls.append(mock.call(952command,953capture_output=True,954check=True955)956)957self.test_subject.login()958959self.subprocess_utils.run.assert_has_calls(960expected_calls, any_order=True961)962963def test_get_scope(self):964expected_value = '@{}'.format(self.namespace)965scope = self.test_subject.get_scope(self.namespace)966self.assertEqual(scope, expected_value)967968def test_get_scope_none_namespace(self):969expected_value = None970scope = self.test_subject.get_scope(None)971self.assertEqual(scope, expected_value)972973def test_get_scope_invalid_name(self):974with self.assertRaises(ValueError):975self.test_subject.get_scope('.{}'.format(self.namespace))976977def test_get_scope_without_prefix(self):978expected_value = '@{}'.format(self.namespace)979scope = self.test_subject.get_scope('@{}'.format(self.namespace))980self.assertEqual(scope, expected_value)981982def test_get_commands(self):983commands = self.test_subject.get_commands(984self.endpoint, self.auth_token985)986self.assertCountEqual(commands, self.commands)987988def test_get_commands_with_scope(self):989commands = self.test_subject.get_commands(990self.endpoint, self.auth_token, scope=self.namespace991)992self.commands[0][3] = '{}:registry'.format(self.namespace)993self.assertCountEqual(commands, self.commands)994995def test_login_dry_run(self):996self.test_subject.login(dry_run=True)997self.subprocess_utils.check_call.assert_not_called()9989991000class TestPipLogin(unittest.TestCase):10011002PIP_INDEX_URL_FMT = PipLogin.PIP_INDEX_URL_FMT10031004def setUp(self):1005self.domain = 'domain'1006self.domain_owner = 'domain-owner'1007self.package_format = 'pip'1008self.repository = 'repository'1009self.auth_token = 'auth-token'1010self.expiration = (datetime.now(tzlocal()) + relativedelta(years=1)1011+ relativedelta(months=9)).replace(microsecond=0)1012self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \1013'a2z.com/{format}/{repository}/'.format(1014domain=self.domain,1015domainOwner=self.domain_owner,1016format=self.package_format,1017repository=self.repository1018)10191020repo_uri = urlparse.urlsplit(self.endpoint)1021self.pip_index_url = self.PIP_INDEX_URL_FMT.format(1022scheme=repo_uri.scheme,1023auth_token=self.auth_token,1024netloc=repo_uri.netloc,1025path=repo_uri.path1026)10271028self.subprocess_utils = mock.Mock()10291030self.test_subject = PipLogin(1031self.auth_token, self.expiration, self.endpoint,1032self.domain, self.repository, self.subprocess_utils1033)10341035def test_get_commands(self):1036expected_commands = [1037['pip', 'config', 'set', 'global.index-url', self.pip_index_url]1038]1039commands = self.test_subject.get_commands(1040self.endpoint, self.auth_token1041)1042self.assertCountEqual(commands, expected_commands)10431044def test_login(self):1045self.test_subject.login()1046self.subprocess_utils.run.assert_called_once_with(1047['pip', 'config', 'set', 'global.index-url', self.pip_index_url],1048capture_output=True,1049check=True1050)10511052def test_login_dry_run(self):1053self.test_subject.login(dry_run=True)1054self.subprocess_utils.run.assert_not_called()105510561057class TestTwineLogin(unittest.TestCase):10581059DEFAULT_PYPI_RC_FMT = TwineLogin.DEFAULT_PYPI_RC_FMT10601061def setUp(self):1062self.file_creator = FileCreator()1063self.domain = 'domain'1064self.domain_owner = 'domain-owner'1065self.package_format = 'pip'1066self.repository = 'repository'1067self.auth_token = 'auth-token'1068self.expiration = (datetime.now(tzlocal()) + relativedelta(years=1)1069+ relativedelta(months=9)).replace(microsecond=0)1070self.endpoint = 'https://{domain}-{domainOwner}.codeartifact.aws.' \1071'a2z.com/{format}/{repository}/'.format(1072domain=self.domain,1073domainOwner=self.domain_owner,1074format=self.package_format,1075repository=self.repository1076)1077self.default_pypi_rc = self.DEFAULT_PYPI_RC_FMT.format(1078repository_endpoint=self.endpoint,1079auth_token=self.auth_token1080)1081self.subprocess_utils = mock.Mock()1082self.test_pypi_rc_path = self.file_creator.full_path('pypirc')1083if not os.path.isdir(os.path.dirname(self.test_pypi_rc_path)):1084os.makedirs(os.path.dirname(self.test_pypi_rc_path))10851086self.test_subject = TwineLogin(1087self.auth_token,1088self.expiration,1089self.endpoint,1090self.domain,1091self.repository,1092self.subprocess_utils,1093self.test_pypi_rc_path1094)10951096def tearDown(self):1097self.file_creator.remove_all()10981099def _assert_pypi_rc_has_expected_content(1100self, pypi_rc_str, server, repo_url=None, username=None, password=None1101):1102pypi_rc = RawConfigParser()1103pypi_rc.read_string(pypi_rc_str)11041105self.assertIn('distutils', pypi_rc.sections())1106self.assertIn('index-servers', pypi_rc.options('distutils'))1107index_servers = pypi_rc.get('distutils', 'index-servers')1108index_servers = [1109index_server.strip()1110for index_server1111in index_servers.split('\n')1112if index_server.strip() != ''1113]1114self.assertIn(server, index_servers)11151116if repo_url or username or password:1117self.assertIn(server, pypi_rc.sections())11181119if repo_url:1120self.assertIn('repository', pypi_rc.options(server))1121self.assertEqual(pypi_rc.get(server, 'repository'), repo_url)11221123if username:1124self.assertIn('username', pypi_rc.options(server))1125self.assertEqual(pypi_rc.get(server, 'username'), username)11261127if password:1128self.assertIn('password', pypi_rc.options(server))1129self.assertEqual(pypi_rc.get(server, 'password'), password)11301131def test_get_pypi_rc_path(self):1132self.assertEqual(1133TwineLogin.get_pypi_rc_path(),1134os.path.join(os.path.expanduser("~"), ".pypirc")1135)11361137def test_login_pypi_rc_not_found_defaults_set(self):1138self.test_subject.login()11391140with open(self.test_pypi_rc_path) as f:1141test_pypi_rc_str = f.read()11421143self._assert_pypi_rc_has_expected_content(1144pypi_rc_str=test_pypi_rc_str,1145server='codeartifact',1146repo_url=self.endpoint,1147username='aws',1148password=self.auth_token1149)11501151def test_login_dry_run(self):1152self.test_subject.login(dry_run=True)1153self.subprocess_utils.run.assert_not_called()1154self.assertFalse(os.path.exists(self.test_pypi_rc_path))11551156def test_login_existing_pypi_rc_not_clobbered(self):1157existing_pypi_rc = '''\1158[distutils]1159index-servers=1160pypi1161test11621163[pypi]1164repository: http://www.python.org/pypi/1165username: monty1166password: JgCXIr5xGG11671168[test]1169repository: http://example.com/test/1170username: testusername1171password: testpassword1172'''11731174with open(self.test_pypi_rc_path, 'w+') as f:1175f.write(existing_pypi_rc)11761177self.test_subject.login()11781179with open(self.test_pypi_rc_path) as f:1180test_pypi_rc_str = f.read()11811182self._assert_pypi_rc_has_expected_content(1183pypi_rc_str=test_pypi_rc_str,1184server='codeartifact',1185repo_url=self.endpoint,1186username='aws',1187password=self.auth_token1188)11891190self._assert_pypi_rc_has_expected_content(1191pypi_rc_str=test_pypi_rc_str,1192server='pypi',1193repo_url='http://www.python.org/pypi/',1194username='monty',1195password='JgCXIr5xGG'1196)11971198self._assert_pypi_rc_has_expected_content(1199pypi_rc_str=test_pypi_rc_str,1200server='test',1201repo_url='http://example.com/test/',1202username='testusername',1203password='testpassword'1204)12051206def test_login_existing_pypi_rc_with_codeartifact_not_clobbered(self):1207existing_pypi_rc = '''\1208[distutils]1209index-servers=1210pypi1211codeartifact12121213[pypi]1214repository: http://www.python.org/pypi/1215username: monty1216password: JgCXIr5xGG12171218[codeartifact]1219repository: https://test-testOwner.codeartifact.aws.a2z.com/pypi/testRepo/1220username: aws1221password: expired_token1222'''12231224with open(self.test_pypi_rc_path, 'w+') as f:1225f.write(existing_pypi_rc)12261227self.test_subject.login()12281229with open(self.test_pypi_rc_path) as f:1230test_pypi_rc_str = f.read()12311232self._assert_pypi_rc_has_expected_content(1233pypi_rc_str=test_pypi_rc_str,1234server='codeartifact',1235repo_url=self.endpoint,1236username='aws',1237password=self.auth_token1238)12391240self._assert_pypi_rc_has_expected_content(1241pypi_rc_str=test_pypi_rc_str,1242server='pypi',1243repo_url='http://www.python.org/pypi/',1244username='monty',1245password='JgCXIr5xGG'1246)12471248def test_login_existing_invalid_pypi_rc_error(self):1249# This is an invalid pypirc as the list of servers are expected under1250# an 'index-servers' option instead of 'servers'.1251existing_pypi_rc = '''\1252[distutils]1253servers=1254pypi12551256[pypi]1257repository: http://www.python.org/pypi/1258username: monty1259password: JgCXIr5xGG1260'''12611262with open(self.test_pypi_rc_path, 'w+') as f:1263f.write(existing_pypi_rc)12641265with open(self.test_pypi_rc_path) as f:1266original_content = f.read()12671268with self.assertRaises(Exception):1269self.test_subject.login()12701271# We should just leave the pypirc untouched when it's invalid.1272with open(self.test_pypi_rc_path) as f:1273self.assertEqual(f.read(), original_content)127412751276class TestRelativeExpirationTime(unittest.TestCase):12771278def test_with_years_months_days(self):1279remaining = relativedelta(years=1, months=9)1280message = get_relative_expiration_time(remaining)1281self.assertEqual(message, '1 year and 9 months')12821283def test_with_years_months(self):1284remaining = relativedelta(years=1, months=8, days=30, hours=23,1285minutes=59, seconds=30)1286message = get_relative_expiration_time(remaining)1287self.assertEqual(message, '1 year and 8 months')12881289def test_with_years_month(self):1290remaining = relativedelta(years=3, days=30, hours=23,1291minutes=59, seconds=30)1292message = get_relative_expiration_time(remaining)1293self.assertEqual(message, '3 years')12941295def test_with_years_days(self):1296remaining = relativedelta(years=1, days=9)1297message = get_relative_expiration_time(remaining)1298self.assertEqual(message, '1 year')12991300def test_with_year(self):1301remaining = relativedelta(months=11, days=30)1302message = get_relative_expiration_time(remaining)1303self.assertEqual(message, '11 months and 30 days')13041305def test_with_years(self):1306remaining = relativedelta(years=1, months=11)1307message = get_relative_expiration_time(remaining)1308self.assertEqual(message, '1 year and 11 months')13091310def test_with_years_days_hours_minutes(self):1311remaining = relativedelta(years=2, days=7, hours=11, minutes=44)1312message = get_relative_expiration_time(remaining)1313self.assertEqual(message, '2 years')13141315def test_with_days_minutes(self):1316remaining = relativedelta(days=1, minutes=44)1317message = get_relative_expiration_time(remaining)1318self.assertEqual(message, '1 day')13191320def test_with_day(self):1321remaining = relativedelta(days=1)1322message = get_relative_expiration_time(remaining)1323self.assertEqual(message, '1 day')13241325def test_with_hour(self):1326self.expiration = (datetime.now(tzlocal())1327+ relativedelta(hours=1)).replace(microsecond=0)1328remaining = relativedelta(1329self.expiration, datetime.now(tzutc())) + relativedelta(seconds=30)1330message = get_relative_expiration_time(remaining)1331self.assertEqual(message, '1 hour')13321333def test_with_minutes_seconds(self):1334remaining = relativedelta(hours=1)1335message = get_relative_expiration_time(remaining)1336self.assertEqual(message, '1 hour')13371338def test_with_full_time(self):1339remaining = relativedelta(1340years=2, months=3, days=7, hours=11, minutes=44)1341message = get_relative_expiration_time(remaining)1342self.assertEqual(message, '2 years and 3 months')134313441345