Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/contrib/lib9p/pytest/sequencer.py
39536 views
1
#! /usr/bin/env python
2
3
from __future__ import print_function
4
5
#__all__ = ['EncDec', 'EncDecSimple', 'EncDecTyped', 'EncDecA',
6
# 'SequenceError', 'Sequencer']
7
8
import abc
9
import struct
10
import sys
11
12
_ProtoStruct = {
13
'1': struct.Struct('<B'),
14
'2': struct.Struct('<H'),
15
'4': struct.Struct('<I'),
16
'8': struct.Struct('<Q'),
17
'_string_': None, # handled specially
18
}
19
for _i in (1, 2, 4, 8):
20
_ProtoStruct[_i] = _ProtoStruct[str(_i)]
21
del _i
22
23
class EncDec(object):
24
__metaclass__ = abc.ABCMeta
25
"""
26
Base class for en/de-coders, which are put into sequencers.
27
28
All have a name and arbitrary user-supplied auxiliary data
29
(default=None).
30
31
All provide a pack() and unpack(). The pack() function
32
returns a "bytes" value. This is internally implemented as a
33
function apack() that returns a list of struct.pack() bytes,
34
and pack() just joins them up as needed.
35
36
The pack/unpack functions take a dictionary of variable names
37
and values, and a second dictionary for conditionals, but at
38
this level conditionals don't apply: they are just being
39
passed through. Variable names do apply to array encoders
40
41
EncDec also provide b2s() and s2b() static methods, which
42
convert strings to bytes and vice versa, as reversibly as
43
possible (using surrogateescape encoding). In Python2 this is
44
a no-op since the string type *is* the bytes type (<type
45
'unicode'>) is the unicode-ized string type).
46
47
EncDec also provides b2u() and u2b() to do conversion to/from
48
Unicode.
49
50
These are partly for internal use (all strings get converted
51
to UTF-8 byte sequences when coding a _string_ type) and partly
52
for doctests, where we just want some py2k/py3k compat hacks.
53
"""
54
def __init__(self, name, aux):
55
self.name = name
56
self.aux = aux
57
58
@staticmethod
59
def b2u(byte_sequence):
60
"transform bytes to unicode"
61
return byte_sequence.decode('utf-8', 'surrogateescape')
62
63
@staticmethod
64
def u2b(unicode_sequence):
65
"transform unicode to bytes"
66
return unicode_sequence.encode('utf-8', 'surrogateescape')
67
68
if sys.version_info[0] >= 3:
69
b2s = b2u
70
@staticmethod
71
def s2b(string):
72
"transform string to bytes (leaves raw byte sequence unchanged)"
73
if isinstance(string, bytes):
74
return string
75
return string.encode('utf-8', 'surrogateescape')
76
else:
77
@staticmethod
78
def b2s(byte_sequence):
79
"transform bytes to string - no-op in python2.7"
80
return byte_sequence
81
@staticmethod
82
def s2b(string):
83
"transform string or unicode to bytes"
84
if isinstance(string, unicode):
85
return string.encode('utf-8', 'surrogateescape')
86
return string
87
88
def pack(self, vdict, cdict, val):
89
"encode value <val> into a byte-string"
90
return b''.join(self.apack(vdict, cdict, val))
91
92
@abc.abstractmethod
93
def apack(self, vdict, cdict, val):
94
"encode value <val> into [bytes1, b2, ..., bN]"
95
96
@abc.abstractmethod
97
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
98
"unpack bytes from <bstring> at <offset>"
99
100
101
class EncDecSimple(EncDec):
102
r"""
103
Encode/decode a simple (but named) field. The field is not an
104
array, which requires using EncDecA, nor a typed object
105
like a qid or stat instance -- those require a Sequence and
106
EncDecTyped.
107
108
The format is one of '1'/1, '2'/2, '4'/4, '8'/8, or '_string_'.
109
110
Note: using b2s here is purely a doctest/tetsmod python2/python3
111
compat hack. The output of e.pack is <type 'bytes'>; b2s
112
converts it to a string, purely for display purposes. (It might
113
be better to map py2 output to bytes but they just print as a
114
string anyway.) In normal use, you should not call b2s here.
115
116
>>> e = EncDecSimple('eggs', 2)
117
>>> e.b2s(e.pack({}, {}, 0))
118
'\x00\x00'
119
>>> e.b2s(e.pack({}, {}, 256))
120
'\x00\x01'
121
122
Values that cannot be packed produce a SequenceError:
123
124
>>> e.pack({}, {}, None)
125
Traceback (most recent call last):
126
...
127
SequenceError: failed while packing 'eggs'=None
128
>>> e.pack({}, {}, -1)
129
Traceback (most recent call last):
130
...
131
SequenceError: failed while packing 'eggs'=-1
132
133
Unpacking both returns a value, and tells how many bytes it
134
used out of the bytestring or byte-array argument. If there
135
are not enough bytes remaining at the starting offset, it
136
raises a SequenceError, unless noerror=True (then unset
137
values are None)
138
139
>>> e.unpack({}, {}, b'\x00\x01', 0)
140
(256, 2)
141
>>> e.unpack({}, {}, b'', 0)
142
Traceback (most recent call last):
143
...
144
SequenceError: out of data while unpacking 'eggs'
145
>>> e.unpack({}, {}, b'', 0, noerror=True)
146
(None, 2)
147
148
Note that strings can be provided as regular strings, byte
149
strings (same as regular strings in py2k), or Unicode strings
150
(same as regular strings in py3k). Unicode strings will be
151
converted to UTF-8 before being packed. Since this leaves
152
7-bit characters alone, these examples work in both py2k and
153
py3k. (Note: the UTF-8 encoding of u'\u1234' is
154
'\0xe1\0x88\0xb4' or 225, 136, 180. The b2i trick below is
155
another py2k vs py3k special case just for doctests: py2k
156
tries to display the utf-8 encoded data as a string.)
157
158
>>> e = EncDecSimple('spam', '_string_')
159
>>> e.b2s(e.pack({}, {}, 'p3=unicode,p2=bytes'))
160
'\x13\x00p3=unicode,p2=bytes'
161
162
>>> e.b2s(e.pack({}, {}, b'bytes'))
163
'\x05\x00bytes'
164
165
>>> import sys
166
>>> ispy3k = sys.version_info[0] >= 3
167
168
>>> b2i = lambda x: x if ispy3k else ord(x)
169
>>> [b2i(x) for x in e.pack({}, {}, u'\u1234')]
170
[3, 0, 225, 136, 180]
171
172
The byte length of the utf-8 data cannot exceed 65535 since
173
the encoding has the length as a 2-byte field (a la the
174
encoding for 'eggs' here). A too-long string produces
175
a SequenceError as well.
176
177
>>> e.pack({}, {}, 16384 * 'spam')
178
Traceback (most recent call last):
179
...
180
SequenceError: string too long (len=65536) while packing 'spam'
181
182
Unpacking strings produces byte arrays. (Of course,
183
in py2k these are also known as <type 'str'>.)
184
185
>>> unpacked = e.unpack({}, {}, b'\x04\x00data', 0)
186
>>> etype = bytes if ispy3k else str
187
>>> print(isinstance(unpacked[0], etype))
188
True
189
>>> e.b2s(unpacked[0])
190
'data'
191
>>> unpacked[1]
192
6
193
194
You may use e.b2s() to conver them to unicode strings in py3k,
195
or you may set e.autob2s. This still only really does
196
anything in py3k, since py2k strings *are* bytes, so it's
197
really just intended for doctest purposes (see EncDecA):
198
199
>>> e.autob2s = True
200
>>> e.unpack({}, {}, b'\x07\x00stringy', 0)
201
('stringy', 9)
202
"""
203
def __init__(self, name, fmt, aux=None):
204
super(EncDecSimple, self).__init__(name, aux)
205
self.fmt = fmt
206
self.struct = _ProtoStruct[fmt]
207
self.autob2s = False
208
209
def __repr__(self):
210
if self.aux is None:
211
return '{0}({1!r}, {2!r})'.format(self.__class__.__name__,
212
self.name, self.fmt)
213
return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__,
214
self.name, self.fmt, self.aux)
215
216
__str__ = __repr__
217
218
def apack(self, vdict, cdict, val):
219
"encode a value"
220
try:
221
if self.struct:
222
return [self.struct.pack(val)]
223
sval = self.s2b(val)
224
if len(sval) > 65535:
225
raise SequenceError('string too long (len={0:d}) '
226
'while packing {1!r}'.format(len(sval), self.name))
227
return [EncDecSimple.string_len.pack(len(sval)), sval]
228
# Include AttributeError in case someone tries to, e.g.,
229
# pack name=None and self.s2b() tries to use .encode on it.
230
except (struct.error, AttributeError):
231
raise SequenceError('failed '
232
'while packing {0!r}={1!r}'.format(self.name, val))
233
234
def _unpack1(self, via, bstring, offset, noerror):
235
"internal function to unpack single item"
236
try:
237
tup = via.unpack_from(bstring, offset)
238
except struct.error as err:
239
if 'unpack_from requires a buffer of at least' in str(err):
240
if noerror:
241
return None, offset + via.size
242
raise SequenceError('out of data '
243
'while unpacking {0!r}'.format(self.name))
244
# not clear what to do here if noerror
245
raise SequenceError('failed '
246
'while unpacking {0!r}'.format(self.name))
247
assert len(tup) == 1
248
return tup[0], offset + via.size
249
250
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
251
"decode a value; return the value and the new offset"
252
if self.struct:
253
return self._unpack1(self.struct, bstring, offset, noerror)
254
slen, offset = self._unpack1(EncDecSimple.string_len, bstring, offset,
255
noerror)
256
if slen is None:
257
return None, offset
258
nexto = offset + slen
259
if len(bstring) < nexto:
260
if noerror:
261
val = None
262
else:
263
raise SequenceError('out of data '
264
'while unpacking {0!r}'.format(self.name))
265
else:
266
val = bstring[offset:nexto]
267
if self.autob2s:
268
val = self.b2s(val)
269
return val, nexto
270
271
# string length: 2 byte unsigned field
272
EncDecSimple.string_len = _ProtoStruct[2]
273
274
class EncDecTyped(EncDec):
275
r"""
276
EncDec for typed objects (which are build from PFODs, which are
277
a sneaky class variant of OrderedDict similar to namedtuple).
278
279
Calling the klass() function with no arguments must create an
280
instance with all-None members.
281
282
We also require a Sequencer to pack and unpack the members of
283
the underlying pfod.
284
285
>>> qid_s = Sequencer('qid')
286
>>> qid_s.append_encdec(None, EncDecSimple('type', 1))
287
>>> qid_s.append_encdec(None, EncDecSimple('version', 4))
288
>>> qid_s.append_encdec(None, EncDecSimple('path', 8))
289
>>> len(qid_s)
290
3
291
292
>>> from pfod import pfod
293
>>> qid = pfod('qid', ['type', 'version', 'path'])
294
>>> len(qid._fields)
295
3
296
>>> qid_inst = qid(1, 2, 3)
297
>>> qid_inst
298
qid(type=1, version=2, path=3)
299
300
>>> e = EncDecTyped(qid, 'aqid', qid_s)
301
>>> e.b2s(e.pack({}, {}, qid_inst))
302
'\x01\x02\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00'
303
>>> e.unpack({}, {},
304
... b'\x01\x02\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00', 0)
305
(qid(type=1, version=2, path=3), 13)
306
307
If an EncDecTyped instance has a conditional sequencer, note
308
that unpacking will leave un-selected items set to None (see
309
the Sequencer example below):
310
311
>>> breakfast = pfod('breakfast', 'eggs spam ham')
312
>>> breakfast()
313
breakfast(eggs=None, spam=None, ham=None)
314
>>> bfseq = Sequencer('breakfast')
315
>>> bfseq.append_encdec(None, EncDecSimple('eggs', 1))
316
>>> bfseq.append_encdec('yuck', EncDecSimple('spam', 1))
317
>>> bfseq.append_encdec(None, EncDecSimple('ham', 1))
318
>>> e = EncDecTyped(breakfast, 'bfname', bfseq)
319
>>> e.unpack({}, {'yuck': False}, b'\x02\x01\x04', 0)
320
(breakfast(eggs=2, spam=None, ham=1), 2)
321
322
This used just two of the three bytes: eggs=2, ham=1.
323
324
>>> e.unpack({}, {'yuck': True}, b'\x02\x01\x04', 0)
325
(breakfast(eggs=2, spam=1, ham=4), 3)
326
327
This used the third byte, so ham=4.
328
"""
329
def __init__(self, klass, name, sequence, aux=None):
330
assert len(sequence) == len(klass()._fields) # temporary
331
super(EncDecTyped, self).__init__(name, aux)
332
self.klass = klass
333
self.name = name
334
self.sequence = sequence
335
336
def __repr__(self):
337
if self.aux is None:
338
return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__,
339
self.klass, self.name, self.sequence)
340
return '{0}({1!r}, {2!r}, {3!r}, {4!r})'.format(self.__class__.__name__,
341
self.klass, self.name, self.sequence, self.aux)
342
343
__str__ = __repr__
344
345
def apack(self, vdict, cdict, val):
346
"""
347
Pack each of our instance variables.
348
349
Note that some packing may be conditional.
350
"""
351
return self.sequence.apack(val, cdict)
352
353
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
354
"""
355
Unpack each instance variable, into a new object of
356
self.klass. Return the new instance and new offset.
357
358
Note that some unpacking may be conditional.
359
"""
360
obj = self.klass()
361
offset = self.sequence.unpack_from(obj, cdict, bstring, offset, noerror)
362
return obj, offset
363
364
class EncDecA(EncDec):
365
r"""
366
EncDec for arrays (repeated objects).
367
368
We take the name of repeat count variable, and a sub-coder
369
(Sequencer instance). For instance, we can en/de-code
370
repeat='nwname' copies of name='wname', or nwname of
371
name='wqid', in a Twalk en/de-code.
372
373
Note that we don't pack or unpack the repeat count itself --
374
that must be done by higher level code. We just get its value
375
from vdict.
376
377
>>> subcode = EncDecSimple('wname', '_string_')
378
>>> e = EncDecA('nwname', 'wname', subcode)
379
>>> e.b2s(e.pack({'nwname': 2}, {}, ['A', 'BC']))
380
'\x01\x00A\x02\x00BC'
381
382
>>> subcode.autob2s = True # so that A and BC decode to py3k str
383
>>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02\x00BC', 0)
384
(['A', 'BC'], 7)
385
386
When using noerror, the first sub-item that fails to decode
387
completely starts the None-s. Strings whose length fails to
388
decode are assumed to be zero bytes long as well, for the
389
purpose of showing the expected packet length:
390
391
>>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02\x00', 0, noerror=True)
392
(['A', None], 7)
393
>>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02', 0, noerror=True)
394
(['A', None], 5)
395
>>> e.unpack({'nwname': 3}, {}, b'\x01\x00A\x02', 0, noerror=True)
396
(['A', None, None], 7)
397
398
As a special case, supplying None for the sub-coder
399
makes the repeated item pack or unpack a simple byte
400
string. (Note that autob2s is not supported here.)
401
A too-short byte string is simply truncated!
402
403
>>> e = EncDecA('count', 'data', None)
404
>>> e.b2s(e.pack({'count': 5}, {}, b'12345'))
405
'12345'
406
>>> x = list(e.unpack({'count': 3}, {}, b'123', 0))
407
>>> x[0] = e.b2s(x[0])
408
>>> x
409
['123', 3]
410
>>> x = list(e.unpack({'count': 3}, {}, b'12', 0, noerror=True))
411
>>> x[0] = e.b2s(x[0])
412
>>> x
413
['12', 3]
414
"""
415
def __init__(self, repeat, name, sub, aux=None):
416
super(EncDecA, self).__init__(name, aux)
417
self.repeat = repeat
418
self.name = name
419
self.sub = sub
420
421
def __repr__(self):
422
if self.aux is None:
423
return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__,
424
self.repeat, self.name, self.sub)
425
return '{0}({1!r}, {2!r}, {3!r}, {4!r})'.format(self.__class__.__name__,
426
self.repeat, self.name, self.sub, self.aux)
427
428
__str__ = __repr__
429
430
def apack(self, vdict, cdict, val):
431
"pack each val[i], for i in range(vdict[self.repeat])"
432
num = vdict[self.repeat]
433
assert num == len(val)
434
if self.sub is None:
435
assert isinstance(val, bytes)
436
return [val]
437
parts = []
438
for i in val:
439
parts.extend(self.sub.apack(vdict, cdict, i))
440
return parts
441
442
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
443
"unpack repeatedly, per self.repeat, into new array."
444
num = vdict[self.repeat]
445
if num is None and noerror:
446
num = 0
447
else:
448
assert num >= 0
449
if self.sub is None:
450
nexto = offset + num
451
if len(bstring) < nexto and not noerror:
452
raise SequenceError('out of data '
453
'while unpacking {0!r}'.format(self.name))
454
return bstring[offset:nexto], nexto
455
array = []
456
for i in range(num):
457
obj, offset = self.sub.unpack(vdict, cdict, bstring, offset,
458
noerror)
459
array.append(obj)
460
return array, offset
461
462
class SequenceError(Exception):
463
"sequence error: item too big, or ran out of data"
464
pass
465
466
class Sequencer(object):
467
r"""
468
A sequencer is an object that packs (marshals) or unpacks
469
(unmarshals) a series of objects, according to their EncDec
470
instances.
471
472
The objects themselves (and their values) come from, or
473
go into, a dictionary: <vdict>, the first argument to
474
pack/unpack.
475
476
Some fields may be conditional. The conditions are in a
477
separate dictionary (the second or <cdict> argument).
478
479
Some objects may be dictionaries or PFODs, e.g., they may
480
be a Plan9 qid or stat structure. These have their own
481
sub-encoding.
482
483
As with each encoder, we have both an apack() function
484
(returns a list of parts) and a plain pack(). Users should
485
mostly stick with plain pack().
486
487
>>> s = Sequencer('monty')
488
>>> s
489
Sequencer('monty')
490
>>> e = EncDecSimple('eggs', 2)
491
>>> s.append_encdec(None, e)
492
>>> s.append_encdec(None, EncDecSimple('spam', 1))
493
>>> s[0]
494
(None, EncDecSimple('eggs', 2))
495
>>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {}))
496
'\x01\x02A'
497
498
When particular fields are conditional, they appear in
499
packed output, or are taken from the byte-string during
500
unpacking, only if their condition is true.
501
502
As with struct, use unpack_from to start at an arbitrary
503
offset and/or omit verification that the entire byte-string
504
is consumed.
505
506
>>> s = Sequencer('python')
507
>>> s.append_encdec(None, e)
508
>>> s.append_encdec('.u', EncDecSimple('spam', 1))
509
>>> s[1]
510
('.u', EncDecSimple('spam', 1))
511
>>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {'.u': True}))
512
'\x01\x02A'
513
>>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {'.u': False}))
514
'\x01\x02'
515
516
>>> d = {}
517
>>> s.unpack(d, {'.u': True}, b'\x01\x02A')
518
>>> print(d['eggs'], d['spam'])
519
513 65
520
>>> d = {}
521
>>> s.unpack(d, {'.u': False}, b'\x01\x02A', 0)
522
Traceback (most recent call last):
523
...
524
SequenceError: 1 byte(s) unconsumed
525
>>> s.unpack_from(d, {'.u': False}, b'\x01\x02A', 0)
526
2
527
>>> print(d)
528
{'eggs': 513}
529
530
The incoming dictionary-like object may be pre-initialized
531
if you like; only sequences that decode are filled-in:
532
533
>>> d = {'eggs': None, 'spam': None}
534
>>> s.unpack_from(d, {'.u': False}, b'\x01\x02A', 0)
535
2
536
>>> print(d['eggs'], d['spam'])
537
513 None
538
539
Some objects may be arrays; if so their EncDec is actually
540
an EncDecA, the repeat count must be in the dictionary, and
541
the object itself must have a len() and be index-able:
542
543
>>> s = Sequencer('arr')
544
>>> s.append_encdec(None, EncDecSimple('n', 1))
545
>>> ae = EncDecSimple('array', 2)
546
>>> s.append_encdec(None, EncDecA('n', 'array', ae))
547
>>> ae.b2s(s.pack({'n': 2, 'array': [257, 514]}, {}))
548
'\x02\x01\x01\x02\x02'
549
550
Unpacking an array creates a list of the number of items.
551
The EncDec encoder that decodes the number of items needs to
552
occur first in the sequencer, so that the dictionary will have
553
acquired the repeat-count variable's value by the time we hit
554
the array's encdec:
555
556
>>> d = {}
557
>>> s.unpack(d, {}, b'\x01\x04\x00')
558
>>> d['n'], d['array']
559
(1, [4])
560
"""
561
def __init__(self, name):
562
self.name = name
563
self._codes = []
564
self.debug = False # or sys.stderr
565
566
def __repr__(self):
567
return '{0}({1!r})'.format(self.__class__.__name__, self.name)
568
569
__str__ = __repr__
570
571
def __len__(self):
572
return len(self._codes)
573
574
def __iter__(self):
575
return iter(self._codes)
576
577
def __getitem__(self, index):
578
return self._codes[index]
579
580
def dprint(self, *args, **kwargs):
581
if not self.debug:
582
return
583
if isinstance(self.debug, bool):
584
dest = sys.stdout
585
else:
586
dest = self.debug
587
print(*args, file=dest, **kwargs)
588
589
def append_encdec(self, cond, code):
590
"add EncDec en/de-coder, conditional on cond"
591
self._codes.append((cond, code))
592
593
def apack(self, vdict, cdict):
594
"""
595
Produce packed representation of each field.
596
"""
597
packed_data = []
598
for cond, code in self._codes:
599
# Skip this item if it's conditional on a false thing.
600
if cond is not None and not cdict[cond]:
601
self.dprint('skip %r - %r is False' % (code, cond))
602
continue
603
604
# Pack the item.
605
self.dprint('pack %r - no cond or %r is True' % (code, cond))
606
packed_data.extend(code.apack(vdict, cdict, vdict[code.name]))
607
608
return packed_data
609
610
def pack(self, vdict, cdict):
611
"""
612
Flatten packed data.
613
"""
614
return b''.join(self.apack(vdict, cdict))
615
616
def unpack_from(self, vdict, cdict, bstring, offset=0, noerror=False):
617
"""
618
Unpack from byte string.
619
620
The values are unpacked into a dictionary vdict;
621
some of its entries may themselves be ordered
622
dictionaries created by typedefed codes.
623
624
Raises SequenceError if the string is too short,
625
unless you set noerror, in which case we assume
626
you want see what you can get out of the data.
627
"""
628
for cond, code in self._codes:
629
# Skip this item if it's conditional on a false thing.
630
if cond is not None and not cdict[cond]:
631
self.dprint('skip %r - %r is False' % (code, cond))
632
continue
633
634
# Unpack the item.
635
self.dprint('unpack %r - no cond or %r is True' % (code, cond))
636
obj, offset = code.unpack(vdict, cdict, bstring, offset, noerror)
637
vdict[code.name] = obj
638
639
return offset
640
641
def unpack(self, vdict, cdict, bstring, noerror=False):
642
"""
643
Like unpack_from but unless noerror=True, requires that
644
we completely use up the given byte string.
645
"""
646
offset = self.unpack_from(vdict, cdict, bstring, 0, noerror)
647
if not noerror and offset != len(bstring):
648
raise SequenceError('{0} byte(s) unconsumed'.format(
649
len(bstring) - offset))
650
651
if __name__ == '__main__':
652
import doctest
653
doctest.testmod()
654
655