Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sbin/hastd/primary.c
39475 views
1
/*-
2
* SPDX-License-Identifier: BSD-2-Clause
3
*
4
* Copyright (c) 2009 The FreeBSD Foundation
5
* Copyright (c) 2010-2011 Pawel Jakub Dawidek <[email protected]>
6
* All rights reserved.
7
*
8
* This software was developed by Pawel Jakub Dawidek under sponsorship from
9
* the FreeBSD Foundation.
10
*
11
* Redistribution and use in source and binary forms, with or without
12
* modification, are permitted provided that the following conditions
13
* are met:
14
* 1. Redistributions of source code must retain the above copyright
15
* notice, this list of conditions and the following disclaimer.
16
* 2. Redistributions in binary form must reproduce the above copyright
17
* notice, this list of conditions and the following disclaimer in the
18
* documentation and/or other materials provided with the distribution.
19
*
20
* THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
21
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
24
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
26
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
27
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
29
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30
* SUCH DAMAGE.
31
*/
32
33
#include <sys/types.h>
34
#include <sys/time.h>
35
#include <sys/bio.h>
36
#include <sys/disk.h>
37
#include <sys/stat.h>
38
39
#include <geom/gate/g_gate.h>
40
41
#include <err.h>
42
#include <errno.h>
43
#include <fcntl.h>
44
#include <libgeom.h>
45
#include <pthread.h>
46
#include <signal.h>
47
#include <stdint.h>
48
#include <stdio.h>
49
#include <string.h>
50
#include <sysexits.h>
51
#include <unistd.h>
52
53
#include <activemap.h>
54
#include <nv.h>
55
#include <rangelock.h>
56
57
#include "control.h"
58
#include "event.h"
59
#include "hast.h"
60
#include "hast_proto.h"
61
#include "hastd.h"
62
#include "hooks.h"
63
#include "metadata.h"
64
#include "proto.h"
65
#include "pjdlog.h"
66
#include "refcnt.h"
67
#include "subr.h"
68
#include "synch.h"
69
70
/* The is only one remote component for now. */
71
#define ISREMOTE(no) ((no) == 1)
72
73
struct hio {
74
/*
75
* Number of components we are still waiting for.
76
* When this field goes to 0, we can send the request back to the
77
* kernel. Each component has to decrease this counter by one
78
* even on failure.
79
*/
80
refcnt_t hio_countdown;
81
/*
82
* Each component has a place to store its own error.
83
* Once the request is handled by all components we can decide if the
84
* request overall is successful or not.
85
*/
86
int *hio_errors;
87
/*
88
* Structure used to communicate with GEOM Gate class.
89
*/
90
struct g_gate_ctl_io hio_ggio;
91
/*
92
* Request was already confirmed to GEOM Gate.
93
*/
94
bool hio_done;
95
/*
96
* Number of components we are still waiting before sending write
97
* completion ack to GEOM Gate. Used for memsync.
98
*/
99
refcnt_t hio_writecount;
100
/*
101
* Memsync request was acknowledged by remote.
102
*/
103
bool hio_memsyncacked;
104
/*
105
* Remember replication from the time the request was initiated,
106
* so we won't get confused when replication changes on reload.
107
*/
108
int hio_replication;
109
TAILQ_ENTRY(hio) *hio_next;
110
};
111
#define hio_free_next hio_next[0]
112
#define hio_done_next hio_next[0]
113
114
/*
115
* Free list holds unused structures. When free list is empty, we have to wait
116
* until some in-progress requests are freed.
117
*/
118
static TAILQ_HEAD(, hio) hio_free_list;
119
static size_t hio_free_list_size;
120
static pthread_mutex_t hio_free_list_lock;
121
static pthread_cond_t hio_free_list_cond;
122
/*
123
* There is one send list for every component. One requests is placed on all
124
* send lists - each component gets the same request, but each component is
125
* responsible for managing his own send list.
126
*/
127
static TAILQ_HEAD(, hio) *hio_send_list;
128
static size_t *hio_send_list_size;
129
static pthread_mutex_t *hio_send_list_lock;
130
static pthread_cond_t *hio_send_list_cond;
131
#define hio_send_local_list_size hio_send_list_size[0]
132
#define hio_send_remote_list_size hio_send_list_size[1]
133
/*
134
* There is one recv list for every component, although local components don't
135
* use recv lists as local requests are done synchronously.
136
*/
137
static TAILQ_HEAD(, hio) *hio_recv_list;
138
static size_t *hio_recv_list_size;
139
static pthread_mutex_t *hio_recv_list_lock;
140
static pthread_cond_t *hio_recv_list_cond;
141
#define hio_recv_remote_list_size hio_recv_list_size[1]
142
/*
143
* Request is placed on done list by the slowest component (the one that
144
* decreased hio_countdown from 1 to 0).
145
*/
146
static TAILQ_HEAD(, hio) hio_done_list;
147
static size_t hio_done_list_size;
148
static pthread_mutex_t hio_done_list_lock;
149
static pthread_cond_t hio_done_list_cond;
150
/*
151
* Structure below are for interaction with sync thread.
152
*/
153
static bool sync_inprogress;
154
static pthread_mutex_t sync_lock;
155
static pthread_cond_t sync_cond;
156
/*
157
* The lock below allows to synchornize access to remote connections.
158
*/
159
static pthread_rwlock_t *hio_remote_lock;
160
161
/*
162
* Lock to synchronize metadata updates. Also synchronize access to
163
* hr_primary_localcnt and hr_primary_remotecnt fields.
164
*/
165
static pthread_mutex_t metadata_lock;
166
167
/*
168
* Maximum number of outstanding I/O requests.
169
*/
170
#define HAST_HIO_MAX 256
171
/*
172
* Number of components. At this point there are only two components: local
173
* and remote, but in the future it might be possible to use multiple local
174
* and remote components.
175
*/
176
#define HAST_NCOMPONENTS 2
177
178
#define ISCONNECTED(res, no) \
179
((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
180
181
#define QUEUE_INSERT1(hio, name, ncomp) do { \
182
mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
183
if (TAILQ_EMPTY(&hio_##name##_list[(ncomp)])) \
184
cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \
185
TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \
186
hio_next[(ncomp)]); \
187
hio_##name##_list_size[(ncomp)]++; \
188
mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \
189
} while (0)
190
#define QUEUE_INSERT2(hio, name) do { \
191
mtx_lock(&hio_##name##_list_lock); \
192
if (TAILQ_EMPTY(&hio_##name##_list)) \
193
cv_broadcast(&hio_##name##_list_cond); \
194
TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
195
hio_##name##_list_size++; \
196
mtx_unlock(&hio_##name##_list_lock); \
197
} while (0)
198
#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \
199
bool _last; \
200
\
201
mtx_lock(&hio_##name##_list_lock[(ncomp)]); \
202
_last = false; \
203
while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
204
cv_timedwait(&hio_##name##_list_cond[(ncomp)], \
205
&hio_##name##_list_lock[(ncomp)], (timeout)); \
206
if ((timeout) != 0) \
207
_last = true; \
208
} \
209
if (hio != NULL) { \
210
PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \
211
hio_##name##_list_size[(ncomp)]--; \
212
TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \
213
hio_next[(ncomp)]); \
214
} \
215
mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \
216
} while (0)
217
#define QUEUE_TAKE2(hio, name) do { \
218
mtx_lock(&hio_##name##_list_lock); \
219
while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \
220
cv_wait(&hio_##name##_list_cond, \
221
&hio_##name##_list_lock); \
222
} \
223
PJDLOG_ASSERT(hio_##name##_list_size != 0); \
224
hio_##name##_list_size--; \
225
TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \
226
mtx_unlock(&hio_##name##_list_lock); \
227
} while (0)
228
229
#define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC)
230
#define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC)
231
#define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC)
232
233
#define SYNCREQ(hio) do { \
234
(hio)->hio_ggio.gctl_unit = -1; \
235
(hio)->hio_ggio.gctl_seq = 1; \
236
} while (0)
237
#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1)
238
#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
239
#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2)
240
241
#define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \
242
(hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio))
243
244
static struct hast_resource *gres;
245
246
static pthread_mutex_t range_lock;
247
static struct rangelocks *range_regular;
248
static bool range_regular_wait;
249
static pthread_cond_t range_regular_cond;
250
static struct rangelocks *range_sync;
251
static bool range_sync_wait;
252
static pthread_cond_t range_sync_cond;
253
static bool fullystarted;
254
255
static void *ggate_recv_thread(void *arg);
256
static void *local_send_thread(void *arg);
257
static void *remote_send_thread(void *arg);
258
static void *remote_recv_thread(void *arg);
259
static void *ggate_send_thread(void *arg);
260
static void *sync_thread(void *arg);
261
static void *guard_thread(void *arg);
262
263
static void
264
output_status_aux(struct nv *nvout)
265
{
266
267
nv_add_uint64(nvout, (uint64_t)hio_free_list_size,
268
"idle_queue_size");
269
nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size,
270
"local_queue_size");
271
nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size,
272
"send_queue_size");
273
nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size,
274
"recv_queue_size");
275
nv_add_uint64(nvout, (uint64_t)hio_done_list_size,
276
"done_queue_size");
277
}
278
279
static void
280
cleanup(struct hast_resource *res)
281
{
282
int rerrno;
283
284
/* Remember errno. */
285
rerrno = errno;
286
287
/* Destroy ggate provider if we created one. */
288
if (res->hr_ggateunit >= 0) {
289
struct g_gate_ctl_destroy ggiod;
290
291
bzero(&ggiod, sizeof(ggiod));
292
ggiod.gctl_version = G_GATE_VERSION;
293
ggiod.gctl_unit = res->hr_ggateunit;
294
ggiod.gctl_force = 1;
295
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) {
296
pjdlog_errno(LOG_WARNING,
297
"Unable to destroy hast/%s device",
298
res->hr_provname);
299
}
300
res->hr_ggateunit = -1;
301
}
302
303
/* Restore errno. */
304
errno = rerrno;
305
}
306
307
static __dead2 void
308
primary_exit(int exitcode, const char *fmt, ...)
309
{
310
va_list ap;
311
312
PJDLOG_ASSERT(exitcode != EX_OK);
313
va_start(ap, fmt);
314
pjdlogv_errno(LOG_ERR, fmt, ap);
315
va_end(ap);
316
cleanup(gres);
317
exit(exitcode);
318
}
319
320
static __dead2 void
321
primary_exitx(int exitcode, const char *fmt, ...)
322
{
323
va_list ap;
324
325
va_start(ap, fmt);
326
pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
327
va_end(ap);
328
cleanup(gres);
329
exit(exitcode);
330
}
331
332
static int
333
hast_activemap_flush(struct hast_resource *res) __unlocks(res->hr_amp_lock)
334
{
335
const unsigned char *buf;
336
size_t size;
337
int ret;
338
339
mtx_lock(&res->hr_amp_diskmap_lock);
340
buf = activemap_bitmap(res->hr_amp, &size);
341
mtx_unlock(&res->hr_amp_lock);
342
PJDLOG_ASSERT(buf != NULL);
343
PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0);
344
ret = 0;
345
if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
346
(ssize_t)size) {
347
pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk");
348
res->hr_stat_activemap_write_error++;
349
ret = -1;
350
}
351
if (ret == 0 && res->hr_metaflush == 1 &&
352
g_flush(res->hr_localfd) == -1) {
353
if (errno == EOPNOTSUPP) {
354
pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.",
355
res->hr_localpath);
356
res->hr_metaflush = 0;
357
} else {
358
pjdlog_errno(LOG_ERR,
359
"Unable to flush disk cache on activemap update");
360
res->hr_stat_activemap_flush_error++;
361
ret = -1;
362
}
363
}
364
mtx_unlock(&res->hr_amp_diskmap_lock);
365
return (ret);
366
}
367
368
static bool
369
real_remote(const struct hast_resource *res)
370
{
371
372
return (strcmp(res->hr_remoteaddr, "none") != 0);
373
}
374
375
static void
376
init_environment(struct hast_resource *res __unused)
377
{
378
struct hio *hio;
379
unsigned int ii, ncomps;
380
381
/*
382
* In the future it might be per-resource value.
383
*/
384
ncomps = HAST_NCOMPONENTS;
385
386
/*
387
* Allocate memory needed by lists.
388
*/
389
hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
390
if (hio_send_list == NULL) {
391
primary_exitx(EX_TEMPFAIL,
392
"Unable to allocate %zu bytes of memory for send lists.",
393
sizeof(hio_send_list[0]) * ncomps);
394
}
395
hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps);
396
if (hio_send_list_size == NULL) {
397
primary_exitx(EX_TEMPFAIL,
398
"Unable to allocate %zu bytes of memory for send list counters.",
399
sizeof(hio_send_list_size[0]) * ncomps);
400
}
401
hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
402
if (hio_send_list_lock == NULL) {
403
primary_exitx(EX_TEMPFAIL,
404
"Unable to allocate %zu bytes of memory for send list locks.",
405
sizeof(hio_send_list_lock[0]) * ncomps);
406
}
407
hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
408
if (hio_send_list_cond == NULL) {
409
primary_exitx(EX_TEMPFAIL,
410
"Unable to allocate %zu bytes of memory for send list condition variables.",
411
sizeof(hio_send_list_cond[0]) * ncomps);
412
}
413
hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
414
if (hio_recv_list == NULL) {
415
primary_exitx(EX_TEMPFAIL,
416
"Unable to allocate %zu bytes of memory for recv lists.",
417
sizeof(hio_recv_list[0]) * ncomps);
418
}
419
hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps);
420
if (hio_recv_list_size == NULL) {
421
primary_exitx(EX_TEMPFAIL,
422
"Unable to allocate %zu bytes of memory for recv list counters.",
423
sizeof(hio_recv_list_size[0]) * ncomps);
424
}
425
hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
426
if (hio_recv_list_lock == NULL) {
427
primary_exitx(EX_TEMPFAIL,
428
"Unable to allocate %zu bytes of memory for recv list locks.",
429
sizeof(hio_recv_list_lock[0]) * ncomps);
430
}
431
hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
432
if (hio_recv_list_cond == NULL) {
433
primary_exitx(EX_TEMPFAIL,
434
"Unable to allocate %zu bytes of memory for recv list condition variables.",
435
sizeof(hio_recv_list_cond[0]) * ncomps);
436
}
437
hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
438
if (hio_remote_lock == NULL) {
439
primary_exitx(EX_TEMPFAIL,
440
"Unable to allocate %zu bytes of memory for remote connections locks.",
441
sizeof(hio_remote_lock[0]) * ncomps);
442
}
443
444
/*
445
* Initialize lists, their counters, locks and condition variables.
446
*/
447
TAILQ_INIT(&hio_free_list);
448
mtx_init(&hio_free_list_lock);
449
cv_init(&hio_free_list_cond);
450
for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
451
TAILQ_INIT(&hio_send_list[ii]);
452
hio_send_list_size[ii] = 0;
453
mtx_init(&hio_send_list_lock[ii]);
454
cv_init(&hio_send_list_cond[ii]);
455
TAILQ_INIT(&hio_recv_list[ii]);
456
hio_recv_list_size[ii] = 0;
457
mtx_init(&hio_recv_list_lock[ii]);
458
cv_init(&hio_recv_list_cond[ii]);
459
rw_init(&hio_remote_lock[ii]);
460
}
461
TAILQ_INIT(&hio_done_list);
462
mtx_init(&hio_done_list_lock);
463
cv_init(&hio_done_list_cond);
464
mtx_init(&metadata_lock);
465
466
/*
467
* Allocate requests pool and initialize requests.
468
*/
469
for (ii = 0; ii < HAST_HIO_MAX; ii++) {
470
hio = malloc(sizeof(*hio));
471
if (hio == NULL) {
472
primary_exitx(EX_TEMPFAIL,
473
"Unable to allocate %zu bytes of memory for hio request.",
474
sizeof(*hio));
475
}
476
refcnt_init(&hio->hio_countdown, 0);
477
hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
478
if (hio->hio_errors == NULL) {
479
primary_exitx(EX_TEMPFAIL,
480
"Unable allocate %zu bytes of memory for hio errors.",
481
sizeof(hio->hio_errors[0]) * ncomps);
482
}
483
hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
484
if (hio->hio_next == NULL) {
485
primary_exitx(EX_TEMPFAIL,
486
"Unable allocate %zu bytes of memory for hio_next field.",
487
sizeof(hio->hio_next[0]) * ncomps);
488
}
489
hio->hio_ggio.gctl_version = G_GATE_VERSION;
490
hio->hio_ggio.gctl_data = malloc(MAXPHYS);
491
if (hio->hio_ggio.gctl_data == NULL) {
492
primary_exitx(EX_TEMPFAIL,
493
"Unable to allocate %zu bytes of memory for gctl_data.",
494
MAXPHYS);
495
}
496
hio->hio_ggio.gctl_length = MAXPHYS;
497
hio->hio_ggio.gctl_error = 0;
498
TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
499
hio_free_list_size++;
500
}
501
}
502
503
static bool
504
init_resuid(struct hast_resource *res)
505
{
506
507
mtx_lock(&metadata_lock);
508
if (res->hr_resuid != 0) {
509
mtx_unlock(&metadata_lock);
510
return (false);
511
} else {
512
/* Initialize unique resource identifier. */
513
arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
514
mtx_unlock(&metadata_lock);
515
if (metadata_write(res) == -1)
516
exit(EX_NOINPUT);
517
return (true);
518
}
519
}
520
521
static void
522
init_local(struct hast_resource *res)
523
{
524
unsigned char *buf;
525
size_t mapsize;
526
527
if (metadata_read(res, true) == -1)
528
exit(EX_NOINPUT);
529
mtx_init(&res->hr_amp_lock);
530
if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
531
res->hr_local_sectorsize, res->hr_keepdirty) == -1) {
532
primary_exit(EX_TEMPFAIL, "Unable to create activemap");
533
}
534
mtx_init(&range_lock);
535
cv_init(&range_regular_cond);
536
if (rangelock_init(&range_regular) == -1)
537
primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
538
cv_init(&range_sync_cond);
539
if (rangelock_init(&range_sync) == -1)
540
primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
541
mapsize = activemap_ondisk_size(res->hr_amp);
542
buf = calloc(1, mapsize);
543
if (buf == NULL) {
544
primary_exitx(EX_TEMPFAIL,
545
"Unable to allocate buffer for activemap.");
546
}
547
if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
548
(ssize_t)mapsize) {
549
primary_exit(EX_NOINPUT, "Unable to read activemap");
550
}
551
activemap_copyin(res->hr_amp, buf, mapsize);
552
free(buf);
553
if (res->hr_resuid != 0)
554
return;
555
/*
556
* We're using provider for the first time. Initialize local and remote
557
* counters. We don't initialize resuid here, as we want to do it just
558
* in time. The reason for this is that we want to inform secondary
559
* that there were no writes yet, so there is no need to synchronize
560
* anything.
561
*/
562
res->hr_primary_localcnt = 0;
563
res->hr_primary_remotecnt = 0;
564
if (metadata_write(res) == -1)
565
exit(EX_NOINPUT);
566
}
567
568
static int
569
primary_connect(struct hast_resource *res, struct proto_conn **connp)
570
{
571
struct proto_conn *conn;
572
int16_t val;
573
574
val = 1;
575
if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) {
576
primary_exit(EX_TEMPFAIL,
577
"Unable to send connection request to parent");
578
}
579
if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) {
580
primary_exit(EX_TEMPFAIL,
581
"Unable to receive reply to connection request from parent");
582
}
583
if (val != 0) {
584
errno = val;
585
pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
586
res->hr_remoteaddr);
587
return (-1);
588
}
589
if (proto_connection_recv(res->hr_conn, true, &conn) == -1) {
590
primary_exit(EX_TEMPFAIL,
591
"Unable to receive connection from parent");
592
}
593
if (proto_connect_wait(conn, res->hr_timeout) == -1) {
594
pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
595
res->hr_remoteaddr);
596
proto_close(conn);
597
return (-1);
598
}
599
/* Error in setting timeout is not critical, but why should it fail? */
600
if (proto_timeout(conn, res->hr_timeout) == -1)
601
pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
602
603
*connp = conn;
604
605
return (0);
606
}
607
608
/*
609
* Function instructs GEOM_GATE to handle reads directly from within the kernel.
610
*/
611
static void
612
enable_direct_reads(struct hast_resource *res)
613
{
614
struct g_gate_ctl_modify ggiomodify;
615
616
bzero(&ggiomodify, sizeof(ggiomodify));
617
ggiomodify.gctl_version = G_GATE_VERSION;
618
ggiomodify.gctl_unit = res->hr_ggateunit;
619
ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET;
620
strlcpy(ggiomodify.gctl_readprov, res->hr_localpath,
621
sizeof(ggiomodify.gctl_readprov));
622
ggiomodify.gctl_readoffset = res->hr_localoff;
623
if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0)
624
pjdlog_debug(1, "Direct reads enabled.");
625
else
626
pjdlog_errno(LOG_WARNING, "Failed to enable direct reads");
627
}
628
629
static int
630
init_remote(struct hast_resource *res, struct proto_conn **inp,
631
struct proto_conn **outp)
632
{
633
struct proto_conn *in, *out;
634
struct nv *nvout, *nvin;
635
const unsigned char *token;
636
unsigned char *map;
637
const char *errmsg;
638
int32_t extentsize;
639
int64_t datasize;
640
uint32_t mapsize;
641
uint8_t version;
642
size_t size;
643
int error;
644
645
PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
646
PJDLOG_ASSERT(real_remote(res));
647
648
in = out = NULL;
649
errmsg = NULL;
650
651
if (primary_connect(res, &out) == -1)
652
return (ECONNREFUSED);
653
654
error = ECONNABORTED;
655
656
/*
657
* First handshake step.
658
* Setup outgoing connection with remote node.
659
*/
660
nvout = nv_alloc();
661
nv_add_string(nvout, res->hr_name, "resource");
662
nv_add_uint8(nvout, HAST_PROTO_VERSION, "version");
663
if (nv_error(nvout) != 0) {
664
pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
665
"Unable to allocate header for connection with %s",
666
res->hr_remoteaddr);
667
nv_free(nvout);
668
goto close;
669
}
670
if (hast_proto_send(res, out, nvout, NULL, 0) == -1) {
671
pjdlog_errno(LOG_WARNING,
672
"Unable to send handshake header to %s",
673
res->hr_remoteaddr);
674
nv_free(nvout);
675
goto close;
676
}
677
nv_free(nvout);
678
if (hast_proto_recv_hdr(out, &nvin) == -1) {
679
pjdlog_errno(LOG_WARNING,
680
"Unable to receive handshake header from %s",
681
res->hr_remoteaddr);
682
goto close;
683
}
684
errmsg = nv_get_string(nvin, "errmsg");
685
if (errmsg != NULL) {
686
pjdlog_warning("%s", errmsg);
687
if (nv_exists(nvin, "wait"))
688
error = EBUSY;
689
nv_free(nvin);
690
goto close;
691
}
692
version = nv_get_uint8(nvin, "version");
693
if (version == 0) {
694
/*
695
* If no version is sent, it means this is protocol version 1.
696
*/
697
version = 1;
698
}
699
if (version > HAST_PROTO_VERSION) {
700
pjdlog_warning("Invalid version received (%hhu).", version);
701
nv_free(nvin);
702
goto close;
703
}
704
res->hr_version = version;
705
pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version);
706
token = nv_get_uint8_array(nvin, &size, "token");
707
if (token == NULL) {
708
pjdlog_warning("Handshake header from %s has no 'token' field.",
709
res->hr_remoteaddr);
710
nv_free(nvin);
711
goto close;
712
}
713
if (size != sizeof(res->hr_token)) {
714
pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
715
res->hr_remoteaddr, size, sizeof(res->hr_token));
716
nv_free(nvin);
717
goto close;
718
}
719
bcopy(token, res->hr_token, sizeof(res->hr_token));
720
nv_free(nvin);
721
722
/*
723
* Second handshake step.
724
* Setup incoming connection with remote node.
725
*/
726
if (primary_connect(res, &in) == -1)
727
goto close;
728
729
nvout = nv_alloc();
730
nv_add_string(nvout, res->hr_name, "resource");
731
nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
732
"token");
733
if (res->hr_resuid == 0) {
734
/*
735
* The resuid field was not yet initialized.
736
* Because we do synchronization inside init_resuid(), it is
737
* possible that someone already initialized it, the function
738
* will return false then, but if we successfully initialized
739
* it, we will get true. True means that there were no writes
740
* to this resource yet and we want to inform secondary that
741
* synchronization is not needed by sending "virgin" argument.
742
*/
743
if (init_resuid(res))
744
nv_add_int8(nvout, 1, "virgin");
745
}
746
nv_add_uint64(nvout, res->hr_resuid, "resuid");
747
nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
748
nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
749
if (nv_error(nvout) != 0) {
750
pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
751
"Unable to allocate header for connection with %s",
752
res->hr_remoteaddr);
753
nv_free(nvout);
754
goto close;
755
}
756
if (hast_proto_send(res, in, nvout, NULL, 0) == -1) {
757
pjdlog_errno(LOG_WARNING,
758
"Unable to send handshake header to %s",
759
res->hr_remoteaddr);
760
nv_free(nvout);
761
goto close;
762
}
763
nv_free(nvout);
764
if (hast_proto_recv_hdr(out, &nvin) == -1) {
765
pjdlog_errno(LOG_WARNING,
766
"Unable to receive handshake header from %s",
767
res->hr_remoteaddr);
768
goto close;
769
}
770
errmsg = nv_get_string(nvin, "errmsg");
771
if (errmsg != NULL) {
772
pjdlog_warning("%s", errmsg);
773
nv_free(nvin);
774
goto close;
775
}
776
datasize = nv_get_int64(nvin, "datasize");
777
if (datasize != res->hr_datasize) {
778
pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
779
(intmax_t)res->hr_datasize, (intmax_t)datasize);
780
nv_free(nvin);
781
goto close;
782
}
783
extentsize = nv_get_int32(nvin, "extentsize");
784
if (extentsize != res->hr_extentsize) {
785
pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
786
(ssize_t)res->hr_extentsize, (ssize_t)extentsize);
787
nv_free(nvin);
788
goto close;
789
}
790
res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
791
res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
792
res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
793
if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY)
794
enable_direct_reads(res);
795
if (nv_exists(nvin, "virgin")) {
796
/*
797
* Secondary was reinitialized, bump localcnt if it is 0 as
798
* only we have the data.
799
*/
800
PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY);
801
PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
802
803
if (res->hr_primary_localcnt == 0) {
804
PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0);
805
806
mtx_lock(&metadata_lock);
807
res->hr_primary_localcnt++;
808
pjdlog_debug(1, "Increasing localcnt to %ju.",
809
(uintmax_t)res->hr_primary_localcnt);
810
(void)metadata_write(res);
811
mtx_unlock(&metadata_lock);
812
}
813
}
814
map = NULL;
815
mapsize = nv_get_uint32(nvin, "mapsize");
816
if (mapsize > 0) {
817
map = malloc(mapsize);
818
if (map == NULL) {
819
pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
820
(uintmax_t)mapsize);
821
nv_free(nvin);
822
goto close;
823
}
824
/*
825
* Remote node have some dirty extents on its own, lets
826
* download its activemap.
827
*/
828
if (hast_proto_recv_data(res, out, nvin, map,
829
mapsize) == -1) {
830
pjdlog_errno(LOG_ERR,
831
"Unable to receive remote activemap");
832
nv_free(nvin);
833
free(map);
834
goto close;
835
}
836
mtx_lock(&res->hr_amp_lock);
837
/*
838
* Merge local and remote bitmaps.
839
*/
840
activemap_merge(res->hr_amp, map, mapsize);
841
free(map);
842
/*
843
* Now that we merged bitmaps from both nodes, flush it to the
844
* disk before we start to synchronize.
845
*/
846
(void)hast_activemap_flush(res);
847
}
848
nv_free(nvin);
849
#ifdef notyet
850
/* Setup directions. */
851
if (proto_send(out, NULL, 0) == -1)
852
pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
853
if (proto_recv(in, NULL, 0) == -1)
854
pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
855
#endif
856
pjdlog_info("Connected to %s.", res->hr_remoteaddr);
857
if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC &&
858
res->hr_version < 2) {
859
pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode.");
860
res->hr_replication = HAST_REPLICATION_FULLSYNC;
861
} else if (res->hr_replication != res->hr_original_replication) {
862
/*
863
* This is in case hastd disconnected and was upgraded.
864
*/
865
res->hr_replication = res->hr_original_replication;
866
}
867
if (inp != NULL && outp != NULL) {
868
*inp = in;
869
*outp = out;
870
} else {
871
res->hr_remotein = in;
872
res->hr_remoteout = out;
873
}
874
event_send(res, EVENT_CONNECT);
875
return (0);
876
close:
877
if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
878
event_send(res, EVENT_SPLITBRAIN);
879
proto_close(out);
880
if (in != NULL)
881
proto_close(in);
882
return (error);
883
}
884
885
static void
886
sync_start(void)
887
{
888
889
mtx_lock(&sync_lock);
890
sync_inprogress = true;
891
mtx_unlock(&sync_lock);
892
cv_signal(&sync_cond);
893
}
894
895
static void
896
sync_stop(void)
897
{
898
899
mtx_lock(&sync_lock);
900
if (sync_inprogress)
901
sync_inprogress = false;
902
mtx_unlock(&sync_lock);
903
}
904
905
static void
906
init_ggate(struct hast_resource *res)
907
{
908
struct g_gate_ctl_create ggiocreate;
909
struct g_gate_ctl_cancel ggiocancel;
910
911
/*
912
* We communicate with ggate via /dev/ggctl. Open it.
913
*/
914
res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
915
if (res->hr_ggatefd == -1)
916
primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
917
/*
918
* Create provider before trying to connect, as connection failure
919
* is not critical, but may take some time.
920
*/
921
bzero(&ggiocreate, sizeof(ggiocreate));
922
ggiocreate.gctl_version = G_GATE_VERSION;
923
ggiocreate.gctl_mediasize = res->hr_datasize;
924
ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
925
ggiocreate.gctl_flags = 0;
926
ggiocreate.gctl_maxcount = 0;
927
ggiocreate.gctl_timeout = 0;
928
ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
929
snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
930
res->hr_provname);
931
if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
932
pjdlog_info("Device hast/%s created.", res->hr_provname);
933
res->hr_ggateunit = ggiocreate.gctl_unit;
934
return;
935
}
936
if (errno != EEXIST) {
937
primary_exit(EX_OSERR, "Unable to create hast/%s device",
938
res->hr_provname);
939
}
940
pjdlog_debug(1,
941
"Device hast/%s already exists, we will try to take it over.",
942
res->hr_provname);
943
/*
944
* If we received EEXIST, we assume that the process who created the
945
* provider died and didn't clean up. In that case we will start from
946
* where he left of.
947
*/
948
bzero(&ggiocancel, sizeof(ggiocancel));
949
ggiocancel.gctl_version = G_GATE_VERSION;
950
ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
951
snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
952
res->hr_provname);
953
if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
954
pjdlog_info("Device hast/%s recovered.", res->hr_provname);
955
res->hr_ggateunit = ggiocancel.gctl_unit;
956
return;
957
}
958
primary_exit(EX_OSERR, "Unable to take over hast/%s device",
959
res->hr_provname);
960
}
961
962
void
963
hastd_primary(struct hast_resource *res)
964
{
965
pthread_t td;
966
pid_t pid;
967
int error, mode, debuglevel;
968
969
/*
970
* Create communication channel for sending control commands from
971
* parent to child.
972
*/
973
if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
974
/* TODO: There's no need for this to be fatal error. */
975
KEEP_ERRNO((void)pidfile_remove(pfh));
976
pjdlog_exit(EX_OSERR,
977
"Unable to create control sockets between parent and child");
978
}
979
/*
980
* Create communication channel for sending events from child to parent.
981
*/
982
if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
983
/* TODO: There's no need for this to be fatal error. */
984
KEEP_ERRNO((void)pidfile_remove(pfh));
985
pjdlog_exit(EX_OSERR,
986
"Unable to create event sockets between child and parent");
987
}
988
/*
989
* Create communication channel for sending connection requests from
990
* child to parent.
991
*/
992
if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) {
993
/* TODO: There's no need for this to be fatal error. */
994
KEEP_ERRNO((void)pidfile_remove(pfh));
995
pjdlog_exit(EX_OSERR,
996
"Unable to create connection sockets between child and parent");
997
}
998
999
pid = fork();
1000
if (pid == -1) {
1001
/* TODO: There's no need for this to be fatal error. */
1002
KEEP_ERRNO((void)pidfile_remove(pfh));
1003
pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
1004
}
1005
1006
if (pid > 0) {
1007
/* This is parent. */
1008
/* Declare that we are receiver. */
1009
proto_recv(res->hr_event, NULL, 0);
1010
proto_recv(res->hr_conn, NULL, 0);
1011
/* Declare that we are sender. */
1012
proto_send(res->hr_ctrl, NULL, 0);
1013
res->hr_workerpid = pid;
1014
return;
1015
}
1016
1017
gres = res;
1018
res->output_status_aux = output_status_aux;
1019
mode = pjdlog_mode_get();
1020
debuglevel = pjdlog_debug_get();
1021
1022
/* Declare that we are sender. */
1023
proto_send(res->hr_event, NULL, 0);
1024
proto_send(res->hr_conn, NULL, 0);
1025
/* Declare that we are receiver. */
1026
proto_recv(res->hr_ctrl, NULL, 0);
1027
descriptors_cleanup(res);
1028
1029
descriptors_assert(res, mode);
1030
1031
pjdlog_init(mode);
1032
pjdlog_debug_set(debuglevel);
1033
pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
1034
setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
1035
1036
init_local(res);
1037
init_ggate(res);
1038
init_environment(res);
1039
1040
if (drop_privs(res) != 0) {
1041
cleanup(res);
1042
exit(EX_CONFIG);
1043
}
1044
pjdlog_info("Privileges successfully dropped.");
1045
1046
/*
1047
* Create the guard thread first, so we can handle signals from the
1048
* very beginning.
1049
*/
1050
error = pthread_create(&td, NULL, guard_thread, res);
1051
PJDLOG_ASSERT(error == 0);
1052
/*
1053
* Create the control thread before sending any event to the parent,
1054
* as we can deadlock when parent sends control request to worker,
1055
* but worker has no control thread started yet, so parent waits.
1056
* In the meantime worker sends an event to the parent, but parent
1057
* is unable to handle the event, because it waits for control
1058
* request response.
1059
*/
1060
error = pthread_create(&td, NULL, ctrl_thread, res);
1061
PJDLOG_ASSERT(error == 0);
1062
if (real_remote(res)) {
1063
error = init_remote(res, NULL, NULL);
1064
if (error == 0) {
1065
sync_start();
1066
} else if (error == EBUSY) {
1067
time_t start = time(NULL);
1068
1069
pjdlog_warning("Waiting for remote node to become %s for %ds.",
1070
role2str(HAST_ROLE_SECONDARY),
1071
res->hr_timeout);
1072
for (;;) {
1073
sleep(1);
1074
error = init_remote(res, NULL, NULL);
1075
if (error != EBUSY)
1076
break;
1077
if (time(NULL) > start + res->hr_timeout)
1078
break;
1079
}
1080
if (error == EBUSY) {
1081
pjdlog_warning("Remote node is still %s, starting anyway.",
1082
role2str(HAST_ROLE_PRIMARY));
1083
}
1084
}
1085
}
1086
error = pthread_create(&td, NULL, ggate_recv_thread, res);
1087
PJDLOG_ASSERT(error == 0);
1088
error = pthread_create(&td, NULL, local_send_thread, res);
1089
PJDLOG_ASSERT(error == 0);
1090
error = pthread_create(&td, NULL, remote_send_thread, res);
1091
PJDLOG_ASSERT(error == 0);
1092
error = pthread_create(&td, NULL, remote_recv_thread, res);
1093
PJDLOG_ASSERT(error == 0);
1094
error = pthread_create(&td, NULL, ggate_send_thread, res);
1095
PJDLOG_ASSERT(error == 0);
1096
fullystarted = true;
1097
(void)sync_thread(res);
1098
}
1099
1100
static void
1101
reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio,
1102
const char *fmt, ...)
1103
{
1104
char msg[1024];
1105
va_list ap;
1106
1107
va_start(ap, fmt);
1108
(void)vsnprintf(msg, sizeof(msg), fmt, ap);
1109
va_end(ap);
1110
switch (ggio->gctl_cmd) {
1111
case BIO_READ:
1112
(void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
1113
(uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1114
break;
1115
case BIO_DELETE:
1116
(void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
1117
(uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1118
break;
1119
case BIO_FLUSH:
1120
(void)snprlcat(msg, sizeof(msg), "FLUSH.");
1121
break;
1122
case BIO_WRITE:
1123
(void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
1124
(uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1125
break;
1126
default:
1127
(void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
1128
(unsigned int)ggio->gctl_cmd);
1129
break;
1130
}
1131
pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
1132
}
1133
1134
static void
1135
remote_close(struct hast_resource *res, int ncomp)
1136
{
1137
1138
rw_wlock(&hio_remote_lock[ncomp]);
1139
/*
1140
* Check for a race between dropping rlock and acquiring wlock -
1141
* another thread can close connection in-between.
1142
*/
1143
if (!ISCONNECTED(res, ncomp)) {
1144
PJDLOG_ASSERT(res->hr_remotein == NULL);
1145
PJDLOG_ASSERT(res->hr_remoteout == NULL);
1146
rw_unlock(&hio_remote_lock[ncomp]);
1147
return;
1148
}
1149
1150
PJDLOG_ASSERT(res->hr_remotein != NULL);
1151
PJDLOG_ASSERT(res->hr_remoteout != NULL);
1152
1153
pjdlog_debug(2, "Closing incoming connection to %s.",
1154
res->hr_remoteaddr);
1155
proto_close(res->hr_remotein);
1156
res->hr_remotein = NULL;
1157
pjdlog_debug(2, "Closing outgoing connection to %s.",
1158
res->hr_remoteaddr);
1159
proto_close(res->hr_remoteout);
1160
res->hr_remoteout = NULL;
1161
1162
rw_unlock(&hio_remote_lock[ncomp]);
1163
1164
pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
1165
1166
/*
1167
* Stop synchronization if in-progress.
1168
*/
1169
sync_stop();
1170
1171
event_send(res, EVENT_DISCONNECT);
1172
}
1173
1174
/*
1175
* Acknowledge write completion to the kernel, but don't update activemap yet.
1176
*/
1177
static void
1178
write_complete(struct hast_resource *res, struct hio *hio)
1179
{
1180
struct g_gate_ctl_io *ggio;
1181
unsigned int ncomp;
1182
1183
PJDLOG_ASSERT(!hio->hio_done);
1184
1185
ggio = &hio->hio_ggio;
1186
PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
1187
1188
/*
1189
* Bump local count if this is first write after
1190
* connection failure with remote node.
1191
*/
1192
ncomp = 1;
1193
rw_rlock(&hio_remote_lock[ncomp]);
1194
if (!ISCONNECTED(res, ncomp)) {
1195
mtx_lock(&metadata_lock);
1196
if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
1197
res->hr_primary_localcnt++;
1198
pjdlog_debug(1, "Increasing localcnt to %ju.",
1199
(uintmax_t)res->hr_primary_localcnt);
1200
(void)metadata_write(res);
1201
}
1202
mtx_unlock(&metadata_lock);
1203
}
1204
rw_unlock(&hio_remote_lock[ncomp]);
1205
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1)
1206
primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1207
hio->hio_done = true;
1208
}
1209
1210
/*
1211
* Thread receives ggate I/O requests from the kernel and passes them to
1212
* appropriate threads:
1213
* WRITE - always goes to both local_send and remote_send threads
1214
* READ (when the block is up-to-date on local component) -
1215
* only local_send thread
1216
* READ (when the block isn't up-to-date on local component) -
1217
* only remote_send thread
1218
* DELETE - always goes to both local_send and remote_send threads
1219
* FLUSH - always goes to both local_send and remote_send threads
1220
*/
1221
static void *
1222
ggate_recv_thread(void *arg)
1223
{
1224
struct hast_resource *res = arg;
1225
struct g_gate_ctl_io *ggio;
1226
struct hio *hio;
1227
unsigned int ii, ncomp, ncomps;
1228
int error;
1229
1230
for (;;) {
1231
pjdlog_debug(2, "ggate_recv: Taking free request.");
1232
QUEUE_TAKE2(hio, free);
1233
pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
1234
ggio = &hio->hio_ggio;
1235
ggio->gctl_unit = res->hr_ggateunit;
1236
ggio->gctl_length = MAXPHYS;
1237
ggio->gctl_error = 0;
1238
hio->hio_done = false;
1239
hio->hio_replication = res->hr_replication;
1240
pjdlog_debug(2,
1241
"ggate_recv: (%p) Waiting for request from the kernel.",
1242
hio);
1243
if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) {
1244
if (sigexit_received)
1245
pthread_exit(NULL);
1246
primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
1247
}
1248
error = ggio->gctl_error;
1249
switch (error) {
1250
case 0:
1251
break;
1252
case ECANCELED:
1253
/* Exit gracefully. */
1254
if (!sigexit_received) {
1255
pjdlog_debug(2,
1256
"ggate_recv: (%p) Received cancel from the kernel.",
1257
hio);
1258
pjdlog_info("Received cancel from the kernel, exiting.");
1259
}
1260
pthread_exit(NULL);
1261
case ENOMEM:
1262
/*
1263
* Buffer too small? Impossible, we allocate MAXPHYS
1264
* bytes - request can't be bigger than that.
1265
*/
1266
/* FALLTHROUGH */
1267
case ENXIO:
1268
default:
1269
primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
1270
strerror(error));
1271
}
1272
1273
ncomp = 0;
1274
ncomps = HAST_NCOMPONENTS;
1275
1276
for (ii = 0; ii < ncomps; ii++)
1277
hio->hio_errors[ii] = EINVAL;
1278
reqlog(LOG_DEBUG, 2, ggio,
1279
"ggate_recv: (%p) Request received from the kernel: ",
1280
hio);
1281
1282
/*
1283
* Inform all components about new write request.
1284
* For read request prefer local component unless the given
1285
* range is out-of-date, then use remote component.
1286
*/
1287
switch (ggio->gctl_cmd) {
1288
case BIO_READ:
1289
res->hr_stat_read++;
1290
ncomps = 1;
1291
mtx_lock(&metadata_lock);
1292
if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
1293
res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1294
/*
1295
* This range is up-to-date on local component,
1296
* so handle request locally.
1297
*/
1298
/* Local component is 0 for now. */
1299
ncomp = 0;
1300
} else /* if (res->hr_syncsrc ==
1301
HAST_SYNCSRC_SECONDARY) */ {
1302
PJDLOG_ASSERT(res->hr_syncsrc ==
1303
HAST_SYNCSRC_SECONDARY);
1304
/*
1305
* This range is out-of-date on local component,
1306
* so send request to the remote node.
1307
*/
1308
/* Remote component is 1 for now. */
1309
ncomp = 1;
1310
}
1311
mtx_unlock(&metadata_lock);
1312
break;
1313
case BIO_WRITE:
1314
res->hr_stat_write++;
1315
if (res->hr_resuid == 0 &&
1316
res->hr_primary_localcnt == 0) {
1317
/* This is first write. */
1318
res->hr_primary_localcnt = 1;
1319
}
1320
for (;;) {
1321
mtx_lock(&range_lock);
1322
if (rangelock_islocked(range_sync,
1323
ggio->gctl_offset, ggio->gctl_length)) {
1324
pjdlog_debug(2,
1325
"regular: Range offset=%jd length=%zu locked.",
1326
(intmax_t)ggio->gctl_offset,
1327
(size_t)ggio->gctl_length);
1328
range_regular_wait = true;
1329
cv_wait(&range_regular_cond, &range_lock);
1330
range_regular_wait = false;
1331
mtx_unlock(&range_lock);
1332
continue;
1333
}
1334
if (rangelock_add(range_regular,
1335
ggio->gctl_offset, ggio->gctl_length) == -1) {
1336
mtx_unlock(&range_lock);
1337
pjdlog_debug(2,
1338
"regular: Range offset=%jd length=%zu is already locked, waiting.",
1339
(intmax_t)ggio->gctl_offset,
1340
(size_t)ggio->gctl_length);
1341
sleep(1);
1342
continue;
1343
}
1344
mtx_unlock(&range_lock);
1345
break;
1346
}
1347
mtx_lock(&res->hr_amp_lock);
1348
if (activemap_write_start(res->hr_amp,
1349
ggio->gctl_offset, ggio->gctl_length)) {
1350
res->hr_stat_activemap_update++;
1351
(void)hast_activemap_flush(res);
1352
} else {
1353
mtx_unlock(&res->hr_amp_lock);
1354
}
1355
if (ISMEMSYNC(hio)) {
1356
hio->hio_memsyncacked = false;
1357
refcnt_init(&hio->hio_writecount, ncomps);
1358
}
1359
break;
1360
case BIO_DELETE:
1361
res->hr_stat_delete++;
1362
break;
1363
case BIO_FLUSH:
1364
res->hr_stat_flush++;
1365
break;
1366
}
1367
pjdlog_debug(2,
1368
"ggate_recv: (%p) Moving request to the send queues.", hio);
1369
refcnt_init(&hio->hio_countdown, ncomps);
1370
for (ii = ncomp; ii < ncomps; ii++)
1371
QUEUE_INSERT1(hio, send, ii);
1372
}
1373
/* NOTREACHED */
1374
return (NULL);
1375
}
1376
1377
/*
1378
* Thread reads from or writes to local component.
1379
* If local read fails, it redirects it to remote_send thread.
1380
*/
1381
static void *
1382
local_send_thread(void *arg)
1383
{
1384
struct hast_resource *res = arg;
1385
struct g_gate_ctl_io *ggio;
1386
struct hio *hio;
1387
unsigned int ncomp, rncomp;
1388
ssize_t ret;
1389
1390
/* Local component is 0 for now. */
1391
ncomp = 0;
1392
/* Remote component is 1 for now. */
1393
rncomp = 1;
1394
1395
for (;;) {
1396
pjdlog_debug(2, "local_send: Taking request.");
1397
QUEUE_TAKE1(hio, send, ncomp, 0);
1398
pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1399
ggio = &hio->hio_ggio;
1400
switch (ggio->gctl_cmd) {
1401
case BIO_READ:
1402
ret = pread(res->hr_localfd, ggio->gctl_data,
1403
ggio->gctl_length,
1404
ggio->gctl_offset + res->hr_localoff);
1405
if (ret == ggio->gctl_length)
1406
hio->hio_errors[ncomp] = 0;
1407
else if (!ISSYNCREQ(hio)) {
1408
/*
1409
* If READ failed, try to read from remote node.
1410
*/
1411
if (ret == -1) {
1412
reqlog(LOG_WARNING, 0, ggio,
1413
"Local request failed (%s), trying remote node. ",
1414
strerror(errno));
1415
} else if (ret != ggio->gctl_length) {
1416
reqlog(LOG_WARNING, 0, ggio,
1417
"Local request failed (%zd != %jd), trying remote node. ",
1418
ret, (intmax_t)ggio->gctl_length);
1419
}
1420
QUEUE_INSERT1(hio, send, rncomp);
1421
continue;
1422
}
1423
break;
1424
case BIO_WRITE:
1425
ret = pwrite(res->hr_localfd, ggio->gctl_data,
1426
ggio->gctl_length,
1427
ggio->gctl_offset + res->hr_localoff);
1428
if (ret == -1) {
1429
hio->hio_errors[ncomp] = errno;
1430
reqlog(LOG_WARNING, 0, ggio,
1431
"Local request failed (%s): ",
1432
strerror(errno));
1433
} else if (ret != ggio->gctl_length) {
1434
hio->hio_errors[ncomp] = EIO;
1435
reqlog(LOG_WARNING, 0, ggio,
1436
"Local request failed (%zd != %jd): ",
1437
ret, (intmax_t)ggio->gctl_length);
1438
} else {
1439
hio->hio_errors[ncomp] = 0;
1440
if (ISASYNC(hio)) {
1441
ggio->gctl_error = 0;
1442
write_complete(res, hio);
1443
}
1444
}
1445
break;
1446
case BIO_DELETE:
1447
ret = g_delete(res->hr_localfd,
1448
ggio->gctl_offset + res->hr_localoff,
1449
ggio->gctl_length);
1450
if (ret == -1) {
1451
hio->hio_errors[ncomp] = errno;
1452
reqlog(LOG_WARNING, 0, ggio,
1453
"Local request failed (%s): ",
1454
strerror(errno));
1455
} else {
1456
hio->hio_errors[ncomp] = 0;
1457
}
1458
break;
1459
case BIO_FLUSH:
1460
if (!res->hr_localflush) {
1461
ret = -1;
1462
errno = EOPNOTSUPP;
1463
break;
1464
}
1465
ret = g_flush(res->hr_localfd);
1466
if (ret == -1) {
1467
if (errno == EOPNOTSUPP)
1468
res->hr_localflush = false;
1469
hio->hio_errors[ncomp] = errno;
1470
reqlog(LOG_WARNING, 0, ggio,
1471
"Local request failed (%s): ",
1472
strerror(errno));
1473
} else {
1474
hio->hio_errors[ncomp] = 0;
1475
}
1476
break;
1477
}
1478
if (ISMEMSYNCWRITE(hio)) {
1479
if (refcnt_release(&hio->hio_writecount) == 0) {
1480
write_complete(res, hio);
1481
}
1482
}
1483
if (refcnt_release(&hio->hio_countdown) > 0)
1484
continue;
1485
if (ISSYNCREQ(hio)) {
1486
mtx_lock(&sync_lock);
1487
SYNCREQDONE(hio);
1488
mtx_unlock(&sync_lock);
1489
cv_signal(&sync_cond);
1490
} else {
1491
pjdlog_debug(2,
1492
"local_send: (%p) Moving request to the done queue.",
1493
hio);
1494
QUEUE_INSERT2(hio, done);
1495
}
1496
}
1497
/* NOTREACHED */
1498
return (NULL);
1499
}
1500
1501
static void
1502
keepalive_send(struct hast_resource *res, unsigned int ncomp)
1503
{
1504
struct nv *nv;
1505
1506
rw_rlock(&hio_remote_lock[ncomp]);
1507
1508
if (!ISCONNECTED(res, ncomp)) {
1509
rw_unlock(&hio_remote_lock[ncomp]);
1510
return;
1511
}
1512
1513
PJDLOG_ASSERT(res->hr_remotein != NULL);
1514
PJDLOG_ASSERT(res->hr_remoteout != NULL);
1515
1516
nv = nv_alloc();
1517
nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
1518
if (nv_error(nv) != 0) {
1519
rw_unlock(&hio_remote_lock[ncomp]);
1520
nv_free(nv);
1521
pjdlog_debug(1,
1522
"keepalive_send: Unable to prepare header to send.");
1523
return;
1524
}
1525
if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) {
1526
rw_unlock(&hio_remote_lock[ncomp]);
1527
pjdlog_common(LOG_DEBUG, 1, errno,
1528
"keepalive_send: Unable to send request");
1529
nv_free(nv);
1530
remote_close(res, ncomp);
1531
return;
1532
}
1533
1534
rw_unlock(&hio_remote_lock[ncomp]);
1535
nv_free(nv);
1536
pjdlog_debug(2, "keepalive_send: Request sent.");
1537
}
1538
1539
/*
1540
* Thread sends request to secondary node.
1541
*/
1542
static void *
1543
remote_send_thread(void *arg)
1544
{
1545
struct hast_resource *res = arg;
1546
struct g_gate_ctl_io *ggio;
1547
time_t lastcheck, now;
1548
struct hio *hio;
1549
struct nv *nv;
1550
unsigned int ncomp;
1551
bool wakeup;
1552
uint64_t offset, length;
1553
uint8_t cmd;
1554
void *data;
1555
1556
/* Remote component is 1 for now. */
1557
ncomp = 1;
1558
lastcheck = time(NULL);
1559
1560
for (;;) {
1561
pjdlog_debug(2, "remote_send: Taking request.");
1562
QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE);
1563
if (hio == NULL) {
1564
now = time(NULL);
1565
if (lastcheck + HAST_KEEPALIVE <= now) {
1566
keepalive_send(res, ncomp);
1567
lastcheck = now;
1568
}
1569
continue;
1570
}
1571
pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1572
ggio = &hio->hio_ggio;
1573
switch (ggio->gctl_cmd) {
1574
case BIO_READ:
1575
cmd = HIO_READ;
1576
data = NULL;
1577
offset = ggio->gctl_offset;
1578
length = ggio->gctl_length;
1579
break;
1580
case BIO_WRITE:
1581
cmd = HIO_WRITE;
1582
data = ggio->gctl_data;
1583
offset = ggio->gctl_offset;
1584
length = ggio->gctl_length;
1585
break;
1586
case BIO_DELETE:
1587
cmd = HIO_DELETE;
1588
data = NULL;
1589
offset = ggio->gctl_offset;
1590
length = ggio->gctl_length;
1591
break;
1592
case BIO_FLUSH:
1593
cmd = HIO_FLUSH;
1594
data = NULL;
1595
offset = 0;
1596
length = 0;
1597
break;
1598
default:
1599
PJDLOG_ABORT("invalid condition");
1600
}
1601
nv = nv_alloc();
1602
nv_add_uint8(nv, cmd, "cmd");
1603
nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1604
nv_add_uint64(nv, offset, "offset");
1605
nv_add_uint64(nv, length, "length");
1606
if (ISMEMSYNCWRITE(hio))
1607
nv_add_uint8(nv, 1, "memsync");
1608
if (nv_error(nv) != 0) {
1609
hio->hio_errors[ncomp] = nv_error(nv);
1610
pjdlog_debug(2,
1611
"remote_send: (%p) Unable to prepare header to send.",
1612
hio);
1613
reqlog(LOG_ERR, 0, ggio,
1614
"Unable to prepare header to send (%s): ",
1615
strerror(nv_error(nv)));
1616
/* Move failed request immediately to the done queue. */
1617
goto done_queue;
1618
}
1619
/*
1620
* Protect connection from disappearing.
1621
*/
1622
rw_rlock(&hio_remote_lock[ncomp]);
1623
if (!ISCONNECTED(res, ncomp)) {
1624
rw_unlock(&hio_remote_lock[ncomp]);
1625
hio->hio_errors[ncomp] = ENOTCONN;
1626
goto done_queue;
1627
}
1628
/*
1629
* Move the request to recv queue before sending it, because
1630
* in different order we can get reply before we move request
1631
* to recv queue.
1632
*/
1633
pjdlog_debug(2,
1634
"remote_send: (%p) Moving request to the recv queue.",
1635
hio);
1636
mtx_lock(&hio_recv_list_lock[ncomp]);
1637
wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1638
TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1639
hio_recv_list_size[ncomp]++;
1640
mtx_unlock(&hio_recv_list_lock[ncomp]);
1641
if (hast_proto_send(res, res->hr_remoteout, nv, data,
1642
data != NULL ? length : 0) == -1) {
1643
hio->hio_errors[ncomp] = errno;
1644
rw_unlock(&hio_remote_lock[ncomp]);
1645
pjdlog_debug(2,
1646
"remote_send: (%p) Unable to send request.", hio);
1647
reqlog(LOG_ERR, 0, ggio,
1648
"Unable to send request (%s): ",
1649
strerror(hio->hio_errors[ncomp]));
1650
remote_close(res, ncomp);
1651
} else {
1652
rw_unlock(&hio_remote_lock[ncomp]);
1653
}
1654
nv_free(nv);
1655
if (wakeup)
1656
cv_signal(&hio_recv_list_cond[ncomp]);
1657
continue;
1658
done_queue:
1659
nv_free(nv);
1660
if (ISSYNCREQ(hio)) {
1661
if (refcnt_release(&hio->hio_countdown) > 0)
1662
continue;
1663
mtx_lock(&sync_lock);
1664
SYNCREQDONE(hio);
1665
mtx_unlock(&sync_lock);
1666
cv_signal(&sync_cond);
1667
continue;
1668
}
1669
if (ggio->gctl_cmd == BIO_WRITE) {
1670
mtx_lock(&res->hr_amp_lock);
1671
if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1672
ggio->gctl_length)) {
1673
(void)hast_activemap_flush(res);
1674
} else {
1675
mtx_unlock(&res->hr_amp_lock);
1676
}
1677
if (ISMEMSYNCWRITE(hio)) {
1678
if (refcnt_release(&hio->hio_writecount) == 0) {
1679
if (hio->hio_errors[0] == 0)
1680
write_complete(res, hio);
1681
}
1682
}
1683
}
1684
if (refcnt_release(&hio->hio_countdown) > 0)
1685
continue;
1686
pjdlog_debug(2,
1687
"remote_send: (%p) Moving request to the done queue.",
1688
hio);
1689
QUEUE_INSERT2(hio, done);
1690
}
1691
/* NOTREACHED */
1692
return (NULL);
1693
}
1694
1695
/*
1696
* Thread receives answer from secondary node and passes it to ggate_send
1697
* thread.
1698
*/
1699
static void *
1700
remote_recv_thread(void *arg)
1701
{
1702
struct hast_resource *res = arg;
1703
struct g_gate_ctl_io *ggio;
1704
struct hio *hio;
1705
struct nv *nv;
1706
unsigned int ncomp;
1707
uint64_t seq;
1708
bool memsyncack;
1709
int error;
1710
1711
/* Remote component is 1 for now. */
1712
ncomp = 1;
1713
1714
for (;;) {
1715
/* Wait until there is anything to receive. */
1716
mtx_lock(&hio_recv_list_lock[ncomp]);
1717
while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1718
pjdlog_debug(2, "remote_recv: No requests, waiting.");
1719
cv_wait(&hio_recv_list_cond[ncomp],
1720
&hio_recv_list_lock[ncomp]);
1721
}
1722
mtx_unlock(&hio_recv_list_lock[ncomp]);
1723
1724
memsyncack = false;
1725
1726
rw_rlock(&hio_remote_lock[ncomp]);
1727
if (!ISCONNECTED(res, ncomp)) {
1728
rw_unlock(&hio_remote_lock[ncomp]);
1729
/*
1730
* Connection is dead, so move all pending requests to
1731
* the done queue (one-by-one).
1732
*/
1733
mtx_lock(&hio_recv_list_lock[ncomp]);
1734
hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1735
PJDLOG_ASSERT(hio != NULL);
1736
TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1737
hio_next[ncomp]);
1738
hio_recv_list_size[ncomp]--;
1739
mtx_unlock(&hio_recv_list_lock[ncomp]);
1740
hio->hio_errors[ncomp] = ENOTCONN;
1741
goto done_queue;
1742
}
1743
if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
1744
pjdlog_errno(LOG_ERR,
1745
"Unable to receive reply header");
1746
rw_unlock(&hio_remote_lock[ncomp]);
1747
remote_close(res, ncomp);
1748
continue;
1749
}
1750
rw_unlock(&hio_remote_lock[ncomp]);
1751
seq = nv_get_uint64(nv, "seq");
1752
if (seq == 0) {
1753
pjdlog_error("Header contains no 'seq' field.");
1754
nv_free(nv);
1755
continue;
1756
}
1757
memsyncack = nv_exists(nv, "received");
1758
mtx_lock(&hio_recv_list_lock[ncomp]);
1759
TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1760
if (hio->hio_ggio.gctl_seq == seq) {
1761
TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1762
hio_next[ncomp]);
1763
hio_recv_list_size[ncomp]--;
1764
break;
1765
}
1766
}
1767
mtx_unlock(&hio_recv_list_lock[ncomp]);
1768
if (hio == NULL) {
1769
pjdlog_error("Found no request matching received 'seq' field (%ju).",
1770
(uintmax_t)seq);
1771
nv_free(nv);
1772
continue;
1773
}
1774
ggio = &hio->hio_ggio;
1775
error = nv_get_int16(nv, "error");
1776
if (error != 0) {
1777
/* Request failed on remote side. */
1778
hio->hio_errors[ncomp] = error;
1779
reqlog(LOG_WARNING, 0, ggio,
1780
"Remote request failed (%s): ", strerror(error));
1781
nv_free(nv);
1782
goto done_queue;
1783
}
1784
switch (ggio->gctl_cmd) {
1785
case BIO_READ:
1786
rw_rlock(&hio_remote_lock[ncomp]);
1787
if (!ISCONNECTED(res, ncomp)) {
1788
rw_unlock(&hio_remote_lock[ncomp]);
1789
nv_free(nv);
1790
goto done_queue;
1791
}
1792
if (hast_proto_recv_data(res, res->hr_remotein, nv,
1793
ggio->gctl_data, ggio->gctl_length) == -1) {
1794
hio->hio_errors[ncomp] = errno;
1795
pjdlog_errno(LOG_ERR,
1796
"Unable to receive reply data");
1797
rw_unlock(&hio_remote_lock[ncomp]);
1798
nv_free(nv);
1799
remote_close(res, ncomp);
1800
goto done_queue;
1801
}
1802
rw_unlock(&hio_remote_lock[ncomp]);
1803
break;
1804
case BIO_WRITE:
1805
case BIO_DELETE:
1806
case BIO_FLUSH:
1807
break;
1808
default:
1809
PJDLOG_ABORT("invalid condition");
1810
}
1811
hio->hio_errors[ncomp] = 0;
1812
nv_free(nv);
1813
done_queue:
1814
if (ISMEMSYNCWRITE(hio)) {
1815
if (!hio->hio_memsyncacked) {
1816
PJDLOG_ASSERT(memsyncack ||
1817
hio->hio_errors[ncomp] != 0);
1818
/* Remote ack arrived. */
1819
if (refcnt_release(&hio->hio_writecount) == 0) {
1820
if (hio->hio_errors[0] == 0)
1821
write_complete(res, hio);
1822
}
1823
hio->hio_memsyncacked = true;
1824
if (hio->hio_errors[ncomp] == 0) {
1825
pjdlog_debug(2,
1826
"remote_recv: (%p) Moving request "
1827
"back to the recv queue.", hio);
1828
mtx_lock(&hio_recv_list_lock[ncomp]);
1829
TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
1830
hio, hio_next[ncomp]);
1831
hio_recv_list_size[ncomp]++;
1832
mtx_unlock(&hio_recv_list_lock[ncomp]);
1833
continue;
1834
}
1835
} else {
1836
PJDLOG_ASSERT(!memsyncack);
1837
/* Remote final reply arrived. */
1838
}
1839
}
1840
if (refcnt_release(&hio->hio_countdown) > 0)
1841
continue;
1842
if (ISSYNCREQ(hio)) {
1843
mtx_lock(&sync_lock);
1844
SYNCREQDONE(hio);
1845
mtx_unlock(&sync_lock);
1846
cv_signal(&sync_cond);
1847
} else {
1848
pjdlog_debug(2,
1849
"remote_recv: (%p) Moving request to the done queue.",
1850
hio);
1851
QUEUE_INSERT2(hio, done);
1852
}
1853
}
1854
/* NOTREACHED */
1855
return (NULL);
1856
}
1857
1858
/*
1859
* Thread sends answer to the kernel.
1860
*/
1861
static void *
1862
ggate_send_thread(void *arg)
1863
{
1864
struct hast_resource *res = arg;
1865
struct g_gate_ctl_io *ggio;
1866
struct hio *hio;
1867
unsigned int ii, ncomps;
1868
1869
ncomps = HAST_NCOMPONENTS;
1870
1871
for (;;) {
1872
pjdlog_debug(2, "ggate_send: Taking request.");
1873
QUEUE_TAKE2(hio, done);
1874
pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1875
ggio = &hio->hio_ggio;
1876
for (ii = 0; ii < ncomps; ii++) {
1877
if (hio->hio_errors[ii] == 0) {
1878
/*
1879
* One successful request is enough to declare
1880
* success.
1881
*/
1882
ggio->gctl_error = 0;
1883
break;
1884
}
1885
}
1886
if (ii == ncomps) {
1887
/*
1888
* None of the requests were successful.
1889
* Use the error from local component except the
1890
* case when we did only remote request.
1891
*/
1892
if (ggio->gctl_cmd == BIO_READ &&
1893
res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
1894
ggio->gctl_error = hio->hio_errors[1];
1895
else
1896
ggio->gctl_error = hio->hio_errors[0];
1897
}
1898
if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1899
mtx_lock(&res->hr_amp_lock);
1900
if (activemap_write_complete(res->hr_amp,
1901
ggio->gctl_offset, ggio->gctl_length)) {
1902
res->hr_stat_activemap_update++;
1903
(void)hast_activemap_flush(res);
1904
} else {
1905
mtx_unlock(&res->hr_amp_lock);
1906
}
1907
}
1908
if (ggio->gctl_cmd == BIO_WRITE) {
1909
/*
1910
* Unlock range we locked.
1911
*/
1912
mtx_lock(&range_lock);
1913
rangelock_del(range_regular, ggio->gctl_offset,
1914
ggio->gctl_length);
1915
if (range_sync_wait)
1916
cv_signal(&range_sync_cond);
1917
mtx_unlock(&range_lock);
1918
if (!hio->hio_done)
1919
write_complete(res, hio);
1920
} else {
1921
if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) {
1922
primary_exit(EX_OSERR,
1923
"G_GATE_CMD_DONE failed");
1924
}
1925
}
1926
if (hio->hio_errors[0]) {
1927
switch (ggio->gctl_cmd) {
1928
case BIO_READ:
1929
res->hr_stat_read_error++;
1930
break;
1931
case BIO_WRITE:
1932
res->hr_stat_write_error++;
1933
break;
1934
case BIO_DELETE:
1935
res->hr_stat_delete_error++;
1936
break;
1937
case BIO_FLUSH:
1938
res->hr_stat_flush_error++;
1939
break;
1940
}
1941
}
1942
pjdlog_debug(2,
1943
"ggate_send: (%p) Moving request to the free queue.", hio);
1944
QUEUE_INSERT2(hio, free);
1945
}
1946
/* NOTREACHED */
1947
return (NULL);
1948
}
1949
1950
/*
1951
* Thread synchronize local and remote components.
1952
*/
1953
static void *
1954
sync_thread(void *arg __unused)
1955
{
1956
struct hast_resource *res = arg;
1957
struct hio *hio;
1958
struct g_gate_ctl_io *ggio;
1959
struct timeval tstart, tend, tdiff;
1960
unsigned int ii, ncomp, ncomps;
1961
off_t offset, length, synced;
1962
bool dorewind, directreads;
1963
int syncext;
1964
1965
ncomps = HAST_NCOMPONENTS;
1966
dorewind = true;
1967
synced = 0;
1968
offset = -1;
1969
directreads = false;
1970
1971
for (;;) {
1972
mtx_lock(&sync_lock);
1973
if (offset >= 0 && !sync_inprogress) {
1974
gettimeofday(&tend, NULL);
1975
timersub(&tend, &tstart, &tdiff);
1976
pjdlog_info("Synchronization interrupted after %#.0T. "
1977
"%NB synchronized so far.", &tdiff,
1978
(intmax_t)synced);
1979
event_send(res, EVENT_SYNCINTR);
1980
}
1981
while (!sync_inprogress) {
1982
dorewind = true;
1983
synced = 0;
1984
cv_wait(&sync_cond, &sync_lock);
1985
}
1986
mtx_unlock(&sync_lock);
1987
/*
1988
* Obtain offset at which we should synchronize.
1989
* Rewind synchronization if needed.
1990
*/
1991
mtx_lock(&res->hr_amp_lock);
1992
if (dorewind)
1993
activemap_sync_rewind(res->hr_amp);
1994
offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1995
if (syncext != -1) {
1996
/*
1997
* We synchronized entire syncext extent, we can mark
1998
* it as clean now.
1999
*/
2000
if (activemap_extent_complete(res->hr_amp, syncext))
2001
(void)hast_activemap_flush(res);
2002
else
2003
mtx_unlock(&res->hr_amp_lock);
2004
} else {
2005
mtx_unlock(&res->hr_amp_lock);
2006
}
2007
if (dorewind) {
2008
dorewind = false;
2009
if (offset == -1)
2010
pjdlog_info("Nodes are in sync.");
2011
else {
2012
pjdlog_info("Synchronization started. %NB to go.",
2013
(intmax_t)(res->hr_extentsize *
2014
activemap_ndirty(res->hr_amp)));
2015
event_send(res, EVENT_SYNCSTART);
2016
gettimeofday(&tstart, NULL);
2017
}
2018
}
2019
if (offset == -1) {
2020
sync_stop();
2021
pjdlog_debug(1, "Nothing to synchronize.");
2022
/*
2023
* Synchronization complete, make both localcnt and
2024
* remotecnt equal.
2025
*/
2026
ncomp = 1;
2027
rw_rlock(&hio_remote_lock[ncomp]);
2028
if (ISCONNECTED(res, ncomp)) {
2029
if (synced > 0) {
2030
int64_t bps;
2031
2032
gettimeofday(&tend, NULL);
2033
timersub(&tend, &tstart, &tdiff);
2034
bps = (int64_t)((double)synced /
2035
((double)tdiff.tv_sec +
2036
(double)tdiff.tv_usec / 1000000));
2037
pjdlog_info("Synchronization complete. "
2038
"%NB synchronized in %#.0lT (%NB/sec).",
2039
(intmax_t)synced, &tdiff,
2040
(intmax_t)bps);
2041
event_send(res, EVENT_SYNCDONE);
2042
}
2043
mtx_lock(&metadata_lock);
2044
if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
2045
directreads = true;
2046
res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
2047
res->hr_primary_localcnt =
2048
res->hr_secondary_remotecnt;
2049
res->hr_primary_remotecnt =
2050
res->hr_secondary_localcnt;
2051
pjdlog_debug(1,
2052
"Setting localcnt to %ju and remotecnt to %ju.",
2053
(uintmax_t)res->hr_primary_localcnt,
2054
(uintmax_t)res->hr_primary_remotecnt);
2055
(void)metadata_write(res);
2056
mtx_unlock(&metadata_lock);
2057
}
2058
rw_unlock(&hio_remote_lock[ncomp]);
2059
if (directreads) {
2060
directreads = false;
2061
enable_direct_reads(res);
2062
}
2063
continue;
2064
}
2065
pjdlog_debug(2, "sync: Taking free request.");
2066
QUEUE_TAKE2(hio, free);
2067
pjdlog_debug(2, "sync: (%p) Got free request.", hio);
2068
/*
2069
* Lock the range we are going to synchronize. We don't want
2070
* race where someone writes between our read and write.
2071
*/
2072
for (;;) {
2073
mtx_lock(&range_lock);
2074
if (rangelock_islocked(range_regular, offset, length)) {
2075
pjdlog_debug(2,
2076
"sync: Range offset=%jd length=%jd locked.",
2077
(intmax_t)offset, (intmax_t)length);
2078
range_sync_wait = true;
2079
cv_wait(&range_sync_cond, &range_lock);
2080
range_sync_wait = false;
2081
mtx_unlock(&range_lock);
2082
continue;
2083
}
2084
if (rangelock_add(range_sync, offset, length) == -1) {
2085
mtx_unlock(&range_lock);
2086
pjdlog_debug(2,
2087
"sync: Range offset=%jd length=%jd is already locked, waiting.",
2088
(intmax_t)offset, (intmax_t)length);
2089
sleep(1);
2090
continue;
2091
}
2092
mtx_unlock(&range_lock);
2093
break;
2094
}
2095
/*
2096
* First read the data from synchronization source.
2097
*/
2098
SYNCREQ(hio);
2099
ggio = &hio->hio_ggio;
2100
ggio->gctl_cmd = BIO_READ;
2101
ggio->gctl_offset = offset;
2102
ggio->gctl_length = length;
2103
ggio->gctl_error = 0;
2104
hio->hio_done = false;
2105
hio->hio_replication = res->hr_replication;
2106
for (ii = 0; ii < ncomps; ii++)
2107
hio->hio_errors[ii] = EINVAL;
2108
reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
2109
hio);
2110
pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2111
hio);
2112
mtx_lock(&metadata_lock);
2113
if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
2114
/*
2115
* This range is up-to-date on local component,
2116
* so handle request locally.
2117
*/
2118
/* Local component is 0 for now. */
2119
ncomp = 0;
2120
} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
2121
PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
2122
/*
2123
* This range is out-of-date on local component,
2124
* so send request to the remote node.
2125
*/
2126
/* Remote component is 1 for now. */
2127
ncomp = 1;
2128
}
2129
mtx_unlock(&metadata_lock);
2130
refcnt_init(&hio->hio_countdown, 1);
2131
QUEUE_INSERT1(hio, send, ncomp);
2132
2133
/*
2134
* Let's wait for READ to finish.
2135
*/
2136
mtx_lock(&sync_lock);
2137
while (!ISSYNCREQDONE(hio))
2138
cv_wait(&sync_cond, &sync_lock);
2139
mtx_unlock(&sync_lock);
2140
2141
if (hio->hio_errors[ncomp] != 0) {
2142
pjdlog_error("Unable to read synchronization data: %s.",
2143
strerror(hio->hio_errors[ncomp]));
2144
goto free_queue;
2145
}
2146
2147
/*
2148
* We read the data from synchronization source, now write it
2149
* to synchronization target.
2150
*/
2151
SYNCREQ(hio);
2152
ggio->gctl_cmd = BIO_WRITE;
2153
for (ii = 0; ii < ncomps; ii++)
2154
hio->hio_errors[ii] = EINVAL;
2155
reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
2156
hio);
2157
pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2158
hio);
2159
mtx_lock(&metadata_lock);
2160
if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
2161
/*
2162
* This range is up-to-date on local component,
2163
* so we update remote component.
2164
*/
2165
/* Remote component is 1 for now. */
2166
ncomp = 1;
2167
} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
2168
PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
2169
/*
2170
* This range is out-of-date on local component,
2171
* so we update it.
2172
*/
2173
/* Local component is 0 for now. */
2174
ncomp = 0;
2175
}
2176
mtx_unlock(&metadata_lock);
2177
2178
pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2179
hio);
2180
refcnt_init(&hio->hio_countdown, 1);
2181
QUEUE_INSERT1(hio, send, ncomp);
2182
2183
/*
2184
* Let's wait for WRITE to finish.
2185
*/
2186
mtx_lock(&sync_lock);
2187
while (!ISSYNCREQDONE(hio))
2188
cv_wait(&sync_cond, &sync_lock);
2189
mtx_unlock(&sync_lock);
2190
2191
if (hio->hio_errors[ncomp] != 0) {
2192
pjdlog_error("Unable to write synchronization data: %s.",
2193
strerror(hio->hio_errors[ncomp]));
2194
goto free_queue;
2195
}
2196
2197
synced += length;
2198
free_queue:
2199
mtx_lock(&range_lock);
2200
rangelock_del(range_sync, offset, length);
2201
if (range_regular_wait)
2202
cv_signal(&range_regular_cond);
2203
mtx_unlock(&range_lock);
2204
pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
2205
hio);
2206
QUEUE_INSERT2(hio, free);
2207
}
2208
/* NOTREACHED */
2209
return (NULL);
2210
}
2211
2212
void
2213
primary_config_reload(struct hast_resource *res, struct nv *nv)
2214
{
2215
unsigned int ii, ncomps;
2216
int modified, vint;
2217
const char *vstr;
2218
2219
pjdlog_info("Reloading configuration...");
2220
2221
PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY);
2222
PJDLOG_ASSERT(gres == res);
2223
nv_assert(nv, "remoteaddr");
2224
nv_assert(nv, "sourceaddr");
2225
nv_assert(nv, "replication");
2226
nv_assert(nv, "checksum");
2227
nv_assert(nv, "compression");
2228
nv_assert(nv, "timeout");
2229
nv_assert(nv, "exec");
2230
nv_assert(nv, "metaflush");
2231
2232
ncomps = HAST_NCOMPONENTS;
2233
2234
#define MODIFIED_REMOTEADDR 0x01
2235
#define MODIFIED_SOURCEADDR 0x02
2236
#define MODIFIED_REPLICATION 0x04
2237
#define MODIFIED_CHECKSUM 0x08
2238
#define MODIFIED_COMPRESSION 0x10
2239
#define MODIFIED_TIMEOUT 0x20
2240
#define MODIFIED_EXEC 0x40
2241
#define MODIFIED_METAFLUSH 0x80
2242
modified = 0;
2243
2244
vstr = nv_get_string(nv, "remoteaddr");
2245
if (strcmp(gres->hr_remoteaddr, vstr) != 0) {
2246
/*
2247
* Don't copy res->hr_remoteaddr to gres just yet.
2248
* We want remote_close() to log disconnect from the old
2249
* addresses, not from the new ones.
2250
*/
2251
modified |= MODIFIED_REMOTEADDR;
2252
}
2253
vstr = nv_get_string(nv, "sourceaddr");
2254
if (strcmp(gres->hr_sourceaddr, vstr) != 0) {
2255
strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr));
2256
modified |= MODIFIED_SOURCEADDR;
2257
}
2258
vint = nv_get_int32(nv, "replication");
2259
if (gres->hr_replication != vint) {
2260
gres->hr_replication = vint;
2261
modified |= MODIFIED_REPLICATION;
2262
}
2263
vint = nv_get_int32(nv, "checksum");
2264
if (gres->hr_checksum != vint) {
2265
gres->hr_checksum = vint;
2266
modified |= MODIFIED_CHECKSUM;
2267
}
2268
vint = nv_get_int32(nv, "compression");
2269
if (gres->hr_compression != vint) {
2270
gres->hr_compression = vint;
2271
modified |= MODIFIED_COMPRESSION;
2272
}
2273
vint = nv_get_int32(nv, "timeout");
2274
if (gres->hr_timeout != vint) {
2275
gres->hr_timeout = vint;
2276
modified |= MODIFIED_TIMEOUT;
2277
}
2278
vstr = nv_get_string(nv, "exec");
2279
if (strcmp(gres->hr_exec, vstr) != 0) {
2280
strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec));
2281
modified |= MODIFIED_EXEC;
2282
}
2283
vint = nv_get_int32(nv, "metaflush");
2284
if (gres->hr_metaflush != vint) {
2285
gres->hr_metaflush = vint;
2286
modified |= MODIFIED_METAFLUSH;
2287
}
2288
2289
/*
2290
* Change timeout for connected sockets.
2291
* Don't bother if we need to reconnect.
2292
*/
2293
if ((modified & MODIFIED_TIMEOUT) != 0 &&
2294
(modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
2295
for (ii = 0; ii < ncomps; ii++) {
2296
if (!ISREMOTE(ii))
2297
continue;
2298
rw_rlock(&hio_remote_lock[ii]);
2299
if (!ISCONNECTED(gres, ii)) {
2300
rw_unlock(&hio_remote_lock[ii]);
2301
continue;
2302
}
2303
rw_unlock(&hio_remote_lock[ii]);
2304
if (proto_timeout(gres->hr_remotein,
2305
gres->hr_timeout) == -1) {
2306
pjdlog_errno(LOG_WARNING,
2307
"Unable to set connection timeout");
2308
}
2309
if (proto_timeout(gres->hr_remoteout,
2310
gres->hr_timeout) == -1) {
2311
pjdlog_errno(LOG_WARNING,
2312
"Unable to set connection timeout");
2313
}
2314
}
2315
}
2316
if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
2317
for (ii = 0; ii < ncomps; ii++) {
2318
if (!ISREMOTE(ii))
2319
continue;
2320
remote_close(gres, ii);
2321
}
2322
if (modified & MODIFIED_REMOTEADDR) {
2323
vstr = nv_get_string(nv, "remoteaddr");
2324
strlcpy(gres->hr_remoteaddr, vstr,
2325
sizeof(gres->hr_remoteaddr));
2326
}
2327
}
2328
#undef MODIFIED_REMOTEADDR
2329
#undef MODIFIED_SOURCEADDR
2330
#undef MODIFIED_REPLICATION
2331
#undef MODIFIED_CHECKSUM
2332
#undef MODIFIED_COMPRESSION
2333
#undef MODIFIED_TIMEOUT
2334
#undef MODIFIED_EXEC
2335
#undef MODIFIED_METAFLUSH
2336
2337
pjdlog_info("Configuration reloaded successfully.");
2338
}
2339
2340
static void
2341
guard_one(struct hast_resource *res, unsigned int ncomp)
2342
{
2343
struct proto_conn *in, *out;
2344
2345
if (!ISREMOTE(ncomp))
2346
return;
2347
2348
rw_rlock(&hio_remote_lock[ncomp]);
2349
2350
if (!real_remote(res)) {
2351
rw_unlock(&hio_remote_lock[ncomp]);
2352
return;
2353
}
2354
2355
if (ISCONNECTED(res, ncomp)) {
2356
PJDLOG_ASSERT(res->hr_remotein != NULL);
2357
PJDLOG_ASSERT(res->hr_remoteout != NULL);
2358
rw_unlock(&hio_remote_lock[ncomp]);
2359
pjdlog_debug(2, "remote_guard: Connection to %s is ok.",
2360
res->hr_remoteaddr);
2361
return;
2362
}
2363
2364
PJDLOG_ASSERT(res->hr_remotein == NULL);
2365
PJDLOG_ASSERT(res->hr_remoteout == NULL);
2366
/*
2367
* Upgrade the lock. It doesn't have to be atomic as no other thread
2368
* can change connection status from disconnected to connected.
2369
*/
2370
rw_unlock(&hio_remote_lock[ncomp]);
2371
pjdlog_debug(2, "remote_guard: Reconnecting to %s.",
2372
res->hr_remoteaddr);
2373
in = out = NULL;
2374
if (init_remote(res, &in, &out) == 0) {
2375
rw_wlock(&hio_remote_lock[ncomp]);
2376
PJDLOG_ASSERT(res->hr_remotein == NULL);
2377
PJDLOG_ASSERT(res->hr_remoteout == NULL);
2378
PJDLOG_ASSERT(in != NULL && out != NULL);
2379
res->hr_remotein = in;
2380
res->hr_remoteout = out;
2381
rw_unlock(&hio_remote_lock[ncomp]);
2382
pjdlog_info("Successfully reconnected to %s.",
2383
res->hr_remoteaddr);
2384
sync_start();
2385
} else {
2386
/* Both connections should be NULL. */
2387
PJDLOG_ASSERT(res->hr_remotein == NULL);
2388
PJDLOG_ASSERT(res->hr_remoteout == NULL);
2389
PJDLOG_ASSERT(in == NULL && out == NULL);
2390
pjdlog_debug(2, "remote_guard: Reconnect to %s failed.",
2391
res->hr_remoteaddr);
2392
}
2393
}
2394
2395
/*
2396
* Thread guards remote connections and reconnects when needed, handles
2397
* signals, etc.
2398
*/
2399
static void *
2400
guard_thread(void *arg)
2401
{
2402
struct hast_resource *res = arg;
2403
unsigned int ii, ncomps;
2404
struct timespec timeout;
2405
time_t lastcheck, now;
2406
sigset_t mask;
2407
int signo;
2408
2409
ncomps = HAST_NCOMPONENTS;
2410
lastcheck = time(NULL);
2411
2412
PJDLOG_VERIFY(sigemptyset(&mask) == 0);
2413
PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
2414
PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
2415
2416
timeout.tv_sec = HAST_KEEPALIVE;
2417
timeout.tv_nsec = 0;
2418
signo = -1;
2419
2420
for (;;) {
2421
switch (signo) {
2422
case SIGINT:
2423
case SIGTERM:
2424
sigexit_received = true;
2425
primary_exitx(EX_OK,
2426
"Termination signal received, exiting.");
2427
break;
2428
default:
2429
break;
2430
}
2431
2432
/*
2433
* Don't check connections until we fully started,
2434
* as we may still be looping, waiting for remote node
2435
* to switch from primary to secondary.
2436
*/
2437
if (fullystarted) {
2438
pjdlog_debug(2, "remote_guard: Checking connections.");
2439
now = time(NULL);
2440
if (lastcheck + HAST_KEEPALIVE <= now) {
2441
for (ii = 0; ii < ncomps; ii++)
2442
guard_one(res, ii);
2443
lastcheck = now;
2444
}
2445
}
2446
signo = sigtimedwait(&mask, NULL, &timeout);
2447
}
2448
/* NOTREACHED */
2449
return (NULL);
2450
}
2451
2452