Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/pkg
Path: blob/main/external/curl/tests/http/testenv/certs.py
2066 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
#***************************************************************************
4
# _ _ ____ _
5
# Project ___| | | | _ \| |
6
# / __| | | | |_) | |
7
# | (__| |_| | _ <| |___
8
# \___|\___/|_| \_\_____|
9
#
10
# Copyright (C) Daniel Stenberg, <[email protected]>, et al.
11
#
12
# This software is licensed as described in the file COPYING, which
13
# you should have received as part of this distribution. The terms
14
# are also available at https://curl.se/docs/copyright.html.
15
#
16
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17
# copies of the Software, and permit persons to whom the Software is
18
# furnished to do so, under the terms of the COPYING file.
19
#
20
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21
# KIND, either express or implied.
22
#
23
# SPDX-License-Identifier: curl
24
#
25
###########################################################################
26
#
27
import base64
28
import ipaddress
29
import os
30
import re
31
from datetime import timedelta, datetime, timezone
32
from typing import List, Any, Optional
33
34
from cryptography import x509
35
from cryptography.hazmat.backends import default_backend
36
from cryptography.hazmat.primitives import hashes
37
from cryptography.hazmat.primitives._serialization import PublicFormat
38
from cryptography.hazmat.primitives.asymmetric import ec, rsa
39
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
40
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
41
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key
42
from cryptography.x509 import ExtendedKeyUsageOID, NameOID
43
44
45
EC_SUPPORTED = {}
46
EC_SUPPORTED.update([(curve.name.upper(), curve) for curve in [
47
ec.SECP192R1,
48
ec.SECP224R1,
49
ec.SECP256R1,
50
ec.SECP384R1,
51
]])
52
53
54
def _private_key(key_type):
55
if isinstance(key_type, str):
56
key_type = key_type.upper()
57
m = re.match(r'^(RSA)?(\d+)$', key_type)
58
if m:
59
key_type = int(m.group(2))
60
61
if isinstance(key_type, int):
62
return rsa.generate_private_key(
63
public_exponent=65537,
64
key_size=key_type,
65
backend=default_backend()
66
)
67
if not isinstance(key_type, ec.EllipticCurve) and key_type in EC_SUPPORTED:
68
key_type = EC_SUPPORTED[key_type]
69
return ec.generate_private_key(
70
curve=key_type,
71
backend=default_backend()
72
)
73
74
75
class CertificateSpec:
76
77
def __init__(self, name: Optional[str] = None,
78
domains: Optional[List[str]] = None,
79
email: Optional[str] = None,
80
key_type: Optional[str] = None,
81
single_file: bool = False,
82
valid_from: timedelta = timedelta(days=-1),
83
valid_to: timedelta = timedelta(days=89),
84
client: bool = False,
85
check_valid: bool = True,
86
sub_specs: Optional[List['CertificateSpec']] = None):
87
self._name = name
88
self.domains = domains
89
self.client = client
90
self.email = email
91
self.key_type = key_type
92
self.single_file = single_file
93
self.valid_from = valid_from
94
self.valid_to = valid_to
95
self.sub_specs = sub_specs
96
self.check_valid = check_valid
97
98
@property
99
def name(self) -> Optional[str]:
100
if self._name:
101
return self._name
102
elif self.domains:
103
return self.domains[0]
104
return None
105
106
@property
107
def type(self) -> Optional[str]:
108
if self.domains and len(self.domains):
109
return "server"
110
elif self.client:
111
return "client"
112
elif self.name:
113
return "ca"
114
return None
115
116
117
class Credentials:
118
119
def __init__(self,
120
name: str,
121
cert: Any,
122
pkey: Any,
123
issuer: Optional['Credentials'] = None):
124
self._name = name
125
self._cert = cert
126
self._pkey = pkey
127
self._issuer = issuer
128
self._cert_file = None
129
self._pkey_file = None
130
self._store = None
131
self._combined_file = None
132
133
@property
134
def name(self) -> str:
135
return self._name
136
137
@property
138
def subject(self) -> x509.Name:
139
return self._cert.subject
140
141
@property
142
def key_type(self):
143
if isinstance(self._pkey, RSAPrivateKey):
144
return f"rsa{self._pkey.key_size}"
145
elif isinstance(self._pkey, EllipticCurvePrivateKey):
146
return f"{self._pkey.curve.name}"
147
else:
148
raise Exception(f"unknown key type: {self._pkey}")
149
150
@property
151
def private_key(self) -> Any:
152
return self._pkey
153
154
def pub_sha256_b64(self) -> Any:
155
pubkey = self._pkey.public_key()
156
sha256 = hashes.Hash(algorithm=hashes.SHA256())
157
sha256.update(pubkey.public_bytes(
158
encoding=Encoding.DER,
159
format=PublicFormat.SubjectPublicKeyInfo
160
))
161
return base64.b64encode(sha256.finalize()).decode('utf8')
162
163
@property
164
def certificate(self) -> Any:
165
return self._cert
166
167
@property
168
def cert_pem(self) -> bytes:
169
return self._cert.public_bytes(Encoding.PEM)
170
171
@property
172
def pkey_pem(self) -> bytes:
173
return self._pkey.private_bytes(
174
Encoding.PEM,
175
PrivateFormat.TraditionalOpenSSL if self.key_type.startswith('rsa') else PrivateFormat.PKCS8,
176
NoEncryption())
177
178
@property
179
def issuer(self) -> Optional['Credentials']:
180
return self._issuer
181
182
def set_store(self, store: 'CertStore'):
183
self._store = store
184
185
def set_files(self, cert_file: str, pkey_file: Optional[str] = None,
186
combined_file: Optional[str] = None):
187
self._cert_file = cert_file
188
self._pkey_file = pkey_file
189
self._combined_file = combined_file
190
191
@property
192
def cert_file(self) -> str:
193
return self._cert_file
194
195
@property
196
def pkey_file(self) -> Optional[str]:
197
return self._pkey_file
198
199
@property
200
def combined_file(self) -> Optional[str]:
201
return self._combined_file
202
203
def get_first(self, name) -> Optional['Credentials']:
204
creds = self._store.get_credentials_for_name(name) if self._store else []
205
return creds[0] if len(creds) else None
206
207
def get_credentials_for_name(self, name) -> List['Credentials']:
208
return self._store.get_credentials_for_name(name) if self._store else []
209
210
def issue_certs(self, specs: List[CertificateSpec],
211
chain: Optional[List['Credentials']] = None) -> List['Credentials']:
212
return [self.issue_cert(spec=spec, chain=chain) for spec in specs]
213
214
def issue_cert(self, spec: CertificateSpec,
215
chain: Optional[List['Credentials']] = None) -> 'Credentials':
216
key_type = spec.key_type if spec.key_type else self.key_type
217
creds = None
218
if self._store:
219
creds = self._store.load_credentials(
220
name=spec.name, key_type=key_type, single_file=spec.single_file,
221
issuer=self, check_valid=spec.check_valid)
222
if creds is None:
223
creds = TestCA.create_credentials(spec=spec, issuer=self, key_type=key_type,
224
valid_from=spec.valid_from, valid_to=spec.valid_to)
225
if self._store:
226
self._store.save(creds, single_file=spec.single_file)
227
if spec.type == "ca":
228
self._store.save_chain(creds, "ca", with_root=True)
229
230
if spec.sub_specs:
231
if self._store:
232
sub_store = CertStore(fpath=os.path.join(self._store.path, creds.name))
233
creds.set_store(sub_store)
234
subchain = chain.copy() if chain else []
235
subchain.append(self)
236
creds.issue_certs(spec.sub_specs, chain=subchain)
237
return creds
238
239
240
class CertStore:
241
242
def __init__(self, fpath: str):
243
self._store_dir = fpath
244
if not os.path.exists(self._store_dir):
245
os.makedirs(self._store_dir)
246
self._creds_by_name = {}
247
248
@property
249
def path(self) -> str:
250
return self._store_dir
251
252
def save(self, creds: Credentials, name: Optional[str] = None,
253
chain: Optional[List[Credentials]] = None,
254
single_file: bool = False) -> None:
255
name = name if name is not None else creds.name
256
cert_file = self.get_cert_file(name=name, key_type=creds.key_type)
257
pkey_file = self.get_pkey_file(name=name, key_type=creds.key_type)
258
comb_file = self.get_combined_file(name=name, key_type=creds.key_type)
259
if single_file:
260
pkey_file = None
261
with open(cert_file, "wb") as fd:
262
fd.write(creds.cert_pem)
263
if chain:
264
for c in chain:
265
fd.write(c.cert_pem)
266
if pkey_file is None:
267
fd.write(creds.pkey_pem)
268
if pkey_file is not None:
269
with open(pkey_file, "wb") as fd:
270
fd.write(creds.pkey_pem)
271
with open(comb_file, "wb") as fd:
272
fd.write(creds.cert_pem)
273
if chain:
274
for c in chain:
275
fd.write(c.cert_pem)
276
fd.write(creds.pkey_pem)
277
creds.set_files(cert_file, pkey_file, comb_file)
278
self._add_credentials(name, creds)
279
280
def save_chain(self, creds: Credentials, infix: str, with_root=False):
281
name = creds.name
282
chain = [creds]
283
while creds.issuer is not None:
284
creds = creds.issuer
285
chain.append(creds)
286
if not with_root and len(chain) > 1:
287
chain = chain[:-1]
288
chain_file = os.path.join(self._store_dir, f'{name}-{infix}.pem')
289
with open(chain_file, "wb") as fd:
290
for c in chain:
291
fd.write(c.cert_pem)
292
293
def _add_credentials(self, name: str, creds: Credentials):
294
if name not in self._creds_by_name:
295
self._creds_by_name[name] = []
296
self._creds_by_name[name].append(creds)
297
298
def get_credentials_for_name(self, name) -> List[Credentials]:
299
return self._creds_by_name[name] if name in self._creds_by_name else []
300
301
def get_cert_file(self, name: str, key_type=None) -> str:
302
key_infix = ".{0}".format(key_type) if key_type is not None else ""
303
return os.path.join(self._store_dir, f'{name}{key_infix}.cert.pem')
304
305
def get_pkey_file(self, name: str, key_type=None) -> str:
306
key_infix = ".{0}".format(key_type) if key_type is not None else ""
307
return os.path.join(self._store_dir, f'{name}{key_infix}.pkey.pem')
308
309
def get_combined_file(self, name: str, key_type=None) -> str:
310
return os.path.join(self._store_dir, f'{name}.pem')
311
312
def load_pem_cert(self, fpath: str) -> x509.Certificate:
313
with open(fpath) as fd:
314
return x509.load_pem_x509_certificate("".join(fd.readlines()).encode())
315
316
def load_pem_pkey(self, fpath: str):
317
with open(fpath) as fd:
318
return load_pem_private_key("".join(fd.readlines()).encode(), password=None)
319
320
def load_credentials(self, name: str, key_type=None,
321
single_file: bool = False,
322
issuer: Optional[Credentials] = None,
323
check_valid: bool = False):
324
cert_file = self.get_cert_file(name=name, key_type=key_type)
325
pkey_file = cert_file if single_file else self.get_pkey_file(name=name, key_type=key_type)
326
comb_file = self.get_combined_file(name=name, key_type=key_type)
327
if os.path.isfile(cert_file) and os.path.isfile(pkey_file):
328
cert = self.load_pem_cert(cert_file)
329
pkey = self.load_pem_pkey(pkey_file)
330
try:
331
now = datetime.now(tz=timezone.utc)
332
if check_valid and \
333
((cert.not_valid_after_utc < now) or
334
(cert.not_valid_before_utc > now)):
335
return None
336
except AttributeError: # older python
337
now = datetime.now()
338
if check_valid and \
339
((cert.not_valid_after < now) or
340
(cert.not_valid_before > now)):
341
return None
342
creds = Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
343
creds.set_store(self)
344
creds.set_files(cert_file, pkey_file, comb_file)
345
self._add_credentials(name, creds)
346
return creds
347
return None
348
349
350
class TestCA:
351
352
@classmethod
353
def create_root(cls, name: str, store_dir: str, key_type: str = "rsa2048") -> Credentials:
354
store = CertStore(fpath=store_dir)
355
creds = store.load_credentials(name="ca", key_type=key_type, issuer=None)
356
if creds is None:
357
creds = TestCA._make_ca_credentials(name=name, key_type=key_type)
358
store.save(creds, name="ca")
359
creds.set_store(store)
360
return creds
361
362
@staticmethod
363
def create_credentials(spec: CertificateSpec, issuer: Credentials, key_type: Any,
364
valid_from: timedelta = timedelta(days=-1),
365
valid_to: timedelta = timedelta(days=89),
366
) -> Credentials:
367
"""
368
Create a certificate signed by this CA for the given domains.
369
370
:returns: the certificate and private key PEM file paths
371
"""
372
if spec.domains and len(spec.domains):
373
creds = TestCA._make_server_credentials(name=spec.name, domains=spec.domains,
374
issuer=issuer, valid_from=valid_from,
375
valid_to=valid_to, key_type=key_type)
376
elif spec.client:
377
creds = TestCA._make_client_credentials(name=spec.name, issuer=issuer,
378
email=spec.email, valid_from=valid_from,
379
valid_to=valid_to, key_type=key_type)
380
elif spec.name:
381
creds = TestCA._make_ca_credentials(name=spec.name, issuer=issuer,
382
valid_from=valid_from, valid_to=valid_to,
383
key_type=key_type)
384
else:
385
raise Exception(f"unrecognized certificate specification: {spec}")
386
return creds
387
388
@staticmethod
389
def _make_x509_name(org_name: Optional[str] = None, common_name: Optional[str] = None, parent: x509.Name = None) -> x509.Name:
390
name_pieces = []
391
if org_name:
392
oid = NameOID.ORGANIZATIONAL_UNIT_NAME if parent else NameOID.ORGANIZATION_NAME
393
name_pieces.append(x509.NameAttribute(oid, org_name))
394
elif common_name:
395
name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
396
if parent:
397
name_pieces.extend(list(parent))
398
return x509.Name(name_pieces)
399
400
@staticmethod
401
def _make_csr(
402
subject: x509.Name,
403
pkey: Any,
404
issuer_subject: Optional[Credentials],
405
valid_from_delta: Optional[timedelta] = None,
406
valid_until_delta: Optional[timedelta] = None
407
) -> x509.CertificateBuilder:
408
pubkey = pkey.public_key()
409
issuer_subject = issuer_subject if issuer_subject is not None else subject
410
411
valid_from = datetime.now()
412
if valid_until_delta is not None:
413
valid_from += valid_from_delta
414
valid_until = datetime.now()
415
if valid_until_delta is not None:
416
valid_until += valid_until_delta
417
418
return (
419
x509.CertificateBuilder()
420
.subject_name(subject)
421
.issuer_name(issuer_subject)
422
.public_key(pubkey)
423
.not_valid_before(valid_from)
424
.not_valid_after(valid_until)
425
.serial_number(x509.random_serial_number())
426
.add_extension(
427
x509.SubjectKeyIdentifier.from_public_key(pubkey),
428
critical=False,
429
)
430
)
431
432
@staticmethod
433
def _add_ca_usages(csr: Any) -> Any:
434
return csr.add_extension(
435
x509.BasicConstraints(ca=True, path_length=9),
436
critical=True,
437
).add_extension(
438
x509.KeyUsage(
439
digital_signature=True,
440
content_commitment=False,
441
key_encipherment=False,
442
data_encipherment=False,
443
key_agreement=False,
444
key_cert_sign=True,
445
crl_sign=True,
446
encipher_only=False,
447
decipher_only=False),
448
critical=True
449
).add_extension(
450
x509.ExtendedKeyUsage([
451
ExtendedKeyUsageOID.CLIENT_AUTH,
452
ExtendedKeyUsageOID.SERVER_AUTH,
453
ExtendedKeyUsageOID.CODE_SIGNING,
454
]),
455
critical=True
456
)
457
458
@staticmethod
459
def _add_leaf_usages(csr: Any, domains: List[str], issuer: Credentials) -> Any:
460
names = []
461
for name in domains:
462
try:
463
names.append(x509.IPAddress(ipaddress.ip_address(name)))
464
# TODO: specify specific exceptions here
465
except: # noqa: E722
466
names.append(x509.DNSName(name))
467
468
return csr.add_extension(
469
x509.BasicConstraints(ca=False, path_length=None),
470
critical=True,
471
).add_extension(
472
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
473
issuer.certificate.extensions.get_extension_for_class(
474
x509.SubjectKeyIdentifier).value),
475
critical=False
476
).add_extension(
477
x509.SubjectAlternativeName(names), critical=True,
478
).add_extension(
479
x509.ExtendedKeyUsage([
480
ExtendedKeyUsageOID.SERVER_AUTH,
481
]),
482
critical=False
483
)
484
485
@staticmethod
486
def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: Optional[str] = None) -> Any:
487
cert = csr.add_extension(
488
x509.BasicConstraints(ca=False, path_length=None),
489
critical=True,
490
).add_extension(
491
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
492
issuer.certificate.extensions.get_extension_for_class(
493
x509.SubjectKeyIdentifier).value),
494
critical=False
495
)
496
if rfc82name:
497
cert.add_extension(
498
x509.SubjectAlternativeName([x509.RFC822Name(rfc82name)]),
499
critical=True,
500
)
501
cert.add_extension(
502
x509.ExtendedKeyUsage([
503
ExtendedKeyUsageOID.CLIENT_AUTH,
504
]),
505
critical=True
506
)
507
return cert
508
509
@staticmethod
510
def _make_ca_credentials(name, key_type: Any,
511
issuer: Optional[Credentials] = None,
512
valid_from: timedelta = timedelta(days=-1),
513
valid_to: timedelta = timedelta(days=89),
514
) -> Credentials:
515
pkey = _private_key(key_type=key_type)
516
if issuer is not None:
517
issuer_subject = issuer.certificate.subject
518
issuer_key = issuer.private_key
519
else:
520
issuer_subject = None
521
issuer_key = pkey
522
subject = TestCA._make_x509_name(org_name=name, parent=issuer.subject if issuer else None)
523
csr = TestCA._make_csr(subject=subject,
524
issuer_subject=issuer_subject, pkey=pkey,
525
valid_from_delta=valid_from, valid_until_delta=valid_to)
526
csr = TestCA._add_ca_usages(csr)
527
cert = csr.sign(private_key=issuer_key,
528
algorithm=hashes.SHA256(),
529
backend=default_backend())
530
return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
531
532
@staticmethod
533
def _make_server_credentials(name: str, domains: List[str], issuer: Credentials,
534
key_type: Any,
535
valid_from: timedelta = timedelta(days=-1),
536
valid_to: timedelta = timedelta(days=89),
537
) -> Credentials:
538
pkey = _private_key(key_type=key_type)
539
subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
540
csr = TestCA._make_csr(subject=subject,
541
issuer_subject=issuer.certificate.subject, pkey=pkey,
542
valid_from_delta=valid_from, valid_until_delta=valid_to)
543
csr = TestCA._add_leaf_usages(csr, domains=domains, issuer=issuer)
544
cert = csr.sign(private_key=issuer.private_key,
545
algorithm=hashes.SHA256(),
546
backend=default_backend())
547
return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
548
549
@staticmethod
550
def _make_client_credentials(name: str,
551
issuer: Credentials, email: Optional[str],
552
key_type: Any,
553
valid_from: timedelta = timedelta(days=-1),
554
valid_to: timedelta = timedelta(days=89),
555
) -> Credentials:
556
pkey = _private_key(key_type=key_type)
557
subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
558
csr = TestCA._make_csr(subject=subject,
559
issuer_subject=issuer.certificate.subject, pkey=pkey,
560
valid_from_delta=valid_from, valid_until_delta=valid_to)
561
csr = TestCA._add_client_usages(csr, issuer=issuer, rfc82name=email)
562
cert = csr.sign(private_key=issuer.private_key,
563
algorithm=hashes.SHA256(),
564
backend=default_backend())
565
return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
566
567