TARAXA
tarcap_thread_pool.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <condition_variable>
4 #include <mutex>
5 #include <thread>
6 #include <vector>
7 
8 #include "logger/logger.hpp"
10 #include "priority_queue.hpp"
11 
12 namespace taraxa::network::tarcap {
13 class PacketsHandler;
14 }
15 
17 
22  public:
26  PacketsThreadPool(size_t workers_num = 10, const addr_t& node_addr = {});
28 
33 
39  std::optional<uint64_t> push(std::pair<tarcap::TarcapVersion, PacketData>&& packet_data);
40 
44  void startProcessing();
45 
49  void stopProcessing();
50 
54  void processPacket(size_t worker_id);
55 
61  void setPacketsHandlers(tarcap::TarcapVersion tarcap_version,
62  std::shared_ptr<tarcap::PacketsHandler> packets_handlers);
63 
70  std::tuple<size_t, size_t, size_t> getQueueSize() const;
71 
72  private:
73  // Declare logger instances
75 
76  // Number of workers(threads)
77  const size_t workers_num_;
78 
79  // Common packets handler - each taraxa capability haits own packets handler
80  std::unordered_map<tarcap::TarcapVersion, std::shared_ptr<tarcap::PacketsHandler>> packets_handlers_;
81 
82  // If true, stop processing packets and join all workers threads
83  std::atomic<bool> stopProcessing_{false};
84 
85  // How many packets were pushed into the queue, it also serves for creating packet unique id
86  uint64_t packets_count_{0};
87 
88  // Queue of unprocessed packets
90 
91  // Queue mutex
92  std::mutex queue_mutex_;
93 
94  // Queue condition variable
95  std::condition_variable cond_var_;
96 
97  // Vector of worker threads - should be initialized as the last member
98  std::vector<std::thread> workers_;
99 };
100 
101 } // namespace taraxa::network::threadpool
Definition: FixedHash.h:35
PacketsThreadPool for concurrent packets processing.
Definition: tarcap_thread_pool.hpp:21
std::tuple< size_t, size_t, size_t > getQueueSize() const
Returns actual size of all priority queues (thread-safe)
Definition: tarcap_thread_pool.cpp:148
std::atomic< bool > stopProcessing_
Definition: tarcap_thread_pool.hpp:83
~PacketsThreadPool()
Definition: tarcap_thread_pool.cpp:19
PacketsThreadPool & operator=(PacketsThreadPool &&)=delete
std::condition_variable cond_var_
Definition: tarcap_thread_pool.hpp:95
PacketsThreadPool(PacketsThreadPool &&)=delete
void stopProcessing()
Stop all processing threads.
Definition: tarcap_thread_pool.cpp:72
PacketsThreadPool(size_t workers_num=10, const addr_t &node_addr={})
Definition: tarcap_thread_pool.cpp:7
void setPacketsHandlers(tarcap::TarcapVersion tarcap_version, std::shared_ptr< tarcap::PacketsHandler > packets_handlers)
Sets packet handler.
Definition: tarcap_thread_pool.cpp:140
void startProcessing()
Start all processing threads.
Definition: tarcap_thread_pool.cpp:58
LOG_OBJECTS_DEFINE const size_t workers_num_
Definition: tarcap_thread_pool.hpp:77
PacketsThreadPool(const PacketsThreadPool &)=delete
std::optional< uint64_t > push(std::pair< tarcap::TarcapVersion, PacketData > &&packet_data)
Push the given element value to the end of the queue. Used for r-values.
Definition: tarcap_thread_pool.cpp:34
std::unordered_map< tarcap::TarcapVersion, std::shared_ptr< tarcap::PacketsHandler > > packets_handlers_
Definition: tarcap_thread_pool.hpp:80
std::mutex queue_mutex_
Definition: tarcap_thread_pool.hpp:92
uint64_t packets_count_
Definition: tarcap_thread_pool.hpp:86
PriorityQueue queue_
Definition: tarcap_thread_pool.hpp:89
PacketsThreadPool & operator=(const PacketsThreadPool &)=delete
void processPacket(size_t worker_id)
Threadpool sycnchronized processing function, which internally calls packet-specific handlers.
Definition: tarcap_thread_pool.cpp:80
std::vector< std::thread > workers_
Definition: tarcap_thread_pool.hpp:98
Definition: priority_queue.hpp:15
#define LOG_OBJECTS_DEFINE
Definition: logger.hpp:60
Definition: vote_manager.hpp:23
unsigned TarcapVersion
Definition: tarcap_version.hpp:4
Definition: node_stats.hpp:17