Path: blob/main/crypto/krb5/src/tests/jsonwalker.py
34878 views
import sys1import json2from collections import defaultdict3from optparse import OptionParser45class Parser(object):6DEFAULTS = {int:0,7str:'',8list:[]}910def __init__(self, defconf=None):11self.defaults = None12if defconf is not None:13self.defaults = self.flatten(defconf)1415def run(self, logs, verbose=None):16result = self.parse(logs)17if len(result) != len(self.defaults):18diff = set(self.defaults.keys()).difference(result.keys())19print('Test failed.')20print('The following attributes were not set:')21for it in diff:22print(it)23sys.exit(1)2425def flatten(self, defaults):26"""27Flattens paths to attributes.2829Parameters30----------31defaults : a dictionaries populated with default values3233Returns :34dict : with flattened attributes35"""36result = dict()37for path,value in self._walk(defaults):38if path in result:39print('Warning: attribute path %s already exists' % path)40result[path] = value4142return result4344def parse(self, logs):45result = defaultdict(list)46for msg in logs:47# each message is treated as a dictionary of dictionaries48for a,v in self._walk(msg):49# see if path is registered in defaults50if a in self.defaults:51dv = self.defaults.get(a)52if dv is None:53# determine default value by type54if v is not None:55dv = self.DEFAULTS[type(v)]56else:57print('Warning: attribute %s is set to None' % a)58continue59# by now we have default value60if v != dv:61# test passed62result[a].append(v)63return result6465def _walk(self, adict):66"""67Generator that works through dictionary.68"""69for a,v in adict.items():70if isinstance(v,dict):71for (attrpath,u) in self._walk(v):72yield (a+'.'+attrpath,u)73else:74yield (a,v)757677if __name__ == '__main__':7879parser = OptionParser()80parser.add_option("-i", "--logfile", dest="filename",81help="input log file in json fmt", metavar="FILE")82parser.add_option("-d", "--defaults", dest="defaults",83help="dictionary with defaults", metavar="FILE")8485(options, args) = parser.parse_args()86if options.filename is not None:87with open(options.filename, 'r') as f:88content = list()89for l in f:90content.append(json.loads(l.rstrip()))91f.close()92else:93print('Input file in JSON format is required')94exit()9596defaults = None97if options.defaults is not None:98with open(options.defaults, 'r') as f:99defaults = json.load(f)100101# run test102p = Parser(defaults)103p.run(content)104exit()105106107