TARAXA
Loading...
Searching...
No Matches
priority_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <libdevcore/RLP.h>
4
5#include <array>
6#include <utility>
7
8#include "logger/logger.hpp"
11#include "packets_queue.hpp"
12
13namespace taraxa {
14class PbftManager;
15}
16
18
20 public:
21 PriorityQueue(size_t tp_workers_count, const std::shared_ptr<PbftManager>& pbft_mgr = nullptr,
22 const addr_t& node_addr = {});
23 ~PriorityQueue() = default;
24
25 PriorityQueue(const PriorityQueue&) = delete;
29
34 void pushBack(std::pair<tarcap::TarcapVersion, PacketData>&& packet);
35
39 std::optional<std::pair<tarcap::TarcapVersion, PacketData>> pop();
40
44 bool empty() const;
45
51 void updateDependenciesStart(const PacketData& packet);
52
60 void updateDependenciesFinish(const PacketData& packet, std::mutex& queue_mutex, std::condition_variable& cond_var);
61
69
74 bool isNonBlockingPacket(SubprotocolPacketType packet_type) const;
75
82 bool updateBlockingDependencies(const PacketData& packet, bool unblock_processing = false);
83
84 private:
91 bool canBorrowThread();
92
93 private:
94 // Declare logger instances
96
97 // Queues that group packets by it's priority.
98 // All packets with PacketPriority::High go to packets_queues_[PacketPriority::High], etc...
99 // TODO: make packets_queues_ const
100 std::array<PacketsQueue, PacketData::PacketPriority::Count> packets_queues_{PacketsQueue(), PacketsQueue(),
101 PacketsQueue()};
102
103 // Mask with all packets types that are currently blocked for processing in another threads due to dependencies, e.g.
104 // syncing packets must be processed synchronously one by one, etc...
106
107 // How many workers can process packets from all the queues at the same time
109
110 // How many workers are currently processing packets from all the queues at the same time
111 std::atomic<size_t> act_total_workers_count_;
112};
113
114} // namespace taraxa::network::threadpool
Definition FixedHash.h:35
Definition packet_data.hpp:12
PacketPriority
Definition packet_data.hpp:15
Definition packets_blocking_mask.hpp:16
Definition packets_queue.hpp:12
Definition priority_queue.hpp:19
bool empty() const
Definition priority_queue.cpp:121
void updateDependenciesStart(const PacketData &packet)
Updates blocking dependencies at the start of packet processing.
Definition priority_queue.cpp:125
PriorityQueue & operator=(const PriorityQueue &)=delete
PacketsBlockingMask blocked_packets_mask_
Definition priority_queue.hpp:105
bool canBorrowThread()
Queue can borrow reserved thread from one of the other priority queues but each queue must have at le...
Definition priority_queue.cpp:41
const size_t MAX_TOTAL_WORKERS_COUNT
Definition priority_queue.hpp:108
void updateDependenciesFinish(const PacketData &packet, std::mutex &queue_mutex, std::condition_variable &cond_var)
Updates blocking dependencies after packet processing is done.
Definition priority_queue.cpp:133
PriorityQueue(PriorityQueue &&)=delete
std::atomic< size_t > act_total_workers_count_
Definition priority_queue.hpp:111
bool updateBlockingDependencies(const PacketData &packet, bool unblock_processing=false)
Updates packet blocking dependency.
Definition priority_queue.cpp:162
size_t getPrirotityQueueSize(PacketData::PacketPriority priority) const
Returns specified priority queue actual size.
Definition priority_queue.cpp:235
bool isNonBlockingPacket(SubprotocolPacketType packet_type) const
Definition priority_queue.cpp:148
PriorityQueue(const PriorityQueue &)=delete
LOG_OBJECTS_DEFINE std::array< PacketsQueue, PacketData::PacketPriority::Count > packets_queues_
Definition priority_queue.hpp:100
std::optional< std::pair< tarcap::TarcapVersion, PacketData > > pop()
Definition priority_queue.cpp:56
PriorityQueue & operator=(PriorityQueue &&)=delete
void pushBack(std::pair< tarcap::TarcapVersion, PacketData > &&packet)
Pushes new packet into the priority queue.
Definition priority_queue.cpp:36
#define LOG_OBJECTS_DEFINE
Definition logger.hpp:60
Definition node_stats.hpp:18
SubprotocolPacketType
SubprotocolPacketType is used in networking layer to differentiate packet types.
Definition packet_types.hpp:12
Definition app.hpp:16