TARAXA
Loading...
Searching...
No Matches
tarcap_thread_pool.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <condition_variable>
4#include <mutex>
5#include <thread>
6#include <vector>
7
8#include "logger/logger.hpp"
10#include "priority_queue.hpp"
11
13class PacketsHandler;
14} // namespace taraxa::network::tarcap
15
16namespace taraxa {
17class PbftManager;
18}
19
21
26 public:
30 PacketsThreadPool(size_t workers_num = 10, const std::shared_ptr<PbftManager>& pbft_mgr = nullptr,
31 const addr_t& node_addr = {});
33
38
44 std::optional<uint64_t> push(std::pair<tarcap::TarcapVersion, PacketData>&& packet_data);
45
49 void startProcessing();
50
54 void stopProcessing();
55
59 void processPacket(size_t worker_id);
60
66 void setPacketsHandlers(tarcap::TarcapVersion tarcap_version,
67 std::shared_ptr<tarcap::PacketsHandler> packets_handlers);
68
75 std::tuple<size_t, size_t, size_t> getQueueSize() const;
76
77 private:
78 // Declare logger instances
80
81 // Number of workers(threads)
82 const size_t workers_num_;
83
84 // Common packets handler - each taraxa capability haits own packets handler
85 std::unordered_map<tarcap::TarcapVersion, std::shared_ptr<tarcap::PacketsHandler>> packets_handlers_;
86
87 // If true, stop processing packets and join all workers threads
88 std::atomic<bool> stopProcessing_{false};
89
90 // How many packets were pushed into the queue, it also serves for creating packet unique id
91 uint64_t packets_count_{0};
92
93 // Queue of unprocessed packets
95
96 // Queue mutex
97 std::mutex queue_mutex_;
98
99 // Queue condition variable
100 std::condition_variable cond_var_;
101
102 // Vector of worker threads - should be initialized as the last member
103 std::vector<std::thread> workers_;
104};
105
106} // namespace taraxa::network::threadpool
Definition FixedHash.h:35
PacketsThreadPool for concurrent packets processing.
Definition tarcap_thread_pool.hpp:25
std::tuple< size_t, size_t, size_t > getQueueSize() const
Returns actual size of all priority queues (thread-safe)
Definition tarcap_thread_pool.cpp:150
std::atomic< bool > stopProcessing_
Definition tarcap_thread_pool.hpp:88
~PacketsThreadPool()
Definition tarcap_thread_pool.cpp:21
std::condition_variable cond_var_
Definition tarcap_thread_pool.hpp:100
PacketsThreadPool(PacketsThreadPool &&)=delete
void stopProcessing()
Stop all processing threads.
Definition tarcap_thread_pool.cpp:74
void setPacketsHandlers(tarcap::TarcapVersion tarcap_version, std::shared_ptr< tarcap::PacketsHandler > packets_handlers)
Sets packet handler.
Definition tarcap_thread_pool.cpp:142
PacketsThreadPool & operator=(PacketsThreadPool &&)=delete
void startProcessing()
Start all processing threads.
Definition tarcap_thread_pool.cpp:60
LOG_OBJECTS_DEFINE const size_t workers_num_
Definition tarcap_thread_pool.hpp:82
PacketsThreadPool(const PacketsThreadPool &)=delete
std::optional< uint64_t > push(std::pair< tarcap::TarcapVersion, PacketData > &&packet_data)
Push the given element value to the end of the queue. Used for r-values.
Definition tarcap_thread_pool.cpp:36
std::unordered_map< tarcap::TarcapVersion, std::shared_ptr< tarcap::PacketsHandler > > packets_handlers_
Definition tarcap_thread_pool.hpp:85
std::mutex queue_mutex_
Definition tarcap_thread_pool.hpp:97
uint64_t packets_count_
Definition tarcap_thread_pool.hpp:91
PriorityQueue queue_
Definition tarcap_thread_pool.hpp:94
void processPacket(size_t worker_id)
Threadpool sycnchronized processing function, which internally calls packet-specific handlers.
Definition tarcap_thread_pool.cpp:82
std::vector< std::thread > workers_
Definition tarcap_thread_pool.hpp:103
PacketsThreadPool & operator=(const PacketsThreadPool &)=delete
Definition priority_queue.hpp:19
#define LOG_OBJECTS_DEFINE
Definition logger.hpp:60
Definition vote_manager.hpp:23
unsigned TarcapVersion
Definition tarcap_version.hpp:4
Definition node_stats.hpp:18
Definition app.hpp:16