Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/cpp/linux/scheduler/Scheduler.h
644 views
1
#pragma once
2
3
#include <iomanip>
4
#include <map>
5
6
#include "ctpl_stl.h"
7
8
#include "InterruptableSleep.h"
9
#include "Cron.h"
10
11
namespace Bosma {
12
using Clock = std::chrono::system_clock;
13
14
class Task {
15
public:
16
explicit Task(std::function<void()> &&f, bool recur = false, bool interval = false) :
17
f(std::move(f)), recur(recur), interval(interval) {}
18
19
virtual Clock::time_point get_new_time() const = 0;
20
21
std::function<void()> f;
22
23
bool recur;
24
bool interval;
25
};
26
27
class InTask : public Task {
28
public:
29
explicit InTask(std::function<void()> &&f) : Task(std::move(f)) {}
30
31
// dummy time_point because it's not used
32
Clock::time_point get_new_time() const override { return Clock::time_point(Clock::duration(0)); }
33
};
34
35
class EveryTask : public Task {
36
public:
37
EveryTask(Clock::duration time, std::function<void()> &&f, bool interval = false) :
38
Task(std::move(f), true, interval), time(time) {}
39
40
Clock::time_point get_new_time() const override {
41
return Clock::now() + time;
42
};
43
Clock::duration time;
44
};
45
46
class CronTask : public Task {
47
public:
48
CronTask(const std::string &expression, std::function<void()> &&f) : Task(std::move(f), true),
49
cron(expression) {}
50
51
Clock::time_point get_new_time() const override {
52
return cron.cron_to_next();
53
};
54
Cron cron;
55
};
56
57
inline bool try_parse(std::tm &tm, const std::string &expression, const std::string &format) {
58
std::stringstream ss(expression);
59
return !(ss >> std::get_time(&tm, format.c_str())).fail();
60
}
61
62
class Scheduler {
63
public:
64
explicit Scheduler(unsigned int max_n_tasks = 4) : done(false), threads(max_n_tasks + 1) {
65
threads.push([this](int) {
66
while (!done) {
67
if (tasks.empty()) {
68
sleeper.sleep();
69
} else {
70
auto time_of_first_task = (*tasks.begin()).first;
71
sleeper.sleep_until(time_of_first_task);
72
}
73
manage_tasks();
74
}
75
});
76
}
77
78
Scheduler(const Scheduler &) = delete;
79
80
Scheduler(Scheduler &&) noexcept = delete;
81
82
Scheduler &operator=(const Scheduler &) = delete;
83
84
Scheduler &operator=(Scheduler &&) noexcept = delete;
85
86
~Scheduler() {
87
done = true;
88
sleeper.interrupt();
89
}
90
91
template<typename _Callable, typename... _Args>
92
void in(const Clock::time_point time, _Callable &&f, _Args &&... args) {
93
std::shared_ptr<Task> t = std::make_shared<InTask>(
94
std::bind(std::forward<_Callable>(f), std::forward<_Args>(args)...));
95
add_task(time, std::move(t));
96
}
97
98
template<typename _Callable, typename... _Args>
99
void in(const Clock::duration time, _Callable &&f, _Args &&... args) {
100
in(Clock::now() + time, std::forward<_Callable>(f), std::forward<_Args>(args)...);
101
}
102
103
template<typename _Callable, typename... _Args>
104
void at(const std::string &time, _Callable &&f, _Args &&... args) {
105
// get current time as a tm object
106
auto time_now = Clock::to_time_t(Clock::now());
107
std::tm tm = *std::localtime(&time_now);
108
109
// our final time as a time_point
110
Clock::time_point tp;
111
112
if (try_parse(tm, time, "%H:%M:%S")) {
113
// convert tm back to time_t, then to a time_point and assign to final
114
tp = Clock::from_time_t(std::mktime(&tm));
115
116
// if we've already passed this time, the user will mean next day, so add a day.
117
if (Clock::now() >= tp)
118
tp += std::chrono::hours(24);
119
} else if (try_parse(tm, time, "%Y-%m-%d %H:%M:%S")) {
120
tp = Clock::from_time_t(std::mktime(&tm));
121
} else if (try_parse(tm, time, "%Y/%m/%d %H:%M:%S")) {
122
tp = Clock::from_time_t(std::mktime(&tm));
123
} else {
124
// could not parse time
125
throw std::runtime_error("Cannot parse time string: " + time);
126
}
127
128
in(tp, std::forward<_Callable>(f), std::forward<_Args>(args)...);
129
}
130
131
template<typename _Callable, typename... _Args>
132
void every(const Clock::duration time, _Callable &&f, _Args &&... args) {
133
std::shared_ptr<Task> t = std::make_shared<EveryTask>(time, std::bind(std::forward<_Callable>(f),
134
std::forward<_Args>(args)...));
135
auto next_time = t->get_new_time();
136
add_task(next_time, std::move(t));
137
}
138
139
// expression format:
140
// from https://en.wikipedia.org/wiki/Cron#Overview
141
// ┌───────────── minute (0 - 59)
142
// │ ┌───────────── hour (0 - 23)
143
// │ │ ┌───────────── day of month (1 - 31)
144
// │ │ │ ┌───────────── month (1 - 12)
145
// │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
146
// │ │ │ │ │
147
// │ │ │ │ │
148
// * * * * *
149
template<typename _Callable, typename... _Args>
150
void cron(const std::string &expression, _Callable &&f, _Args &&... args) {
151
std::shared_ptr<Task> t = std::make_shared<CronTask>(expression, std::bind(std::forward<_Callable>(f),
152
std::forward<_Args>(args)...));
153
auto next_time = t->get_new_time();
154
add_task(next_time, std::move(t));
155
}
156
157
template<typename _Callable, typename... _Args>
158
void interval(const Clock::duration time, _Callable &&f, _Args &&... args) {
159
std::shared_ptr<Task> t = std::make_shared<EveryTask>(time, std::bind(std::forward<_Callable>(f),
160
std::forward<_Args>(args)...), true);
161
add_task(Clock::now(), std::move(t));
162
}
163
164
private:
165
std::atomic<bool> done;
166
167
Bosma::InterruptableSleep sleeper;
168
169
std::multimap<Clock::time_point, std::shared_ptr<Task>> tasks;
170
std::mutex lock;
171
ctpl::thread_pool threads;
172
173
void add_task(const Clock::time_point time, std::shared_ptr<Task> t) {
174
std::lock_guard<std::mutex> l(lock);
175
tasks.emplace(time, std::move(t));
176
sleeper.interrupt();
177
}
178
179
void manage_tasks() {
180
std::lock_guard<std::mutex> l(lock);
181
182
auto end_of_tasks_to_run = tasks.upper_bound(Clock::now());
183
184
// if there are any tasks to be run and removed
185
if (end_of_tasks_to_run != tasks.begin()) {
186
// keep track of tasks that will be re-added
187
decltype(tasks) recurred_tasks;
188
189
// for all tasks that have been triggered
190
for (auto i = tasks.begin(); i != end_of_tasks_to_run; ++i) {
191
192
auto &task = (*i).second;
193
194
if (task->interval) {
195
// if it's an interval task, only add the task back after f() is completed
196
threads.push([this, task](int) {
197
task->f();
198
// no risk of race-condition,
199
// add_task() will wait for manage_tasks() to release lock
200
add_task(task->get_new_time(), task);
201
});
202
} else {
203
threads.push([task](int) {
204
task->f();
205
});
206
// calculate time of next run and add the new task to the tasks to be recurred
207
if (task->recur)
208
recurred_tasks.emplace(task->get_new_time(), std::move(task));
209
}
210
}
211
212
// remove the completed tasks
213
tasks.erase(tasks.begin(), end_of_tasks_to_run);
214
215
// re-add the tasks that are recurring
216
for (auto &task : recurred_tasks)
217
tasks.emplace(task.first, std::move(task.second));
218
}
219
}
220
};
221
}
222