Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/crypto/krb5/src/tests/jsonwalker.py
34878 views
1
import sys
2
import json
3
from collections import defaultdict
4
from optparse import OptionParser
5
6
class Parser(object):
7
DEFAULTS = {int:0,
8
str:'',
9
list:[]}
10
11
def __init__(self, defconf=None):
12
self.defaults = None
13
if defconf is not None:
14
self.defaults = self.flatten(defconf)
15
16
def run(self, logs, verbose=None):
17
result = self.parse(logs)
18
if len(result) != len(self.defaults):
19
diff = set(self.defaults.keys()).difference(result.keys())
20
print('Test failed.')
21
print('The following attributes were not set:')
22
for it in diff:
23
print(it)
24
sys.exit(1)
25
26
def flatten(self, defaults):
27
"""
28
Flattens paths to attributes.
29
30
Parameters
31
----------
32
defaults : a dictionaries populated with default values
33
34
Returns :
35
dict : with flattened attributes
36
"""
37
result = dict()
38
for path,value in self._walk(defaults):
39
if path in result:
40
print('Warning: attribute path %s already exists' % path)
41
result[path] = value
42
43
return result
44
45
def parse(self, logs):
46
result = defaultdict(list)
47
for msg in logs:
48
# each message is treated as a dictionary of dictionaries
49
for a,v in self._walk(msg):
50
# see if path is registered in defaults
51
if a in self.defaults:
52
dv = self.defaults.get(a)
53
if dv is None:
54
# determine default value by type
55
if v is not None:
56
dv = self.DEFAULTS[type(v)]
57
else:
58
print('Warning: attribute %s is set to None' % a)
59
continue
60
# by now we have default value
61
if v != dv:
62
# test passed
63
result[a].append(v)
64
return result
65
66
def _walk(self, adict):
67
"""
68
Generator that works through dictionary.
69
"""
70
for a,v in adict.items():
71
if isinstance(v,dict):
72
for (attrpath,u) in self._walk(v):
73
yield (a+'.'+attrpath,u)
74
else:
75
yield (a,v)
76
77
78
if __name__ == '__main__':
79
80
parser = OptionParser()
81
parser.add_option("-i", "--logfile", dest="filename",
82
help="input log file in json fmt", metavar="FILE")
83
parser.add_option("-d", "--defaults", dest="defaults",
84
help="dictionary with defaults", metavar="FILE")
85
86
(options, args) = parser.parse_args()
87
if options.filename is not None:
88
with open(options.filename, 'r') as f:
89
content = list()
90
for l in f:
91
content.append(json.loads(l.rstrip()))
92
f.close()
93
else:
94
print('Input file in JSON format is required')
95
exit()
96
97
defaults = None
98
if options.defaults is not None:
99
with open(options.defaults, 'r') as f:
100
defaults = json.load(f)
101
102
# run test
103
p = Parser(defaults)
104
p.run(content)
105
exit()
106
107