Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
official-stockfish
GitHub Repository: official-stockfish/Stockfish
Path: blob/master/src/numa.h
634 views
1
/*
2
Stockfish, a UCI chess playing engine derived from Glaurung 2.1
3
Copyright (C) 2004-2026 The Stockfish developers (see AUTHORS file)
4
5
Stockfish is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
Stockfish is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <http://www.gnu.org/licenses/>.
17
*/
18
19
#ifndef NUMA_H_INCLUDED
20
#define NUMA_H_INCLUDED
21
22
#include <algorithm>
23
#include <atomic>
24
#include <cstdint>
25
#include <cstdlib>
26
#include <functional>
27
#include <iostream>
28
#include <limits>
29
#include <map>
30
#include <memory>
31
#include <mutex>
32
#include <set>
33
#include <sstream>
34
#include <string>
35
#include <thread>
36
#include <utility>
37
#include <vector>
38
#include <cstring>
39
40
#include "shm.h"
41
42
// We support linux very well, but we explicitly do NOT support Android,
43
// because there is no affected systems, not worth maintaining.
44
#if defined(__linux__) && !defined(__ANDROID__)
45
#if !defined(_GNU_SOURCE)
46
#define _GNU_SOURCE
47
#endif
48
#include <sched.h>
49
#elif defined(_WIN64)
50
51
#if _WIN32_WINNT < 0x0601
52
#undef _WIN32_WINNT
53
#define _WIN32_WINNT 0x0601 // Force to include needed API prototypes
54
#endif
55
56
// On Windows each processor group can have up to 64 processors.
57
// https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups
58
static constexpr size_t WIN_PROCESSOR_GROUP_SIZE = 64;
59
60
#if !defined(NOMINMAX)
61
#define NOMINMAX
62
#endif
63
#include <windows.h>
64
#if defined small
65
#undef small
66
#endif
67
68
// https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadselectedcpusetmasks
69
using SetThreadSelectedCpuSetMasks_t = BOOL (*)(HANDLE, PGROUP_AFFINITY, USHORT);
70
71
// https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-getthreadselectedcpusetmasks
72
using GetThreadSelectedCpuSetMasks_t = BOOL (*)(HANDLE, PGROUP_AFFINITY, USHORT, PUSHORT);
73
74
#endif
75
76
#include "misc.h"
77
78
namespace Stockfish {
79
80
using CpuIndex = size_t;
81
using NumaIndex = size_t;
82
83
inline CpuIndex get_hardware_concurrency() {
84
CpuIndex concurrency = std::thread::hardware_concurrency();
85
86
// Get all processors across all processor groups on windows, since
87
// hardware_concurrency() only returns the number of processors in
88
// the first group, because only these are available to std::thread.
89
#ifdef _WIN64
90
concurrency = std::max<CpuIndex>(concurrency, GetActiveProcessorCount(ALL_PROCESSOR_GROUPS));
91
#endif
92
93
return concurrency;
94
}
95
96
inline const CpuIndex SYSTEM_THREADS_NB = std::max<CpuIndex>(1, get_hardware_concurrency());
97
98
#if defined(_WIN64)
99
100
struct WindowsAffinity {
101
std::optional<std::set<CpuIndex>> oldApi;
102
std::optional<std::set<CpuIndex>> newApi;
103
104
// We also provide diagnostic for when the affinity is set to nullopt
105
// whether it was due to being indeterminate. If affinity is indeterminate
106
// it is best to assume it is not set at all, so consistent with the meaning
107
// of the nullopt affinity.
108
bool isNewDeterminate = true;
109
bool isOldDeterminate = true;
110
111
std::optional<std::set<CpuIndex>> get_combined() const {
112
if (!oldApi.has_value())
113
return newApi;
114
if (!newApi.has_value())
115
return oldApi;
116
117
std::set<CpuIndex> intersect;
118
std::set_intersection(oldApi->begin(), oldApi->end(), newApi->begin(), newApi->end(),
119
std::inserter(intersect, intersect.begin()));
120
return intersect;
121
}
122
123
// Since Windows 11 and Windows Server 2022 thread affinities can span
124
// processor groups and can be set as such by a new WinAPI function. However,
125
// we may need to force using the old API if we detect that the process has
126
// affinity set by the old API already and we want to override that. Due to the
127
// limitations of the old API we cannot detect its use reliably. There will be
128
// cases where we detect not use but it has actually been used and vice versa.
129
130
bool likely_used_old_api() const { return oldApi.has_value() || !isOldDeterminate; }
131
};
132
133
inline std::pair<BOOL, std::vector<USHORT>> get_process_group_affinity() {
134
135
// GetProcessGroupAffinity requires the GroupArray argument to be
136
// aligned to 4 bytes instead of just 2.
137
static constexpr size_t GroupArrayMinimumAlignment = 4;
138
static_assert(GroupArrayMinimumAlignment >= alignof(USHORT));
139
140
// The function should succeed the second time, but it may fail if the group
141
// affinity has changed between GetProcessGroupAffinity calls. In such case
142
// we consider this a hard error, as we Cannot work with unstable affinities
143
// anyway.
144
static constexpr int MAX_TRIES = 2;
145
USHORT GroupCount = 1;
146
for (int i = 0; i < MAX_TRIES; ++i)
147
{
148
auto GroupArray = std::make_unique<USHORT[]>(
149
GroupCount + (GroupArrayMinimumAlignment / alignof(USHORT) - 1));
150
151
USHORT* GroupArrayAligned = align_ptr_up<GroupArrayMinimumAlignment>(GroupArray.get());
152
153
const BOOL status =
154
GetProcessGroupAffinity(GetCurrentProcess(), &GroupCount, GroupArrayAligned);
155
156
if (status == 0 && GetLastError() != ERROR_INSUFFICIENT_BUFFER)
157
{
158
break;
159
}
160
161
if (status != 0)
162
{
163
return std::make_pair(status,
164
std::vector(GroupArrayAligned, GroupArrayAligned + GroupCount));
165
}
166
}
167
168
return std::make_pair(0, std::vector<USHORT>());
169
}
170
171
// On Windows there are two ways to set affinity, and therefore 2 ways to get it.
172
// These are not consistent, so we have to check both. In some cases it is actually
173
// not possible to determine affinity. For example when two different threads have
174
// affinity on different processor groups, set using SetThreadAffinityMask, we cannot
175
// retrieve the actual affinities.
176
// From documentation on GetProcessAffinityMask:
177
// > If the calling process contains threads in multiple groups,
178
// > the function returns zero for both affinity masks.
179
// In such cases we just give up and assume we have affinity for all processors.
180
// nullopt means no affinity is set, that is, all processors are allowed
181
inline WindowsAffinity get_process_affinity() {
182
HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll"));
183
auto GetThreadSelectedCpuSetMasks_f = GetThreadSelectedCpuSetMasks_t(
184
(void (*)()) GetProcAddress(k32, "GetThreadSelectedCpuSetMasks"));
185
186
BOOL status = 0;
187
188
WindowsAffinity affinity;
189
190
if (GetThreadSelectedCpuSetMasks_f != nullptr)
191
{
192
USHORT RequiredMaskCount;
193
status = GetThreadSelectedCpuSetMasks_f(GetCurrentThread(), nullptr, 0, &RequiredMaskCount);
194
195
// We expect ERROR_INSUFFICIENT_BUFFER from GetThreadSelectedCpuSetMasks,
196
// but other failure is an actual error.
197
if (status == 0 && GetLastError() != ERROR_INSUFFICIENT_BUFFER)
198
{
199
affinity.isNewDeterminate = false;
200
}
201
else if (RequiredMaskCount > 0)
202
{
203
// If RequiredMaskCount then these affinities were never set, but it's
204
// not consistent so GetProcessAffinityMask may still return some affinity.
205
auto groupAffinities = std::make_unique<GROUP_AFFINITY[]>(RequiredMaskCount);
206
207
status = GetThreadSelectedCpuSetMasks_f(GetCurrentThread(), groupAffinities.get(),
208
RequiredMaskCount, &RequiredMaskCount);
209
210
if (status == 0)
211
{
212
affinity.isNewDeterminate = false;
213
}
214
else
215
{
216
std::set<CpuIndex> cpus;
217
218
for (USHORT i = 0; i < RequiredMaskCount; ++i)
219
{
220
const size_t procGroupIndex = groupAffinities[i].Group;
221
222
for (size_t j = 0; j < WIN_PROCESSOR_GROUP_SIZE; ++j)
223
{
224
if (groupAffinities[i].Mask & (KAFFINITY(1) << j))
225
cpus.insert(procGroupIndex * WIN_PROCESSOR_GROUP_SIZE + j);
226
}
227
}
228
229
affinity.newApi = std::move(cpus);
230
}
231
}
232
}
233
234
// NOTE: There is no way to determine full affinity using the old API if
235
// individual threads set affinity on different processor groups.
236
237
DWORD_PTR proc, sys;
238
status = GetProcessAffinityMask(GetCurrentProcess(), &proc, &sys);
239
240
// If proc == 0 then we cannot determine affinity because it spans processor groups.
241
// On Windows 11 and Server 2022 it will instead
242
// > If, however, hHandle specifies a handle to the current process, the function
243
// > always uses the calling thread's primary group (which by default is the same
244
// > as the process' primary group) in order to set the
245
// > lpProcessAffinityMask and lpSystemAffinityMask.
246
// So it will never be indeterminate here. We can only make assumptions later.
247
if (status == 0 || proc == 0)
248
{
249
affinity.isOldDeterminate = false;
250
return affinity;
251
}
252
253
// If SetProcessAffinityMask was never called the affinity must span
254
// all processor groups, but if it was called it must only span one.
255
256
std::vector<USHORT> groupAffinity; // We need to capture this later and capturing
257
// from structured bindings requires c++20.
258
259
std::tie(status, groupAffinity) = get_process_group_affinity();
260
if (status == 0)
261
{
262
affinity.isOldDeterminate = false;
263
return affinity;
264
}
265
266
if (groupAffinity.size() == 1)
267
{
268
// We detect the case when affinity is set to all processors and correctly
269
// leave affinity.oldApi as nullopt.
270
if (GetActiveProcessorGroupCount() != 1 || proc != sys)
271
{
272
std::set<CpuIndex> cpus;
273
274
const size_t procGroupIndex = groupAffinity[0];
275
276
const uint64_t mask = static_cast<uint64_t>(proc);
277
for (size_t j = 0; j < WIN_PROCESSOR_GROUP_SIZE; ++j)
278
{
279
if (mask & (KAFFINITY(1) << j))
280
cpus.insert(procGroupIndex * WIN_PROCESSOR_GROUP_SIZE + j);
281
}
282
283
affinity.oldApi = std::move(cpus);
284
}
285
}
286
else
287
{
288
// If we got here it means that either SetProcessAffinityMask was never set
289
// or we're on Windows 11/Server 2022.
290
291
// Since Windows 11 and Windows Server 2022 the behaviour of
292
// GetProcessAffinityMask changed:
293
// > If, however, hHandle specifies a handle to the current process,
294
// > the function always uses the calling thread's primary group
295
// > (which by default is the same as the process' primary group)
296
// > in order to set the lpProcessAffinityMask and lpSystemAffinityMask.
297
// In which case we can actually retrieve the full affinity.
298
299
if (GetThreadSelectedCpuSetMasks_f != nullptr)
300
{
301
std::thread th([&]() {
302
std::set<CpuIndex> cpus;
303
bool isAffinityFull = true;
304
305
for (auto procGroupIndex : groupAffinity)
306
{
307
const int numActiveProcessors =
308
GetActiveProcessorCount(static_cast<WORD>(procGroupIndex));
309
310
// We have to schedule to two different processors
311
// and & the affinities we get. Otherwise our processor
312
// choice could influence the resulting affinity.
313
// We assume the processor IDs within the group are
314
// filled sequentially from 0.
315
uint64_t procCombined = std::numeric_limits<uint64_t>::max();
316
uint64_t sysCombined = std::numeric_limits<uint64_t>::max();
317
318
for (int i = 0; i < std::min(numActiveProcessors, 2); ++i)
319
{
320
GROUP_AFFINITY GroupAffinity;
321
std::memset(&GroupAffinity, 0, sizeof(GROUP_AFFINITY));
322
GroupAffinity.Group = static_cast<WORD>(procGroupIndex);
323
324
GroupAffinity.Mask = static_cast<KAFFINITY>(1) << i;
325
326
status =
327
SetThreadGroupAffinity(GetCurrentThread(), &GroupAffinity, nullptr);
328
if (status == 0)
329
{
330
affinity.isOldDeterminate = false;
331
return;
332
}
333
334
SwitchToThread();
335
336
DWORD_PTR proc2, sys2;
337
status = GetProcessAffinityMask(GetCurrentProcess(), &proc2, &sys2);
338
if (status == 0)
339
{
340
affinity.isOldDeterminate = false;
341
return;
342
}
343
344
procCombined &= static_cast<uint64_t>(proc2);
345
sysCombined &= static_cast<uint64_t>(sys2);
346
}
347
348
if (procCombined != sysCombined)
349
isAffinityFull = false;
350
351
for (size_t j = 0; j < WIN_PROCESSOR_GROUP_SIZE; ++j)
352
{
353
if (procCombined & (KAFFINITY(1) << j))
354
cpus.insert(procGroupIndex * WIN_PROCESSOR_GROUP_SIZE + j);
355
}
356
}
357
358
// We have to detect the case where the affinity was not set,
359
// or is set to all processors so that we correctly produce as
360
// std::nullopt result.
361
if (!isAffinityFull)
362
{
363
affinity.oldApi = std::move(cpus);
364
}
365
});
366
367
th.join();
368
}
369
}
370
371
return affinity;
372
}
373
374
// Type machinery used to emulate Cache->GroupCount
375
376
template<typename T, typename = void>
377
struct HasGroupCount: std::false_type {};
378
379
template<typename T>
380
struct HasGroupCount<T, std::void_t<decltype(std::declval<T>().Cache.GroupCount)>>: std::true_type {
381
};
382
383
template<typename T, typename Pred, std::enable_if_t<HasGroupCount<T>::value, bool> = true>
384
std::set<CpuIndex> readCacheMembers(const T* info, Pred&& is_cpu_allowed) {
385
std::set<CpuIndex> cpus;
386
// On Windows 10 this will read a 0 because GroupCount doesn't exist
387
int groupCount = std::max(info->Cache.GroupCount, WORD(1));
388
for (WORD procGroup = 0; procGroup < groupCount; ++procGroup)
389
{
390
for (BYTE number = 0; number < WIN_PROCESSOR_GROUP_SIZE; ++number)
391
{
392
WORD groupNumber = info->Cache.GroupMasks[procGroup].Group;
393
const CpuIndex c = static_cast<CpuIndex>(groupNumber) * WIN_PROCESSOR_GROUP_SIZE
394
+ static_cast<CpuIndex>(number);
395
if (!(info->Cache.GroupMasks[procGroup].Mask & (1ULL << number)) || !is_cpu_allowed(c))
396
continue;
397
cpus.insert(c);
398
}
399
}
400
return cpus;
401
}
402
403
template<typename T, typename Pred, std::enable_if_t<!HasGroupCount<T>::value, bool> = true>
404
std::set<CpuIndex> readCacheMembers(const T* info, Pred&& is_cpu_allowed) {
405
std::set<CpuIndex> cpus;
406
for (BYTE number = 0; number < WIN_PROCESSOR_GROUP_SIZE; ++number)
407
{
408
WORD groupNumber = info->Cache.GroupMask.Group;
409
const CpuIndex c = static_cast<CpuIndex>(groupNumber) * WIN_PROCESSOR_GROUP_SIZE
410
+ static_cast<CpuIndex>(number);
411
if (!(info->Cache.GroupMask.Mask & (1ULL << number)) || !is_cpu_allowed(c))
412
continue;
413
cpus.insert(c);
414
}
415
return cpus;
416
}
417
418
#endif
419
420
#if defined(__linux__) && !defined(__ANDROID__)
421
422
inline std::set<CpuIndex> get_process_affinity() {
423
424
std::set<CpuIndex> cpus;
425
426
// For unsupported systems, or in case of a soft error, we may assume
427
// all processors are available for use.
428
[[maybe_unused]] auto set_to_all_cpus = [&]() {
429
for (CpuIndex c = 0; c < SYSTEM_THREADS_NB; ++c)
430
cpus.insert(c);
431
};
432
433
// cpu_set_t by default holds 1024 entries. This may not be enough soon,
434
// but there is no easy way to determine how many threads there actually
435
// is. In this case we just choose a reasonable upper bound.
436
static constexpr CpuIndex MaxNumCpus = 1024 * 64;
437
438
cpu_set_t* mask = CPU_ALLOC(MaxNumCpus);
439
if (mask == nullptr)
440
std::exit(EXIT_FAILURE);
441
442
const size_t masksize = CPU_ALLOC_SIZE(MaxNumCpus);
443
444
CPU_ZERO_S(masksize, mask);
445
446
const int status = sched_getaffinity(0, masksize, mask);
447
448
if (status != 0)
449
{
450
CPU_FREE(mask);
451
std::exit(EXIT_FAILURE);
452
}
453
454
for (CpuIndex c = 0; c < MaxNumCpus; ++c)
455
if (CPU_ISSET_S(c, masksize, mask))
456
cpus.insert(c);
457
458
CPU_FREE(mask);
459
460
return cpus;
461
}
462
463
#endif
464
465
#if defined(__linux__) && !defined(__ANDROID__)
466
467
inline static const auto STARTUP_PROCESSOR_AFFINITY = get_process_affinity();
468
469
#elif defined(_WIN64)
470
471
inline static const auto STARTUP_PROCESSOR_AFFINITY = get_process_affinity();
472
inline static const auto STARTUP_USE_OLD_AFFINITY_API =
473
STARTUP_PROCESSOR_AFFINITY.likely_used_old_api();
474
475
#endif
476
477
// We want to abstract the purpose of storing the numa node index somewhat.
478
// Whoever is using this does not need to know the specifics of the replication
479
// machinery to be able to access NUMA replicated memory.
480
class NumaReplicatedAccessToken {
481
public:
482
NumaReplicatedAccessToken() :
483
n(0) {}
484
485
explicit NumaReplicatedAccessToken(NumaIndex idx) :
486
n(idx) {}
487
488
NumaIndex get_numa_index() const { return n; }
489
490
private:
491
NumaIndex n;
492
};
493
494
struct L3Domain {
495
NumaIndex systemNumaIndex{};
496
std::set<CpuIndex> cpus{};
497
};
498
499
// Use system NUMA nodes
500
struct SystemNumaPolicy {};
501
// Use system-reported L3 domains
502
struct L3DomainsPolicy {};
503
// Group system-reported L3 domains until they reach bundleSize
504
struct BundledL3Policy {
505
size_t bundleSize;
506
};
507
508
using NumaAutoPolicy = std::variant<SystemNumaPolicy, L3DomainsPolicy, BundledL3Policy>;
509
510
// Designed as immutable, because there is no good reason to alter an already
511
// existing config in a way that doesn't require recreating it completely, and
512
// it would be complex and expensive to maintain class invariants.
513
// The CPU (processor) numbers always correspond to the actual numbering used
514
// by the system. The NUMA node numbers MAY NOT correspond to the system's
515
// numbering of the NUMA nodes. In particular, by default, if the processor has
516
// non-uniform cache access within a NUMA node (i.e., a non-unified L3 cache structure),
517
// then L3 domains within a system NUMA node will be used to subdivide it
518
// into multiple logical NUMA nodes in the config. Additionally, empty nodes may
519
// be removed, or the user may create custom nodes.
520
//
521
// As a special case, when performing system-wide replication of read-only data
522
// (i.e., LazyNumaReplicatedSystemWide), the system NUMA node is used, rather than
523
// custom or L3-aware nodes. See that class's get_discriminator() function.
524
//
525
// It is guaranteed that NUMA nodes are NOT empty: every node exposed by NumaConfig
526
// has at least one processor assigned.
527
//
528
// We use startup affinities so as not to modify its own behaviour in time.
529
//
530
// Since Stockfish doesn't support exceptions all places where an exception
531
// should be thrown are replaced by std::exit.
532
class NumaConfig {
533
public:
534
NumaConfig() :
535
highestCpuIndex(0),
536
customAffinity(false) {
537
const auto numCpus = SYSTEM_THREADS_NB;
538
add_cpu_range_to_node(NumaIndex{0}, CpuIndex{0}, numCpus - 1);
539
}
540
541
// This function gets a NumaConfig based on the system's provided information.
542
// The available policies are documented above.
543
static NumaConfig from_system([[maybe_unused]] const NumaAutoPolicy& policy,
544
bool respectProcessAffinity = true) {
545
NumaConfig cfg = empty();
546
547
#if !((defined(__linux__) && !defined(__ANDROID__)) || defined(_WIN64))
548
// Fallback for unsupported systems.
549
for (CpuIndex c = 0; c < SYSTEM_THREADS_NB; ++c)
550
cfg.add_cpu_to_node(NumaIndex{0}, c);
551
#else
552
553
#if defined(_WIN64)
554
555
std::optional<std::set<CpuIndex>> allowedCpus;
556
557
if (respectProcessAffinity)
558
allowedCpus = STARTUP_PROCESSOR_AFFINITY.get_combined();
559
560
// The affinity cannot be determined in all cases on Windows,
561
// but we at least guarantee that the number of allowed processors
562
// is >= number of processors in the affinity mask. In case the user
563
// is not satisfied they must set the processor numbers explicitly.
564
auto is_cpu_allowed = [&allowedCpus](CpuIndex c) {
565
return !allowedCpus.has_value() || allowedCpus->count(c) == 1;
566
};
567
568
#elif defined(__linux__) && !defined(__ANDROID__)
569
570
std::set<CpuIndex> allowedCpus;
571
572
if (respectProcessAffinity)
573
allowedCpus = STARTUP_PROCESSOR_AFFINITY;
574
575
auto is_cpu_allowed = [respectProcessAffinity, &allowedCpus](CpuIndex c) {
576
return !respectProcessAffinity || allowedCpus.count(c) == 1;
577
};
578
579
#endif
580
581
bool l3Success = false;
582
if (!std::holds_alternative<SystemNumaPolicy>(policy))
583
{
584
size_t l3BundleSize = 0;
585
if (const auto* v = std::get_if<BundledL3Policy>(&policy))
586
{
587
l3BundleSize = v->bundleSize;
588
}
589
if (auto l3Cfg =
590
try_get_l3_aware_config(respectProcessAffinity, l3BundleSize, is_cpu_allowed))
591
{
592
cfg = std::move(*l3Cfg);
593
l3Success = true;
594
}
595
}
596
if (!l3Success)
597
cfg = from_system_numa(respectProcessAffinity, is_cpu_allowed);
598
599
#if defined(_WIN64)
600
// Split the NUMA nodes to be contained within a group if necessary.
601
// This is needed between Windows 10 Build 20348 and Windows 11, because
602
// the new NUMA allocation behaviour was introduced while there was
603
// still no way to set thread affinity spanning multiple processor groups.
604
// See https://learn.microsoft.com/en-us/windows/win32/procthread/numa-support
605
// We also do this is if need to force old API for some reason.
606
//
607
// 2024-08-26: It appears that we need to actually always force this behaviour.
608
// While Windows allows this to work now, such assignments have bad interaction
609
// with the scheduler - in particular it still prefers scheduling on the thread's
610
// "primary" node, even if it means scheduling SMT processors first.
611
// See https://github.com/official-stockfish/Stockfish/issues/5551
612
// See https://learn.microsoft.com/en-us/windows/win32/procthread/processor-groups
613
//
614
// Each process is assigned a primary group at creation, and by default all
615
// of its threads' primary group is the same. Each thread's ideal processor
616
// is in the thread's primary group, so threads will preferentially be
617
// scheduled to processors on their primary group, but they are able to
618
// be scheduled to processors on any other group.
619
//
620
// used to be guarded by if (STARTUP_USE_OLD_AFFINITY_API)
621
{
622
NumaConfig splitCfg = empty();
623
624
NumaIndex splitNodeIndex = 0;
625
for (const auto& cpus : cfg.nodes)
626
{
627
if (cpus.empty())
628
continue;
629
630
size_t lastProcGroupIndex = *(cpus.begin()) / WIN_PROCESSOR_GROUP_SIZE;
631
for (CpuIndex c : cpus)
632
{
633
const size_t procGroupIndex = c / WIN_PROCESSOR_GROUP_SIZE;
634
if (procGroupIndex != lastProcGroupIndex)
635
{
636
splitNodeIndex += 1;
637
lastProcGroupIndex = procGroupIndex;
638
}
639
splitCfg.add_cpu_to_node(splitNodeIndex, c);
640
}
641
splitNodeIndex += 1;
642
}
643
644
cfg = std::move(splitCfg);
645
}
646
#endif
647
648
#endif
649
650
// We have to ensure no empty NUMA nodes persist.
651
cfg.remove_empty_numa_nodes();
652
653
// If the user explicitly opts out from respecting the current process affinity
654
// then it may be inconsistent with the current affinity (obviously), so we
655
// consider it custom.
656
if (!respectProcessAffinity)
657
cfg.customAffinity = true;
658
659
return cfg;
660
}
661
662
// ':'-separated numa nodes
663
// ','-separated cpu indices
664
// supports "first-last" range syntax for cpu indices
665
// For example "0-15,128-143:16-31,144-159:32-47,160-175:48-63,176-191"
666
static NumaConfig from_string(const std::string& s) {
667
NumaConfig cfg = empty();
668
669
NumaIndex n = 0;
670
for (auto&& nodeStr : split(s, ":"))
671
{
672
auto indices = indices_from_shortened_string(std::string(nodeStr));
673
if (!indices.empty())
674
{
675
for (auto idx : indices)
676
{
677
if (!cfg.add_cpu_to_node(n, CpuIndex(idx)))
678
std::exit(EXIT_FAILURE);
679
}
680
681
n += 1;
682
}
683
}
684
685
cfg.customAffinity = true;
686
687
return cfg;
688
}
689
690
NumaConfig(const NumaConfig&) = delete;
691
NumaConfig(NumaConfig&&) = default;
692
NumaConfig& operator=(const NumaConfig&) = delete;
693
NumaConfig& operator=(NumaConfig&&) = default;
694
695
bool is_cpu_assigned(CpuIndex n) const { return nodeByCpu.count(n) == 1; }
696
697
NumaIndex num_numa_nodes() const { return nodes.size(); }
698
699
CpuIndex num_cpus_in_numa_node(NumaIndex n) const {
700
assert(n < nodes.size());
701
return nodes[n].size();
702
}
703
704
CpuIndex num_cpus() const { return nodeByCpu.size(); }
705
706
bool requires_memory_replication() const { return customAffinity || nodes.size() > 1; }
707
708
std::string to_string() const {
709
std::string str;
710
711
bool isFirstNode = true;
712
for (auto&& cpus : nodes)
713
{
714
if (!isFirstNode)
715
str += ":";
716
717
bool isFirstSet = true;
718
auto rangeStart = cpus.begin();
719
for (auto it = cpus.begin(); it != cpus.end(); ++it)
720
{
721
auto next = std::next(it);
722
if (next == cpus.end() || *next != *it + 1)
723
{
724
// cpus[i] is at the end of the range (may be of size 1)
725
if (!isFirstSet)
726
str += ",";
727
728
const CpuIndex last = *it;
729
730
if (it != rangeStart)
731
{
732
const CpuIndex first = *rangeStart;
733
734
str += std::to_string(first);
735
str += "-";
736
str += std::to_string(last);
737
}
738
else
739
str += std::to_string(last);
740
741
rangeStart = next;
742
isFirstSet = false;
743
}
744
}
745
746
isFirstNode = false;
747
}
748
749
return str;
750
}
751
752
bool suggests_binding_threads(CpuIndex numThreads) const {
753
// If we can reasonably determine that the threads cannot be contained
754
// by the OS within the first NUMA node then we advise distributing
755
// and binding threads. When the threads are not bound we can only use
756
// NUMA memory replicated objects from the first node, so when the OS
757
// has to schedule on other nodes we lose performance. We also suggest
758
// binding if there's enough threads to distribute among nodes with minimal
759
// disparity. We try to ignore small nodes, in particular the empty ones.
760
761
// If the affinity set by the user does not match the affinity given by
762
// the OS then binding is necessary to ensure the threads are running on
763
// correct processors.
764
if (customAffinity)
765
return true;
766
767
// We obviously cannot distribute a single thread, so a single thread
768
// should never be bound.
769
if (numThreads <= 1)
770
return false;
771
772
size_t largestNodeSize = 0;
773
for (auto&& cpus : nodes)
774
if (cpus.size() > largestNodeSize)
775
largestNodeSize = cpus.size();
776
777
auto is_node_small = [largestNodeSize](const std::set<CpuIndex>& node) {
778
static constexpr double SmallNodeThreshold = 0.6;
779
return static_cast<double>(node.size()) / static_cast<double>(largestNodeSize)
780
<= SmallNodeThreshold;
781
};
782
783
size_t numNotSmallNodes = 0;
784
for (auto&& cpus : nodes)
785
if (!is_node_small(cpus))
786
numNotSmallNodes += 1;
787
788
return (numThreads > largestNodeSize / 2 || numThreads >= numNotSmallNodes * 4)
789
&& nodes.size() > 1;
790
}
791
792
std::vector<NumaIndex> distribute_threads_among_numa_nodes(CpuIndex numThreads) const {
793
std::vector<NumaIndex> ns;
794
795
if (nodes.size() == 1)
796
{
797
// Special case for when there's no NUMA nodes. This doesn't buy us
798
// much, but let's keep the default path simple.
799
ns.resize(numThreads, NumaIndex{0});
800
}
801
else
802
{
803
std::vector<size_t> occupation(nodes.size(), 0);
804
for (CpuIndex c = 0; c < numThreads; ++c)
805
{
806
NumaIndex bestNode{0};
807
float bestNodeFill = std::numeric_limits<float>::max();
808
for (NumaIndex n = 0; n < nodes.size(); ++n)
809
{
810
float fill =
811
static_cast<float>(occupation[n] + 1) / static_cast<float>(nodes[n].size());
812
// NOTE: Do we want to perhaps fill the first available node
813
// up to 50% first before considering other nodes?
814
// Probably not, because it would interfere with running
815
// multiple instances. We basically shouldn't favor any
816
// particular node.
817
if (fill < bestNodeFill)
818
{
819
bestNode = n;
820
bestNodeFill = fill;
821
}
822
}
823
ns.emplace_back(bestNode);
824
occupation[bestNode] += 1;
825
}
826
}
827
828
return ns;
829
}
830
831
NumaReplicatedAccessToken bind_current_thread_to_numa_node(NumaIndex n) const {
832
if (n >= nodes.size() || nodes[n].size() == 0)
833
std::exit(EXIT_FAILURE);
834
835
#if defined(__linux__) && !defined(__ANDROID__)
836
837
cpu_set_t* mask = CPU_ALLOC(highestCpuIndex + 1);
838
if (mask == nullptr)
839
std::exit(EXIT_FAILURE);
840
841
const size_t masksize = CPU_ALLOC_SIZE(highestCpuIndex + 1);
842
843
CPU_ZERO_S(masksize, mask);
844
845
for (CpuIndex c : nodes[n])
846
CPU_SET_S(c, masksize, mask);
847
848
const int status = sched_setaffinity(0, masksize, mask);
849
850
CPU_FREE(mask);
851
852
if (status != 0)
853
std::exit(EXIT_FAILURE);
854
855
// We yield this thread just to be sure it gets rescheduled.
856
// This is defensive, allowed because this code is not performance critical.
857
sched_yield();
858
859
#elif defined(_WIN64)
860
861
// Requires Windows 11. No good way to set thread affinity spanning
862
// processor groups before that.
863
HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll"));
864
auto SetThreadSelectedCpuSetMasks_f = SetThreadSelectedCpuSetMasks_t(
865
(void (*)()) GetProcAddress(k32, "SetThreadSelectedCpuSetMasks"));
866
867
// We ALWAYS set affinity with the new API if available, because
868
// there's no downsides, and we forcibly keep it consistent with
869
// the old API should we need to use it. I.e. we always keep this
870
// as a superset of what we set with SetThreadGroupAffinity.
871
if (SetThreadSelectedCpuSetMasks_f != nullptr)
872
{
873
// Only available on Windows 11 and Windows Server 2022 onwards
874
const USHORT numProcGroups = USHORT(
875
((highestCpuIndex + 1) + WIN_PROCESSOR_GROUP_SIZE - 1) / WIN_PROCESSOR_GROUP_SIZE);
876
auto groupAffinities = std::make_unique<GROUP_AFFINITY[]>(numProcGroups);
877
std::memset(groupAffinities.get(), 0, sizeof(GROUP_AFFINITY) * numProcGroups);
878
for (WORD i = 0; i < numProcGroups; ++i)
879
groupAffinities[i].Group = i;
880
881
for (CpuIndex c : nodes[n])
882
{
883
const size_t procGroupIndex = c / WIN_PROCESSOR_GROUP_SIZE;
884
const size_t idxWithinProcGroup = c % WIN_PROCESSOR_GROUP_SIZE;
885
groupAffinities[procGroupIndex].Mask |= KAFFINITY(1) << idxWithinProcGroup;
886
}
887
888
HANDLE hThread = GetCurrentThread();
889
890
const BOOL status =
891
SetThreadSelectedCpuSetMasks_f(hThread, groupAffinities.get(), numProcGroups);
892
if (status == 0)
893
std::exit(EXIT_FAILURE);
894
895
// We yield this thread just to be sure it gets rescheduled.
896
// This is defensive, allowed because this code is not performance critical.
897
SwitchToThread();
898
}
899
900
// Sometimes we need to force the old API, but do not use it unless necessary.
901
if (SetThreadSelectedCpuSetMasks_f == nullptr || STARTUP_USE_OLD_AFFINITY_API)
902
{
903
// On earlier windows version (since windows 7) we cannot run a single thread
904
// on multiple processor groups, so we need to restrict the group.
905
// We assume the group of the first processor listed for this node.
906
// Processors from outside this group will not be assigned for this thread.
907
// Normally this won't be an issue because windows used to assign NUMA nodes
908
// such that they cannot span processor groups. However, since Windows 10
909
// Build 20348 the behaviour changed, so there's a small window of versions
910
// between this and Windows 11 that might exhibit problems with not all
911
// processors being utilized.
912
//
913
// We handle this in NumaConfig::from_system by manually splitting the
914
// nodes when we detect that there is no function to set affinity spanning
915
// processor nodes. This is required because otherwise our thread distribution
916
// code may produce suboptimal results.
917
//
918
// See https://learn.microsoft.com/en-us/windows/win32/procthread/numa-support
919
GROUP_AFFINITY affinity;
920
std::memset(&affinity, 0, sizeof(GROUP_AFFINITY));
921
// We use an ordered set to be sure to get the smallest cpu number here.
922
const size_t forcedProcGroupIndex = *(nodes[n].begin()) / WIN_PROCESSOR_GROUP_SIZE;
923
affinity.Group = static_cast<WORD>(forcedProcGroupIndex);
924
for (CpuIndex c : nodes[n])
925
{
926
const size_t procGroupIndex = c / WIN_PROCESSOR_GROUP_SIZE;
927
const size_t idxWithinProcGroup = c % WIN_PROCESSOR_GROUP_SIZE;
928
// We skip processors that are not in the same processor group.
929
// If everything was set up correctly this will never be an issue,
930
// but we have to account for bad NUMA node specification.
931
if (procGroupIndex != forcedProcGroupIndex)
932
continue;
933
934
affinity.Mask |= KAFFINITY(1) << idxWithinProcGroup;
935
}
936
937
HANDLE hThread = GetCurrentThread();
938
939
const BOOL status = SetThreadGroupAffinity(hThread, &affinity, nullptr);
940
if (status == 0)
941
std::exit(EXIT_FAILURE);
942
943
// We yield this thread just to be sure it gets rescheduled. This is
944
// defensive, allowed because this code is not performance critical.
945
SwitchToThread();
946
}
947
948
#endif
949
950
return NumaReplicatedAccessToken(n);
951
}
952
953
template<typename FuncT>
954
void execute_on_numa_node(NumaIndex n, FuncT&& f) const {
955
std::thread th([this, &f, n]() {
956
bind_current_thread_to_numa_node(n);
957
std::forward<FuncT>(f)();
958
});
959
960
th.join();
961
}
962
963
std::vector<std::set<CpuIndex>> nodes;
964
std::map<CpuIndex, NumaIndex> nodeByCpu;
965
966
private:
967
CpuIndex highestCpuIndex;
968
969
bool customAffinity;
970
971
static NumaConfig empty() { return NumaConfig(EmptyNodeTag{}); }
972
973
struct EmptyNodeTag {};
974
975
NumaConfig(EmptyNodeTag) :
976
highestCpuIndex(0),
977
customAffinity(false) {}
978
979
void remove_empty_numa_nodes() {
980
std::vector<std::set<CpuIndex>> newNodes;
981
for (auto&& cpus : nodes)
982
if (!cpus.empty())
983
newNodes.emplace_back(std::move(cpus));
984
nodes = std::move(newNodes);
985
}
986
987
// Returns true if successful
988
// Returns false if failed, i.e. when the cpu is already present
989
// strong guarantee, the structure remains unmodified
990
bool add_cpu_to_node(NumaIndex n, CpuIndex c) {
991
if (is_cpu_assigned(c))
992
return false;
993
994
while (nodes.size() <= n)
995
nodes.emplace_back();
996
997
nodes[n].insert(c);
998
nodeByCpu[c] = n;
999
1000
if (c > highestCpuIndex)
1001
highestCpuIndex = c;
1002
1003
return true;
1004
}
1005
1006
// Returns true if successful
1007
// Returns false if failed, i.e. when any of the cpus is already present
1008
// strong guarantee, the structure remains unmodified
1009
bool add_cpu_range_to_node(NumaIndex n, CpuIndex cfirst, CpuIndex clast) {
1010
for (CpuIndex c = cfirst; c <= clast; ++c)
1011
if (is_cpu_assigned(c))
1012
return false;
1013
1014
while (nodes.size() <= n)
1015
nodes.emplace_back();
1016
1017
for (CpuIndex c = cfirst; c <= clast; ++c)
1018
{
1019
nodes[n].insert(c);
1020
nodeByCpu[c] = n;
1021
}
1022
1023
if (clast > highestCpuIndex)
1024
highestCpuIndex = clast;
1025
1026
return true;
1027
}
1028
1029
static std::vector<size_t> indices_from_shortened_string(const std::string& s) {
1030
std::vector<size_t> indices;
1031
1032
if (s.empty())
1033
return indices;
1034
1035
for (const auto& ss : split(s, ","))
1036
{
1037
if (ss.empty())
1038
continue;
1039
1040
auto parts = split(ss, "-");
1041
if (parts.size() == 1)
1042
{
1043
const CpuIndex c = CpuIndex{str_to_size_t(std::string(parts[0]))};
1044
indices.emplace_back(c);
1045
}
1046
else if (parts.size() == 2)
1047
{
1048
const CpuIndex cfirst = CpuIndex{str_to_size_t(std::string(parts[0]))};
1049
const CpuIndex clast = CpuIndex{str_to_size_t(std::string(parts[1]))};
1050
for (size_t c = cfirst; c <= clast; ++c)
1051
{
1052
indices.emplace_back(c);
1053
}
1054
}
1055
}
1056
1057
return indices;
1058
}
1059
1060
// This function queries the system for the mapping of processors to NUMA nodes.
1061
// On Linux we read from standardized kernel sysfs, with a fallback to single NUMA
1062
// node. On Windows we utilize GetNumaProcessorNodeEx, which has its quirks, see
1063
// comment for Windows implementation of get_process_affinity.
1064
template<typename Pred>
1065
static NumaConfig from_system_numa([[maybe_unused]] bool respectProcessAffinity,
1066
[[maybe_unused]] Pred&& is_cpu_allowed) {
1067
NumaConfig cfg = empty();
1068
1069
#if defined(__linux__) && !defined(__ANDROID__)
1070
1071
// On Linux things are straightforward, since there's no processor groups and
1072
// any thread can be scheduled on all processors.
1073
// We try to gather this information from the sysfs first
1074
// https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-devices-node
1075
1076
bool useFallback = false;
1077
auto fallback = [&]() {
1078
useFallback = true;
1079
cfg = empty();
1080
};
1081
1082
// /sys/devices/system/node/online contains information about active NUMA nodes
1083
auto nodeIdsStr = read_file_to_string("/sys/devices/system/node/online");
1084
if (!nodeIdsStr.has_value() || nodeIdsStr->empty())
1085
{
1086
fallback();
1087
}
1088
else
1089
{
1090
remove_whitespace(*nodeIdsStr);
1091
for (size_t n : indices_from_shortened_string(*nodeIdsStr))
1092
{
1093
// /sys/devices/system/node/node.../cpulist
1094
std::string path =
1095
std::string("/sys/devices/system/node/node") + std::to_string(n) + "/cpulist";
1096
auto cpuIdsStr = read_file_to_string(path);
1097
// Now, we only bail if the file does not exist. Some nodes may be
1098
// empty, that's fine. An empty node still has a file that appears
1099
// to have some whitespace, so we need to handle that.
1100
if (!cpuIdsStr.has_value())
1101
{
1102
fallback();
1103
break;
1104
}
1105
else
1106
{
1107
remove_whitespace(*cpuIdsStr);
1108
for (size_t c : indices_from_shortened_string(*cpuIdsStr))
1109
{
1110
if (is_cpu_allowed(c))
1111
cfg.add_cpu_to_node(n, c);
1112
}
1113
}
1114
}
1115
}
1116
1117
if (useFallback)
1118
{
1119
for (CpuIndex c = 0; c < SYSTEM_THREADS_NB; ++c)
1120
if (is_cpu_allowed(c))
1121
cfg.add_cpu_to_node(NumaIndex{0}, c);
1122
}
1123
1124
#elif defined(_WIN64)
1125
1126
WORD numProcGroups = GetActiveProcessorGroupCount();
1127
for (WORD procGroup = 0; procGroup < numProcGroups; ++procGroup)
1128
{
1129
for (BYTE number = 0; number < WIN_PROCESSOR_GROUP_SIZE; ++number)
1130
{
1131
PROCESSOR_NUMBER procnum;
1132
procnum.Group = procGroup;
1133
procnum.Number = number;
1134
procnum.Reserved = 0;
1135
USHORT nodeNumber;
1136
1137
const BOOL status = GetNumaProcessorNodeEx(&procnum, &nodeNumber);
1138
const CpuIndex c = static_cast<CpuIndex>(procGroup) * WIN_PROCESSOR_GROUP_SIZE
1139
+ static_cast<CpuIndex>(number);
1140
if (status != 0 && nodeNumber != std::numeric_limits<USHORT>::max()
1141
&& is_cpu_allowed(c))
1142
{
1143
cfg.add_cpu_to_node(nodeNumber, c);
1144
}
1145
}
1146
}
1147
1148
#else
1149
1150
abort(); // should not reach here
1151
1152
#endif
1153
1154
return cfg;
1155
}
1156
1157
template<typename Pred>
1158
static std::optional<NumaConfig> try_get_l3_aware_config(
1159
bool respectProcessAffinity, size_t bundleSize, [[maybe_unused]] Pred&& is_cpu_allowed) {
1160
// Get the normal system configuration so we know to which NUMA node
1161
// each L3 domain belongs.
1162
NumaConfig systemConfig =
1163
NumaConfig::from_system(SystemNumaPolicy{}, respectProcessAffinity);
1164
std::vector<L3Domain> l3Domains;
1165
1166
#if defined(__linux__) && !defined(__ANDROID__)
1167
1168
std::set<CpuIndex> seenCpus;
1169
auto nextUnseenCpu = [&seenCpus]() {
1170
for (CpuIndex i = 0;; ++i)
1171
if (!seenCpus.count(i))
1172
return i;
1173
};
1174
1175
while (true)
1176
{
1177
CpuIndex next = nextUnseenCpu();
1178
auto siblingsStr =
1179
read_file_to_string("/sys/devices/system/cpu/cpu" + std::to_string(next)
1180
+ "/cache/index3/shared_cpu_list");
1181
1182
if (!siblingsStr.has_value() || siblingsStr->empty())
1183
{
1184
break; // we have read all available CPUs
1185
}
1186
1187
L3Domain domain;
1188
for (size_t c : indices_from_shortened_string(*siblingsStr))
1189
{
1190
if (is_cpu_allowed(c))
1191
{
1192
domain.systemNumaIndex = systemConfig.nodeByCpu.at(c);
1193
domain.cpus.insert(c);
1194
}
1195
seenCpus.insert(c);
1196
}
1197
if (!domain.cpus.empty())
1198
{
1199
l3Domains.emplace_back(std::move(domain));
1200
}
1201
}
1202
1203
#elif defined(_WIN64)
1204
1205
DWORD bufSize = 0;
1206
GetLogicalProcessorInformationEx(RelationCache, nullptr, &bufSize);
1207
if (GetLastError() != ERROR_INSUFFICIENT_BUFFER)
1208
return std::nullopt;
1209
1210
std::vector<char> buffer(bufSize);
1211
auto info = reinterpret_cast<PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX>(buffer.data());
1212
if (!GetLogicalProcessorInformationEx(RelationCache, info, &bufSize))
1213
return std::nullopt;
1214
1215
while (reinterpret_cast<char*>(info) < buffer.data() + bufSize)
1216
{
1217
info = std::launder(info);
1218
if (info->Relationship == RelationCache && info->Cache.Level == 3)
1219
{
1220
L3Domain domain{};
1221
domain.cpus = readCacheMembers(info, is_cpu_allowed);
1222
if (!domain.cpus.empty())
1223
{
1224
domain.systemNumaIndex = systemConfig.nodeByCpu.at(*domain.cpus.begin());
1225
l3Domains.push_back(std::move(domain));
1226
}
1227
}
1228
// Variable length data structure, advance to next
1229
info = reinterpret_cast<PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX>(
1230
reinterpret_cast<char*>(info) + info->Size);
1231
}
1232
#endif
1233
1234
if (!l3Domains.empty())
1235
return {NumaConfig::from_l3_info(std::move(l3Domains), bundleSize)};
1236
1237
return std::nullopt;
1238
}
1239
1240
1241
static NumaConfig from_l3_info(std::vector<L3Domain>&& domains, size_t bundleSize) {
1242
assert(!domains.empty());
1243
1244
std::map<NumaIndex, std::vector<L3Domain>> list;
1245
for (auto& d : domains)
1246
list[d.systemNumaIndex].emplace_back(std::move(d));
1247
1248
NumaConfig cfg = empty();
1249
NumaIndex n = 0;
1250
for (auto& [_, ds] : list)
1251
{
1252
bool changed;
1253
// Scan through pairs and merge them. With roughly equal L3 sizes, should give
1254
// a decent distribution.
1255
do
1256
{
1257
changed = false;
1258
for (size_t j = 0; j + 1 < ds.size(); ++j)
1259
{
1260
if (ds[j].cpus.size() + ds[j + 1].cpus.size() <= bundleSize)
1261
{
1262
changed = true;
1263
ds[j].cpus.merge(ds[j + 1].cpus);
1264
ds.erase(ds.begin() + j + 1);
1265
}
1266
}
1267
// ds.size() has decreased if changed is true, so this loop will terminate
1268
} while (changed);
1269
for (const L3Domain& d : ds)
1270
{
1271
const NumaIndex dn = n++;
1272
for (CpuIndex cpu : d.cpus)
1273
{
1274
cfg.add_cpu_to_node(dn, cpu);
1275
}
1276
}
1277
}
1278
return cfg;
1279
}
1280
};
1281
1282
class NumaReplicationContext;
1283
1284
// Instances of this class are tracked by the NumaReplicationContext instance.
1285
// NumaReplicationContext informs all tracked instances when NUMA configuration changes.
1286
class NumaReplicatedBase {
1287
public:
1288
NumaReplicatedBase(NumaReplicationContext& ctx);
1289
1290
NumaReplicatedBase(const NumaReplicatedBase&) = delete;
1291
NumaReplicatedBase(NumaReplicatedBase&& other) noexcept;
1292
1293
NumaReplicatedBase& operator=(const NumaReplicatedBase&) = delete;
1294
NumaReplicatedBase& operator=(NumaReplicatedBase&& other) noexcept;
1295
1296
virtual void on_numa_config_changed() = 0;
1297
virtual ~NumaReplicatedBase();
1298
1299
const NumaConfig& get_numa_config() const;
1300
1301
private:
1302
NumaReplicationContext* context;
1303
};
1304
1305
// We force boxing with a unique_ptr. If this becomes an issue due to added
1306
// indirection we may need to add an option for a custom boxing type. When the
1307
// NUMA config changes the value stored at the index 0 is replicated to other nodes.
1308
template<typename T>
1309
class NumaReplicated: public NumaReplicatedBase {
1310
public:
1311
using ReplicatorFuncType = std::function<T(const T&)>;
1312
1313
NumaReplicated(NumaReplicationContext& ctx) :
1314
NumaReplicatedBase(ctx) {
1315
replicate_from(T{});
1316
}
1317
1318
NumaReplicated(NumaReplicationContext& ctx, T&& source) :
1319
NumaReplicatedBase(ctx) {
1320
replicate_from(std::move(source));
1321
}
1322
1323
NumaReplicated(const NumaReplicated&) = delete;
1324
NumaReplicated(NumaReplicated&& other) noexcept :
1325
NumaReplicatedBase(std::move(other)),
1326
instances(std::exchange(other.instances, {})) {}
1327
1328
NumaReplicated& operator=(const NumaReplicated&) = delete;
1329
NumaReplicated& operator=(NumaReplicated&& other) noexcept {
1330
NumaReplicatedBase::operator=(*this, std::move(other));
1331
instances = std::exchange(other.instances, {});
1332
1333
return *this;
1334
}
1335
1336
NumaReplicated& operator=(T&& source) {
1337
replicate_from(std::move(source));
1338
1339
return *this;
1340
}
1341
1342
~NumaReplicated() override = default;
1343
1344
const T& operator[](NumaReplicatedAccessToken token) const {
1345
assert(token.get_numa_index() < instances.size());
1346
return *(instances[token.get_numa_index()]);
1347
}
1348
1349
const T& operator*() const { return *(instances[0]); }
1350
1351
const T* operator->() const { return instances[0].get(); }
1352
1353
template<typename FuncT>
1354
void modify_and_replicate(FuncT&& f) {
1355
auto source = std::move(instances[0]);
1356
std::forward<FuncT>(f)(*source);
1357
replicate_from(std::move(*source));
1358
}
1359
1360
void on_numa_config_changed() override {
1361
// Use the first one as the source. It doesn't matter which one we use,
1362
// because they all must be identical, but the first one is guaranteed to exist.
1363
auto source = std::move(instances[0]);
1364
replicate_from(std::move(*source));
1365
}
1366
1367
private:
1368
std::vector<std::unique_ptr<T>> instances;
1369
1370
void replicate_from(T&& source) {
1371
instances.clear();
1372
1373
const NumaConfig& cfg = get_numa_config();
1374
if (cfg.requires_memory_replication())
1375
{
1376
for (NumaIndex n = 0; n < cfg.num_numa_nodes(); ++n)
1377
{
1378
cfg.execute_on_numa_node(
1379
n, [this, &source]() { instances.emplace_back(std::make_unique<T>(source)); });
1380
}
1381
}
1382
else
1383
{
1384
assert(cfg.num_numa_nodes() == 1);
1385
// We take advantage of the fact that replication is not required
1386
// and reuse the source value, avoiding one copy operation.
1387
instances.emplace_back(std::make_unique<T>(std::move(source)));
1388
}
1389
}
1390
};
1391
1392
// We force boxing with a unique_ptr. If this becomes an issue due to added
1393
// indirection we may need to add an option for a custom boxing type.
1394
template<typename T>
1395
class LazyNumaReplicated: public NumaReplicatedBase {
1396
public:
1397
using ReplicatorFuncType = std::function<T(const T&)>;
1398
1399
LazyNumaReplicated(NumaReplicationContext& ctx) :
1400
NumaReplicatedBase(ctx) {
1401
prepare_replicate_from(T{});
1402
}
1403
1404
LazyNumaReplicated(NumaReplicationContext& ctx, T&& source) :
1405
NumaReplicatedBase(ctx) {
1406
prepare_replicate_from(std::move(source));
1407
}
1408
1409
LazyNumaReplicated(const LazyNumaReplicated&) = delete;
1410
LazyNumaReplicated(LazyNumaReplicated&& other) noexcept :
1411
NumaReplicatedBase(std::move(other)),
1412
instances(std::exchange(other.instances, {})) {}
1413
1414
LazyNumaReplicated& operator=(const LazyNumaReplicated&) = delete;
1415
LazyNumaReplicated& operator=(LazyNumaReplicated&& other) noexcept {
1416
NumaReplicatedBase::operator=(*this, std::move(other));
1417
instances = std::exchange(other.instances, {});
1418
1419
return *this;
1420
}
1421
1422
LazyNumaReplicated& operator=(T&& source) {
1423
prepare_replicate_from(std::move(source));
1424
1425
return *this;
1426
}
1427
1428
~LazyNumaReplicated() override = default;
1429
1430
const T& operator[](NumaReplicatedAccessToken token) const {
1431
assert(token.get_numa_index() < instances.size());
1432
ensure_present(token.get_numa_index());
1433
return *(instances[token.get_numa_index()]);
1434
}
1435
1436
const T& operator*() const { return *(instances[0]); }
1437
1438
const T* operator->() const { return instances[0].get(); }
1439
1440
template<typename FuncT>
1441
void modify_and_replicate(FuncT&& f) {
1442
auto source = std::move(instances[0]);
1443
std::forward<FuncT>(f)(*source);
1444
prepare_replicate_from(std::move(*source));
1445
}
1446
1447
void on_numa_config_changed() override {
1448
// Use the first one as the source. It doesn't matter which one we use,
1449
// because they all must be identical, but the first one is guaranteed to exist.
1450
auto source = std::move(instances[0]);
1451
prepare_replicate_from(std::move(*source));
1452
}
1453
1454
private:
1455
mutable std::vector<std::unique_ptr<T>> instances;
1456
mutable std::mutex mutex;
1457
1458
void ensure_present(NumaIndex idx) const {
1459
assert(idx < instances.size());
1460
1461
if (instances[idx] != nullptr)
1462
return;
1463
1464
assert(idx != 0);
1465
1466
std::unique_lock<std::mutex> lock(mutex);
1467
// Check again for races.
1468
if (instances[idx] != nullptr)
1469
return;
1470
1471
const NumaConfig& cfg = get_numa_config();
1472
cfg.execute_on_numa_node(
1473
idx, [this, idx]() { instances[idx] = std::make_unique<T>(*instances[0]); });
1474
}
1475
1476
void prepare_replicate_from(T&& source) {
1477
instances.clear();
1478
1479
const NumaConfig& cfg = get_numa_config();
1480
if (cfg.requires_memory_replication())
1481
{
1482
assert(cfg.num_numa_nodes() > 0);
1483
1484
// We just need to make sure the first instance is there.
1485
// Note that we cannot move here as we need to reallocate the data
1486
// on the correct NUMA node.
1487
cfg.execute_on_numa_node(
1488
0, [this, &source]() { instances.emplace_back(std::make_unique<T>(source)); });
1489
1490
// Prepare others for lazy init.
1491
instances.resize(cfg.num_numa_nodes());
1492
}
1493
else
1494
{
1495
assert(cfg.num_numa_nodes() == 1);
1496
// We take advantage of the fact that replication is not required
1497
// and reuse the source value, avoiding one copy operation.
1498
instances.emplace_back(std::make_unique<T>(std::move(source)));
1499
}
1500
}
1501
};
1502
1503
// Utilizes shared memory.
1504
template<typename T>
1505
class LazyNumaReplicatedSystemWide: public NumaReplicatedBase {
1506
public:
1507
using ReplicatorFuncType = std::function<T(const T&)>;
1508
1509
LazyNumaReplicatedSystemWide(NumaReplicationContext& ctx) :
1510
NumaReplicatedBase(ctx) {
1511
prepare_replicate_from(std::make_unique<T>());
1512
}
1513
1514
LazyNumaReplicatedSystemWide(NumaReplicationContext& ctx, std::unique_ptr<T>&& source) :
1515
NumaReplicatedBase(ctx) {
1516
prepare_replicate_from(std::move(source));
1517
}
1518
1519
LazyNumaReplicatedSystemWide(const LazyNumaReplicatedSystemWide&) = delete;
1520
LazyNumaReplicatedSystemWide(LazyNumaReplicatedSystemWide&& other) noexcept :
1521
NumaReplicatedBase(std::move(other)),
1522
instances(std::exchange(other.instances, {})) {}
1523
1524
LazyNumaReplicatedSystemWide& operator=(const LazyNumaReplicatedSystemWide&) = delete;
1525
LazyNumaReplicatedSystemWide& operator=(LazyNumaReplicatedSystemWide&& other) noexcept {
1526
NumaReplicatedBase::operator=(*this, std::move(other));
1527
instances = std::exchange(other.instances, {});
1528
1529
return *this;
1530
}
1531
1532
LazyNumaReplicatedSystemWide& operator=(std::unique_ptr<T>&& source) {
1533
prepare_replicate_from(std::move(source));
1534
1535
return *this;
1536
}
1537
1538
~LazyNumaReplicatedSystemWide() override = default;
1539
1540
const T& operator[](NumaReplicatedAccessToken token) const {
1541
assert(token.get_numa_index() < instances.size());
1542
ensure_present(token.get_numa_index());
1543
return *(instances[token.get_numa_index()]);
1544
}
1545
1546
const T& operator*() const { return *(instances[0]); }
1547
1548
const T* operator->() const { return &*instances[0]; }
1549
1550
std::vector<std::pair<SystemWideSharedConstantAllocationStatus, std::optional<std::string>>>
1551
get_status_and_errors() const {
1552
std::vector<std::pair<SystemWideSharedConstantAllocationStatus, std::optional<std::string>>>
1553
status;
1554
status.reserve(instances.size());
1555
1556
for (const auto& instance : instances)
1557
{
1558
status.emplace_back(instance.get_status(), instance.get_error_message());
1559
}
1560
1561
return status;
1562
}
1563
1564
template<typename FuncT>
1565
void modify_and_replicate(FuncT&& f) {
1566
auto source = std::make_unique<T>(*instances[0]);
1567
std::forward<FuncT>(f)(*source);
1568
prepare_replicate_from(std::move(source));
1569
}
1570
1571
void on_numa_config_changed() override {
1572
// Use the first one as the source. It doesn't matter which one we use,
1573
// because they all must be identical, but the first one is guaranteed to exist.
1574
auto source = std::make_unique<T>(*instances[0]);
1575
prepare_replicate_from(std::move(source));
1576
}
1577
1578
private:
1579
mutable std::vector<SystemWideSharedConstant<T>> instances;
1580
mutable std::mutex mutex;
1581
1582
std::size_t get_discriminator(NumaIndex idx) const {
1583
const NumaConfig& cfg = get_numa_config();
1584
const NumaConfig& cfg_sys = NumaConfig::from_system(SystemNumaPolicy{}, false);
1585
// as a discriminator, locate the hardware/system numadomain this cpuindex belongs to
1586
CpuIndex cpu = *cfg.nodes[idx].begin(); // get a CpuIndex from NumaIndex
1587
NumaIndex sys_idx = cfg_sys.is_cpu_assigned(cpu) ? cfg_sys.nodeByCpu.at(cpu) : 0;
1588
std::string s = cfg_sys.to_string() + "$" + std::to_string(sys_idx);
1589
return static_cast<std::size_t>(hash_string(s));
1590
}
1591
1592
void ensure_present(NumaIndex idx) const {
1593
assert(idx < instances.size());
1594
1595
if (instances[idx] != nullptr)
1596
return;
1597
1598
assert(idx != 0);
1599
1600
std::unique_lock<std::mutex> lock(mutex);
1601
// Check again for races.
1602
if (instances[idx] != nullptr)
1603
return;
1604
1605
const NumaConfig& cfg = get_numa_config();
1606
cfg.execute_on_numa_node(idx, [this, idx]() {
1607
instances[idx] = SystemWideSharedConstant<T>(*instances[0], get_discriminator(idx));
1608
});
1609
}
1610
1611
void prepare_replicate_from(std::unique_ptr<T>&& source) {
1612
instances.clear();
1613
1614
const NumaConfig& cfg = get_numa_config();
1615
// We just need to make sure the first instance is there.
1616
// Note that we cannot move here as we need to reallocate the data
1617
// on the correct NUMA node.
1618
// Even in the case of a single NUMA node we have to copy since it's shared memory.
1619
if (cfg.requires_memory_replication())
1620
{
1621
assert(cfg.num_numa_nodes() > 0);
1622
1623
cfg.execute_on_numa_node(0, [this, &source]() {
1624
instances.emplace_back(SystemWideSharedConstant<T>(*source, get_discriminator(0)));
1625
});
1626
1627
// Prepare others for lazy init.
1628
instances.resize(cfg.num_numa_nodes());
1629
}
1630
else
1631
{
1632
assert(cfg.num_numa_nodes() == 1);
1633
instances.emplace_back(SystemWideSharedConstant<T>(*source, get_discriminator(0)));
1634
}
1635
}
1636
};
1637
1638
class NumaReplicationContext {
1639
public:
1640
NumaReplicationContext(NumaConfig&& cfg) :
1641
config(std::move(cfg)) {}
1642
1643
NumaReplicationContext(const NumaReplicationContext&) = delete;
1644
NumaReplicationContext(NumaReplicationContext&&) = delete;
1645
1646
NumaReplicationContext& operator=(const NumaReplicationContext&) = delete;
1647
NumaReplicationContext& operator=(NumaReplicationContext&&) = delete;
1648
1649
~NumaReplicationContext() {
1650
// The context must outlive replicated objects
1651
if (!trackedReplicatedObjects.empty())
1652
std::exit(EXIT_FAILURE);
1653
}
1654
1655
void attach(NumaReplicatedBase* obj) {
1656
assert(trackedReplicatedObjects.count(obj) == 0);
1657
trackedReplicatedObjects.insert(obj);
1658
}
1659
1660
void detach(NumaReplicatedBase* obj) {
1661
assert(trackedReplicatedObjects.count(obj) == 1);
1662
trackedReplicatedObjects.erase(obj);
1663
}
1664
1665
// oldObj may be invalid at this point
1666
void move_attached([[maybe_unused]] NumaReplicatedBase* oldObj, NumaReplicatedBase* newObj) {
1667
assert(trackedReplicatedObjects.count(oldObj) == 1);
1668
assert(trackedReplicatedObjects.count(newObj) == 0);
1669
trackedReplicatedObjects.erase(oldObj);
1670
trackedReplicatedObjects.insert(newObj);
1671
}
1672
1673
void set_numa_config(NumaConfig&& cfg) {
1674
config = std::move(cfg);
1675
for (auto&& obj : trackedReplicatedObjects)
1676
obj->on_numa_config_changed();
1677
}
1678
1679
const NumaConfig& get_numa_config() const { return config; }
1680
1681
private:
1682
NumaConfig config;
1683
1684
// std::set uses std::less by default, which is required for pointer comparison
1685
std::set<NumaReplicatedBase*> trackedReplicatedObjects;
1686
};
1687
1688
inline NumaReplicatedBase::NumaReplicatedBase(NumaReplicationContext& ctx) :
1689
context(&ctx) {
1690
context->attach(this);
1691
}
1692
1693
inline NumaReplicatedBase::NumaReplicatedBase(NumaReplicatedBase&& other) noexcept :
1694
context(std::exchange(other.context, nullptr)) {
1695
context->move_attached(&other, this);
1696
}
1697
1698
inline NumaReplicatedBase& NumaReplicatedBase::operator=(NumaReplicatedBase&& other) noexcept {
1699
context = std::exchange(other.context, nullptr);
1700
1701
context->move_attached(&other, this);
1702
1703
return *this;
1704
}
1705
1706
inline NumaReplicatedBase::~NumaReplicatedBase() {
1707
if (context != nullptr)
1708
context->detach(this);
1709
}
1710
1711
inline const NumaConfig& NumaReplicatedBase::get_numa_config() const {
1712
return context->get_numa_config();
1713
}
1714
1715
} // namespace Stockfish
1716
1717
1718
#endif // #ifndef NUMA_H_INCLUDED
1719
1720