TARAXA
Loading...
Searching...
No Matches
tarcap_thread_pool.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <condition_variable>
4#include <mutex>
5#include <thread>
6#include <vector>
7
8#include "logger/logging.hpp"
10#include "priority_queue.hpp"
11
13class PacketsHandler;
14} // namespace taraxa::network::tarcap
15
16namespace taraxa {
17class PbftManager;
18}
19
21
26 public:
30 PacketsThreadPool(size_t workers_num = 10, const std::shared_ptr<PbftManager>& pbft_mgr = nullptr);
32
37
43 std::optional<uint64_t> push(std::pair<tarcap::TarcapVersion, PacketData>&& packet_data);
44
48 void startProcessing();
49
53 void stopProcessing();
54
58 void processPacket(size_t worker_id);
59
65 void setPacketsHandlers(tarcap::TarcapVersion tarcap_version,
66 std::shared_ptr<tarcap::PacketsHandler> packets_handlers);
67
74 std::tuple<size_t, size_t, size_t> getQueueSize() const;
75
76 private:
77 // Number of workers(threads)
78 const size_t workers_num_;
79
80 // Common packets handler - each taraxa capability haits own packets handler
81 std::unordered_map<tarcap::TarcapVersion, std::shared_ptr<tarcap::PacketsHandler>> packets_handlers_;
82
83 // If true, stop processing packets and join all workers threads
84 std::atomic<bool> stopProcessing_{false};
85
86 // How many packets were pushed into the queue, it also serves for creating packet unique id
87 uint64_t packets_count_{0};
88
89 // Queue of unprocessed packets
91
92 // Queue mutex
93 std::mutex queue_mutex_;
94
95 // Queue condition variable
96 std::condition_variable cond_var_;
97
98 // Vector of worker threads - should be initialized as the last member
99 std::vector<std::thread> workers_;
100
102};
103
104} // namespace taraxa::network::threadpool
PacketsThreadPool for concurrent packets processing.
Definition tarcap_thread_pool.hpp:25
std::tuple< size_t, size_t, size_t > getQueueSize() const
Returns actual size of all priority queues (thread-safe)
Definition tarcap_thread_pool.cpp:145
std::atomic< bool > stopProcessing_
Definition tarcap_thread_pool.hpp:84
~PacketsThreadPool()
Definition tarcap_thread_pool.cpp:19
std::condition_variable cond_var_
Definition tarcap_thread_pool.hpp:96
PacketsThreadPool(PacketsThreadPool &&)=delete
void stopProcessing()
Stop all processing threads.
Definition tarcap_thread_pool.cpp:72
void setPacketsHandlers(tarcap::TarcapVersion tarcap_version, std::shared_ptr< tarcap::PacketsHandler > packets_handlers)
Sets packet handler.
Definition tarcap_thread_pool.cpp:137
PacketsThreadPool & operator=(PacketsThreadPool &&)=delete
void startProcessing()
Start all processing threads.
Definition tarcap_thread_pool.cpp:58
PacketsThreadPool(const PacketsThreadPool &)=delete
logger::Logger logger_
Definition tarcap_thread_pool.hpp:101
std::optional< uint64_t > push(std::pair< tarcap::TarcapVersion, PacketData > &&packet_data)
Push the given element value to the end of the queue. Used for r-values.
Definition tarcap_thread_pool.cpp:34
std::unordered_map< tarcap::TarcapVersion, std::shared_ptr< tarcap::PacketsHandler > > packets_handlers_
Definition tarcap_thread_pool.hpp:81
std::mutex queue_mutex_
Definition tarcap_thread_pool.hpp:93
uint64_t packets_count_
Definition tarcap_thread_pool.hpp:87
PriorityQueue queue_
Definition tarcap_thread_pool.hpp:90
const size_t workers_num_
Definition tarcap_thread_pool.hpp:78
void processPacket(size_t worker_id)
Threadpool sycnchronized processing function, which internally calls packet-specific handlers.
Definition tarcap_thread_pool.cpp:80
std::vector< std::thread > workers_
Definition tarcap_thread_pool.hpp:99
PacketsThreadPool & operator=(const PacketsThreadPool &)=delete
Definition priority_queue.hpp:19
std::shared_ptr< spdlog::logger > Logger
Definition logging.hpp:12
Definition vote_manager.hpp:24
unsigned TarcapVersion
Definition tarcap_version.hpp:4
Definition node_stats.hpp:18
Definition app.hpp:16