In [1]:
.rawInput


Using raw input
Out[1]:


In [2]:
#ifndef PERIODIC_THREAD_H
#define PERIODIC_THREAD_H
#include <functional>
#include <memory>
#include <condition_variable>
#include <chrono>
#include <mutex>
#include <thread>
 
namespace ExNs
{
    class periodic_thread
    {
    public:
        periodic_thread();
 
        template<typename Callable, typename... Args>
        explicit periodic_thread(const std::chrono::milliseconds& periode,
                                 Callable&& fct, Args&&... args)
        : periode_(periode),
          is_running_(false),
          is_notified_(false)
        {
            fct_ = std::bind(std::forward<Callable>(fct),
                             std::forward<Args>(args)...);
        }
 
        ~periodic_thread();
 
        periodic_thread(const periodic_thread&) = delete;
        periodic_thread& operator=(const periodic_thread&) = delete;
 
        periodic_thread(periodic_thread&&);
        periodic_thread& operator=(periodic_thread&&);
 
        void start();
 
    private:
        void swap(periodic_thread&&);
        void stop();
        void do_work();
 
        std::function<bool()> fct_;
        std::chrono::milliseconds periode_;
        std::chrono::high_resolution_clock::time_point t_;
        bool is_running_;
        bool is_notified_;
        std::condition_variable thread_cond_;
        std::mutex thread_mutex_;
        std::thread worker_thread_;
    };
}
 
#endif // PERIODIC_THREAD_H


Out[2]:


In [3]:
namespace ExNs
{
    periodic_thread::periodic_thread()
        : is_running_(false),
          is_notified_(false)
    {}
 
    periodic_thread::~periodic_thread()
    {
        stop();
    }
 
    periodic_thread::periodic_thread(periodic_thread&& pt)
    {
        swap(std::move(pt));
    }
 
    periodic_thread& periodic_thread::operator=(periodic_thread&& pt)
    {
        if (this != &pt)
        {
            stop();
            swap(std::move(pt));
        }
        return *this;
    }
 
    void periodic_thread::start()
    {
        std::lock_guard<std::mutex> lock(thread_mutex_);
        t_ = std::chrono::high_resolution_clock::now();
        worker_thread_ = std::thread(&periodic_thread::do_work, this);
        is_running_ = true;
    }
 
    void periodic_thread::stop()
    {
        {
            std::lock_guard<std::mutex> lock(thread_mutex_);
            is_running_ = false;
            is_notified_ = true;
            thread_cond_.notify_one();
        }
        if (worker_thread_.joinable())
        {
            worker_thread_.join();
        }
    }
 
    void periodic_thread::swap(periodic_thread&& pt)
    {
        std::unique_lock<std::mutex> lock_a(thread_mutex_, std::defer_lock);
        std::unique_lock<std::mutex> lock_b(pt.thread_mutex_, std::defer_lock);
        std::lock(lock_a, lock_b);
 
        std::swap(fct_, pt.fct_);
        std::swap(periode_, pt.periode_);
        std::swap(is_notified_, pt.is_notified_);
    }
 
    void periodic_thread::do_work()
    {
        bool stop = false;
        std::unique_lock<std::mutex> lock(thread_mutex_);
        while (is_running_ && !stop)
        {
            // temps suivant
            t_ = t_ + periode_;
 
            // boucle pour empêcher les rêveils intempestifs
            std::cv_status timeOut = std::cv_status::no_timeout;
            while ((!is_notified_) && (timeOut == std::cv_status::no_timeout))
            {
                // Calcul du temps
                std::chrono::high_resolution_clock::time_point t2 =
                    std::chrono::high_resolution_clock::now();
                std::chrono::milliseconds next =
                    std::chrono::duration_cast
                        <std::chrono::milliseconds>(t_ - t2);
                if (next < std::chrono::milliseconds::zero())
                {
                    // Retard de plus d'un cycle
                    next = periode_;
                    t_ = t2 + next;
                }
                else if (next > periode_)
                {
                    // En avance
                    next = periode_;
                    t_ = t2 + next;
                }
                timeOut = thread_cond_.wait_for(lock, next);
            }
            is_notified_ = false;
 
            if (is_running_ && fct_)
            {
                // Tick
                lock.unlock();
                stop = fct_();
                lock.lock();
            }
            else
            {
                stop = true;
            }
        }
    }
}


Out[3]:


In [4]:
#include <iostream>


Out[4]:


In [5]:
.rawInput


Not using raw input
Out[5]:


In [6]:
{
    // Déclaration en tant que membre de classe
    ExNs::periodic_thread thread_;

    // ...
    
    thread_ = ExNs::periodic_thread(std::chrono::milliseconds(1000), []() {
        std::cout << "tick" << std::endl;
        return false;
    });

    thread_.start();
}


Out[6]:

Threads

A partir de C++11, la stl s'est enrichie de nombreuses classes pour la gestion des threads.

Utiliser std::async pour un traitement asynchrone

La fonction std::async simplifie la gestion des traitements asynchrones en encapsulant :

  • la gestion du thread (événement de synchronisation et répartition de charge)
  • la communication inter-thread à l'aide des std::future.

std::async bloque (join) si la fonction asynchrone n'est pas terminée :

  • lorsque le programme est en attente du retour de la fonction asynchrone
  • lorsque le programme se termine Elle évite ainsi la mise en place d'événement de synchronisation pour l'ordonnancement des tâches.

Le variable de retour std::future permet de récupérer :

  • le retour de la fonction asynchrone
  • l'exception lancée dans la fonction asynchrone Elle rend abstrait le partage de variable et de sa gestion.

L'utilisation de std::async peut bénéficier des avantages liées à la technologie et aux algorithmes de répartition de charge (pools de threads,vol de travail, ...) grâce à l'implémenteur de la bibliothèque standard. std::async prends comme paramètre par défaut la stratégie std::launch::async | std::launch::deferred. Selon l'algorithme de répartition de charge implémenté, le traitement sera soit rééllement asynchrone ou seulement retardée jusqu'au moment où le résultat est attendu. L'appelant peut forcer l'une des stratégies.


In [7]:
.rawInput


Using raw input
Out[7]:


In [8]:
#include <future>


Out[8]:


In [9]:
.rawInput


Not using raw input
Out[9]:


In [10]:
// Get Start Time
std::chrono::system_clock::time_point start = std::chrono::system_clock::now();

auto lambda = [](auto&& data)
{
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return std::string("readDB_") + data;
};

std::future<std::string> resultFromDB = std::async(lambda, "Parameter");
 
int i = 5;
while (i > 0)
{
    std::cout << i << "s ... ";
    std::this_thread::sleep_for(std::chrono::seconds(1));
    --i;
}
 
//Fetch Data from DB
// Will block till data is available in future<std::string> object.
std::string dbData = resultFromDB.get();
 
// Get End Time
auto end = std::chrono::system_clock::now();
 
auto diff = std::chrono::duration_cast < std::chrono::seconds > (end - start).count();
std::cout << "Total Time Taken = " << diff << " Seconds" << std::endl;

//Printing Data
std::cout << "Data = " << dbData << std::endl;


IncrementalExecutor::executeFunction: symbol '__emutls_v._ZSt11__once_call' unresolved while linking function '_GLOBAL__sub_I_cling_module_6'!
IncrementalExecutor::executeFunction: symbol '__emutls_v._ZSt15__once_callable' unresolved while linking function '_GLOBAL__sub_I_cling_module_6'!
Total Time Taken = 0 Seconds
Data = 
Out[10]:
(std::basic_ostream<char, std::char_traits<char> >::__ostream_type &) @0x7f562c1a63a0

In [ ]: