Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/pkg
Path: blob/main/external/curl/tests/http/testenv/vsftpd.py
2066 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
#***************************************************************************
4
# _ _ ____ _
5
# Project ___| | | | _ \| |
6
# / __| | | | |_) | |
7
# | (__| |_| | _ <| |___
8
# \___|\___/|_| \_\_____|
9
#
10
# Copyright (C) Daniel Stenberg, <[email protected]>, et al.
11
#
12
# This software is licensed as described in the file COPYING, which
13
# you should have received as part of this distribution. The terms
14
# are also available at https://curl.se/docs/copyright.html.
15
#
16
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17
# copies of the Software, and permit persons to whom the Software is
18
# furnished to do so, under the terms of the COPYING file.
19
#
20
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21
# KIND, either express or implied.
22
#
23
# SPDX-License-Identifier: curl
24
#
25
###########################################################################
26
#
27
import logging
28
import os
29
import re
30
import socket
31
import subprocess
32
import time
33
34
from datetime import datetime, timedelta
35
from typing import List, Dict
36
37
from .curl import CurlClient, ExecResult
38
from .env import Env
39
from .ports import alloc_ports_and_do
40
41
log = logging.getLogger(__name__)
42
43
44
class VsFTPD:
45
46
def __init__(self, env: Env, with_ssl=False, ssl_implicit=False):
47
self.env = env
48
self._cmd = env.vsftpd
49
self._port = 0
50
self._with_ssl = with_ssl
51
self._ssl_implicit = ssl_implicit and with_ssl
52
self._scheme = 'ftps' if self._ssl_implicit else 'ftp'
53
if self._with_ssl:
54
self.name = 'vsftpds'
55
self._port_skey = 'ftps'
56
self._port_specs = {
57
'ftps': socket.SOCK_STREAM,
58
}
59
else:
60
self.name = 'vsftpd'
61
self._port_skey = 'ftp'
62
self._port_specs = {
63
'ftp': socket.SOCK_STREAM,
64
}
65
self._vsftpd_dir = os.path.join(env.gen_dir, self.name)
66
self._run_dir = os.path.join(self._vsftpd_dir, 'run')
67
self._docs_dir = os.path.join(self._vsftpd_dir, 'docs')
68
self._tmp_dir = os.path.join(self._vsftpd_dir, 'tmp')
69
self._conf_file = os.path.join(self._vsftpd_dir, 'test.conf')
70
self._pid_file = os.path.join(self._vsftpd_dir, 'vsftpd.pid')
71
self._error_log = os.path.join(self._vsftpd_dir, 'vsftpd.log')
72
self._process = None
73
74
self.clear_logs()
75
76
@property
77
def domain(self):
78
return self.env.ftp_domain
79
80
@property
81
def docs_dir(self):
82
return self._docs_dir
83
84
@property
85
def port(self) -> int:
86
return self._port
87
88
def clear_logs(self):
89
self._rmf(self._error_log)
90
91
def exists(self):
92
return os.path.exists(self._cmd)
93
94
def is_running(self):
95
if self._process:
96
self._process.poll()
97
return self._process.returncode is None
98
return False
99
100
def start_if_needed(self):
101
if not self.is_running():
102
return self.start()
103
return True
104
105
def stop(self, wait_dead=True):
106
self._mkpath(self._tmp_dir)
107
if self._process:
108
self._process.terminate()
109
self._process.wait(timeout=2)
110
self._process = None
111
return not wait_dead or self.wait_dead(timeout=timedelta(seconds=5))
112
return True
113
114
def restart(self):
115
self.stop()
116
return self.start()
117
118
def initial_start(self):
119
120
def startup(ports: Dict[str, int]) -> bool:
121
self._port = ports[self._port_skey]
122
if self.start():
123
self.env.update_ports(ports)
124
return True
125
self.stop()
126
self._port = 0
127
return False
128
129
return alloc_ports_and_do(self._port_specs, startup,
130
self.env.gen_root, max_tries=3)
131
132
def start(self, wait_live=True):
133
assert self._port > 0
134
self._mkpath(self._tmp_dir)
135
if self._process:
136
self.stop()
137
self._write_config()
138
args = [
139
self._cmd,
140
f'{self._conf_file}',
141
]
142
procerr = open(self._error_log, 'a')
143
self._process = subprocess.Popen(args=args, stderr=procerr)
144
if self._process.returncode is not None:
145
return False
146
return not wait_live or self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
147
148
def wait_dead(self, timeout: timedelta):
149
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
150
try_until = datetime.now() + timeout
151
while datetime.now() < try_until:
152
check_url = f'{self._scheme}://{self.domain}:{self.port}/'
153
r = curl.ftp_get(urls=[check_url], extra_args=['-v'])
154
if r.exit_code != 0:
155
return True
156
log.debug(f'waiting for vsftpd to stop responding: {r}')
157
time.sleep(.1)
158
log.debug(f"Server still responding after {timeout}")
159
return False
160
161
def wait_live(self, timeout: timedelta):
162
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
163
try_until = datetime.now() + timeout
164
while datetime.now() < try_until:
165
check_url = f'{self._scheme}://{self.domain}:{self.port}/'
166
r = curl.ftp_get(urls=[check_url], extra_args=[
167
'--trace', 'curl-start.trace', '--trace-time'
168
])
169
if r.exit_code == 0:
170
return True
171
time.sleep(.1)
172
log.error(f"Server still not responding after {timeout}")
173
return False
174
175
def _rmf(self, path):
176
if os.path.exists(path):
177
return os.remove(path)
178
179
def _mkpath(self, path):
180
if not os.path.exists(path):
181
return os.makedirs(path)
182
183
def _write_config(self):
184
self._mkpath(self._docs_dir)
185
self._mkpath(self._tmp_dir)
186
conf = [ # base server config
187
'listen=YES',
188
'run_as_launching_user=YES',
189
'#listen_address=127.0.0.1',
190
f'listen_port={self.port}',
191
'local_enable=NO',
192
'anonymous_enable=YES',
193
f'anon_root={self._docs_dir}',
194
'dirmessage_enable=YES',
195
'write_enable=YES',
196
'anon_upload_enable=YES',
197
'log_ftp_protocol=YES',
198
'xferlog_enable=YES',
199
'xferlog_std_format=NO',
200
f'vsftpd_log_file={self._error_log}',
201
'\n',
202
]
203
if self._with_ssl:
204
creds = self.env.get_credentials(self.domain)
205
assert creds # convince pytype this isn't None
206
conf.extend([
207
'ssl_enable=YES',
208
'debug_ssl=YES',
209
'allow_anon_ssl=YES',
210
f'rsa_cert_file={creds.cert_file}',
211
f'rsa_private_key_file={creds.pkey_file}',
212
# require_ssl_reuse=YES means ctrl and data connection need to use the same session
213
'require_ssl_reuse=NO',
214
])
215
if self._ssl_implicit:
216
conf.extend([
217
'implicit_ssl=YES',
218
])
219
with open(self._conf_file, 'w') as fd:
220
fd.write("\n".join(conf))
221
222
def get_data_ports(self, r: ExecResult) -> List[int]:
223
return [int(m.group(1)) for line in r.trace_lines if
224
(m := re.match(r'.*Connected 2nd connection to .* port (\d+)', line))]
225
226