Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/pkg
Path: blob/main/external/curl/tests/http/test_40_socks.py
2649 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
#***************************************************************************
4
# _ _ ____ _
5
# Project ___| | | | _ \| |
6
# / __| | | | |_) | |
7
# | (__| |_| | _ <| |___
8
# \___|\___/|_| \_\_____|
9
#
10
# Copyright (C) Daniel Stenberg, <[email protected]>, et al.
11
#
12
# This software is licensed as described in the file COPYING, which
13
# you should have received as part of this distribution. The terms
14
# are also available at https://curl.se/docs/copyright.html.
15
#
16
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17
# copies of the Software, and permit persons to whom the Software is
18
# furnished to do so, under the terms of the COPYING file.
19
#
20
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21
# KIND, either express or implied.
22
#
23
# SPDX-License-Identifier: curl
24
#
25
###########################################################################
26
#
27
import logging
28
import os
29
from typing import Generator
30
import pytest
31
32
from testenv import Env, CurlClient, Dante
33
34
35
log = logging.getLogger(__name__)
36
37
38
@pytest.mark.skipif(condition=not Env.has_danted(), reason="missing danted")
39
class TestSocks:
40
41
@pytest.fixture(scope='class')
42
def danted(self, env: Env) -> Generator[Dante, None, None]:
43
danted = Dante(env=env)
44
assert danted.initial_start()
45
yield danted
46
danted.stop()
47
48
@pytest.fixture(autouse=True, scope='class')
49
def _class_scope(self, env, httpd):
50
indir = httpd.docs_dir
51
env.make_data_file(indir=indir, fname="data-10m", fsize=10*1024*1024)
52
env.make_data_file(indir=env.gen_dir, fname="data-10m", fsize=10*1024*1024)
53
54
@pytest.mark.parametrize("sproto", ['socks4', 'socks5'])
55
def test_40_01_socks_http(self, env: Env, sproto, danted: Dante, httpd):
56
curl = CurlClient(env=env, socks_args=[
57
f'--{sproto}', f'127.0.0.1:{danted.port}'
58
])
59
url = f'http://{env.domain1}:{env.http_port}/data.json'
60
r = curl.http_get(url=url)
61
r.check_response(http_status=200)
62
63
@pytest.mark.parametrize("sproto", ['socks4', 'socks5'])
64
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
65
def test_40_02_socks_https(self, env: Env, sproto, proto, danted: Dante, httpd):
66
if proto == 'h3' and not env.have_h3():
67
pytest.skip("h3 not supported")
68
curl = CurlClient(env=env, socks_args=[
69
f'--{sproto}', f'127.0.0.1:{danted.port}'
70
])
71
url = f'https://{env.authority_for(env.domain1, proto)}/data.json'
72
r = curl.http_get(url=url, alpn_proto=proto)
73
if proto == 'h3':
74
assert r.exit_code == 3 # unsupported combination
75
else:
76
r.check_response(http_status=200)
77
78
@pytest.mark.parametrize("sproto", ['socks4', 'socks5'])
79
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
80
def test_40_03_dl_serial(self, env: Env, httpd, danted, proto, sproto):
81
count = 3
82
urln = f'https://{env.authority_for(env.domain1, proto)}/data-10m?[0-{count-1}]'
83
curl = CurlClient(env=env, socks_args=[
84
f'--{sproto}', f'127.0.0.1:{danted.port}'
85
])
86
r = curl.http_download(urls=[urln], alpn_proto=proto)
87
r.check_response(count=count, http_status=200)
88
89
@pytest.mark.parametrize("sproto", ['socks4', 'socks5'])
90
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
91
def test_40_04_ul_serial(self, env: Env, httpd, danted, proto, sproto):
92
fdata = os.path.join(env.gen_dir, 'data-10m')
93
count = 2
94
curl = CurlClient(env=env, socks_args=[
95
f'--{sproto}', f'127.0.0.1:{danted.port}'
96
])
97
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
98
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto)
99
r.check_stats(count=count, http_status=200, exitcode=0)
100
indata = open(fdata).readlines()
101
for i in range(count):
102
respdata = open(curl.response_file(i)).readlines()
103
assert respdata == indata
104
105