Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/pkg
Path: blob/main/external/curl/tests/http/testenv/env.py
2066 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
#***************************************************************************
4
# _ _ ____ _
5
# Project ___| | | | _ \| |
6
# / __| | | | |_) | |
7
# | (__| |_| | _ <| |___
8
# \___|\___/|_| \_\_____|
9
#
10
# Copyright (C) Daniel Stenberg, <[email protected]>, et al.
11
#
12
# This software is licensed as described in the file COPYING, which
13
# you should have received as part of this distribution. The terms
14
# are also available at https://curl.se/docs/copyright.html.
15
#
16
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17
# copies of the Software, and permit persons to whom the Software is
18
# furnished to do so, under the terms of the COPYING file.
19
#
20
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21
# KIND, either express or implied.
22
#
23
# SPDX-License-Identifier: curl
24
#
25
###########################################################################
26
#
27
import gzip
28
import logging
29
import os
30
import re
31
import shutil
32
import subprocess
33
import tempfile
34
from configparser import ConfigParser, ExtendedInterpolation
35
from datetime import timedelta
36
from typing import Optional, Dict, List
37
38
import pytest
39
from filelock import FileLock
40
41
from .certs import CertificateSpec, Credentials, TestCA
42
43
44
log = logging.getLogger(__name__)
45
46
47
def init_config_from(conf_path):
48
if os.path.isfile(conf_path):
49
config = ConfigParser(interpolation=ExtendedInterpolation())
50
config.read(conf_path)
51
return config
52
return None
53
54
55
TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__))
56
PROJ_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH))
57
TOP_PATH = os.path.join(os.getcwd(), os.path.pardir)
58
CONFIG_PATH = os.path.join(TOP_PATH, 'tests', 'http', 'config.ini')
59
if not os.path.exists(CONFIG_PATH):
60
ALT_CONFIG_PATH = os.path.join(PROJ_PATH, 'tests', 'http', 'config.ini')
61
if not os.path.exists(ALT_CONFIG_PATH):
62
raise Exception(f'unable to find config.ini in {CONFIG_PATH} nor {ALT_CONFIG_PATH}')
63
TOP_PATH = PROJ_PATH
64
CONFIG_PATH = ALT_CONFIG_PATH
65
DEF_CONFIG = init_config_from(CONFIG_PATH)
66
CURL = os.path.join(TOP_PATH, 'src', 'curl')
67
68
69
class EnvConfig:
70
71
def __init__(self, pytestconfig: Optional[pytest.Config] = None,
72
testrun_uid=None,
73
worker_id=None):
74
self.pytestconfig = pytestconfig
75
self.testrun_uid = testrun_uid
76
self.worker_id = worker_id if worker_id is not None else 'master'
77
self.tests_dir = TESTS_HTTPD_PATH
78
self.gen_root = self.gen_dir = os.path.join(self.tests_dir, 'gen')
79
if self.worker_id != 'master':
80
self.gen_dir = os.path.join(self.gen_dir, self.worker_id)
81
self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir))
82
self.build_dir = TOP_PATH
83
self.config = DEF_CONFIG
84
# check cur and its features
85
self.curl = CURL
86
if 'CURL' in os.environ:
87
self.curl = os.environ['CURL']
88
self.curl_props = {
89
'version_string': '',
90
'version': '',
91
'os': '',
92
'fullname': '',
93
'features_string': '',
94
'features': set(),
95
'protocols_string': '',
96
'protocols': set(),
97
'libs': set(),
98
'lib_versions': set(),
99
}
100
self.curl_is_debug = False
101
self.curl_protos = []
102
p = subprocess.run(args=[self.curl, '-V'],
103
capture_output=True, text=True)
104
if p.returncode != 0:
105
raise RuntimeError(f'{self.curl} -V failed with exit code: {p.returncode}')
106
if p.stderr.startswith('WARNING:'):
107
self.curl_is_debug = True
108
for line in p.stdout.splitlines(keepends=False):
109
if line.startswith('curl '):
110
self.curl_props['version_string'] = line
111
m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line)
112
if m:
113
self.curl_props['fullname'] = m.group(0)
114
self.curl_props['version'] = m.group('version')
115
self.curl_props['os'] = m.group('os')
116
self.curl_props['lib_versions'] = {
117
lib.lower() for lib in m.group('libs').split(' ')
118
}
119
self.curl_props['libs'] = {
120
re.sub(r'/[a-z0-9.-]*', '', lib) for lib in self.curl_props['lib_versions']
121
}
122
if line.startswith('Features: '):
123
self.curl_props['features_string'] = line[10:]
124
self.curl_props['features'] = {
125
feat.lower() for feat in line[10:].split(' ')
126
}
127
if line.startswith('Protocols: '):
128
self.curl_props['protocols_string'] = line[11:]
129
self.curl_props['protocols'] = {
130
prot.lower() for prot in line[11:].split(' ')
131
}
132
133
self.ports = {}
134
135
self.httpd = self.config['httpd']['httpd']
136
self.apxs = self.config['httpd']['apxs']
137
if len(self.apxs) == 0:
138
self.apxs = None
139
self._httpd_version = None
140
141
self.examples_pem = {
142
'key': 'xxx',
143
'cert': 'xxx',
144
}
145
self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs')
146
self.tld = 'http.curl.se'
147
self.domain1 = f"one.{self.tld}"
148
self.domain1brotli = f"brotli.one.{self.tld}"
149
self.domain2 = f"two.{self.tld}"
150
self.ftp_domain = f"ftp.{self.tld}"
151
self.proxy_domain = f"proxy.{self.tld}"
152
self.expired_domain = f"expired.{self.tld}"
153
self.cert_specs = [
154
CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'),
155
CertificateSpec(name='domain1-no-ip', domains=[self.domain1, self.domain1brotli], key_type='rsa2048'),
156
CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
157
CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'),
158
CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'),
159
CertificateSpec(domains=[self.expired_domain], key_type='rsa2048',
160
valid_from=timedelta(days=-100), valid_to=timedelta(days=-10)),
161
CertificateSpec(name="clientsX", sub_specs=[
162
CertificateSpec(name="user1", client=True),
163
]),
164
]
165
166
self.nghttpx = self.config['nghttpx']['nghttpx']
167
if len(self.nghttpx.strip()) == 0:
168
self.nghttpx = None
169
self._nghttpx_version = None
170
self.nghttpx_with_h3 = False
171
if self.nghttpx is not None:
172
p = subprocess.run(args=[self.nghttpx, '-v'],
173
capture_output=True, text=True)
174
if p.returncode != 0:
175
# not a working nghttpx
176
self.nghttpx = None
177
else:
178
self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip())
179
self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None
180
log.debug(f'nghttpx -v: {p.stdout}')
181
182
self.caddy = self.config['caddy']['caddy']
183
self._caddy_version = None
184
if len(self.caddy.strip()) == 0:
185
self.caddy = None
186
if self.caddy is not None:
187
try:
188
p = subprocess.run(args=[self.caddy, 'version'],
189
capture_output=True, text=True)
190
if p.returncode != 0:
191
# not a working caddy
192
self.caddy = None
193
m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout)
194
if m:
195
self._caddy_version = m.group(1)
196
else:
197
raise RuntimeError(f'Unable to determine cadd version from: {p.stdout}')
198
# TODO: specify specific exceptions here
199
except: # noqa: E722
200
self.caddy = None
201
202
self.vsftpd = self.config['vsftpd']['vsftpd']
203
self._vsftpd_version = None
204
if self.vsftpd is not None:
205
try:
206
with tempfile.TemporaryFile('w+') as tmp:
207
p = subprocess.run(args=[self.vsftpd, '-v'],
208
capture_output=True, text=True, stdin=tmp)
209
if p.returncode != 0:
210
# not a working vsftpd
211
self.vsftpd = None
212
if p.stderr:
213
ver_text = p.stderr
214
else:
215
# Oddly, some versions of vsftpd write to stdin (!)
216
# instead of stderr, which is odd but works. If there
217
# is nothing on stderr, read the file on stdin and use
218
# any data there instead.
219
tmp.seek(0)
220
ver_text = tmp.read()
221
m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', ver_text)
222
if m:
223
self._vsftpd_version = m.group(1)
224
elif len(p.stderr) == 0:
225
# vsftp does not use stdout or stderr for printing its version... -.-
226
self._vsftpd_version = 'unknown'
227
else:
228
raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
229
except Exception:
230
self.vsftpd = None
231
232
self._tcpdump = shutil.which('tcpdump')
233
234
@property
235
def httpd_version(self):
236
if self._httpd_version is None and self.apxs is not None:
237
try:
238
p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'],
239
capture_output=True, text=True)
240
if p.returncode != 0:
241
log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
242
else:
243
self._httpd_version = p.stdout.strip()
244
except Exception:
245
log.exception(f'{self.apxs} failed to run')
246
return self._httpd_version
247
248
def versiontuple(self, v):
249
v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v)
250
return tuple(map(int, v.split('.')))
251
252
def httpd_is_at_least(self, minv):
253
if self.httpd_version is None:
254
return False
255
hv = self.versiontuple(self.httpd_version)
256
return hv >= self.versiontuple(minv)
257
258
def caddy_is_at_least(self, minv):
259
if self.caddy_version is None:
260
return False
261
hv = self.versiontuple(self.caddy_version)
262
return hv >= self.versiontuple(minv)
263
264
def is_complete(self) -> bool:
265
return os.path.isfile(self.httpd) and \
266
self.apxs is not None and \
267
os.path.isfile(self.apxs)
268
269
def get_incomplete_reason(self) -> Optional[str]:
270
if self.httpd is None or len(self.httpd.strip()) == 0:
271
return 'httpd not configured, see `--with-test-httpd=<path>`'
272
if not os.path.isfile(self.httpd):
273
return f'httpd ({self.httpd}) not found'
274
if self.apxs is None:
275
return "command apxs not found (commonly provided in apache2-dev)"
276
if not os.path.isfile(self.apxs):
277
return f"apxs ({self.apxs}) not found"
278
return None
279
280
@property
281
def nghttpx_version(self):
282
return self._nghttpx_version
283
284
@property
285
def caddy_version(self):
286
return self._caddy_version
287
288
@property
289
def vsftpd_version(self):
290
return self._vsftpd_version
291
292
@property
293
def tcpdmp(self) -> Optional[str]:
294
return self._tcpdump
295
296
def clear_locks(self):
297
ca_lock = os.path.join(self.gen_root, 'ca/ca.lock')
298
if os.path.exists(ca_lock):
299
os.remove(ca_lock)
300
301
302
class Env:
303
304
SERVER_TIMEOUT = 30 # seconds to wait for server to come up/reload
305
306
CONFIG = EnvConfig()
307
308
@staticmethod
309
def setup_incomplete() -> bool:
310
return not Env.CONFIG.is_complete()
311
312
@staticmethod
313
def incomplete_reason() -> Optional[str]:
314
return Env.CONFIG.get_incomplete_reason()
315
316
@staticmethod
317
def have_nghttpx() -> bool:
318
return Env.CONFIG.nghttpx is not None
319
320
@staticmethod
321
def have_h3_server() -> bool:
322
return Env.CONFIG.nghttpx_with_h3
323
324
@staticmethod
325
def have_ssl_curl() -> bool:
326
return Env.curl_has_feature('ssl') or Env.curl_has_feature('multissl')
327
328
@staticmethod
329
def have_h2_curl() -> bool:
330
return 'http2' in Env.CONFIG.curl_props['features']
331
332
@staticmethod
333
def have_h3_curl() -> bool:
334
return 'http3' in Env.CONFIG.curl_props['features']
335
336
@staticmethod
337
def curl_uses_lib(libname: str) -> bool:
338
return libname.lower() in Env.CONFIG.curl_props['libs']
339
340
@staticmethod
341
def curl_uses_any_libs(libs: List[str]) -> bool:
342
for libname in libs:
343
if libname.lower() in Env.CONFIG.curl_props['libs']:
344
return True
345
return False
346
347
@staticmethod
348
def curl_uses_ossl_quic() -> bool:
349
if Env.have_h3_curl():
350
return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3')
351
return False
352
353
@staticmethod
354
def curl_version_string() -> str:
355
return Env.CONFIG.curl_props['version_string']
356
357
@staticmethod
358
def curl_features_string() -> str:
359
return Env.CONFIG.curl_props['features_string']
360
361
@staticmethod
362
def curl_has_feature(feature: str) -> bool:
363
return feature.lower() in Env.CONFIG.curl_props['features']
364
365
@staticmethod
366
def curl_protocols_string() -> str:
367
return Env.CONFIG.curl_props['protocols_string']
368
369
@staticmethod
370
def curl_has_protocol(protocol: str) -> bool:
371
return protocol.lower() in Env.CONFIG.curl_props['protocols']
372
373
@staticmethod
374
def curl_lib_version(libname: str) -> str:
375
prefix = f'{libname.lower()}/'
376
for lversion in Env.CONFIG.curl_props['lib_versions']:
377
if lversion.startswith(prefix):
378
return lversion[len(prefix):]
379
return 'unknown'
380
381
@staticmethod
382
def curl_lib_version_at_least(libname: str, min_version) -> bool:
383
lversion = Env.curl_lib_version(libname)
384
if lversion != 'unknown':
385
return Env.CONFIG.versiontuple(min_version) <= \
386
Env.CONFIG.versiontuple(lversion)
387
return False
388
389
@staticmethod
390
def curl_os() -> str:
391
return Env.CONFIG.curl_props['os']
392
393
@staticmethod
394
def curl_fullname() -> str:
395
return Env.CONFIG.curl_props['fullname']
396
397
@staticmethod
398
def curl_version() -> str:
399
return Env.CONFIG.curl_props['version']
400
401
@staticmethod
402
def curl_is_debug() -> bool:
403
return Env.CONFIG.curl_is_debug
404
405
@staticmethod
406
def curl_can_early_data() -> bool:
407
return Env.curl_uses_any_libs(['gnutls', 'wolfssl', 'quictls', 'openssl'])
408
409
@staticmethod
410
def curl_can_h3_early_data() -> bool:
411
return Env.curl_can_early_data() and \
412
Env.curl_uses_lib('ngtcp2')
413
414
@staticmethod
415
def have_h3() -> bool:
416
return Env.have_h3_curl() and Env.have_h3_server()
417
418
@staticmethod
419
def httpd_version() -> str:
420
return Env.CONFIG.httpd_version
421
422
@staticmethod
423
def nghttpx_version() -> str:
424
return Env.CONFIG.nghttpx_version
425
426
@staticmethod
427
def caddy_version() -> str:
428
return Env.CONFIG.caddy_version
429
430
@staticmethod
431
def caddy_is_at_least(minv) -> bool:
432
return Env.CONFIG.caddy_is_at_least(minv)
433
434
@staticmethod
435
def httpd_is_at_least(minv) -> bool:
436
return Env.CONFIG.httpd_is_at_least(minv)
437
438
@staticmethod
439
def has_caddy() -> bool:
440
return Env.CONFIG.caddy is not None
441
442
@staticmethod
443
def has_vsftpd() -> bool:
444
return Env.CONFIG.vsftpd is not None
445
446
@staticmethod
447
def vsftpd_version() -> str:
448
return Env.CONFIG.vsftpd_version
449
450
@staticmethod
451
def tcpdump() -> Optional[str]:
452
return Env.CONFIG.tcpdmp
453
454
def __init__(self, pytestconfig=None, env_config=None):
455
if env_config:
456
Env.CONFIG = env_config
457
self._verbose = pytestconfig.option.verbose \
458
if pytestconfig is not None else 0
459
self._ca = None
460
self._test_timeout = 300.0 if self._verbose > 1 else 60.0 # seconds
461
462
def issue_certs(self):
463
if self._ca is None:
464
ca_dir = os.path.join(self.CONFIG.gen_root, 'ca')
465
os.makedirs(ca_dir, exist_ok=True)
466
lock_file = os.path.join(ca_dir, 'ca.lock')
467
with FileLock(lock_file):
468
self._ca = TestCA.create_root(name=self.CONFIG.tld,
469
store_dir=ca_dir,
470
key_type="rsa2048")
471
self._ca.issue_certs(self.CONFIG.cert_specs)
472
473
def setup(self):
474
os.makedirs(self.gen_dir, exist_ok=True)
475
os.makedirs(self.htdocs_dir, exist_ok=True)
476
self.issue_certs()
477
478
def get_credentials(self, domain) -> Optional[Credentials]:
479
creds = self.ca.get_credentials_for_name(domain)
480
if len(creds) > 0:
481
return creds[0]
482
return None
483
484
@property
485
def verbose(self) -> int:
486
return self._verbose
487
488
@property
489
def test_timeout(self) -> Optional[float]:
490
return self._test_timeout
491
492
@test_timeout.setter
493
def test_timeout(self, val: Optional[float]):
494
self._test_timeout = val
495
496
@property
497
def gen_dir(self) -> str:
498
return self.CONFIG.gen_dir
499
500
@property
501
def gen_root(self) -> str:
502
return self.CONFIG.gen_root
503
504
@property
505
def project_dir(self) -> str:
506
return self.CONFIG.project_dir
507
508
@property
509
def build_dir(self) -> str:
510
return self.CONFIG.build_dir
511
512
@property
513
def ca(self):
514
return self._ca
515
516
@property
517
def htdocs_dir(self) -> str:
518
return self.CONFIG.htdocs_dir
519
520
@property
521
def tld(self) -> str:
522
return self.CONFIG.tld
523
524
@property
525
def domain1(self) -> str:
526
return self.CONFIG.domain1
527
528
@property
529
def domain1brotli(self) -> str:
530
return self.CONFIG.domain1brotli
531
532
@property
533
def domain2(self) -> str:
534
return self.CONFIG.domain2
535
536
@property
537
def ftp_domain(self) -> str:
538
return self.CONFIG.ftp_domain
539
540
@property
541
def proxy_domain(self) -> str:
542
return self.CONFIG.proxy_domain
543
544
@property
545
def expired_domain(self) -> str:
546
return self.CONFIG.expired_domain
547
548
@property
549
def ports(self) -> Dict[str, int]:
550
return self.CONFIG.ports
551
552
def update_ports(self, ports: Dict[str, int]):
553
self.CONFIG.ports.update(ports)
554
555
@property
556
def http_port(self) -> int:
557
return self.CONFIG.ports.get('http', 0)
558
559
@property
560
def https_port(self) -> int:
561
return self.CONFIG.ports['https']
562
563
@property
564
def https_only_tcp_port(self) -> int:
565
return self.CONFIG.ports['https-tcp-only']
566
567
@property
568
def nghttpx_https_port(self) -> int:
569
return self.CONFIG.ports['nghttpx_https']
570
571
@property
572
def h3_port(self) -> int:
573
return self.https_port
574
575
@property
576
def proxy_port(self) -> int:
577
return self.CONFIG.ports['proxy']
578
579
@property
580
def proxys_port(self) -> int:
581
return self.CONFIG.ports['proxys']
582
583
@property
584
def ftp_port(self) -> int:
585
return self.CONFIG.ports['ftp']
586
587
@property
588
def ftps_port(self) -> int:
589
return self.CONFIG.ports['ftps']
590
591
@property
592
def h2proxys_port(self) -> int:
593
return self.CONFIG.ports['h2proxys']
594
595
def pts_port(self, proto: str = 'http/1.1') -> int:
596
# proxy tunnel port
597
return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys']
598
599
@property
600
def caddy(self) -> str:
601
return self.CONFIG.caddy
602
603
@property
604
def caddy_https_port(self) -> int:
605
return self.CONFIG.ports['caddys']
606
607
@property
608
def caddy_http_port(self) -> int:
609
return self.CONFIG.ports['caddy']
610
611
@property
612
def vsftpd(self) -> str:
613
return self.CONFIG.vsftpd
614
615
@property
616
def ws_port(self) -> int:
617
return self.CONFIG.ports['ws']
618
619
@property
620
def curl(self) -> str:
621
return self.CONFIG.curl
622
623
@property
624
def httpd(self) -> str:
625
return self.CONFIG.httpd
626
627
@property
628
def apxs(self) -> str:
629
return self.CONFIG.apxs
630
631
@property
632
def nghttpx(self) -> Optional[str]:
633
return self.CONFIG.nghttpx
634
635
@property
636
def slow_network(self) -> bool:
637
return "CURL_DBG_SOCK_WBLOCK" in os.environ or \
638
"CURL_DBG_SOCK_WPARTIAL" in os.environ
639
640
@property
641
def ci_run(self) -> bool:
642
return "CURL_CI" in os.environ
643
644
def port_for(self, alpn_proto: Optional[str] = None):
645
if alpn_proto is None or \
646
alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']:
647
return self.https_port
648
if alpn_proto in ['h3']:
649
return self.h3_port
650
return self.http_port
651
652
def authority_for(self, domain: str, alpn_proto: Optional[str] = None):
653
return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}'
654
655
def make_data_file(self, indir: str, fname: str, fsize: int,
656
line_length: int = 1024) -> str:
657
if line_length < 11:
658
raise RuntimeError('line_length less than 11 not supported')
659
fpath = os.path.join(indir, fname)
660
s10 = "0123456789"
661
s = round((line_length / 10) + 1) * s10
662
s = s[0:line_length-11]
663
with open(fpath, 'w') as fd:
664
for i in range(int(fsize / line_length)):
665
fd.write(f"{i:09d}-{s}\n")
666
remain = int(fsize % line_length)
667
if remain != 0:
668
i = int(fsize / line_length) + 1
669
fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n")
670
return fpath
671
672
def make_data_gzipbomb(self, indir: str, fname: str, fsize: int) -> str:
673
fpath = os.path.join(indir, fname)
674
gzpath = f'{fpath}.gz'
675
varpath = f'{fpath}.var'
676
677
with open(fpath, 'w') as fd:
678
fd.write('not what we are looking for!\n')
679
count = int(fsize / 1024)
680
zero1k = bytearray(1024)
681
with gzip.open(gzpath, 'wb') as fd:
682
for _ in range(count):
683
fd.write(zero1k)
684
with open(varpath, 'w') as fd:
685
fd.write(f'URI: {fname}\n')
686
fd.write('\n')
687
fd.write(f'URI: {fname}.gz\n')
688
fd.write('Content-Type: text/plain\n')
689
fd.write('Content-Encoding: x-gzip\n')
690
fd.write('\n')
691
return fpath
692
693