Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/pkg
Path: blob/main/external/curl/tests/negtelnetserver.py
2065 views
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
#
4
# Project ___| | | | _ \| |
5
# / __| | | | |_) | |
6
# | (__| |_| | _ <| |___
7
# \___|\___/|_| \_\_____|
8
#
9
# Copyright (C) Daniel Stenberg, <[email protected]>, et al.
10
#
11
# This software is licensed as described in the file COPYING, which
12
# you should have received as part of this distribution. The terms
13
# are also available at https://curl.se/docs/copyright.html.
14
#
15
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
16
# copies of the Software, and permit persons to whom the Software is
17
# furnished to do so, under the terms of the COPYING file.
18
#
19
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20
# KIND, either express or implied.
21
#
22
# SPDX-License-Identifier: curl
23
#
24
"""A telnet server which negotiates."""
25
26
from __future__ import (absolute_import, division, print_function,
27
unicode_literals)
28
29
import argparse
30
import logging
31
import os
32
import socket
33
import sys
34
35
from util import ClosingFileHandler
36
37
if sys.version_info.major >= 3:
38
import socketserver
39
else:
40
import SocketServer as socketserver
41
42
log = logging.getLogger(__name__)
43
HOST = "localhost"
44
IDENT = "NTEL"
45
46
47
# The strings that indicate the test framework is checking our aliveness
48
VERIFIED_REQ = "verifiedserver"
49
VERIFIED_RSP = "WE ROOLZ: {pid}"
50
51
52
def telnetserver(options):
53
"""Start up a TCP server with a telnet handler and serve DICT requests forever."""
54
if options.pidfile:
55
pid = os.getpid()
56
# see tests/server/util.c function write_pidfile
57
if os.name == "nt":
58
pid += 4194304
59
with open(options.pidfile, "w") as f:
60
f.write(str(pid))
61
62
local_bind = (HOST, options.port)
63
log.info("Listening on %s", local_bind)
64
65
# Need to set the allow_reuse on the class, not on the instance.
66
socketserver.TCPServer.allow_reuse_address = True
67
with socketserver.TCPServer(local_bind, NegotiatingTelnetHandler) as server:
68
server.serve_forever()
69
# leaving `with` calls server.close() automatically
70
return ScriptRC.SUCCESS
71
72
73
class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
74
"""Handler class for Telnet connections."""
75
76
def handle(self):
77
"""Negotiates options before reading data."""
78
neg = Negotiator(self.request)
79
80
try:
81
# Send some initial negotiations.
82
neg.send_do("NEW_ENVIRON")
83
neg.send_will("NEW_ENVIRON")
84
neg.send_dont("NAWS")
85
neg.send_wont("NAWS")
86
87
# Get the data passed through the negotiator
88
data = neg.recv(4*1024)
89
log.debug("Incoming data: %r", data)
90
91
if VERIFIED_REQ.encode('utf-8') in data:
92
log.debug("Received verification request from test framework")
93
pid = os.getpid()
94
# see tests/server/util.c function write_pidfile
95
if os.name == "nt":
96
pid += 4194304
97
response = VERIFIED_RSP.format(pid=pid)
98
response_data = response.encode('utf-8')
99
else:
100
log.debug("Received normal request - echoing back")
101
response_data = data.decode('utf-8').strip().encode('utf-8')
102
103
if response_data:
104
log.debug("Sending %r", response_data)
105
self.request.sendall(response_data)
106
107
# put some effort into making a clean socket shutdown
108
# that does not give the client ECONNRESET
109
self.request.settimeout(0.1)
110
self.request.recv(4*1024)
111
self.request.shutdown(socket.SHUT_RDWR)
112
113
except IOError:
114
log.exception("IOError hit during request")
115
116
117
class Negotiator(object):
118
NO_NEG = 0
119
START_NEG = 1
120
WILL = 2
121
WONT = 3
122
DO = 4
123
DONT = 5
124
125
def __init__(self, tcp):
126
self.tcp = tcp
127
self.state = self.NO_NEG
128
129
def recv(self, bytes):
130
"""
131
Read bytes from TCP, handling negotiation sequences.
132
133
:param bytes: Number of bytes to read
134
:return: a buffer of bytes
135
"""
136
buffer = bytearray()
137
138
# If we keep receiving negotiation sequences, we won't fill the buffer.
139
# Keep looping while we can, and until we have something to give back
140
# to the caller.
141
while len(buffer) == 0:
142
data = self.tcp.recv(bytes)
143
if not data:
144
# TCP failed to give us any data. Break out.
145
break
146
147
for byte_int in bytearray(data):
148
if self.state == self.NO_NEG:
149
self.no_neg(byte_int, buffer)
150
elif self.state == self.START_NEG:
151
self.start_neg(byte_int)
152
elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]:
153
self.handle_option(byte_int)
154
else:
155
# Received an unexpected byte. Stop negotiations
156
log.error("Unexpected byte %s in state %s",
157
byte_int,
158
self.state)
159
self.state = self.NO_NEG
160
161
return buffer
162
163
def no_neg(self, byte_int, buffer):
164
# Not negotiating anything thus far. Check to see if we
165
# should.
166
if byte_int == NegTokens.IAC:
167
# Start negotiation
168
log.debug("Starting negotiation (IAC)")
169
self.state = self.START_NEG
170
else:
171
# Just append the incoming byte to the buffer
172
buffer.append(byte_int)
173
174
def start_neg(self, byte_int):
175
# In a negotiation.
176
log.debug("In negotiation (%s)",
177
NegTokens.from_val(byte_int))
178
179
if byte_int == NegTokens.WILL:
180
# Client is confirming they are willing to do an option
181
log.debug("Client is willing")
182
self.state = self.WILL
183
elif byte_int == NegTokens.WONT:
184
# Client is confirming they are unwilling to do an
185
# option
186
log.debug("Client is unwilling")
187
self.state = self.WONT
188
elif byte_int == NegTokens.DO:
189
# Client is indicating they can do an option
190
log.debug("Client can do")
191
self.state = self.DO
192
elif byte_int == NegTokens.DONT:
193
# Client is indicating they can't do an option
194
log.debug("Client can't do")
195
self.state = self.DONT
196
else:
197
# Received an unexpected byte. Stop negotiations
198
log.error("Unexpected byte %s in state %s",
199
byte_int,
200
self.state)
201
self.state = self.NO_NEG
202
203
def handle_option(self, byte_int):
204
if byte_int in [NegOptions.BINARY,
205
NegOptions.CHARSET,
206
NegOptions.SUPPRESS_GO_AHEAD,
207
NegOptions.NAWS,
208
NegOptions.NEW_ENVIRON]:
209
log.debug("Option: %s", NegOptions.from_val(byte_int))
210
211
# No further negotiation of this option needed. Reset the state.
212
self.state = self.NO_NEG
213
214
else:
215
# Received an unexpected byte. Stop negotiations
216
log.error("Unexpected byte %s in state %s",
217
byte_int,
218
self.state)
219
self.state = self.NO_NEG
220
221
def send_message(self, message_ints):
222
self.tcp.sendall(bytearray(message_ints))
223
224
def send_iac(self, arr):
225
message = [NegTokens.IAC]
226
message.extend(arr)
227
self.send_message(message)
228
229
def send_do(self, option_str):
230
log.debug("Sending DO %s", option_str)
231
self.send_iac([NegTokens.DO, NegOptions.to_val(option_str)])
232
233
def send_dont(self, option_str):
234
log.debug("Sending DONT %s", option_str)
235
self.send_iac([NegTokens.DONT, NegOptions.to_val(option_str)])
236
237
def send_will(self, option_str):
238
log.debug("Sending WILL %s", option_str)
239
self.send_iac([NegTokens.WILL, NegOptions.to_val(option_str)])
240
241
def send_wont(self, option_str):
242
log.debug("Sending WONT %s", option_str)
243
self.send_iac([NegTokens.WONT, NegOptions.to_val(option_str)])
244
245
246
class NegBase(object):
247
@classmethod
248
def to_val(cls, name):
249
return getattr(cls, name)
250
251
@classmethod
252
def from_val(cls, val):
253
for k in cls.__dict__:
254
if getattr(cls, k) == val:
255
return k
256
257
return "<unknown>"
258
259
260
class NegTokens(NegBase):
261
# The start of a negotiation sequence
262
IAC = 255
263
# Confirm willingness to negotiate
264
WILL = 251
265
# Confirm unwillingness to negotiate
266
WONT = 252
267
# Indicate willingness to negotiate
268
DO = 253
269
# Indicate unwillingness to negotiate
270
DONT = 254
271
272
# The start of sub-negotiation options.
273
SB = 250
274
# The end of sub-negotiation options.
275
SE = 240
276
277
278
class NegOptions(NegBase):
279
# Binary Transmission
280
BINARY = 0
281
# Suppress Go Ahead
282
SUPPRESS_GO_AHEAD = 3
283
# NAWS - width and height of client
284
NAWS = 31
285
# NEW-ENVIRON - environment variables on client
286
NEW_ENVIRON = 39
287
# Charset option
288
CHARSET = 42
289
290
291
def get_options():
292
parser = argparse.ArgumentParser()
293
294
parser.add_argument("--port", action="store", default=9019,
295
type=int, help="port to listen on")
296
parser.add_argument("--verbose", action="store", type=int, default=0,
297
help="verbose output")
298
parser.add_argument("--pidfile", action="store",
299
help="file name for the PID")
300
parser.add_argument("--logfile", action="store",
301
help="file name for the log")
302
parser.add_argument("--srcdir", action="store", help="test directory")
303
parser.add_argument("--id", action="store", help="server ID")
304
parser.add_argument("--ipv4", action="store_true", default=0,
305
help="IPv4 flag")
306
307
return parser.parse_args()
308
309
310
def setup_logging(options):
311
"""Set up logging from the command line options."""
312
root_logger = logging.getLogger()
313
add_stdout = False
314
315
formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s "
316
"[{ident}] %(message)s"
317
.format(ident=IDENT))
318
319
# Write out to a logfile
320
if options.logfile:
321
handler = ClosingFileHandler(options.logfile)
322
handler.setFormatter(formatter)
323
handler.setLevel(logging.DEBUG)
324
root_logger.addHandler(handler)
325
else:
326
# The logfile wasn't specified. Add a stdout logger.
327
add_stdout = True
328
329
if options.verbose:
330
# Add a stdout logger as well in verbose mode
331
root_logger.setLevel(logging.DEBUG)
332
add_stdout = True
333
else:
334
root_logger.setLevel(logging.INFO)
335
336
if add_stdout:
337
stdout_handler = logging.StreamHandler(sys.stdout)
338
stdout_handler.setFormatter(formatter)
339
stdout_handler.setLevel(logging.DEBUG)
340
root_logger.addHandler(stdout_handler)
341
342
343
class ScriptRC(object):
344
"""Enum for script return codes."""
345
346
SUCCESS = 0
347
FAILURE = 1
348
EXCEPTION = 2
349
350
351
if __name__ == '__main__':
352
# Get the options from the user.
353
options = get_options()
354
355
# Setup logging using the user options
356
setup_logging(options)
357
358
# Run main script.
359
try:
360
rc = telnetserver(options)
361
except Exception:
362
log.exception('Error in telnet server')
363
rc = ScriptRC.EXCEPTION
364
365
if options.pidfile and os.path.isfile(options.pidfile):
366
os.unlink(options.pidfile)
367
368
log.info("Returning %d", rc)
369
sys.exit(rc)
370
371