TARAXA
priority_queue.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <libdevcore/RLP.h>
4 
5 #include <array>
6 #include <utility>
7 
8 #include "logger/logger.hpp"
11 #include "packets_queue.hpp"
12 
14 
16  public:
17  PriorityQueue(size_t tp_workers_count, const addr_t& node_addr = {});
18 
23  void pushBack(std::pair<tarcap::TarcapVersion, PacketData>&& packet);
24 
28  std::optional<std::pair<tarcap::TarcapVersion, PacketData>> pop();
29 
33  bool empty() const;
34 
40  void updateDependenciesStart(const PacketData& packet);
41 
49  void updateDependenciesFinish(const PacketData& packet, std::mutex& queue_mutex, std::condition_variable& cond_var);
50 
58 
63  bool isNonBlockingPacket(SubprotocolPacketType packet_type) const;
64 
71  bool updateBlockingDependencies(const PacketData& packet, bool unblock_processing = false);
72 
73  private:
80  bool canBorrowThread();
81 
82  private:
83  // Declare logger instances
85 
86  // Queues that group packets by it's priority.
87  // All packets with PacketPriority::High go to packets_queues_[PacketPriority::High], etc...
88  // TODO: make packets_queues_ const
89  std::array<PacketsQueue, PacketData::PacketPriority::Count> packets_queues_{PacketsQueue(), PacketsQueue(),
90  PacketsQueue()};
91 
92  // Mask with all packets types that are currently blocked for processing in another threads due to dependencies, e.g.
93  // syncing packets must be processed synchronously one by one, etc...
95 
96  // How many workers can process packets from all the queues at the same time
98 
99  // How many workers are currently processing packets from all the queues at the same time
100  std::atomic<size_t> act_total_workers_count_;
101 };
102 
103 } // namespace taraxa::network::threadpool
Definition: FixedHash.h:35
Definition: packet_data.hpp:12
PacketPriority
Definition: packet_data.hpp:15
Definition: packets_blocking_mask.hpp:12
Definition: packets_queue.hpp:12
Definition: priority_queue.hpp:15
bool empty() const
Definition: priority_queue.cpp:119
void updateDependenciesStart(const PacketData &packet)
Updates blocking dependencies at the start of packet processing.
Definition: priority_queue.cpp:123
PacketsBlockingMask blocked_packets_mask_
Definition: priority_queue.hpp:94
bool canBorrowThread()
Queue can borrow reserved thread from one of the other priority queues but each queue must have at le...
Definition: priority_queue.cpp:39
const size_t MAX_TOTAL_WORKERS_COUNT
Definition: priority_queue.hpp:97
PriorityQueue(size_t tp_workers_count, const addr_t &node_addr={})
Definition: priority_queue.cpp:5
void updateDependenciesFinish(const PacketData &packet, std::mutex &queue_mutex, std::condition_variable &cond_var)
Updates blocking dependencies after packet processing is done.
Definition: priority_queue.cpp:131
std::atomic< size_t > act_total_workers_count_
Definition: priority_queue.hpp:100
bool updateBlockingDependencies(const PacketData &packet, bool unblock_processing=false)
Updates packet blocking dependency.
Definition: priority_queue.cpp:160
size_t getPrirotityQueueSize(PacketData::PacketPriority priority) const
Returns specified priority queue actual size.
Definition: priority_queue.cpp:226
bool isNonBlockingPacket(SubprotocolPacketType packet_type) const
Definition: priority_queue.cpp:146
LOG_OBJECTS_DEFINE std::array< PacketsQueue, PacketData::PacketPriority::Count > packets_queues_
Definition: priority_queue.hpp:89
std::optional< std::pair< tarcap::TarcapVersion, PacketData > > pop()
Definition: priority_queue.cpp:54
void pushBack(std::pair< tarcap::TarcapVersion, PacketData > &&packet)
Pushes new packet into the priority queue.
Definition: priority_queue.cpp:34
#define LOG_OBJECTS_DEFINE
Definition: logger.hpp:60
Definition: node_stats.hpp:17
SubprotocolPacketType
SubprotocolPacketType is used in networking layer to differentiate packet types.
Definition: packet_types.hpp:12