Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/crypto/krb5/src/tests/kcmserver.py
34878 views
1
# This is a simple KCM test server, used to exercise the KCM ccache
2
# client code. It will generally throw an uncaught exception if the
3
# client sends anything unexpected, so is unsuitable for production.
4
# (It also imposes no namespace or access constraints, and blocks
5
# while reading requests and writing responses.)
6
7
# This code knows nothing about how to marshal and unmarshal principal
8
# names and credentials as is required in the KCM protocol; instead,
9
# it just remembers the marshalled forms and replays them to the
10
# client when asked. This works because marshalled creds and
11
# principal names are always the last part of marshalled request
12
# arguments, and because we don't need to implement remove_cred (which
13
# would need to know how to match a cred tag against previously stored
14
# credentials).
15
16
# The following code is useful for debugging if anything appears to be
17
# going wrong in the server, since daemon output is generally not
18
# visible in Python test scripts.
19
#
20
# import sys, traceback
21
# def ehook(etype, value, tb):
22
# with open('/tmp/exception', 'w') as f:
23
# traceback.print_exception(etype, value, tb, file=f)
24
# sys.excepthook = ehook
25
26
import optparse
27
import select
28
import socket
29
import struct
30
import sys
31
32
caches = {}
33
cache_uuidmap = {}
34
defname = b'default'
35
next_unique = 1
36
next_uuid = 1
37
38
class KCMOpcodes(object):
39
GEN_NEW = 3
40
INITIALIZE = 4
41
DESTROY = 5
42
STORE = 6
43
RETRIEVE = 7
44
GET_PRINCIPAL = 8
45
GET_CRED_UUID_LIST = 9
46
GET_CRED_BY_UUID = 10
47
REMOVE_CRED = 11
48
GET_CACHE_UUID_LIST = 18
49
GET_CACHE_BY_UUID = 19
50
GET_DEFAULT_CACHE = 20
51
SET_DEFAULT_CACHE = 21
52
GET_KDC_OFFSET = 22
53
SET_KDC_OFFSET = 23
54
GET_CRED_LIST = 13001
55
REPLACE = 13002
56
57
58
class KRB5Errors(object):
59
KRB5_CC_NOTFOUND = -1765328243
60
KRB5_CC_END = -1765328242
61
KRB5_CC_NOSUPP = -1765328137
62
KRB5_FCC_NOFILE = -1765328189
63
KRB5_FCC_INTERNAL = -1765328188
64
65
66
def make_uuid():
67
global next_uuid
68
uuid = bytes(12) + struct.pack('>L', next_uuid)
69
next_uuid = next_uuid + 1
70
return uuid
71
72
73
class Cache(object):
74
def __init__(self, name):
75
self.name = name
76
self.princ = None
77
self.uuid = make_uuid()
78
self.cred_uuids = []
79
self.creds = {}
80
self.time_offset = 0
81
82
83
def get_cache(name):
84
if name in caches:
85
return caches[name]
86
cache = Cache(name)
87
caches[name] = cache
88
cache_uuidmap[cache.uuid] = cache
89
return cache
90
91
92
def unmarshal_name(argbytes):
93
offset = argbytes.find(b'\0')
94
return argbytes[0:offset], argbytes[offset+1:]
95
96
97
# Find the bounds of a marshalled principal, returning it and the
98
# remainder of argbytes.
99
def extract_princ(argbytes):
100
ncomps, rlen = struct.unpack('>LL', argbytes[4:12])
101
pos = 12 + rlen
102
for i in range(ncomps):
103
clen, = struct.unpack('>L', argbytes[pos:pos+4])
104
pos += 4 + clen
105
return argbytes[0:pos], argbytes[pos:]
106
107
108
# Return true if the marshalled principals p1 and p2 name the same
109
# principal.
110
def princ_eq(p1, p2):
111
# Ignore the name-types at bytes 0..3. The remaining bytes should
112
# be identical if the principals are the same.
113
return p1[4:] == p2[4:]
114
115
116
def op_gen_new(argbytes):
117
# Does not actually check for uniqueness.
118
global next_unique
119
name = b'unique' + str(next_unique).encode('ascii')
120
next_unique += 1
121
return 0, name + b'\0'
122
123
124
def op_initialize(argbytes):
125
name, princ = unmarshal_name(argbytes)
126
cache = get_cache(name)
127
cache.princ = princ
128
cache.cred_uuids = []
129
cache.creds = {}
130
cache.time_offset = 0
131
return 0, b''
132
133
134
def op_destroy(argbytes):
135
name, rest = unmarshal_name(argbytes)
136
cache = get_cache(name)
137
del cache_uuidmap[cache.uuid]
138
del caches[name]
139
return 0, b''
140
141
142
def op_store(argbytes):
143
name, cred = unmarshal_name(argbytes)
144
cache = get_cache(name)
145
uuid = make_uuid()
146
cache.creds[uuid] = cred
147
cache.cred_uuids.append(uuid)
148
return 0, b''
149
150
151
def op_retrieve(argbytes):
152
name, rest = unmarshal_name(argbytes)
153
# Ignore the flags at rest[0:4] and the header at rest[4:8].
154
# Assume there are client and server creds in the tag and match
155
# only against them.
156
cprinc, rest = extract_princ(rest[8:])
157
sprinc, rest = extract_princ(rest)
158
cache = get_cache(name)
159
for cred in (cache.creds[u] for u in cache.cred_uuids):
160
cred_cprinc, rest = extract_princ(cred)
161
cred_sprinc, rest = extract_princ(rest)
162
if princ_eq(cred_cprinc, cprinc) and princ_eq(cred_sprinc, sprinc):
163
return 0, cred
164
return KRB5Errors.KRB5_CC_NOTFOUND, b''
165
166
167
def op_get_principal(argbytes):
168
name, rest = unmarshal_name(argbytes)
169
cache = get_cache(name)
170
if cache.princ is None:
171
return KRB5Errors.KRB5_FCC_NOFILE, b''
172
return 0, cache.princ + b'\0'
173
174
175
def op_get_cred_uuid_list(argbytes):
176
name, rest = unmarshal_name(argbytes)
177
cache = get_cache(name)
178
return 0, b''.join(cache.cred_uuids)
179
180
181
def op_get_cred_by_uuid(argbytes):
182
name, uuid = unmarshal_name(argbytes)
183
cache = get_cache(name)
184
if uuid not in cache.creds:
185
return KRB5Errors.KRB5_CC_END, b''
186
return 0, cache.creds[uuid]
187
188
189
def op_remove_cred(argbytes):
190
return KRB5Errors.KRB5_CC_NOSUPP, b''
191
192
193
def op_get_cache_uuid_list(argbytes):
194
return 0, b''.join(cache_uuidmap.keys())
195
196
197
def op_get_cache_by_uuid(argbytes):
198
uuid = argbytes
199
if uuid not in cache_uuidmap:
200
return KRB5Errors.KRB5_CC_END, b''
201
return 0, cache_uuidmap[uuid].name + b'\0'
202
203
204
def op_get_default_cache(argbytes):
205
return 0, defname + b'\0'
206
207
208
def op_set_default_cache(argbytes):
209
global defname
210
defname, rest = unmarshal_name(argbytes)
211
return 0, b''
212
213
214
def op_get_kdc_offset(argbytes):
215
name, rest = unmarshal_name(argbytes)
216
cache = get_cache(name)
217
return 0, struct.pack('>l', cache.time_offset)
218
219
220
def op_set_kdc_offset(argbytes):
221
name, obytes = unmarshal_name(argbytes)
222
cache = get_cache(name)
223
cache.time_offset, = struct.unpack('>l', obytes)
224
return 0, b''
225
226
227
def op_get_cred_list(argbytes):
228
name, rest = unmarshal_name(argbytes)
229
cache = get_cache(name)
230
creds = [cache.creds[u] for u in cache.cred_uuids]
231
return 0, (struct.pack('>L', len(creds)) +
232
b''.join(struct.pack('>L', len(c)) + c for c in creds))
233
234
235
def op_replace(argbytes):
236
name, rest = unmarshal_name(argbytes)
237
offset, = struct.unpack('>L', rest[0:4])
238
princ, rest = extract_princ(rest[4:])
239
ncreds, = struct.unpack('>L', rest[0:4])
240
rest = rest[4:]
241
creds = []
242
for i in range(ncreds):
243
len, = struct.unpack('>L', rest[0:4])
244
creds.append(rest[4:4+len])
245
rest = rest[4+len:]
246
247
cache = get_cache(name)
248
cache.princ = princ
249
cache.cred_uuids = []
250
cache.creds = {}
251
cache.time_offset = offset
252
for i in range(ncreds):
253
uuid = make_uuid()
254
cache.creds[uuid] = creds[i]
255
cache.cred_uuids.append(uuid)
256
257
return 0, b''
258
259
260
ophandlers = {
261
KCMOpcodes.GEN_NEW : op_gen_new,
262
KCMOpcodes.INITIALIZE : op_initialize,
263
KCMOpcodes.DESTROY : op_destroy,
264
KCMOpcodes.STORE : op_store,
265
KCMOpcodes.RETRIEVE : op_retrieve,
266
KCMOpcodes.GET_PRINCIPAL : op_get_principal,
267
KCMOpcodes.GET_CRED_UUID_LIST : op_get_cred_uuid_list,
268
KCMOpcodes.GET_CRED_BY_UUID : op_get_cred_by_uuid,
269
KCMOpcodes.REMOVE_CRED : op_remove_cred,
270
KCMOpcodes.GET_CACHE_UUID_LIST : op_get_cache_uuid_list,
271
KCMOpcodes.GET_CACHE_BY_UUID : op_get_cache_by_uuid,
272
KCMOpcodes.GET_DEFAULT_CACHE : op_get_default_cache,
273
KCMOpcodes.SET_DEFAULT_CACHE : op_set_default_cache,
274
KCMOpcodes.GET_KDC_OFFSET : op_get_kdc_offset,
275
KCMOpcodes.SET_KDC_OFFSET : op_set_kdc_offset,
276
KCMOpcodes.GET_CRED_LIST : op_get_cred_list,
277
KCMOpcodes.REPLACE : op_replace
278
}
279
280
# Read and respond to a request from the socket s.
281
def service_request(s):
282
lenbytes = b''
283
while len(lenbytes) < 4:
284
lenbytes += s.recv(4 - len(lenbytes))
285
if lenbytes == b'':
286
return False
287
288
reqlen, = struct.unpack('>L', lenbytes)
289
req = b''
290
while len(req) < reqlen:
291
req += s.recv(reqlen - len(req))
292
293
majver, minver, op = struct.unpack('>BBH', req[:4])
294
argbytes = req[4:]
295
296
if op in ophandlers:
297
code, payload = ophandlers[op](argbytes)
298
else:
299
code, payload = KRB5Errors.KRB5_FCC_INTERNAL, b''
300
301
# The KCM response is the code (4 bytes) and the response payload.
302
# The Heimdal IPC response is the length of the KCM response (4
303
# bytes), a status code which is essentially always 0 (4 bytes),
304
# and the KCM response.
305
kcm_response = struct.pack('>l', code) + payload
306
hipc_response = struct.pack('>LL', len(kcm_response), 0) + kcm_response
307
s.sendall(hipc_response)
308
return True
309
310
parser = optparse.OptionParser()
311
parser.add_option('-f', '--fallback', action='store_true', dest='fallback',
312
default=False,
313
help='Do not support RETRIEVE/GET_CRED_LIST/REPLACE')
314
(options, args) = parser.parse_args()
315
if options.fallback:
316
del ophandlers[KCMOpcodes.RETRIEVE]
317
del ophandlers[KCMOpcodes.GET_CRED_LIST]
318
del ophandlers[KCMOpcodes.REPLACE]
319
320
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
321
server.bind(args[0])
322
server.listen(5)
323
select_input = [server,]
324
sys.stderr.write('starting...\n')
325
sys.stderr.flush()
326
327
while True:
328
iready, oready, xready = select.select(select_input, [], [])
329
for s in iready:
330
if s == server:
331
client, addr = server.accept()
332
select_input.append(client)
333
else:
334
if not service_request(s):
335
select_input.remove(s)
336
s.close()
337
338