Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sqlmapproject
GitHub Repository: sqlmapproject/sqlmap
Path: blob/master/lib/core/decorators.py
3554 views
1
#!/usr/bin/env python
2
3
"""
4
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
5
See the file 'LICENSE' for copying permission
6
"""
7
8
import functools
9
import threading
10
11
from lib.core.datatype import LRUDict
12
from lib.core.settings import MAX_CACHE_ITEMS
13
from lib.core.threads import getCurrentThreadData
14
15
_cache = {}
16
_method_locks = {}
17
18
def cachedmethod(f):
19
"""
20
Method with a cached content
21
22
>>> __ = cachedmethod(lambda _: _)
23
>>> __(1)
24
1
25
>>> __(1)
26
1
27
>>> __ = cachedmethod(lambda *args, **kwargs: args[0])
28
>>> __(2)
29
2
30
>>> __ = cachedmethod(lambda *args, **kwargs: next(iter(kwargs.values())))
31
>>> __(foobar=3)
32
3
33
34
Reference: http://code.activestate.com/recipes/325205-cache-decorator-in-python-24/
35
"""
36
37
_cache[f] = LRUDict(capacity=MAX_CACHE_ITEMS)
38
_method_locks[f] = threading.RLock()
39
40
def _freeze(val):
41
if isinstance(val, (list, set, tuple)):
42
return tuple(_freeze(x) for x in val)
43
if isinstance(val, dict):
44
return tuple(sorted((k, _freeze(v)) for k, v in val.items()))
45
return val
46
47
@functools.wraps(f)
48
def _f(*args, **kwargs):
49
lock, cache = _method_locks[f], _cache[f]
50
51
try:
52
if kwargs:
53
key = (args, frozenset(kwargs.items()))
54
else:
55
key = args
56
57
with lock:
58
if key in cache:
59
return cache[key]
60
61
except TypeError:
62
# Note: fallback (slowpath(
63
if kwargs:
64
key = (_freeze(args), _freeze(kwargs))
65
else:
66
key = _freeze(args)
67
68
with lock:
69
if key in cache:
70
return cache[key]
71
72
result = f(*args, **kwargs)
73
74
with lock:
75
cache[key] = result
76
77
return result
78
79
return _f
80
81
def stackedmethod(f):
82
"""
83
Method using pushValue/popValue functions (fallback function for stack realignment)
84
85
>>> threadData = getCurrentThreadData()
86
>>> original = len(threadData.valueStack)
87
>>> __ = stackedmethod(lambda _: threadData.valueStack.append(_))
88
>>> __(1)
89
>>> len(threadData.valueStack) == original
90
True
91
"""
92
93
@functools.wraps(f)
94
def _(*args, **kwargs):
95
threadData = getCurrentThreadData()
96
originalLevel = len(threadData.valueStack)
97
98
try:
99
result = f(*args, **kwargs)
100
finally:
101
if len(threadData.valueStack) > originalLevel:
102
del threadData.valueStack[originalLevel:]
103
104
return result
105
106
return _
107
108
def lockedmethod(f):
109
"""
110
Decorates a function or method with a reentrant lock (only one thread can execute the function at a time)
111
112
>>> @lockedmethod
113
... def recursive_count(n):
114
... if n <= 0: return 0
115
... return n + recursive_count(n - 1)
116
>>> recursive_count(5)
117
15
118
"""
119
120
lock = threading.RLock()
121
122
@functools.wraps(f)
123
def _(*args, **kwargs):
124
with lock:
125
result = f(*args, **kwargs)
126
return result
127
128
return _
129
130