Path: blob/master/cpp/linux/scheduler/Scheduler.h
644 views
#pragma once12#include <iomanip>3#include <map>45#include "ctpl_stl.h"67#include "InterruptableSleep.h"8#include "Cron.h"910namespace Bosma {11using Clock = std::chrono::system_clock;1213class Task {14public:15explicit Task(std::function<void()> &&f, bool recur = false, bool interval = false) :16f(std::move(f)), recur(recur), interval(interval) {}1718virtual Clock::time_point get_new_time() const = 0;1920std::function<void()> f;2122bool recur;23bool interval;24};2526class InTask : public Task {27public:28explicit InTask(std::function<void()> &&f) : Task(std::move(f)) {}2930// dummy time_point because it's not used31Clock::time_point get_new_time() const override { return Clock::time_point(Clock::duration(0)); }32};3334class EveryTask : public Task {35public:36EveryTask(Clock::duration time, std::function<void()> &&f, bool interval = false) :37Task(std::move(f), true, interval), time(time) {}3839Clock::time_point get_new_time() const override {40return Clock::now() + time;41};42Clock::duration time;43};4445class CronTask : public Task {46public:47CronTask(const std::string &expression, std::function<void()> &&f) : Task(std::move(f), true),48cron(expression) {}4950Clock::time_point get_new_time() const override {51return cron.cron_to_next();52};53Cron cron;54};5556inline bool try_parse(std::tm &tm, const std::string &expression, const std::string &format) {57std::stringstream ss(expression);58return !(ss >> std::get_time(&tm, format.c_str())).fail();59}6061class Scheduler {62public:63explicit Scheduler(unsigned int max_n_tasks = 4) : done(false), threads(max_n_tasks + 1) {64threads.push([this](int) {65while (!done) {66if (tasks.empty()) {67sleeper.sleep();68} else {69auto time_of_first_task = (*tasks.begin()).first;70sleeper.sleep_until(time_of_first_task);71}72manage_tasks();73}74});75}7677Scheduler(const Scheduler &) = delete;7879Scheduler(Scheduler &&) noexcept = delete;8081Scheduler &operator=(const Scheduler &) = delete;8283Scheduler &operator=(Scheduler &&) noexcept = delete;8485~Scheduler() {86done = true;87sleeper.interrupt();88}8990template<typename _Callable, typename... _Args>91void in(const Clock::time_point time, _Callable &&f, _Args &&... args) {92std::shared_ptr<Task> t = std::make_shared<InTask>(93std::bind(std::forward<_Callable>(f), std::forward<_Args>(args)...));94add_task(time, std::move(t));95}9697template<typename _Callable, typename... _Args>98void in(const Clock::duration time, _Callable &&f, _Args &&... args) {99in(Clock::now() + time, std::forward<_Callable>(f), std::forward<_Args>(args)...);100}101102template<typename _Callable, typename... _Args>103void at(const std::string &time, _Callable &&f, _Args &&... args) {104// get current time as a tm object105auto time_now = Clock::to_time_t(Clock::now());106std::tm tm = *std::localtime(&time_now);107108// our final time as a time_point109Clock::time_point tp;110111if (try_parse(tm, time, "%H:%M:%S")) {112// convert tm back to time_t, then to a time_point and assign to final113tp = Clock::from_time_t(std::mktime(&tm));114115// if we've already passed this time, the user will mean next day, so add a day.116if (Clock::now() >= tp)117tp += std::chrono::hours(24);118} else if (try_parse(tm, time, "%Y-%m-%d %H:%M:%S")) {119tp = Clock::from_time_t(std::mktime(&tm));120} else if (try_parse(tm, time, "%Y/%m/%d %H:%M:%S")) {121tp = Clock::from_time_t(std::mktime(&tm));122} else {123// could not parse time124throw std::runtime_error("Cannot parse time string: " + time);125}126127in(tp, std::forward<_Callable>(f), std::forward<_Args>(args)...);128}129130template<typename _Callable, typename... _Args>131void every(const Clock::duration time, _Callable &&f, _Args &&... args) {132std::shared_ptr<Task> t = std::make_shared<EveryTask>(time, std::bind(std::forward<_Callable>(f),133std::forward<_Args>(args)...));134auto next_time = t->get_new_time();135add_task(next_time, std::move(t));136}137138// expression format:139// from https://en.wikipedia.org/wiki/Cron#Overview140// ┌───────────── minute (0 - 59)141// │ ┌───────────── hour (0 - 23)142// │ │ ┌───────────── day of month (1 - 31)143// │ │ │ ┌───────────── month (1 - 12)144// │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)145// │ │ │ │ │146// │ │ │ │ │147// * * * * *148template<typename _Callable, typename... _Args>149void cron(const std::string &expression, _Callable &&f, _Args &&... args) {150std::shared_ptr<Task> t = std::make_shared<CronTask>(expression, std::bind(std::forward<_Callable>(f),151std::forward<_Args>(args)...));152auto next_time = t->get_new_time();153add_task(next_time, std::move(t));154}155156template<typename _Callable, typename... _Args>157void interval(const Clock::duration time, _Callable &&f, _Args &&... args) {158std::shared_ptr<Task> t = std::make_shared<EveryTask>(time, std::bind(std::forward<_Callable>(f),159std::forward<_Args>(args)...), true);160add_task(Clock::now(), std::move(t));161}162163private:164std::atomic<bool> done;165166Bosma::InterruptableSleep sleeper;167168std::multimap<Clock::time_point, std::shared_ptr<Task>> tasks;169std::mutex lock;170ctpl::thread_pool threads;171172void add_task(const Clock::time_point time, std::shared_ptr<Task> t) {173std::lock_guard<std::mutex> l(lock);174tasks.emplace(time, std::move(t));175sleeper.interrupt();176}177178void manage_tasks() {179std::lock_guard<std::mutex> l(lock);180181auto end_of_tasks_to_run = tasks.upper_bound(Clock::now());182183// if there are any tasks to be run and removed184if (end_of_tasks_to_run != tasks.begin()) {185// keep track of tasks that will be re-added186decltype(tasks) recurred_tasks;187188// for all tasks that have been triggered189for (auto i = tasks.begin(); i != end_of_tasks_to_run; ++i) {190191auto &task = (*i).second;192193if (task->interval) {194// if it's an interval task, only add the task back after f() is completed195threads.push([this, task](int) {196task->f();197// no risk of race-condition,198// add_task() will wait for manage_tasks() to release lock199add_task(task->get_new_time(), task);200});201} else {202threads.push([task](int) {203task->f();204});205// calculate time of next run and add the new task to the tasks to be recurred206if (task->recur)207recurred_tasks.emplace(task->get_new_time(), std::move(task));208}209}210211// remove the completed tasks212tasks.erase(tasks.begin(), end_of_tasks_to_run);213214// re-add the tasks that are recurring215for (auto &task : recurred_tasks)216tasks.emplace(task.first, std::move(task.second));217}218}219};220}221222