Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/src/utils/threadpool/TaskQueue.h
169678 views
1
/****************************************************************************/
2
// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
// Copyright (C) 2020-2025 German Aerospace Center (DLR) and others.
4
// This program and the accompanying materials are made available under the
5
// terms of the Eclipse Public License 2.0 which is available at
6
// https://www.eclipse.org/legal/epl-2.0/
7
// This Source Code may also be made available under the following Secondary
8
// Licenses when the conditions for such availability set forth in the Eclipse
9
// Public License 2.0 are satisfied: GNU General Public License, version 2
10
// or later which is available at
11
// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
/****************************************************************************/
14
/// @file TaskQueue.h
15
/// @author Michael Behrisch
16
/// @date 2020-09-09
17
///
18
// Threadpool implementation,
19
// based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20
/****************************************************************************/
21
#pragma once
22
#include <config.h>
23
24
#include <condition_variable>
25
#include <functional>
26
#include <queue>
27
#ifdef _MSC_VER
28
#pragma warning(push)
29
#pragma warning(disable: 4355 5204 5220) // mask warnings in MSVCs ppl-stdlib
30
#endif
31
#include <future>
32
#ifdef _MSC_VER
33
#pragma warning(pop)
34
#endif
35
36
37
template <typename C>
38
class TaskBase {
39
public:
40
virtual ~TaskBase() = default;
41
virtual void exec(const C& context) = 0;
42
};
43
44
template <typename T, typename C>
45
class Task : public TaskBase<C> {
46
public:
47
Task(T&& t) : task(std::move(t)) {}
48
void exec(const C& context) override {
49
task(context);
50
}
51
52
T task;
53
};
54
55
template <typename C>
56
class TaskQueue {
57
using LockType = std::unique_lock<std::mutex>;
58
59
public:
60
using TaskPtrType = std::unique_ptr<TaskBase<C> >;
61
TaskQueue() = default;
62
~TaskQueue() = default;
63
64
void setEnabled(bool enabled) {
65
{
66
LockType lock{ myMutex };
67
myEnabled = enabled;
68
}
69
if (!enabled) {
70
myReady.notify_all();
71
}
72
}
73
74
bool isEnabled() const {
75
LockType lock{ myMutex };
76
return myEnabled;
77
}
78
79
bool waitAndPop(TaskPtrType& task) {
80
LockType lock{ myMutex };
81
myReady.wait(lock, [this] { return !myEnabled || !myQueue.empty(); });
82
if (myEnabled && !myQueue.empty()) {
83
task = std::move(myQueue.front());
84
myQueue.pop();
85
return true;
86
}
87
return false;
88
}
89
90
template <typename TaskT>
91
auto push(TaskT&& task) -> std::future<decltype(task(std::declval<C>()))> {
92
using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
93
auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
94
auto future = job->task.get_future();
95
{
96
LockType lock{ myMutex };
97
myQueue.emplace(std::move(job));
98
}
99
100
myReady.notify_one();
101
return future;
102
}
103
104
bool tryPop(TaskPtrType& task) {
105
LockType lock{ myMutex, std::try_to_lock };
106
if (!lock || !myEnabled || myQueue.empty()) {
107
return false;
108
}
109
task = std::move(myQueue.front());
110
myQueue.pop();
111
return true;
112
}
113
114
template <typename TaskT>
115
auto tryPush(TaskT&& task, bool& success) -> std::future<decltype(task(std::declval<C>()))> {
116
std::future<decltype(task(std::declval<C>()))> future;
117
success = false;
118
{
119
LockType lock{ myMutex, std::try_to_lock };
120
if (!lock) {
121
return future;
122
}
123
using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
124
auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
125
future = job->task.get_future();
126
success = true;
127
myQueue.emplace(std::move(job));
128
}
129
130
myReady.notify_one();
131
return future;
132
}
133
134
private:
135
TaskQueue(const TaskQueue&) = delete;
136
TaskQueue& operator=(const TaskQueue&) = delete;
137
138
std::queue<TaskPtrType> myQueue;
139
bool myEnabled = true;
140
mutable std::mutex myMutex;
141
std::condition_variable myReady;
142
};
143
144