Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
projectdiscovery
GitHub Repository: projectdiscovery/nuclei
Path: blob/dev/pkg/protocols/http/httputils/spm.go
2072 views
1
package httputils
2
3
import (
4
"context"
5
"sync"
6
7
syncutil "github.com/projectdiscovery/utils/sync"
8
"golang.org/x/exp/maps"
9
)
10
11
// WorkPoolType is the type of work pool to use
12
type WorkPoolType uint
13
14
const (
15
// Blocking blocks addition of new work when the pool is full
16
Blocking WorkPoolType = iota
17
// NonBlocking does not block addition of new work when the pool is full
18
NonBlocking
19
)
20
21
// StopAtFirstMatchHandler is a handler that executes
22
// request and stops on first match
23
type StopAtFirstMatchHandler[T comparable] struct {
24
once sync.Once
25
// Result Channel
26
ResultChan chan T
27
28
// work pool and its type
29
poolType WorkPoolType
30
sgPool *syncutil.AdaptiveWaitGroup
31
wgPool *sync.WaitGroup
32
33
// internal / unexported
34
ctx context.Context
35
cancel context.CancelFunc
36
internalWg *sync.WaitGroup
37
results map[T]struct{}
38
onResult func(T)
39
stopEnabled bool
40
maxResults int
41
}
42
43
// NewBlockingSPMHandler creates a new stop at first match handler
44
func NewBlockingSPMHandler[T comparable](ctx context.Context, size int, maxResults int, spm bool) *StopAtFirstMatchHandler[T] {
45
ctx1, cancel := context.WithCancel(ctx)
46
47
awg, _ := syncutil.New(syncutil.WithSize(size))
48
49
s := &StopAtFirstMatchHandler[T]{
50
ResultChan: make(chan T, 1),
51
poolType: Blocking,
52
sgPool: awg,
53
internalWg: &sync.WaitGroup{},
54
ctx: ctx1,
55
cancel: cancel,
56
stopEnabled: spm,
57
results: make(map[T]struct{}),
58
maxResults: maxResults,
59
}
60
s.internalWg.Add(1)
61
go s.run(ctx)
62
return s
63
}
64
65
// NewNonBlockingSPMHandler creates a new stop at first match handler
66
func NewNonBlockingSPMHandler[T comparable](ctx context.Context, maxResults int, spm bool) *StopAtFirstMatchHandler[T] {
67
ctx1, cancel := context.WithCancel(ctx)
68
s := &StopAtFirstMatchHandler[T]{
69
ResultChan: make(chan T, 1),
70
poolType: NonBlocking,
71
wgPool: &sync.WaitGroup{},
72
internalWg: &sync.WaitGroup{},
73
ctx: ctx1,
74
cancel: cancel,
75
stopEnabled: spm,
76
results: make(map[T]struct{}),
77
maxResults: maxResults,
78
}
79
s.internalWg.Add(1)
80
go s.run(ctx)
81
return s
82
}
83
84
// Trigger triggers the stop at first match handler and stops the execution of
85
// existing requests
86
func (h *StopAtFirstMatchHandler[T]) Trigger() {
87
if h.stopEnabled {
88
h.cancel()
89
}
90
}
91
92
// Cancel cancels spm context
93
func (h *StopAtFirstMatchHandler[T]) Cancel() {
94
h.cancel()
95
}
96
97
// SetOnResult callback
98
// this is not thread safe
99
func (h *StopAtFirstMatchHandler[T]) SetOnResultCallback(fn func(T)) {
100
if h.onResult != nil {
101
tmp := h.onResult
102
h.onResult = func(t T) {
103
tmp(t)
104
fn(t)
105
}
106
} else {
107
h.onResult = fn
108
}
109
}
110
111
// MatchCallback is called when a match is found
112
// input fn should be the callback that is intended to be called
113
// if stop at first is enabled and other conditions are met
114
// if it does not meet above conditions, use of this function is discouraged
115
func (h *StopAtFirstMatchHandler[T]) MatchCallback(fn func()) {
116
if !h.stopEnabled {
117
fn()
118
return
119
}
120
h.once.Do(fn)
121
}
122
123
// run runs the internal handler
124
func (h *StopAtFirstMatchHandler[T]) run(ctx context.Context) {
125
defer h.internalWg.Done()
126
127
for {
128
select {
129
case <-ctx.Done():
130
return
131
case val, ok := <-h.ResultChan:
132
if !ok {
133
return
134
}
135
if h.onResult != nil {
136
h.onResult(val)
137
}
138
if len(h.results) >= h.maxResults {
139
// skip or do not store the result
140
continue
141
}
142
h.results[val] = struct{}{}
143
}
144
}
145
}
146
147
// Done returns a channel with the context done signal when stop at first match is detected
148
func (h *StopAtFirstMatchHandler[T]) Done() <-chan struct{} {
149
return h.ctx.Done()
150
}
151
152
// Cancelled returns true if the context is cancelled
153
func (h *StopAtFirstMatchHandler[T]) Cancelled() bool {
154
return h.ctx.Err() != nil
155
}
156
157
// FoundFirstMatch returns true if first match was found
158
// in stop at first match mode
159
func (h *StopAtFirstMatchHandler[T]) FoundFirstMatch() bool {
160
if h.ctx.Err() != nil && h.stopEnabled {
161
return true
162
}
163
return false
164
}
165
166
// Acquire acquires a new work
167
func (h *StopAtFirstMatchHandler[T]) Acquire() {
168
switch h.poolType {
169
case Blocking:
170
h.sgPool.Add()
171
case NonBlocking:
172
h.wgPool.Add(1)
173
}
174
}
175
176
// Release releases a work
177
func (h *StopAtFirstMatchHandler[T]) Release() {
178
switch h.poolType {
179
case Blocking:
180
h.sgPool.Done()
181
case NonBlocking:
182
h.wgPool.Done()
183
}
184
}
185
186
func (h *StopAtFirstMatchHandler[T]) Resize(ctx context.Context, size int) error {
187
if h.sgPool.Size != size {
188
return h.sgPool.Resize(ctx, size)
189
}
190
return nil
191
}
192
193
func (h *StopAtFirstMatchHandler[T]) Size() int {
194
return h.sgPool.Size
195
}
196
197
// Wait waits for all work to be done
198
func (h *StopAtFirstMatchHandler[T]) Wait() {
199
switch h.poolType {
200
case Blocking:
201
h.sgPool.Wait()
202
case NonBlocking:
203
h.wgPool.Wait()
204
}
205
// after waiting it closes the error channel
206
close(h.ResultChan)
207
h.internalWg.Wait()
208
}
209
210
// CombinedResults returns the combined results
211
func (h *StopAtFirstMatchHandler[T]) CombinedResults() []T {
212
return maps.Keys(h.results)
213
}
214
215