Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aws
GitHub Repository: aws/aws-cli
Path: blob/develop/awscli/customizations/codeartifact/login.py
2637 views
1
import errno
2
import os
3
import platform
4
import sys
5
import subprocess
6
import re
7
8
from datetime import datetime
9
from dateutil.tz import tzutc
10
from dateutil.relativedelta import relativedelta
11
from botocore.utils import parse_timestamp
12
13
from awscli.compat import (
14
is_windows, urlparse, RawConfigParser, StringIO,
15
get_stderr_encoding, is_macos
16
)
17
from awscli.customizations import utils as cli_utils
18
from awscli.customizations.commands import BasicCommand
19
from awscli.customizations.utils import uni_print
20
21
22
def get_relative_expiration_time(remaining):
23
values = []
24
prev_non_zero_attr = False
25
for attr in ["years", "months", "days", "hours", "minutes"]:
26
value = getattr(remaining, attr)
27
if value > 0:
28
if prev_non_zero_attr:
29
values.append("and")
30
values.append(str(value))
31
values.append(attr[:-1] if value == 1 else attr)
32
if prev_non_zero_attr:
33
break
34
prev_non_zero_attr = value > 0
35
36
message = " ".join(values)
37
return message
38
39
40
class CommandFailedError(Exception):
41
def __init__(self, called_process_error, auth_token):
42
msg = str(called_process_error).replace(auth_token, '******')
43
if called_process_error.stderr is not None:
44
msg +=(
45
f' Stderr from command:\n'
46
f'{called_process_error.stderr.decode(get_stderr_encoding())}'
47
)
48
Exception.__init__(self, msg)
49
50
51
class BaseLogin(object):
52
_TOOL_NOT_FOUND_MESSAGE = '%s was not found. Please verify installation.'
53
54
def __init__(self, auth_token, expiration, repository_endpoint,
55
domain, repository, subprocess_utils, namespace=None):
56
self.auth_token = auth_token
57
self.expiration = expiration
58
self.repository_endpoint = repository_endpoint
59
self.domain = domain
60
self.repository = repository
61
self.subprocess_utils = subprocess_utils
62
self.namespace = namespace
63
64
def login(self, dry_run=False):
65
raise NotImplementedError('login()')
66
67
def _dry_run_commands(self, tool, commands):
68
for command in commands:
69
sys.stdout.write(' '.join(command))
70
sys.stdout.write(os.linesep)
71
sys.stdout.write(os.linesep)
72
73
def _write_success_message(self, tool):
74
# add extra 30 seconds make expiration more reasonable
75
# for some corner case
76
# e.g. 11 hours 59 minutes 31 seconds should output --> 12 hours.
77
remaining = relativedelta(
78
self.expiration, datetime.now(tzutc())) + relativedelta(seconds=30)
79
expiration_message = get_relative_expiration_time(remaining)
80
81
sys.stdout.write('Successfully configured {} to use '
82
'AWS CodeArtifact repository {} '
83
.format(tool, self.repository_endpoint))
84
sys.stdout.write(os.linesep)
85
sys.stdout.write('Login expires in {} at {}'.format(
86
expiration_message, self.expiration))
87
sys.stdout.write(os.linesep)
88
89
def _run_commands(self, tool, commands, dry_run=False):
90
if dry_run:
91
self._dry_run_commands(tool, commands)
92
return
93
94
for command in commands:
95
self._run_command(tool, command)
96
97
self._write_success_message(tool)
98
99
def _run_command(self, tool, command, *, ignore_errors=False):
100
try:
101
self.subprocess_utils.run(
102
command,
103
capture_output=True,
104
check=True
105
)
106
except subprocess.CalledProcessError as ex:
107
if not ignore_errors:
108
raise CommandFailedError(ex, self.auth_token)
109
except OSError as ex:
110
if ex.errno == errno.ENOENT:
111
raise ValueError(
112
self._TOOL_NOT_FOUND_MESSAGE % tool
113
)
114
raise ex
115
116
@classmethod
117
def get_commands(cls, endpoint, auth_token, **kwargs):
118
raise NotImplementedError('get_commands()')
119
120
121
class SwiftLogin(BaseLogin):
122
123
DEFAULT_NETRC_FMT = \
124
u'machine {hostname} login token password {auth_token}'
125
126
NETRC_REGEX_FMT = \
127
r'(?P<entry_start>\bmachine\s+{escaped_hostname}\s+login\s+\S+\s+password\s+)' \
128
r'(?P<token>\S+)'
129
130
def login(self, dry_run=False):
131
scope = self.get_scope(
132
self.namespace
133
)
134
commands = self.get_commands(
135
self.repository_endpoint, self.auth_token, scope=scope
136
)
137
138
if not is_macos:
139
hostname = urlparse.urlparse(self.repository_endpoint).hostname
140
new_entry = self.DEFAULT_NETRC_FMT.format(
141
hostname=hostname,
142
auth_token=self.auth_token
143
)
144
if dry_run:
145
self._display_new_netrc_entry(new_entry, self.get_netrc_path())
146
else:
147
self._update_netrc_entry(hostname, new_entry, self.get_netrc_path())
148
149
self._run_commands('swift', commands, dry_run)
150
151
def _display_new_netrc_entry(self, new_entry, netrc_path):
152
sys.stdout.write('Dryrun mode is enabled, not writing to netrc.')
153
sys.stdout.write(os.linesep)
154
sys.stdout.write(
155
f'The following line would have been written to {netrc_path}:'
156
)
157
sys.stdout.write(os.linesep)
158
sys.stdout.write(os.linesep)
159
sys.stdout.write(new_entry)
160
sys.stdout.write(os.linesep)
161
sys.stdout.write(os.linesep)
162
sys.stdout.write('And would have run the following commands:')
163
sys.stdout.write(os.linesep)
164
sys.stdout.write(os.linesep)
165
166
def _update_netrc_entry(self, hostname, new_entry, netrc_path):
167
pattern = re.compile(
168
self.NETRC_REGEX_FMT.format(escaped_hostname=re.escape(hostname)),
169
re.M
170
)
171
if not os.path.isfile(netrc_path):
172
self._create_netrc_file(netrc_path, new_entry)
173
else:
174
with open(netrc_path, 'r') as f:
175
contents = f.read()
176
escaped_auth_token = self.auth_token.replace('\\', r'\\')
177
new_contents = re.sub(
178
pattern,
179
rf"\g<entry_start>{escaped_auth_token}",
180
contents
181
)
182
183
if new_contents == contents:
184
new_contents = self._append_netrc_entry(new_contents, new_entry)
185
186
with open(netrc_path, 'w') as f:
187
f.write(new_contents)
188
189
def _create_netrc_file(self, netrc_path, new_entry):
190
dirname = os.path.split(netrc_path)[0]
191
if not os.path.isdir(dirname):
192
os.makedirs(dirname)
193
with os.fdopen(os.open(netrc_path,
194
os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
195
f.write(new_entry + '\n')
196
197
def _append_netrc_entry(self, contents, new_entry):
198
if contents.endswith('\n'):
199
return contents + new_entry + '\n'
200
else:
201
return contents + '\n' + new_entry + '\n'
202
203
@classmethod
204
def get_netrc_path(cls):
205
return os.path.join(os.path.expanduser("~"), ".netrc")
206
207
@classmethod
208
def get_scope(cls, namespace):
209
# Regex for valid scope name
210
valid_scope_name = re.compile(
211
r'\A[a-zA-Z0-9](?:[a-zA-Z0-9]|-(?=[a-zA-Z0-9])){0,38}\Z'
212
)
213
214
if namespace is None:
215
return namespace
216
217
if not valid_scope_name.match(namespace):
218
raise ValueError(
219
'Invalid scope name, scope must contain URL-safe '
220
'characters, no leading dots or underscores and no '
221
'more than 39 characters'
222
)
223
224
return namespace
225
226
@classmethod
227
def get_commands(cls, endpoint, auth_token, **kwargs):
228
commands = []
229
scope = kwargs.get('scope')
230
231
# Set up the codeartifact repository as the swift registry.
232
set_registry_command = [
233
'swift', 'package-registry', 'set', endpoint
234
]
235
if scope is not None:
236
set_registry_command.extend(['--scope', scope])
237
commands.append(set_registry_command)
238
239
# Authenticate against the repository.
240
# We will write token to .netrc for Linux and Windows
241
# MacOS will store the token from command line option to Keychain
242
login_registry_command = [
243
'swift', 'package-registry', 'login', f'{endpoint}login'
244
]
245
if is_macos:
246
login_registry_command.extend(['--token', auth_token])
247
commands.append(login_registry_command)
248
249
return commands
250
251
252
class NuGetBaseLogin(BaseLogin):
253
_NUGET_INDEX_URL_FMT = '{endpoint}v3/index.json'
254
255
# When adding new sources we can specify that we added the source to the
256
# user level NuGet.Config file. However, when updating an existing source
257
# we cannot be specific about which level NuGet.Config file was updated
258
# because it is possible that the existing source was not in the user
259
# level NuGet.Config. The source listing command returns all configured
260
# sources from all NuGet.Config levels. The update command updates the
261
# source in whichever NuGet.Config file the source was found.
262
_SOURCE_ADDED_MESSAGE = 'Added source %s to the user level NuGet.Config\n'
263
_SOURCE_UPDATED_MESSAGE = 'Updated source %s in the NuGet.Config\n'
264
# Example line the below regex should match:
265
# 1. nuget.org [Enabled]
266
_SOURCE_REGEX = re.compile(r'^\d+\.\s(?P<source_name>.+)\s\[.*\]')
267
268
def login(self, dry_run=False):
269
try:
270
source_to_url_dict = self._get_source_to_url_dict()
271
except OSError as ex:
272
if ex.errno == errno.ENOENT:
273
raise ValueError(
274
self._TOOL_NOT_FOUND_MESSAGE % self._get_tool_name()
275
)
276
raise ex
277
278
nuget_index_url = self._NUGET_INDEX_URL_FMT.format(
279
endpoint=self.repository_endpoint
280
)
281
source_name, already_exists = self._get_source_name(
282
nuget_index_url, source_to_url_dict
283
)
284
285
if already_exists:
286
command = self._get_configure_command(
287
'update', nuget_index_url, source_name
288
)
289
source_configured_message = self._SOURCE_UPDATED_MESSAGE
290
else:
291
command = self._get_configure_command('add', nuget_index_url, source_name)
292
source_configured_message = self._SOURCE_ADDED_MESSAGE
293
294
if dry_run:
295
dry_run_command = ' '.join([str(cd) for cd in command])
296
uni_print(dry_run_command)
297
uni_print('\n')
298
return
299
300
try:
301
self.subprocess_utils.run(
302
command,
303
capture_output=True,
304
check=True
305
)
306
except subprocess.CalledProcessError as e:
307
uni_print('Failed to update the NuGet.Config\n')
308
raise CommandFailedError(e, self.auth_token)
309
310
uni_print(source_configured_message % source_name)
311
self._write_success_message('nuget')
312
313
def _get_source_to_url_dict(self):
314
"""
315
Parses the output of the nuget sources list command.
316
317
A dict is created where the keys are the source names
318
and the values the corresponding URL.
319
320
The output of the command can contain header and footer information
321
around the 'Registered Sources' section, which is ignored.
322
323
Example output that is parsed:
324
325
Registered Sources:
326
327
1. Source Name 1 [Enabled]
328
https://source1.com/index.json
329
2. Source Name 2 [Disabled]
330
https://source2.com/index.json
331
100. Source Name 100 [Activé]
332
https://source100.com/index.json
333
"""
334
response = self.subprocess_utils.check_output(
335
self._get_list_command(),
336
stderr=self.subprocess_utils.PIPE
337
)
338
339
lines = response.decode(os.device_encoding(1) or "utf-8").splitlines()
340
lines = [line for line in lines if line.strip() != '']
341
342
source_to_url_dict = {}
343
for i in range(len(lines)):
344
result = self._SOURCE_REGEX.match(lines[i].strip())
345
if result:
346
source_to_url_dict[result["source_name"].strip()] = \
347
lines[i + 1].strip()
348
349
return source_to_url_dict
350
351
def _get_source_name(self, codeartifact_url, source_dict):
352
default_name = '{}/{}'.format(self.domain, self.repository)
353
354
# Check if the CodeArtifact URL is already present in the
355
# NuGet.Config file. If the URL already exists, use the source name
356
# already assigned to the CodeArtifact URL.
357
for source_name, source_url in source_dict.items():
358
if source_url == codeartifact_url:
359
return source_name, True
360
361
# If the CodeArtifact URL is not present in the NuGet.Config file,
362
# check if the default source name already exists so we can know
363
# whether we need to add a new entry or update the existing entry.
364
for source_name in source_dict.keys():
365
if source_name == default_name:
366
return source_name, True
367
368
# If neither the source url nor the source name already exist in the
369
# NuGet.Config file, use the default source name.
370
return default_name, False
371
372
def _get_tool_name(self):
373
raise NotImplementedError('_get_tool_name()')
374
375
def _get_list_command(self):
376
raise NotImplementedError('_get_list_command()')
377
378
def _get_configure_command(self, operation, nuget_index_url, source_name):
379
raise NotImplementedError('_get_configure_command()')
380
381
382
class NuGetLogin(NuGetBaseLogin):
383
384
def _get_tool_name(self):
385
return 'nuget'
386
387
def _get_list_command(self):
388
return ['nuget', 'sources', 'list', '-format', 'detailed']
389
390
def _get_configure_command(self, operation, nuget_index_url, source_name):
391
return [
392
'nuget', 'sources', operation,
393
'-name', source_name,
394
'-source', nuget_index_url,
395
'-username', 'aws',
396
'-password', self.auth_token
397
]
398
399
400
class DotNetLogin(NuGetBaseLogin):
401
402
def _get_tool_name(self):
403
return 'dotnet'
404
405
def _get_list_command(self):
406
return ['dotnet', 'nuget', 'list', 'source', '--format', 'detailed']
407
408
def _get_configure_command(self, operation, nuget_index_url, source_name):
409
command = ['dotnet', 'nuget', operation, 'source']
410
411
if operation == 'add':
412
command.append(nuget_index_url)
413
command += ['--name', source_name]
414
else:
415
command.append(source_name)
416
command += ['--source', nuget_index_url]
417
418
command += [
419
'--username', 'aws',
420
'--password', self.auth_token
421
]
422
423
# Encryption is not supported on non-Windows platforms.
424
if not is_windows:
425
command.append('--store-password-in-clear-text')
426
427
return command
428
429
430
class NpmLogin(BaseLogin):
431
432
# On Windows we need to be explicit about the .cmd file to execute
433
# (unless we execute through the shell, i.e. with shell=True).
434
NPM_CMD = 'npm.cmd' if platform.system().lower() == 'windows' else 'npm'
435
436
def login(self, dry_run=False):
437
scope = self.get_scope(
438
self.namespace
439
)
440
commands = self.get_commands(
441
self.repository_endpoint, self.auth_token, scope=scope
442
)
443
self._run_commands('npm', commands, dry_run)
444
445
def _run_command(self, tool, command):
446
ignore_errors = any('always-auth' in arg for arg in command)
447
super()._run_command(tool, command, ignore_errors=ignore_errors)
448
449
@classmethod
450
def get_scope(cls, namespace):
451
# Regex for valid scope name
452
valid_scope_name = re.compile('^(@[a-z0-9-~][a-z0-9-._~]*)')
453
454
if namespace is None:
455
return namespace
456
457
# Add @ prefix to scope if it doesn't exist
458
if namespace.startswith('@'):
459
scope = namespace
460
else:
461
scope = '@{}'.format(namespace)
462
463
if not valid_scope_name.match(scope):
464
raise ValueError(
465
'Invalid scope name, scope must contain URL-safe '
466
'characters, no leading dots or underscores'
467
)
468
469
return scope
470
471
@classmethod
472
def get_commands(cls, endpoint, auth_token, **kwargs):
473
commands = []
474
scope = kwargs.get('scope')
475
476
# prepend scope if it exists
477
registry = '{}:registry'.format(scope) if scope else 'registry'
478
479
# set up the codeartifact repository as the npm registry.
480
commands.append(
481
[cls.NPM_CMD, 'config', 'set', registry, endpoint]
482
)
483
484
repo_uri = urlparse.urlsplit(endpoint)
485
486
# configure npm to always require auth for the repository.
487
always_auth_config = '//{}{}:always-auth'.format(
488
repo_uri.netloc, repo_uri.path
489
)
490
commands.append(
491
[cls.NPM_CMD, 'config', 'set', always_auth_config, 'true']
492
)
493
494
# set auth info for the repository.
495
auth_token_config = '//{}{}:_authToken'.format(
496
repo_uri.netloc, repo_uri.path
497
)
498
commands.append(
499
[cls.NPM_CMD, 'config', 'set', auth_token_config, auth_token]
500
)
501
502
return commands
503
504
505
class PipLogin(BaseLogin):
506
507
PIP_INDEX_URL_FMT = '{scheme}://aws:{auth_token}@{netloc}{path}simple/'
508
509
def login(self, dry_run=False):
510
commands = self.get_commands(
511
self.repository_endpoint, self.auth_token
512
)
513
self._run_commands('pip', commands, dry_run)
514
515
@classmethod
516
def get_commands(cls, endpoint, auth_token, **kwargs):
517
repo_uri = urlparse.urlsplit(endpoint)
518
pip_index_url = cls.PIP_INDEX_URL_FMT.format(
519
scheme=repo_uri.scheme,
520
auth_token=auth_token,
521
netloc=repo_uri.netloc,
522
path=repo_uri.path
523
)
524
525
return [['pip', 'config', 'set', 'global.index-url', pip_index_url]]
526
527
528
class TwineLogin(BaseLogin):
529
530
DEFAULT_PYPI_RC_FMT = u'''\
531
[distutils]
532
index-servers=
533
pypi
534
codeartifact
535
536
[codeartifact]
537
repository: {repository_endpoint}
538
username: aws
539
password: {auth_token}'''
540
541
def __init__(
542
self,
543
auth_token,
544
expiration,
545
repository_endpoint,
546
domain,
547
repository,
548
subprocess_utils,
549
pypi_rc_path=None
550
):
551
if pypi_rc_path is None:
552
pypi_rc_path = self.get_pypi_rc_path()
553
self.pypi_rc_path = pypi_rc_path
554
super(TwineLogin, self).__init__(
555
auth_token, expiration, repository_endpoint,
556
domain, repository, subprocess_utils)
557
558
@classmethod
559
def get_commands(cls, endpoint, auth_token, **kwargs):
560
# TODO(ujjwalpa@): We don't really have a command to execute for Twine
561
# as we directly write to the pypirc file (or to stdout for dryrun)
562
# with python itself instead. Nevertheless, we're using this method for
563
# testing so we'll keep the interface for now but return a string with
564
# the expected pypirc content instead of a list of commands to
565
# execute. This definitely reeks of code smell and there is probably
566
# room for rethinking and refactoring the interfaces of these adapter
567
# helper classes in the future.
568
569
assert 'pypi_rc_path' in kwargs, 'pypi_rc_path must be provided.'
570
pypi_rc_path = kwargs['pypi_rc_path']
571
572
default_pypi_rc = cls.DEFAULT_PYPI_RC_FMT.format(
573
repository_endpoint=endpoint,
574
auth_token=auth_token
575
)
576
577
pypi_rc = RawConfigParser()
578
if os.path.exists(pypi_rc_path):
579
try:
580
pypi_rc.read(pypi_rc_path)
581
index_servers = pypi_rc.get('distutils', 'index-servers')
582
servers = [
583
server.strip()
584
for server in index_servers.split('\n')
585
if server.strip() != ''
586
]
587
588
if 'codeartifact' not in servers:
589
servers.append('codeartifact')
590
pypi_rc.set(
591
'distutils', 'index-servers', '\n' + '\n'.join(servers)
592
)
593
594
if 'codeartifact' not in pypi_rc.sections():
595
pypi_rc.add_section('codeartifact')
596
597
pypi_rc.set('codeartifact', 'repository', endpoint)
598
pypi_rc.set('codeartifact', 'username', 'aws')
599
pypi_rc.set('codeartifact', 'password', auth_token)
600
except Exception as e: # invalid .pypirc file
601
sys.stdout.write('%s is in an invalid state.' % pypi_rc_path)
602
sys.stdout.write(os.linesep)
603
raise e
604
else:
605
pypi_rc.read_string(default_pypi_rc)
606
607
pypi_rc_stream = StringIO()
608
pypi_rc.write(pypi_rc_stream)
609
pypi_rc_str = pypi_rc_stream.getvalue()
610
pypi_rc_stream.close()
611
612
return pypi_rc_str
613
614
def login(self, dry_run=False):
615
# No command to execute for Twine, we get the expected pypirc content
616
# instead.
617
pypi_rc_str = self.get_commands(
618
self.repository_endpoint,
619
self.auth_token,
620
pypi_rc_path=self.pypi_rc_path
621
)
622
623
if dry_run:
624
sys.stdout.write('Dryrun mode is enabled, not writing to pypirc.')
625
sys.stdout.write(os.linesep)
626
sys.stdout.write(
627
'%s would have been set to the following:' % self.pypi_rc_path
628
)
629
sys.stdout.write(os.linesep)
630
sys.stdout.write(os.linesep)
631
sys.stdout.write(pypi_rc_str)
632
sys.stdout.write(os.linesep)
633
else:
634
with open(self.pypi_rc_path, 'w+') as fp:
635
fp.write(pypi_rc_str)
636
637
self._write_success_message('twine')
638
639
@classmethod
640
def get_pypi_rc_path(cls):
641
return os.path.join(os.path.expanduser("~"), ".pypirc")
642
643
644
class CodeArtifactLogin(BasicCommand):
645
'''Log in to the idiomatic tool for the requested package format.'''
646
647
TOOL_MAP = {
648
'swift': {
649
'package_format': 'swift',
650
'login_cls': SwiftLogin,
651
'namespace_support': True,
652
},
653
'nuget': {
654
'package_format': 'nuget',
655
'login_cls': NuGetLogin,
656
'namespace_support': False,
657
},
658
'dotnet': {
659
'package_format': 'nuget',
660
'login_cls': DotNetLogin,
661
'namespace_support': False,
662
},
663
'npm': {
664
'package_format': 'npm',
665
'login_cls': NpmLogin,
666
'namespace_support': True,
667
},
668
'pip': {
669
'package_format': 'pypi',
670
'login_cls': PipLogin,
671
'namespace_support': False,
672
},
673
'twine': {
674
'package_format': 'pypi',
675
'login_cls': TwineLogin,
676
'namespace_support': False,
677
}
678
}
679
680
NAME = 'login'
681
682
DESCRIPTION = (
683
'Sets up the idiomatic tool for your package format to use your '
684
'CodeArtifact repository. Your login information is valid for up '
685
'to 12 hours after which you must login again.'
686
)
687
688
ARG_TABLE = [
689
{
690
'name': 'tool',
691
'help_text': 'The tool you want to connect with your repository',
692
'choices': list(TOOL_MAP.keys()),
693
'required': True,
694
},
695
{
696
'name': 'domain',
697
'help_text': 'Your CodeArtifact domain name',
698
'required': True,
699
},
700
{
701
'name': 'domain-owner',
702
'help_text': 'The AWS account ID that owns your CodeArtifact '
703
'domain',
704
'required': False,
705
},
706
{
707
'name': 'namespace',
708
'help_text': 'Associates a namespace with your repository tool',
709
'required': False,
710
},
711
{
712
'name': 'duration-seconds',
713
'cli_type_name': 'integer',
714
'help_text': 'The time, in seconds, that the login information '
715
'is valid',
716
'required': False,
717
},
718
{
719
'name': 'repository',
720
'help_text': 'Your CodeArtifact repository name',
721
'required': True,
722
},
723
{
724
'name': 'endpoint-type',
725
'help_text': 'The type of endpoint you want the tool to interact with',
726
'required': False
727
},
728
{
729
'name': 'dry-run',
730
'action': 'store_true',
731
'help_text': 'Only print the commands that would be executed '
732
'to connect your tool with your repository without '
733
'making any changes to your configuration. Note that '
734
'this prints the unredacted auth token as part of the output',
735
'required': False,
736
'default': False
737
},
738
]
739
740
def _get_namespace(self, tool, parsed_args):
741
namespace_compatible = self.TOOL_MAP[tool]['namespace_support']
742
743
if not namespace_compatible and parsed_args.namespace:
744
raise ValueError(
745
'Argument --namespace is not supported for {}'.format(tool)
746
)
747
else:
748
return parsed_args.namespace
749
750
def _get_repository_endpoint(
751
self, codeartifact_client, parsed_args, package_format
752
):
753
kwargs = {
754
'domain': parsed_args.domain,
755
'repository': parsed_args.repository,
756
'format': package_format
757
}
758
if parsed_args.endpoint_type:
759
kwargs['endpointType'] = parsed_args.endpoint_type
760
if parsed_args.domain_owner:
761
kwargs['domainOwner'] = parsed_args.domain_owner
762
763
get_repository_endpoint_response = \
764
codeartifact_client.get_repository_endpoint(**kwargs)
765
766
return get_repository_endpoint_response['repositoryEndpoint']
767
768
def _get_authorization_token(self, codeartifact_client, parsed_args):
769
kwargs = {
770
'domain': parsed_args.domain
771
}
772
if parsed_args.domain_owner:
773
kwargs['domainOwner'] = parsed_args.domain_owner
774
775
if parsed_args.duration_seconds:
776
kwargs['durationSeconds'] = parsed_args.duration_seconds
777
778
get_authorization_token_response = \
779
codeartifact_client.get_authorization_token(**kwargs)
780
781
return get_authorization_token_response
782
783
def _run_main(self, parsed_args, parsed_globals):
784
tool = parsed_args.tool.lower()
785
786
package_format = self.TOOL_MAP[tool]['package_format']
787
788
codeartifact_client = cli_utils.create_client_from_parsed_globals(
789
self._session, 'codeartifact', parsed_globals
790
)
791
792
auth_token_res = self._get_authorization_token(
793
codeartifact_client, parsed_args
794
)
795
796
repository_endpoint = self._get_repository_endpoint(
797
codeartifact_client, parsed_args, package_format
798
)
799
800
domain = parsed_args.domain
801
repository = parsed_args.repository
802
namespace = self._get_namespace(tool, parsed_args)
803
804
auth_token = auth_token_res['authorizationToken']
805
expiration = parse_timestamp(auth_token_res['expiration'])
806
login = self.TOOL_MAP[tool]['login_cls'](
807
auth_token, expiration, repository_endpoint,
808
domain, repository, subprocess, namespace
809
)
810
811
login.login(parsed_args.dry_run)
812
813
return 0
814
815