Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
7638 views
1
/* Simple example fz_stream implementation using curl */
2
3
#include "mupdf/fitz.h"
4
#include "curl_stream.h"
5
6
#define CURL_STATICLIB
7
#include <curl/curl.h>
8
9
#undef DEBUG_BLOCK_FETCHING
10
11
#ifdef DEBUG_BLOCK_FETCHING
12
#define DEBUG_MESSAGE(A) do { fz_warn A; } while(0)
13
#else
14
#define DEBUG_MESSAGE(A) do { } while(0)
15
#endif
16
17
#define BLOCK_SHIFT 20
18
#define BLOCK_SIZE (1<<BLOCK_SHIFT)
19
20
#if defined(_WIN32) || defined(_WIN64)
21
#include <windows.h>
22
#else
23
#include "pthread.h"
24
#include <ctype.h>
25
#endif
26
27
typedef struct curl_stream_state_s curl_stream_state;
28
29
struct curl_stream_state_s
30
{
31
fz_context *ctx;
32
CURL *handle;
33
char *filename;
34
int data_arrived;
35
int content_length; /* As returned by curl. -1 for unknown. */
36
int total_length; /* As obtained from the Content-Range header. */
37
int buffer_max;
38
int buffer_fill;
39
unsigned char *buffer;
40
int map_length;
41
unsigned char *map;
42
int fill_point; /* The next file offset we will fetch to */
43
int current_fill_point; /* The current file offset we are fetching to */
44
int complete;
45
int kill_thread;
46
void (*more_data)(void *, int);
47
void *more_data_arg;
48
const char *error;
49
50
unsigned char public_buffer[4096];
51
52
#if defined(_WIN32) || defined(_WIN64)
53
void *thread;
54
DWORD thread_id;
55
HANDLE mutex;
56
#else
57
pthread_t thread;
58
pthread_mutex_t mutex;
59
#endif
60
};
61
62
static void fetcher_thread(curl_stream_state *state);
63
64
#if defined(_WIN32) || defined(_WIN64)
65
static void
66
lock(curl_stream_state *state)
67
{
68
WaitForSingleObject(state->mutex, INFINITE);
69
}
70
71
static void
72
unlock(curl_stream_state *state)
73
{
74
ReleaseMutex(state->mutex);
75
}
76
77
static DWORD WINAPI
78
win_thread(void *lparam)
79
{
80
fetcher_thread((curl_stream_state *)lparam);
81
82
return 0;
83
}
84
85
#else /* Anything else assumed to be pthreads */
86
87
static void
88
lock(curl_stream_state *state)
89
{
90
pthread_mutex_lock(&state->mutex);
91
}
92
93
static void
94
unlock(curl_stream_state *state)
95
{
96
pthread_mutex_unlock(&state->mutex);
97
}
98
99
static void *
100
pthread_thread(void *arg)
101
{
102
fetcher_thread((curl_stream_state *)arg);
103
104
return NULL;
105
}
106
#endif
107
108
static size_t header_arrived(void *ptr, size_t size, size_t nmemb, void *state_)
109
{
110
curl_stream_state *state = (curl_stream_state *)state_;
111
112
if (strncmp(ptr, "Content-Range:", 14) == 0)
113
{
114
char *p = (char *)ptr;
115
int len = (int)(nmemb * size);
116
int start, end, total;
117
while (len && !isdigit(*p))
118
p++, len--;
119
start = 0;
120
while (len && isdigit(*p))
121
{
122
start = start*10 + *p-'0';
123
p++, len--;
124
}
125
while (len && !isdigit(*p))
126
p++, len--;
127
end = 0;
128
while (len && isdigit(*p))
129
{
130
end = end*10 + *p-'0';
131
p++, len--;
132
}
133
while (len && !isdigit(*p))
134
p++, len--;
135
total = 0;
136
while (len && isdigit(*p))
137
{
138
total = total*10 + *p-'0';
139
p++, len--;
140
}
141
state->total_length = total;
142
}
143
144
return nmemb * size;
145
}
146
147
static size_t data_arrived(void *ptr, size_t size, size_t nmemb, void *state_)
148
{
149
curl_stream_state *state = (curl_stream_state *)state_;
150
int old_start;
151
152
size *= nmemb;
153
154
if (state->data_arrived == 0)
155
{
156
double d;
157
long response;
158
int len;
159
/* This is the first time data has arrived. If the response
160
* code is 206, then we can do byte requests, and we will
161
* known the total_length from having processed the header
162
* already. */
163
curl_easy_getinfo(state->handle, CURLINFO_RESPONSE_CODE, &response);
164
if (state->total_length && response == 206)
165
{
166
/* We got a range header, and the correct http response
167
* code. We can assume that byte fetches are accepted
168
* and we'll run without progressive mode. */
169
state->content_length = len = state->total_length;
170
state->map_length = (len+BLOCK_SIZE-1)>>BLOCK_SHIFT;
171
state->map = fz_malloc_no_throw(state->ctx, (state->map_length+7)>>3);
172
state->buffer = fz_malloc_no_throw(state->ctx, len);
173
state->buffer_max = len;
174
if (state->map == NULL || state->buffer == NULL)
175
{
176
/* FIXME: Crap error handling! */
177
exit(1);
178
}
179
memset(state->map, 0, (state->map_length+7)>>3);
180
}
181
else
182
{
183
/* So we can't use ByteRanges. Do we at least know the
184
* complete length of the file? */
185
curl_easy_getinfo(state->handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
186
state->content_length = len = (int)d;
187
if (len > 0 && response == 200)
188
{
189
/* Yes. We can run as a progressive file */
190
state->buffer = fz_malloc_no_throw(state->ctx, len);
191
state->buffer_max = len;
192
if (state->buffer == NULL)
193
{
194
/* FIXME: Crap error handling! */
195
exit(1);
196
}
197
}
198
else
199
{
200
/* What a crap server. Won't tell us how big
201
* the file is. We'll have to expand as data
202
* as arrives. */
203
state->content_length = -1;
204
}
205
}
206
state->data_arrived = 1;
207
}
208
if (state->content_length < 0)
209
{
210
int newsize = (int)(state->current_fill_point + size);
211
if (newsize > state->buffer_max)
212
{
213
/* Expand the buffer */
214
int new_max = state->buffer_max * 2;
215
if (new_max == 0)
216
new_max = 4096;
217
state->buffer = fz_resize_array_no_throw(state->ctx, state->buffer, new_max, 1);
218
if (state->buffer == NULL)
219
{
220
/* FIXME: Crap error handling! */
221
exit(1);
222
}
223
state->buffer_max = new_max;
224
}
225
}
226
227
DEBUG_MESSAGE((state->ctx, "data arrived: offset=%d len=%d", state->current_fill_point, (int) size));
228
old_start = state->current_fill_point;
229
memcpy(state->buffer + state->current_fill_point, ptr, size);
230
state->current_fill_point += (int)size;
231
if (state->current_fill_point == state->content_length ||
232
(((state->current_fill_point ^ old_start) & ~(BLOCK_SIZE-1)) != 0))
233
{
234
if (state->map)
235
{
236
old_start >>= BLOCK_SHIFT;
237
state->map[old_start>>3] |= 1<<(old_start & 7);
238
}
239
}
240
241
if (state->more_data)
242
state->more_data(state->more_data_arg, 0);
243
244
return size;
245
}
246
247
#define HAVE_BLOCK(map, num) \
248
(((map)[(num)>>3] & (1<<((num) & 7))) != 0)
249
250
static void
251
fetch_chunk(curl_stream_state *state)
252
{
253
char text[32];
254
int fill, start, end;
255
CURLcode ret;
256
257
lock(state);
258
259
if (state->kill_thread)
260
{
261
state->complete = 1;
262
unlock(state);
263
return;
264
}
265
266
fill = state->fill_point;
267
if (state->content_length > 0)
268
{
269
/* Find the next block that we haven't got */
270
int map_length = state->map_length;
271
unsigned char *map = state->map;
272
for ( ; (fill < map_length && HAVE_BLOCK(map, fill)); fill++);
273
if (fill == map_length)
274
{
275
for (fill = 0;
276
(fill < map_length && HAVE_BLOCK(map, fill));
277
fill++);
278
if (fill == map_length)
279
{
280
/* We've got it all! */
281
state->complete = 1;
282
state->kill_thread = 1;
283
unlock(state);
284
if (state->more_data)
285
state->more_data(state->more_data_arg, 1);
286
fz_warn(state->ctx, "Background fetch complete!");
287
return;
288
}
289
}
290
DEBUG_MESSAGE((state->ctx, "block requested was %d, fetching %d", state->fill_point, fill));
291
state->fill_point = fill;
292
}
293
294
unlock(state);
295
296
/* Fetch that block */
297
start = fill * BLOCK_SIZE;
298
end = start + BLOCK_SIZE-1;
299
state->current_fill_point = start;
300
if (state->content_length > 0 && start >= state->content_length)
301
state->complete = 1;
302
if (state->content_length > 0 && end >= state->content_length)
303
end = state->content_length-1;
304
snprintf(text, 32, "%d-%d", start, end);
305
curl_easy_setopt(state->handle, CURLOPT_RANGE, text);
306
ret = curl_easy_perform(state->handle);
307
if (ret != CURLE_OK)
308
state->error = curl_easy_strerror(ret);
309
}
310
311
static void
312
fetcher_thread(curl_stream_state *state)
313
{
314
while (!state->complete)
315
fetch_chunk(state);
316
}
317
318
static int
319
stream_next(fz_context *ctx, fz_stream *stream, int len)
320
{
321
curl_stream_state *state = (curl_stream_state *)stream->state;
322
int len_read = 0;
323
int read_point = stream->pos;
324
int block = read_point>>BLOCK_SHIFT;
325
int left_over = (-read_point) & (BLOCK_SIZE-1);
326
unsigned char *buf = state->public_buffer;
327
328
if (state->error != NULL)
329
fz_throw(ctx, FZ_ERROR_GENERIC, "cannot fetch data: %s", state->error);
330
331
if (len > sizeof(state->public_buffer))
332
len = sizeof(state->public_buffer);
333
334
if (state->content_length == 0)
335
fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (A) (offset=%d)", read_point);
336
337
if (state->map == NULL)
338
{
339
/* We are doing a simple linear fetch as we don't know the
340
* content length. */
341
if (read_point + len > state->current_fill_point)
342
{
343
stream->rp = stream->wp;
344
fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (B) (offset=%d)", read_point);
345
}
346
memcpy(buf, state->buffer + read_point, len);
347
stream->rp = buf;
348
stream->wp = buf + len;
349
stream->pos += len;
350
if (len == 0)
351
return EOF;
352
return *stream->rp++;
353
}
354
355
if (read_point + len > state->content_length)
356
len = state->content_length - read_point;
357
if (left_over > len)
358
left_over = len;
359
360
if (left_over)
361
{
362
/* We are starting midway through a block */
363
if (!HAVE_BLOCK(state->map, block))
364
{
365
lock(state);
366
state->fill_point = block;
367
unlock(state);
368
stream->rp = stream->wp;
369
fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (C) (offset=%d)", read_point);
370
}
371
block++;
372
if (left_over > len)
373
left_over = len;
374
memcpy(buf, state->buffer + read_point, left_over);
375
buf += left_over;
376
read_point += left_over;
377
len -= left_over;
378
len_read += left_over;
379
}
380
381
/* Copy any complete blocks */
382
while (len > BLOCK_SIZE)
383
{
384
if (!HAVE_BLOCK(state->map, block))
385
{
386
lock(state);
387
state->fill_point = block;
388
unlock(state);
389
stream->rp = stream->wp;
390
fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (D) (offset=%d)", read_point);
391
}
392
block++;
393
memcpy(buf, state->buffer + read_point, BLOCK_SIZE);
394
buf += BLOCK_SIZE;
395
read_point += BLOCK_SIZE;
396
len -= BLOCK_SIZE;
397
len_read += BLOCK_SIZE;
398
}
399
400
/* Copy any trailing bytes */
401
if (len > 0)
402
{
403
if (!HAVE_BLOCK(state->map, block))
404
{
405
lock(state);
406
state->fill_point = block;
407
unlock(state);
408
stream->rp = stream->wp;
409
fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (E) (offset=%d)", read_point);
410
}
411
memcpy(buf, state->buffer + read_point, len);
412
len_read += len;
413
}
414
stream->rp = state->public_buffer;
415
stream->wp = stream->rp + len_read;
416
stream->pos += len_read;
417
if (len_read == 0)
418
return EOF;
419
return *stream->rp++;
420
}
421
422
static void
423
stream_close(fz_context *ctx, void *state_)
424
{
425
curl_stream_state *state = (curl_stream_state *)state_;
426
427
if (!state || state->kill_thread)
428
return;
429
430
lock(state);
431
state->kill_thread = 1;
432
unlock(state);
433
434
#if defined(_WIN32) || defined(_WIN64)
435
WaitForSingleObject(state->thread, INFINITE);
436
CloseHandle(state->thread);
437
CloseHandle(state->mutex);
438
#else
439
pthread_join(state->thread, NULL);
440
pthread_mutex_destroy(&state->mutex);
441
#endif
442
443
fz_free(ctx, state->buffer);
444
fz_free(ctx, state->map);
445
fz_free(ctx, state);
446
}
447
448
static fz_stream hack_stream;
449
static curl_stream_state hack;
450
static int hack_pos;
451
452
static void
453
stream_seek(fz_context *ctx, fz_stream *stream, int offset, int whence)
454
{
455
curl_stream_state *state = (curl_stream_state *)stream->state;
456
457
switch(whence)
458
{
459
case SEEK_CUR:
460
offset += stream->pos;
461
break;
462
case SEEK_END:
463
offset += state->content_length;
464
break;
465
default:
466
case SEEK_SET:
467
break;
468
}
469
if (offset < 0)
470
offset = 0;
471
else if (state->content_length > 0 && offset > state->content_length)
472
offset = state->content_length;
473
stream->wp = stream->rp;
474
stream->pos = offset;
475
hack = *state;
476
hack_pos = offset;
477
hack_stream = *stream;
478
}
479
480
static int
481
stream_meta(fz_context *ctx, fz_stream *stream, int key, int size, void *ptr)
482
{
483
curl_stream_state *state = (curl_stream_state *)stream->state;
484
485
switch(key)
486
{
487
case FZ_STREAM_META_LENGTH:
488
if (!state->data_arrived)
489
fz_throw(ctx, FZ_ERROR_TRYLATER, "still awaiting file length");
490
return state->content_length;
491
case FZ_STREAM_META_PROGRESSIVE:
492
return 1;
493
}
494
return -1;
495
}
496
497
fz_stream *fz_stream_from_curl(fz_context *ctx, char *filename, void (*more_data)(void *,int), void *more_data_arg)
498
{
499
CURLcode ret;
500
CURL *handle;
501
curl_stream_state *state = fz_malloc_struct(ctx, curl_stream_state);
502
fz_stream *stream;
503
504
ret = curl_global_init(CURL_GLOBAL_ALL);
505
if (ret != 0)
506
fz_throw(ctx, FZ_ERROR_GENERIC, "curl init failed (code %d)", ret);
507
508
state->ctx = ctx;
509
state->handle = handle = curl_easy_init();
510
state->more_data = more_data;
511
state->more_data_arg = more_data_arg;
512
513
curl_easy_setopt(handle, CURLOPT_URL, filename);
514
515
curl_easy_setopt(handle, CURLOPT_NOPROGRESS, 1);
516
517
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, data_arrived);
518
519
curl_easy_setopt(handle, CURLOPT_WRITEDATA, state);
520
521
curl_easy_setopt(handle, CURLOPT_WRITEHEADER, state);
522
523
curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, header_arrived);
524
525
#if defined(_WIN32) || defined(_WIN64)
526
state->mutex = CreateMutex(NULL, FALSE, NULL);
527
if (state->mutex == NULL)
528
fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
529
530
state->thread = CreateThread(NULL, 0, win_thread, state, 0, &state->thread_id);
531
if (state->thread == NULL)
532
fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
533
#else
534
if (pthread_mutex_init(&state->mutex, NULL))
535
fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
536
537
if (pthread_create(&state->thread, NULL, pthread_thread, state))
538
fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
539
540
#endif
541
542
stream = fz_new_stream(ctx, state, stream_next, stream_close);
543
stream->seek = stream_seek;
544
stream->meta = stream_meta;
545
return stream;
546
}
547
548