Jump to content
nedo

[C++] Coada prioritare generica, pentru utilizare cu threaduri

Recommended Posts

Salut.

Mai jos va prezint o coada, scrisa de mine, generica, ce poate fi folosita cu orice fel de tip de variabila.

La ce e utilizata?

Coada se foloseste intr-un program ce foloseste un model tip producator/consumator(i). In aceasta coada puteti adauga instante ale tipului de variabila ales(printr-un thread producator), iar unul sau mai multe threaduri consumatoare le pot scoate din coada fara a avea probleme cu deadlock, livelock, sau race condition.

In exemplul de mai jos, voi folosi 2 clase, una producator si una consumator, si un numar de 5 threaduri, 1 producator si 4 consumatori.

Modelul de baza al acestei cozi este inspirat de catre coada prioritara gasita in sample-ul wxwidgets.

Aceasta implementare se foloseste doar de facilitati c++, ne avand dependinte, totusi pentru a o putea utiliza aveti nevoie de un compilator modern ce suporta standardul c++11, si la compilare aveti nevoie sa utilizati flag-ul -std=c++11(pentru gcc sau clang).

queue.h


template <class T>
class queue_priority
{
public:
enum class priorities{P_HIGH, P_NORMAL, P_LOW};
queue_priority(unsigned long max_size);
queue_priority();
queue_priority(const queue_priority& other);
queue_priority& operator=(const queue_priority& other);
bool pop(T& t);
bool push(priorities p, T& t);
void suspend();
void resume();


private:
std::multimap<priorities, T> m_map;
std::mutex m_mutex_action;
std::mutex m_mutex_cv_suspend;
std::condition_variable m_cv_suspend;
std::atomic_bool m_bool_suspend;
std::atomic_bool m_bool_full;
std::atomic_bool m_bool_empty;
std::atomic<unsigned int> m_nr_current_items;
size_t MAX;
};

queue.cpp


template <class T>
queue_priority<T>::queue_priority() : MAX(200)
{
m_bool_full = false;
m_bool_suspend = false;
m_bool_empty = true;
m_nr_current_items = 0;
}

template <class T>
queue_priority<T>::queue_priority(size_t max_size) : MAX(max_size)
{
m_bool_full = false;
m_bool_suspend = false;
m_bool_empty = true;
m_nr_current_items = 0;

}
template <class T>
queue_priority<T>::queue_priority(const queue_priority& other) : MAX(other.MAX)
{
m_map = std::move(other.m_map);
m_bool_full = other.m_bool_full.load();
m_bool_empty = other.m_bool_empty.load();
m_bool_suspend = other.m_bool_suspend.load();
m_nr_current_items = other.m_nr_current_items.load();
}

template <class T>
queue_priority<T>& queue_priority<T>::operator=(const queue_priority<T>& other)
{
m_map = other.m_map;
m_bool_full = other.m_bool_full.load();
m_bool_empty = other.m_bool_empty.load();
m_bool_suspend = other.m_bool_suspend.load();
m_nr_current_items = other.m_nr_current_items.load();
return *this;
}

template <class T>
bool queue_priority<T>::pop(T& t)
{
// verificam daca se doreste suspendarea cozi si asteptam pana primim mesajul sa repornim coada
if(m_bool_suspend.load())
{
std::unique_lock<std::mutex> suspendLock(m_mutex_action);
m_cv_suspend.wait(suspendLock, [this] {return m_bool_suspend.load();});
}
// inchidem mutex-ul ce protezeaza datele interne(map-ul cu variabila noastra)>
std::lock_guard<std::mutex> lock(m_mutex_action);
// fiind intr-un pop, inainte sa extragem un element din coada, verificam daca aceasta e goala si returnam daca este
if(m_bool_empty.load())
{
return false;
}
t = m_map.begin()->second;
m_map.erase(m_map.begin());
bool expected = true;
// in cazul in care coada era plina, semnalizam ca aceasta nu mai este
while(!m_bool_full.compare_exchange_weak(expected, false)) {}
// daca a devenit goala, semnalizam acest lucru
if(m_map.empty())
{
expected = false;
while(m_bool_empty.compare_exchange_weak(expected, true)) {}
}
return true;
}

template <class T>
bool queue_priority<T>::push(priorities p, T& t)
{
// verificam daca se doreste suspendarea cozi si asteptam pana primim mesajul sa repornim coada
if(m_bool_suspend.load())
{
std::unique_lock<std::mutex> suspendLock(m_mutex_action);
m_cv_suspend.wait(suspendLock, [this] {return m_bool_suspend.load();});
}
// inchidem mutex-ul ce protezeaza datele interne(map-ul cu variabila noastra)>
std::lock_guard<std::mutex> lock(m_mutex_action);
// fiind un push, verificam daca coada este plina, iar daca este, iesim.
if(m_bool_full.load())
{
return false;
}
// inseram datele
m_map.insert(std::make_pair(p, std::move(t)));
// semnalizam faptul ca am introdus element => coada nu mai este goala
bool expected = true;
while(m_bool_empty.compare_exchange_weak(expected, false)) {}
// daca am umplut coada, semnalizam asta.
if(m_map.size() == MAX)
{
expected = false;
while(m_bool_full.compare_exchange_weak(expected, true)) {}
}
return true;
}


template <class T>
void queue_priority<T>::suspend()
{
m_bool_suspend = true;
}

template <class T>
void queue_priority<T>::resume()
{
m_bool_suspend = false;
m_cv_suspend.notify_all();
}

Cam asta ar fi codul clasei. Mai jos o sa va atasez un exemplu complet. In finalul fisierului queue.cpp veti gasi o instantiere a clasei priority_queue<Job>; Este necesara deoarece atunci cand folosesti templates apare o problema. La compilare fisierele se compileaza separat, iar atunci cand queue.cpp este compilat el nu stie de existenta instantei priority_queue<Job>, iar cand veti incerca sa folositi clasa, veti primi o eroare.

Mai multe detalii despre asta gasiti in acest raspuns.

Mai jos aveti codul unui proiect complet. 3 fisiere, main.cpp, queue.h, queue.cpp

Programelul va citi un fisier ce contine pe cate un rand numere de la 1 la x(generatis i voi fisierul, nu il mai adaug deoarece este mare) - eu am folosit 10000, citeste numarul si il pune in coada. Consumatori il extrag din coada si verifica daca este par sau impar si il introduc in fisierul potrivit.

Tineti cont totusi ca modelul ales de mine(1 producator si 4 consumatori) nu este cel mai eficient. Eficientizarea acestui model se face prin teste si masuratori repetate, acesta e utilizat doar pentru a exemplifica utilizarea acestei cozi.

main.cpp


#include "queue.h"
#include <memory>

using namespace std;

int main()
{
for(int i = 0; i < 1000;i++)
{
cout << "looping for " << i << " time. " << endl;
queue_priority<Job>* cj = new queue_priority<Job>();
Producator prod(cj, std::string("intrare.txt")); // fisierul cu numerele
auto sp_par = std::make_shared<std::ofstream>("iesire_par.txt"); // fisier de salvare pare
auto sp_impar = std::make_shared<std::ofstream>("iesire_impar.txt"); // fisier de salvare impare
Consummer con1(cj, sp_par, sp_impar);
Consummer con2(cj, sp_par, sp_impar);
Consummer con3(cj, sp_par, sp_impar);
Consummer con4(cj, sp_par, sp_impar);


std::thread trd_con1(std::ref(con1));
std::thread trd_con2(std::ref(con2));
//std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::thread trd_con3(std::ref(con3));
std::thread trd_con4(std::ref(con4));
std::thread trd_prod(std::ref(prod));


trd_con1.join();
trd_con2.join();
trd_prod.join();
trd_con3.join();
trd_con4.join();
delete cj;
}
cout << "Apasati enter pentru a iesi.";
cin.get(); // tinem fereastra deschisa
return 0;
}

queue.h


#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <iostream>
#include <fstream>
#include <map>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <cstdlib>

// static specifica faptul ca acesti mutexi sa fie folositi doar in aceasta unitate translationala
// adica sa nu fie vizibili in fisierele ce includ acest header
static std::mutex m_mutex_par; // protezeasa fisierul de iesire numere pare
static std::mutex m_mutex_impar; // protejeaza fisierul de iesire numere impare


class Data
{
public:
Data(std::string data);
Data() = default;
~Data() = default;
Data& operator=(const Data& other);
std::string GetData();
private:
std::string m_data;
};

class Job
{
public:
enum class commands{J_COM_THREAD_EXIT, J_COM_THREAD_JOB};
Job(Data data, commands cmd = commands::J_COM_THREAD_JOB);
Job() = default;
~Job() = default;
Job& operator=(const Job& other);
Data GetData();
commands GetCommand();
private:
Data m_data;
commands m_cmd;
};

template <class T>
class queue_priority
{
public:
enum class priorities{P_HIGH, P_NORMAL, P_LOW};
queue_priority(unsigned long max_size);
queue_priority();
queue_priority(const queue_priority& other);
queue_priority& operator=(const queue_priority& other);
bool pop(T& t);
bool push(priorities p, T& t);
void suspend();
void resume();


private:
std::multimap<priorities, T> m_map;
std::mutex m_mutex_action;
std::mutex m_mutex_cv_suspend;
std::condition_variable m_cv_suspend;
std::atomic_bool m_bool_suspend;
std::atomic_bool m_bool_full;
std::atomic_bool m_bool_empty;
size_t MAX;
};

class Producator
{
public:
Producator(queue_priority<Job>* cj, std::string inputFile);
Producator() = default;
~Producator() = default;
void operator()();
private:
queue_priority<Job>* mp_queue;
std::string m_in_file_path;
};


class Consummer
{
public:
Consummer(queue_priority<Job>* cj, std::shared_ptr<std::ofstream> par, std::shared_ptr<std::ofstream> impar);
Consummer() = default;
~Consummer() = default;
Consummer& operator=(const Consummer& other);
void operator()();
private:
queue_priority<Job>* mp_queue;
std::shared_ptr<std::ofstream> m_par;
std::shared_ptr<std::ofstream> m_impar;
};

#endif // QUEUE_H_INCLUDED

queue.cpp


#include "queue.h"


Data::Data(std::string data) : m_data(data)
{

}

Data& Data::operator=(const Data& other)
{
m_data = other.m_data;
return *this;
}

std::string Data::GetData()
{
return m_data;
}

Job::Job(Data data, commands cmd) : m_data(data), m_cmd(cmd)
{

}

Job& Job::operator=(const Job& other)
{
m_data = other.m_data;
m_cmd = other.m_cmd;
return *this;
}

Data Job::GetData()
{
return m_data;
}

Job::commands Job::GetCommand()
{
return m_cmd;
}


Producator::Producator(queue_priority<Job>* cj, std::string inputFile) : mp_queue(cj), m_in_file_path(inputFile)
{

}

void Producator::operator()()
{
if(mp_queue == nullptr)
{
return;
}
std::ifstream fIn(m_in_file_path.c_str());
if(!fIn.is_open())
{
mutex_cout.lock();
std::cout << "Nu am putut deschide fisierul de intrare." << std::endl;
mutex_cout.unlock();
// aveti grija la iesire, asigurati-va ca joburile pentru iesire chiar au fost introduse
// folositi while loop-ul de mai jos pentru a va asigura ca un job critic chiar a fost introdus
Job jExit(Data(""), Job::commands::J_COM_THREAD_EXIT);
for(size_t i = 0; i < 4;i++)
{
while(!mp_queue->push(queue_priority<Job>::priorities:_LOW, jExit))
{
std::this_thread::yield();
}
}

return;
}
std::string line;
while(std::getline(fIn, line))
{
if(line.empty())
{
mutex_cout.lock();
std::cout << "Am gasit o linie goala." << std::endl;
mutex_cout.unlock();
continue;
}
Job jb = Job(Data(line));
unsigned counter = 0;
while(!mp_queue->push(queue_priority<Job>::priorities:_NORMAL, jb))
{
std::this_thread::yield();
}
}
// aveti grija la iesire, asigurati-va ca joburile pentru iesire chiar au fost introduse
// folositi while loop-ul de mai jos pentru a va asigura ca un job critic chiar a fost introdus
Job jExit(Data(""), Job::commands::J_COM_THREAD_EXIT);
for(size_t i = 0; i < 4;i++)
{
while(!mp_queue->push(queue_priority<Job>::priorities:_LOW, jExit))
{
std::this_thread::yield();
}
}
}

Consummer::Consummer(queue_priority<Job>* cj, std::shared_ptr<std::ofstream> par, std::shared_ptr<std::ofstream> impar) : mp_queue(cj), m_par(par), m_impar(impar)
{

}

Consummer& Consummer::operator=(const Consummer& other)
{
mp_queue = other.mp_queue;
m_par = other.m_par;
m_impar = other.m_impar;
return *this;
}

void Consummer::operator()()
{
if(mp_queue == nullptr)
{
return;
}
Job jb;
while(!mp_queue->pop(jb))
{
std::this_thread::yield();
}
while(jb.GetCommand() != Job::commands::J_COM_THREAD_EXIT)
{
Data data = jb.GetData();
int nr = atoi(data.GetData().c_str());
if(nr == 0)
{
mutex_cout.lock();
std::cout << "Am primit cifra 0, continut linie nedefinit ." << std::endl;
mutex_cout.unlock();
continue;
}
if((nr % 2) == 0)
{
m_mutex_par.lock();
*m_par << nr << std::endl;
m_mutex_par.unlock();
}
else
{
m_mutex_impar.lock();
*m_impar << nr << std::endl;
m_mutex_impar.unlock();
}
//std::this_thread::sleep_for(std::chrono::milliseconds(1)); // sleeping to simulate some work
while(!mp_queue->pop(jb))
{
std::this_thread::yield();
}
}
}

template <class T>
queue_priority<T>::queue_priority() : MAX(200)
{
m_bool_full = false;
m_bool_suspend = false;
m_bool_empty = true;
}

template <class T>
queue_priority<T>::queue_priority(size_t max_size) : MAX(max_size)
{
m_bool_full = false;
m_bool_suspend = false;
m_bool_empty = true;
}

template <class T>
queue_priority<T>::queue_priority(const queue_priority& other) : MAX(other.MAX)
{
m_map = std::move(other.m_map);
m_bool_full = other.m_bool_full.load();
m_bool_empty = other.m_bool_empty.load();
m_bool_suspend = other.m_bool_suspend.load();
}

template <class T>
queue_priority<T>& queue_priority<T>::operator=(const queue_priority<T>& other)
{
m_map = other.m_map;
m_bool_full = other.m_bool_full.load();
m_bool_empty = other.m_bool_empty.load();
m_bool_suspend = other.m_bool_suspend.load();
return *this;
}

template <class T>
bool queue_priority<T>::pop(T& t)
{
// verificam daca se doreste suspendarea cozi si asteptam pana primim mesajul sa repornim coada
if(m_bool_suspend.load())
{
std::unique_lock<std::mutex> suspendLock(m_mutex_action);
m_cv_suspend.wait(suspendLock, [this] {return m_bool_suspend.load();});
}
// inchidem mutex-ul ce protezeaza datele interne(map-ul cu variabila noastra)>
std::lock_guard<std::mutex> lock(m_mutex_action);
// fiind intr-un pop, inainte sa extragem un element din coada, verificam daca aceasta e goala si returnam daca este
if(m_bool_empty.load())
{
return false;
}
t = m_map.begin()->second;
m_map.erase(m_map.begin());
bool expected = true;
// in cazul in care coada era plina, semnalizam ca aceasta nu mai este
while(!m_bool_full.compare_exchange_weak(expected, false)) {}
// daca a devenit goala, semnalizam acest lucru
if(m_map.empty())
{
expected = false;
while(m_bool_empty.compare_exchange_weak(expected, true)) {}
}
return true;

}

template <class T>
bool queue_priority<T>::push(priorities p, T& t)
{
// verificam daca se doreste suspendarea cozi si asteptam pana primim mesajul sa repornim coada
if(m_bool_suspend.load())
{
std::unique_lock<std::mutex> suspendLock(m_mutex_action);
m_cv_suspend.wait(suspendLock, [this] {return m_bool_suspend.load();});
}
// inchidem mutex-ul ce protezeaza datele interne(map-ul cu variabila noastra)>
std::lock_guard<std::mutex> lock(m_mutex_action);
// fiind un push, verificam daca coada este plina, iar daca este, iesim.
if(m_bool_full.load())
{
return false;
}
// inseram datele
m_map.insert(std::make_pair(p, std::move(t)));
// semnalizam faptul ca am introdus element => coada nu mai este goala
bool expected = true;
while(m_bool_empty.compare_exchange_weak(expected, false)) {}
// daca am umplut coada, semnalizam asta.
if(m_map.size() == MAX)
{
expected = false;
while(m_bool_full.compare_exchange_weak(expected, true)) {}
}
return true;
}


template <class T>
void queue_priority<T>::suspend()
{
m_bool_suspend = true;
}

template <class T>
void queue_priority<T>::resume()
{
m_bool_suspend = false;
m_cv_suspend.notify_all();
}

template class queue_priority<Job>; // instantiem clasa noastra, detalii mai sus.

Un avertisment.

Aveti grija atunci cand producatorul termina treaba, fiti siguri ca ati introdus numarul corect de joburi pentru iesire, altfel daca folositi threaduri joinable o sa va treziti ca la final asteptati la infinit dupa ele.(am pus un avertisment in cod)

Daca aveti intrebari le astept.

Edited by nedo
Link to comment
Share on other sites

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.



×
×
  • Create New...