Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sbin/hastd/secondary.c
39475 views
1
/*-
2
* SPDX-License-Identifier: BSD-2-Clause
3
*
4
* Copyright (c) 2009-2010 The FreeBSD Foundation
5
* Copyright (c) 2010 Pawel Jakub Dawidek <[email protected]>
6
* All rights reserved.
7
*
8
* This software was developed by Pawel Jakub Dawidek under sponsorship from
9
* the FreeBSD Foundation.
10
*
11
* Redistribution and use in source and binary forms, with or without
12
* modification, are permitted provided that the following conditions
13
* are met:
14
* 1. Redistributions of source code must retain the above copyright
15
* notice, this list of conditions and the following disclaimer.
16
* 2. Redistributions in binary form must reproduce the above copyright
17
* notice, this list of conditions and the following disclaimer in the
18
* documentation and/or other materials provided with the distribution.
19
*
20
* THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
21
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
24
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
26
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
27
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
29
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30
* SUCH DAMAGE.
31
*/
32
33
#include <sys/param.h>
34
#include <sys/time.h>
35
#include <sys/bio.h>
36
#include <sys/disk.h>
37
#include <sys/stat.h>
38
39
#include <err.h>
40
#include <errno.h>
41
#include <fcntl.h>
42
#include <libgeom.h>
43
#include <pthread.h>
44
#include <signal.h>
45
#include <stdint.h>
46
#include <stdio.h>
47
#include <string.h>
48
#include <sysexits.h>
49
#include <unistd.h>
50
51
#include <activemap.h>
52
#include <nv.h>
53
#include <pjdlog.h>
54
55
#include "control.h"
56
#include "event.h"
57
#include "hast.h"
58
#include "hast_proto.h"
59
#include "hastd.h"
60
#include "hooks.h"
61
#include "metadata.h"
62
#include "proto.h"
63
#include "subr.h"
64
#include "synch.h"
65
66
struct hio {
67
uint64_t hio_seq;
68
int hio_error;
69
void *hio_data;
70
uint8_t hio_cmd;
71
uint64_t hio_offset;
72
uint64_t hio_length;
73
bool hio_memsync;
74
TAILQ_ENTRY(hio) hio_next;
75
};
76
77
static struct hast_resource *gres;
78
79
/*
80
* Free list holds unused structures. When free list is empty, we have to wait
81
* until some in-progress requests are freed.
82
*/
83
static TAILQ_HEAD(, hio) hio_free_list;
84
static size_t hio_free_list_size;
85
static pthread_mutex_t hio_free_list_lock;
86
static pthread_cond_t hio_free_list_cond;
87
/*
88
* Disk thread (the one that does I/O requests) takes requests from this list.
89
*/
90
static TAILQ_HEAD(, hio) hio_disk_list;
91
static size_t hio_disk_list_size;
92
static pthread_mutex_t hio_disk_list_lock;
93
static pthread_cond_t hio_disk_list_cond;
94
/*
95
* Thread that sends requests back to primary takes requests from this list.
96
*/
97
static TAILQ_HEAD(, hio) hio_send_list;
98
static size_t hio_send_list_size;
99
static pthread_mutex_t hio_send_list_lock;
100
static pthread_cond_t hio_send_list_cond;
101
102
/*
103
* Maximum number of outstanding I/O requests.
104
*/
105
#define HAST_HIO_MAX 256
106
107
static void *recv_thread(void *arg);
108
static void *disk_thread(void *arg);
109
static void *send_thread(void *arg);
110
111
#define QUEUE_INSERT(name, hio) do { \
112
mtx_lock(&hio_##name##_list_lock); \
113
if (TAILQ_EMPTY(&hio_##name##_list)) \
114
cv_broadcast(&hio_##name##_list_cond); \
115
TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \
116
hio_##name##_list_size++; \
117
mtx_unlock(&hio_##name##_list_lock); \
118
} while (0)
119
#define QUEUE_TAKE(name, hio) do { \
120
mtx_lock(&hio_##name##_list_lock); \
121
while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \
122
cv_wait(&hio_##name##_list_cond, \
123
&hio_##name##_list_lock); \
124
} \
125
PJDLOG_ASSERT(hio_##name##_list_size != 0); \
126
hio_##name##_list_size--; \
127
TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \
128
mtx_unlock(&hio_##name##_list_lock); \
129
} while (0)
130
131
static void
132
output_status_aux(struct nv *nvout)
133
{
134
135
nv_add_uint64(nvout, (uint64_t)hio_free_list_size, "idle_queue_size");
136
nv_add_uint64(nvout, (uint64_t)hio_disk_list_size, "local_queue_size");
137
nv_add_uint64(nvout, (uint64_t)hio_send_list_size, "send_queue_size");
138
}
139
140
static void
141
hio_clear(struct hio *hio)
142
{
143
144
hio->hio_seq = 0;
145
hio->hio_error = 0;
146
hio->hio_cmd = HIO_UNDEF;
147
hio->hio_offset = 0;
148
hio->hio_length = 0;
149
hio->hio_memsync = false;
150
}
151
152
static void
153
hio_copy(const struct hio *srchio, struct hio *dsthio)
154
{
155
156
/*
157
* We don't copy hio_error, hio_data and hio_next fields.
158
*/
159
160
dsthio->hio_seq = srchio->hio_seq;
161
dsthio->hio_cmd = srchio->hio_cmd;
162
dsthio->hio_offset = srchio->hio_offset;
163
dsthio->hio_length = srchio->hio_length;
164
dsthio->hio_memsync = srchio->hio_memsync;
165
}
166
167
static void
168
init_environment(void)
169
{
170
struct hio *hio;
171
unsigned int ii;
172
173
/*
174
* Initialize lists, their locks and theirs condition variables.
175
*/
176
TAILQ_INIT(&hio_free_list);
177
mtx_init(&hio_free_list_lock);
178
cv_init(&hio_free_list_cond);
179
TAILQ_INIT(&hio_disk_list);
180
mtx_init(&hio_disk_list_lock);
181
cv_init(&hio_disk_list_cond);
182
TAILQ_INIT(&hio_send_list);
183
mtx_init(&hio_send_list_lock);
184
cv_init(&hio_send_list_cond);
185
186
/*
187
* Allocate requests pool and initialize requests.
188
*/
189
for (ii = 0; ii < HAST_HIO_MAX; ii++) {
190
hio = malloc(sizeof(*hio));
191
if (hio == NULL) {
192
pjdlog_exitx(EX_TEMPFAIL,
193
"Unable to allocate memory (%zu bytes) for hio request.",
194
sizeof(*hio));
195
}
196
hio->hio_data = malloc(MAXPHYS);
197
if (hio->hio_data == NULL) {
198
pjdlog_exitx(EX_TEMPFAIL,
199
"Unable to allocate memory (%zu bytes) for gctl_data.",
200
(size_t)MAXPHYS);
201
}
202
hio_clear(hio);
203
TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
204
hio_free_list_size++;
205
}
206
}
207
208
static void
209
init_local(struct hast_resource *res)
210
{
211
212
if (metadata_read(res, true) == -1)
213
exit(EX_NOINPUT);
214
}
215
216
static void
217
init_remote(struct hast_resource *res, struct nv *nvin)
218
{
219
uint64_t resuid;
220
struct nv *nvout;
221
unsigned char *map;
222
size_t mapsize;
223
224
#ifdef notyet
225
/* Setup direction. */
226
if (proto_send(res->hr_remoteout, NULL, 0) == -1)
227
pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
228
#endif
229
230
nvout = nv_alloc();
231
nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
232
nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
233
resuid = nv_get_uint64(nvin, "resuid");
234
res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
235
res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
236
nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
237
nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
238
mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
239
METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
240
map = malloc(mapsize);
241
if (map == NULL) {
242
pjdlog_exitx(EX_TEMPFAIL,
243
"Unable to allocate memory (%zu bytes) for activemap.",
244
mapsize);
245
}
246
/*
247
* When we work as primary and secondary is missing we will increase
248
* localcnt in our metadata. When secondary is connected and synced
249
* we make localcnt be equal to remotecnt, which means nodes are more
250
* or less in sync.
251
* Split-brain condition is when both nodes are not able to communicate
252
* and are both configured as primary nodes. In turn, they can both
253
* make incompatible changes to the data and we have to detect that.
254
* Under split-brain condition we will increase our localcnt on first
255
* write and remote node will increase its localcnt on first write.
256
* When we connect we can see that primary's localcnt is greater than
257
* our remotecnt (primary was modified while we weren't watching) and
258
* our localcnt is greater than primary's remotecnt (we were modified
259
* while primary wasn't watching).
260
* There are many possible combinations which are all gathered below.
261
* Don't pay too much attention to exact numbers, the more important
262
* is to compare them. We compare secondary's local with primary's
263
* remote and secondary's remote with primary's local.
264
* Note that every case where primary's localcnt is smaller than
265
* secondary's remotecnt and where secondary's localcnt is smaller than
266
* primary's remotecnt should be impossible in practise. We will perform
267
* full synchronization then. Those cases are marked with an asterisk.
268
* Regular synchronization means that only extents marked as dirty are
269
* synchronized (regular synchronization).
270
*
271
* SECONDARY METADATA PRIMARY METADATA
272
* local=3 remote=3 local=2 remote=2* ?! Full sync from secondary.
273
* local=3 remote=3 local=2 remote=3* ?! Full sync from primary.
274
* local=3 remote=3 local=2 remote=4* ?! Full sync from primary.
275
* local=3 remote=3 local=3 remote=2 Primary is out-of-date,
276
* regular sync from secondary.
277
* local=3 remote=3 local=3 remote=3 Regular sync just in case.
278
* local=3 remote=3 local=3 remote=4* ?! Full sync from primary.
279
* local=3 remote=3 local=4 remote=2 Split-brain condition.
280
* local=3 remote=3 local=4 remote=3 Secondary out-of-date,
281
* regular sync from primary.
282
* local=3 remote=3 local=4 remote=4* ?! Full sync from primary.
283
*/
284
if (res->hr_resuid == 0) {
285
/*
286
* Provider is used for the first time. If primary node done no
287
* writes yet as well (we will find "virgin" argument) then
288
* there is no need to synchronize anything. If primary node
289
* done any writes already we have to synchronize everything.
290
*/
291
PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
292
res->hr_resuid = resuid;
293
if (metadata_write(res) == -1)
294
exit(EX_NOINPUT);
295
if (nv_exists(nvin, "virgin")) {
296
free(map);
297
map = NULL;
298
mapsize = 0;
299
} else {
300
memset(map, 0xff, mapsize);
301
}
302
nv_add_int8(nvout, 1, "virgin");
303
nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
304
} else if (res->hr_resuid != resuid) {
305
char errmsg[256];
306
307
free(map);
308
(void)snprintf(errmsg, sizeof(errmsg),
309
"Resource unique ID mismatch (primary=%ju, secondary=%ju).",
310
(uintmax_t)resuid, (uintmax_t)res->hr_resuid);
311
pjdlog_error("%s", errmsg);
312
nv_add_string(nvout, errmsg, "errmsg");
313
if (hast_proto_send(res, res->hr_remotein, nvout,
314
NULL, 0) == -1) {
315
pjdlog_exit(EX_TEMPFAIL,
316
"Unable to send response to %s",
317
res->hr_remoteaddr);
318
}
319
nv_free(nvout);
320
exit(EX_CONFIG);
321
} else if (
322
/* Is primary out-of-date? */
323
(res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
324
res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
325
/* Are the nodes more or less in sync? */
326
(res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
327
res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
328
/* Is secondary out-of-date? */
329
(res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
330
res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
331
/*
332
* Nodes are more or less in sync or one of the nodes is
333
* out-of-date.
334
* It doesn't matter at this point which one, we just have to
335
* send out local bitmap to the remote node.
336
*/
337
if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
338
(ssize_t)mapsize) {
339
pjdlog_exit(LOG_ERR, "Unable to read activemap");
340
}
341
if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
342
res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
343
/* Primary is out-of-date, sync from secondary. */
344
nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
345
} else {
346
/*
347
* Secondary is out-of-date or counts match.
348
* Sync from primary.
349
*/
350
nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
351
}
352
} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
353
res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
354
/*
355
* Not good, we have split-brain condition.
356
*/
357
free(map);
358
pjdlog_error("Split-brain detected, exiting.");
359
nv_add_string(nvout, "Split-brain condition!", "errmsg");
360
if (hast_proto_send(res, res->hr_remotein, nvout,
361
NULL, 0) == -1) {
362
pjdlog_exit(EX_TEMPFAIL,
363
"Unable to send response to %s",
364
res->hr_remoteaddr);
365
}
366
nv_free(nvout);
367
/* Exit on split-brain. */
368
event_send(res, EVENT_SPLITBRAIN);
369
exit(EX_CONFIG);
370
} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
371
res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
372
/*
373
* This should never happen in practise, but we will perform
374
* full synchronization.
375
*/
376
PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
377
res->hr_primary_localcnt < res->hr_secondary_remotecnt);
378
mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
379
METADATA_SIZE, res->hr_extentsize,
380
res->hr_local_sectorsize);
381
memset(map, 0xff, mapsize);
382
if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
383
/* In this one of five cases sync from secondary. */
384
nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
385
} else {
386
/* For the rest four cases sync from primary. */
387
nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
388
}
389
pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
390
(uintmax_t)res->hr_primary_localcnt,
391
(uintmax_t)res->hr_primary_remotecnt,
392
(uintmax_t)res->hr_secondary_localcnt,
393
(uintmax_t)res->hr_secondary_remotecnt);
394
}
395
nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
396
if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
397
pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
398
res->hr_remoteaddr);
399
}
400
if (map != NULL)
401
free(map);
402
nv_free(nvout);
403
#ifdef notyet
404
/* Setup direction. */
405
if (proto_recv(res->hr_remotein, NULL, 0) == -1)
406
pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
407
#endif
408
}
409
410
void
411
hastd_secondary(struct hast_resource *res, struct nv *nvin)
412
{
413
sigset_t mask;
414
pthread_t td;
415
pid_t pid;
416
int error, mode, debuglevel;
417
418
/*
419
* Create communication channel between parent and child.
420
*/
421
if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
422
KEEP_ERRNO((void)pidfile_remove(pfh));
423
pjdlog_exit(EX_OSERR,
424
"Unable to create control sockets between parent and child");
425
}
426
/*
427
* Create communication channel between child and parent.
428
*/
429
if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
430
KEEP_ERRNO((void)pidfile_remove(pfh));
431
pjdlog_exit(EX_OSERR,
432
"Unable to create event sockets between child and parent");
433
}
434
435
pid = fork();
436
if (pid == -1) {
437
KEEP_ERRNO((void)pidfile_remove(pfh));
438
pjdlog_exit(EX_OSERR, "Unable to fork");
439
}
440
441
if (pid > 0) {
442
/* This is parent. */
443
proto_close(res->hr_remotein);
444
res->hr_remotein = NULL;
445
proto_close(res->hr_remoteout);
446
res->hr_remoteout = NULL;
447
/* Declare that we are receiver. */
448
proto_recv(res->hr_event, NULL, 0);
449
/* Declare that we are sender. */
450
proto_send(res->hr_ctrl, NULL, 0);
451
res->hr_workerpid = pid;
452
return;
453
}
454
455
gres = res;
456
res->output_status_aux = output_status_aux;
457
mode = pjdlog_mode_get();
458
debuglevel = pjdlog_debug_get();
459
460
/* Declare that we are sender. */
461
proto_send(res->hr_event, NULL, 0);
462
/* Declare that we are receiver. */
463
proto_recv(res->hr_ctrl, NULL, 0);
464
descriptors_cleanup(res);
465
466
descriptors_assert(res, mode);
467
468
pjdlog_init(mode);
469
pjdlog_debug_set(debuglevel);
470
pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
471
setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
472
473
PJDLOG_VERIFY(sigemptyset(&mask) == 0);
474
PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
475
476
/* Error in setting timeout is not critical, but why should it fail? */
477
if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
478
pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
479
if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
480
pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
481
482
init_local(res);
483
init_environment();
484
485
if (drop_privs(res) != 0)
486
exit(EX_CONFIG);
487
pjdlog_info("Privileges successfully dropped.");
488
489
/*
490
* Create the control thread before sending any event to the parent,
491
* as we can deadlock when parent sends control request to worker,
492
* but worker has no control thread started yet, so parent waits.
493
* In the meantime worker sends an event to the parent, but parent
494
* is unable to handle the event, because it waits for control
495
* request response.
496
*/
497
error = pthread_create(&td, NULL, ctrl_thread, res);
498
PJDLOG_ASSERT(error == 0);
499
500
init_remote(res, nvin);
501
event_send(res, EVENT_CONNECT);
502
503
error = pthread_create(&td, NULL, recv_thread, res);
504
PJDLOG_ASSERT(error == 0);
505
error = pthread_create(&td, NULL, disk_thread, res);
506
PJDLOG_ASSERT(error == 0);
507
(void)send_thread(res);
508
}
509
510
static void
511
reqlog(int loglevel, int debuglevel, int error, struct hio *hio,
512
const char *fmt, ...)
513
{
514
char msg[1024];
515
va_list ap;
516
int len;
517
518
va_start(ap, fmt);
519
len = vsnprintf(msg, sizeof(msg), fmt, ap);
520
va_end(ap);
521
if ((size_t)len < sizeof(msg)) {
522
switch (hio->hio_cmd) {
523
case HIO_READ:
524
(void)snprintf(msg + len, sizeof(msg) - len,
525
"READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
526
(uintmax_t)hio->hio_length);
527
break;
528
case HIO_DELETE:
529
(void)snprintf(msg + len, sizeof(msg) - len,
530
"DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
531
(uintmax_t)hio->hio_length);
532
break;
533
case HIO_FLUSH:
534
(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
535
break;
536
case HIO_WRITE:
537
(void)snprintf(msg + len, sizeof(msg) - len,
538
"WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
539
(uintmax_t)hio->hio_length);
540
break;
541
case HIO_KEEPALIVE:
542
(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
543
break;
544
default:
545
(void)snprintf(msg + len, sizeof(msg) - len,
546
"UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
547
break;
548
}
549
}
550
pjdlog_common(loglevel, debuglevel, error, "%s", msg);
551
}
552
553
static int
554
requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
555
{
556
557
hio->hio_cmd = nv_get_uint8(nv, "cmd");
558
if (hio->hio_cmd == 0) {
559
pjdlog_error("Header contains no 'cmd' field.");
560
hio->hio_error = EINVAL;
561
goto end;
562
}
563
if (hio->hio_cmd != HIO_KEEPALIVE) {
564
hio->hio_seq = nv_get_uint64(nv, "seq");
565
if (hio->hio_seq == 0) {
566
pjdlog_error("Header contains no 'seq' field.");
567
hio->hio_error = EINVAL;
568
goto end;
569
}
570
}
571
switch (hio->hio_cmd) {
572
case HIO_FLUSH:
573
case HIO_KEEPALIVE:
574
break;
575
case HIO_WRITE:
576
hio->hio_memsync = nv_exists(nv, "memsync");
577
/* FALLTHROUGH */
578
case HIO_READ:
579
case HIO_DELETE:
580
hio->hio_offset = nv_get_uint64(nv, "offset");
581
if (nv_error(nv) != 0) {
582
pjdlog_error("Header is missing 'offset' field.");
583
hio->hio_error = EINVAL;
584
goto end;
585
}
586
hio->hio_length = nv_get_uint64(nv, "length");
587
if (nv_error(nv) != 0) {
588
pjdlog_error("Header is missing 'length' field.");
589
hio->hio_error = EINVAL;
590
goto end;
591
}
592
if (hio->hio_length == 0) {
593
pjdlog_error("Data length is zero.");
594
hio->hio_error = EINVAL;
595
goto end;
596
}
597
if (hio->hio_cmd != HIO_DELETE && hio->hio_length > MAXPHYS) {
598
pjdlog_error("Data length is too large (%ju > %ju).",
599
(uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
600
hio->hio_error = EINVAL;
601
goto end;
602
}
603
if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
604
pjdlog_error("Offset %ju is not multiple of sector size.",
605
(uintmax_t)hio->hio_offset);
606
hio->hio_error = EINVAL;
607
goto end;
608
}
609
if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
610
pjdlog_error("Length %ju is not multiple of sector size.",
611
(uintmax_t)hio->hio_length);
612
hio->hio_error = EINVAL;
613
goto end;
614
}
615
if (hio->hio_offset + hio->hio_length >
616
(uint64_t)res->hr_datasize) {
617
pjdlog_error("Data offset is too large (%ju > %ju).",
618
(uintmax_t)(hio->hio_offset + hio->hio_length),
619
(uintmax_t)res->hr_datasize);
620
hio->hio_error = EINVAL;
621
goto end;
622
}
623
break;
624
default:
625
pjdlog_error("Header contains invalid 'cmd' (%hhu).",
626
hio->hio_cmd);
627
hio->hio_error = EINVAL;
628
goto end;
629
}
630
hio->hio_error = 0;
631
end:
632
return (hio->hio_error);
633
}
634
635
static __dead2 void
636
secondary_exit(int exitcode, const char *fmt, ...)
637
{
638
va_list ap;
639
640
PJDLOG_ASSERT(exitcode != EX_OK);
641
va_start(ap, fmt);
642
pjdlogv_errno(LOG_ERR, fmt, ap);
643
va_end(ap);
644
event_send(gres, EVENT_DISCONNECT);
645
exit(exitcode);
646
}
647
648
/*
649
* Thread receives requests from the primary node.
650
*/
651
static void *
652
recv_thread(void *arg)
653
{
654
struct hast_resource *res = arg;
655
struct hio *hio, *mshio;
656
struct nv *nv;
657
658
for (;;) {
659
pjdlog_debug(2, "recv: Taking free request.");
660
QUEUE_TAKE(free, hio);
661
pjdlog_debug(2, "recv: (%p) Got request.", hio);
662
if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
663
secondary_exit(EX_TEMPFAIL,
664
"Unable to receive request header");
665
}
666
if (requnpack(res, hio, nv) != 0) {
667
nv_free(nv);
668
pjdlog_debug(2,
669
"recv: (%p) Moving request to the send queue.",
670
hio);
671
QUEUE_INSERT(send, hio);
672
continue;
673
}
674
switch (hio->hio_cmd) {
675
case HIO_READ:
676
res->hr_stat_read++;
677
break;
678
case HIO_WRITE:
679
res->hr_stat_write++;
680
break;
681
case HIO_DELETE:
682
res->hr_stat_delete++;
683
break;
684
case HIO_FLUSH:
685
res->hr_stat_flush++;
686
break;
687
case HIO_KEEPALIVE:
688
break;
689
default:
690
PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
691
hio->hio_cmd);
692
}
693
reqlog(LOG_DEBUG, 2, -1, hio,
694
"recv: (%p) Got request header: ", hio);
695
if (hio->hio_cmd == HIO_KEEPALIVE) {
696
nv_free(nv);
697
pjdlog_debug(2,
698
"recv: (%p) Moving request to the free queue.",
699
hio);
700
hio_clear(hio);
701
QUEUE_INSERT(free, hio);
702
continue;
703
} else if (hio->hio_cmd == HIO_WRITE) {
704
if (hast_proto_recv_data(res, res->hr_remotein, nv,
705
hio->hio_data, MAXPHYS) == -1) {
706
secondary_exit(EX_TEMPFAIL,
707
"Unable to receive request data");
708
}
709
if (hio->hio_memsync) {
710
/*
711
* For memsync requests we expect two replies.
712
* Clone the hio so we can handle both of them.
713
*/
714
pjdlog_debug(2, "recv: Taking free request.");
715
QUEUE_TAKE(free, mshio);
716
pjdlog_debug(2, "recv: (%p) Got request.",
717
mshio);
718
hio_copy(hio, mshio);
719
mshio->hio_error = 0;
720
/*
721
* We want to keep 'memsync' tag only on the
722
* request going onto send queue (mshio).
723
*/
724
hio->hio_memsync = false;
725
pjdlog_debug(2,
726
"recv: (%p) Moving memsync request to the send queue.",
727
mshio);
728
QUEUE_INSERT(send, mshio);
729
}
730
}
731
nv_free(nv);
732
pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
733
hio);
734
QUEUE_INSERT(disk, hio);
735
}
736
/* NOTREACHED */
737
return (NULL);
738
}
739
740
/*
741
* Thread reads from or writes to local component and also handles DELETE and
742
* FLUSH requests.
743
*/
744
static void *
745
disk_thread(void *arg)
746
{
747
struct hast_resource *res = arg;
748
struct hio *hio;
749
ssize_t ret;
750
bool clear_activemap, logerror;
751
752
clear_activemap = true;
753
754
for (;;) {
755
pjdlog_debug(2, "disk: Taking request.");
756
QUEUE_TAKE(disk, hio);
757
while (clear_activemap) {
758
unsigned char *map;
759
size_t mapsize;
760
761
/*
762
* When first request is received, it means that primary
763
* already received our activemap, merged it and stored
764
* locally. We can now safely clear our activemap.
765
*/
766
mapsize =
767
activemap_calc_ondisk_size(res->hr_local_mediasize -
768
METADATA_SIZE, res->hr_extentsize,
769
res->hr_local_sectorsize);
770
map = calloc(1, mapsize);
771
if (map == NULL) {
772
pjdlog_warning("Unable to allocate memory to clear local activemap.");
773
break;
774
}
775
if (pwrite(res->hr_localfd, map, mapsize,
776
METADATA_SIZE) != (ssize_t)mapsize) {
777
pjdlog_errno(LOG_WARNING,
778
"Unable to store cleared activemap");
779
free(map);
780
res->hr_stat_activemap_write_error++;
781
break;
782
}
783
free(map);
784
clear_activemap = false;
785
pjdlog_debug(1, "Local activemap cleared.");
786
break;
787
}
788
reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
789
logerror = true;
790
/* Handle the actual request. */
791
switch (hio->hio_cmd) {
792
case HIO_READ:
793
ret = pread(res->hr_localfd, hio->hio_data,
794
hio->hio_length,
795
hio->hio_offset + res->hr_localoff);
796
if (ret == -1)
797
hio->hio_error = errno;
798
else if (ret != (int64_t)hio->hio_length)
799
hio->hio_error = EIO;
800
else
801
hio->hio_error = 0;
802
break;
803
case HIO_WRITE:
804
ret = pwrite(res->hr_localfd, hio->hio_data,
805
hio->hio_length,
806
hio->hio_offset + res->hr_localoff);
807
if (ret == -1)
808
hio->hio_error = errno;
809
else if (ret != (int64_t)hio->hio_length)
810
hio->hio_error = EIO;
811
else
812
hio->hio_error = 0;
813
break;
814
case HIO_DELETE:
815
ret = g_delete(res->hr_localfd,
816
hio->hio_offset + res->hr_localoff,
817
hio->hio_length);
818
if (ret == -1)
819
hio->hio_error = errno;
820
else
821
hio->hio_error = 0;
822
break;
823
case HIO_FLUSH:
824
if (!res->hr_localflush) {
825
ret = -1;
826
hio->hio_error = EOPNOTSUPP;
827
logerror = false;
828
break;
829
}
830
ret = g_flush(res->hr_localfd);
831
if (ret == -1) {
832
if (errno == EOPNOTSUPP)
833
res->hr_localflush = false;
834
hio->hio_error = errno;
835
} else {
836
hio->hio_error = 0;
837
}
838
break;
839
default:
840
PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
841
hio->hio_cmd);
842
}
843
if (logerror && hio->hio_error != 0) {
844
reqlog(LOG_ERR, 0, hio->hio_error, hio,
845
"Request failed: ");
846
}
847
pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
848
hio);
849
QUEUE_INSERT(send, hio);
850
}
851
/* NOTREACHED */
852
return (NULL);
853
}
854
855
/*
856
* Thread sends requests back to primary node.
857
*/
858
static void *
859
send_thread(void *arg)
860
{
861
struct hast_resource *res = arg;
862
struct nv *nvout;
863
struct hio *hio;
864
void *data;
865
size_t length;
866
867
for (;;) {
868
pjdlog_debug(2, "send: Taking request.");
869
QUEUE_TAKE(send, hio);
870
reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
871
nvout = nv_alloc();
872
/* Copy sequence number. */
873
nv_add_uint64(nvout, hio->hio_seq, "seq");
874
if (hio->hio_memsync) {
875
PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
876
nv_add_int8(nvout, 1, "received");
877
}
878
switch (hio->hio_cmd) {
879
case HIO_READ:
880
if (hio->hio_error == 0) {
881
data = hio->hio_data;
882
length = hio->hio_length;
883
break;
884
}
885
/*
886
* We send no data in case of an error.
887
*/
888
/* FALLTHROUGH */
889
case HIO_DELETE:
890
case HIO_FLUSH:
891
case HIO_WRITE:
892
data = NULL;
893
length = 0;
894
break;
895
default:
896
PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
897
hio->hio_cmd);
898
}
899
if (hio->hio_error != 0) {
900
switch (hio->hio_cmd) {
901
case HIO_READ:
902
res->hr_stat_read_error++;
903
break;
904
case HIO_WRITE:
905
res->hr_stat_write_error++;
906
break;
907
case HIO_DELETE:
908
res->hr_stat_delete_error++;
909
break;
910
case HIO_FLUSH:
911
res->hr_stat_flush_error++;
912
break;
913
}
914
nv_add_int16(nvout, hio->hio_error, "error");
915
}
916
if (hast_proto_send(res, res->hr_remoteout, nvout, data,
917
length) == -1) {
918
secondary_exit(EX_TEMPFAIL, "Unable to send reply");
919
}
920
nv_free(nvout);
921
pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
922
hio);
923
hio_clear(hio);
924
QUEUE_INSERT(free, hio);
925
}
926
/* NOTREACHED */
927
return (NULL);
928
}
929
930