Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
projectdiscovery
GitHub Repository: projectdiscovery/nuclei
Path: blob/dev/internal/pdcp/writer.go
2070 views
1
package pdcp
2
3
import (
4
"bufio"
5
"bytes"
6
"context"
7
"fmt"
8
"io"
9
"net/http"
10
"net/url"
11
"regexp"
12
"sync/atomic"
13
"time"
14
15
"github.com/projectdiscovery/gologger"
16
"github.com/projectdiscovery/nuclei/v3/pkg/catalog/config"
17
"github.com/projectdiscovery/nuclei/v3/pkg/output"
18
"github.com/projectdiscovery/nuclei/v3/pkg/utils/json"
19
"github.com/projectdiscovery/retryablehttp-go"
20
pdcpauth "github.com/projectdiscovery/utils/auth/pdcp"
21
"github.com/projectdiscovery/utils/env"
22
"github.com/projectdiscovery/utils/errkit"
23
unitutils "github.com/projectdiscovery/utils/unit"
24
updateutils "github.com/projectdiscovery/utils/update"
25
urlutil "github.com/projectdiscovery/utils/url"
26
)
27
28
const (
29
uploadEndpoint = "/v1/scans/import"
30
appendEndpoint = "/v1/scans/%s/import"
31
flushTimer = time.Minute
32
MaxChunkSize = 4 * unitutils.Mega // 4 MB
33
xidRe = `^[a-z0-9]{20}$`
34
teamIDHeader = "X-Team-Id"
35
NoneTeamID = "none"
36
)
37
38
var (
39
xidRegex = regexp.MustCompile(xidRe)
40
_ output.Writer = &UploadWriter{}
41
// teamID if given
42
TeamIDEnv = env.GetEnvOrDefault("PDCP_TEAM_ID", NoneTeamID)
43
)
44
45
// UploadWriter is a writer that uploads its output to pdcp
46
// server to enable web dashboard and more
47
type UploadWriter struct {
48
*output.StandardWriter
49
creds *pdcpauth.PDCPCredentials
50
uploadURL *url.URL
51
client *retryablehttp.Client
52
cancel context.CancelFunc
53
done chan struct{}
54
scanID string
55
scanName string
56
counter atomic.Int32
57
TeamID string
58
Logger *gologger.Logger
59
}
60
61
// NewUploadWriter creates a new upload writer
62
func NewUploadWriter(ctx context.Context, logger *gologger.Logger, creds *pdcpauth.PDCPCredentials) (*UploadWriter, error) {
63
if creds == nil {
64
return nil, fmt.Errorf("no credentials provided")
65
}
66
u := &UploadWriter{
67
creds: creds,
68
done: make(chan struct{}, 1),
69
TeamID: NoneTeamID,
70
Logger: logger,
71
}
72
var err error
73
reader, writer := io.Pipe()
74
// create standard writer
75
u.StandardWriter, err = output.NewWriter(
76
output.WithWriter(writer),
77
output.WithJson(true, true),
78
)
79
if err != nil {
80
return nil, errkit.Wrap(err, "could not create output writer")
81
}
82
tmp, err := urlutil.Parse(creds.Server)
83
if err != nil {
84
return nil, errkit.Wrap(err, "could not parse server url")
85
}
86
tmp.Path = uploadEndpoint
87
tmp.Update()
88
u.uploadURL = tmp.URL
89
90
// create http client
91
opts := retryablehttp.DefaultOptionsSingle
92
opts.NoAdjustTimeout = true
93
opts.Timeout = time.Duration(3) * time.Minute
94
u.client = retryablehttp.NewClient(opts)
95
96
// create context
97
ctx, u.cancel = context.WithCancel(ctx)
98
// start auto commit
99
// upload every 1 minute or when buffer is full
100
go u.autoCommit(ctx, reader)
101
return u, nil
102
}
103
104
// SetScanID sets the scan id for the upload writer
105
func (u *UploadWriter) SetScanID(id string) error {
106
if !xidRegex.MatchString(id) {
107
return fmt.Errorf("invalid scan id provided")
108
}
109
u.scanID = id
110
return nil
111
}
112
113
// SetScanName sets the scan name for the upload writer
114
func (u *UploadWriter) SetScanName(name string) {
115
u.scanName = name
116
}
117
118
func (u *UploadWriter) SetTeamID(id string) {
119
if id == "" {
120
u.TeamID = NoneTeamID
121
} else {
122
u.TeamID = id
123
}
124
}
125
126
func (u *UploadWriter) autoCommit(ctx context.Context, r *io.PipeReader) {
127
reader := bufio.NewReader(r)
128
ch := make(chan string, 4)
129
130
// continuously read from the reader and send to channel
131
go func() {
132
defer func() {
133
_ = r.Close()
134
}()
135
defer close(ch)
136
for {
137
data, err := reader.ReadString('\n')
138
if err != nil {
139
return
140
}
141
u.counter.Add(1)
142
ch <- data
143
}
144
}()
145
146
// wait for context to be done
147
defer func() {
148
u.done <- struct{}{}
149
close(u.done)
150
// if no scanid is generated no results were uploaded
151
if u.scanID == "" {
152
u.Logger.Verbose().Msgf("Scan results upload to cloud skipped, no results found to upload")
153
} else {
154
u.Logger.Info().Msgf("%v Scan results uploaded to cloud, you can view scan results at %v", u.counter.Load(), getScanDashBoardURL(u.scanID, u.TeamID))
155
}
156
}()
157
// temporary buffer to store the results
158
buff := &bytes.Buffer{}
159
ticker := time.NewTicker(flushTimer)
160
defer ticker.Stop()
161
for {
162
select {
163
case <-ctx.Done():
164
// flush before exit
165
if buff.Len() > 0 {
166
if err := u.uploadChunk(buff); err != nil {
167
u.Logger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
168
}
169
}
170
return
171
case <-ticker.C:
172
// flush the buffer
173
if buff.Len() > 0 {
174
if err := u.uploadChunk(buff); err != nil {
175
u.Logger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
176
}
177
}
178
case line, ok := <-ch:
179
if !ok {
180
if buff.Len() > 0 {
181
if err := u.uploadChunk(buff); err != nil {
182
u.Logger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
183
}
184
}
185
return
186
}
187
if buff.Len()+len(line) > MaxChunkSize {
188
// flush existing buffer
189
if err := u.uploadChunk(buff); err != nil {
190
u.Logger.Error().Msgf("Failed to upload scan results on cloud: %v", err)
191
}
192
} else {
193
buff.WriteString(line)
194
}
195
}
196
}
197
}
198
199
// uploadChunk uploads a chunk of data to the server
200
func (u *UploadWriter) uploadChunk(buff *bytes.Buffer) error {
201
if err := u.upload(buff.Bytes()); err != nil {
202
return errkit.Wrap(err, "could not upload chunk")
203
}
204
// if successful, reset the buffer
205
buff.Reset()
206
// log in verbose mode
207
u.Logger.Warning().Msgf("Uploaded results chunk, you can view scan results at %v", getScanDashBoardURL(u.scanID, u.TeamID))
208
return nil
209
}
210
211
func (u *UploadWriter) upload(data []byte) error {
212
req, err := u.getRequest(data)
213
if err != nil {
214
return errkit.Wrap(err, "could not create upload request")
215
}
216
resp, err := u.client.Do(req)
217
if err != nil {
218
return errkit.Wrap(err, "could not upload results")
219
}
220
defer func() {
221
_ = resp.Body.Close()
222
}()
223
bin, err := io.ReadAll(resp.Body)
224
if err != nil {
225
return errkit.Wrap(err, "could not get id from response")
226
}
227
if resp.StatusCode != http.StatusOK {
228
return fmt.Errorf("could not upload results got status code %v on %v", resp.StatusCode, resp.Request.URL.String())
229
}
230
var uploadResp uploadResponse
231
if err := json.Unmarshal(bin, &uploadResp); err != nil {
232
return errkit.Wrap(err, fmt.Sprintf("could not unmarshal response got %v", string(bin)))
233
}
234
if uploadResp.ID != "" && u.scanID == "" {
235
u.scanID = uploadResp.ID
236
}
237
return nil
238
}
239
240
// getRequest returns a new request for upload
241
// if scanID is not provided create new scan by uploading the data
242
// if scanID is provided append the data to existing scan
243
func (u *UploadWriter) getRequest(bin []byte) (*retryablehttp.Request, error) {
244
var method, url string
245
246
if u.scanID == "" {
247
u.uploadURL.Path = uploadEndpoint
248
method = http.MethodPost
249
url = u.uploadURL.String()
250
} else {
251
u.uploadURL.Path = fmt.Sprintf(appendEndpoint, u.scanID)
252
method = http.MethodPatch
253
url = u.uploadURL.String()
254
}
255
req, err := retryablehttp.NewRequest(method, url, bytes.NewReader(bin))
256
if err != nil {
257
return nil, errkit.Wrap(err, "could not create cloud upload request")
258
}
259
// add pdtm meta params
260
req.Params.Merge(updateutils.GetpdtmParams(config.Version))
261
// if it is upload endpoint also include name if it exists
262
if u.scanName != "" && req.Path == uploadEndpoint {
263
req.Params.Add("name", u.scanName)
264
}
265
req.Update()
266
267
req.Header.Set(pdcpauth.ApiKeyHeaderName, u.creds.APIKey)
268
if u.TeamID != NoneTeamID && u.TeamID != "" {
269
req.Header.Set(teamIDHeader, u.TeamID)
270
}
271
req.Header.Set("Content-Type", "application/octet-stream")
272
req.Header.Set("Accept", "application/json")
273
return req, nil
274
}
275
276
// Close closes the upload writer
277
func (u *UploadWriter) Close() {
278
u.cancel()
279
<-u.done
280
u.StandardWriter.Close()
281
}
282
283