TARAXA
Loading...
Searching...
No Matches
priority_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <libdevcore/RLP.h>
4
5#include <array>
6#include <utility>
7
8#include "logger/logging.hpp"
11#include "packets_queue.hpp"
12
13namespace taraxa {
14class PbftManager;
15}
16
18
20 public:
21 PriorityQueue(size_t tp_workers_count, const std::shared_ptr<PbftManager>& pbft_mgr = nullptr);
22 ~PriorityQueue() = default;
23
24 PriorityQueue(const PriorityQueue&) = delete;
28
33 void pushBack(std::pair<tarcap::TarcapVersion, PacketData>&& packet);
34
38 std::optional<std::pair<tarcap::TarcapVersion, PacketData>> pop();
39
43 bool empty() const;
44
50 void updateDependenciesStart(const PacketData& packet);
51
59 void updateDependenciesFinish(const PacketData& packet, std::mutex& queue_mutex, std::condition_variable& cond_var);
60
68
73 bool isNonBlockingPacket(SubprotocolPacketType packet_type) const;
74
81 bool updateBlockingDependencies(const PacketData& packet, bool unblock_processing = false);
82
83 private:
90 bool canBorrowThread();
91
92 private:
93 // Queues that group packets by it's priority.
94 // All packets with PacketPriority::High go to packets_queues_[PacketPriority::High], etc...
95 // TODO: make packets_queues_ const
96 std::array<PacketsQueue, PacketData::PacketPriority::Count> packets_queues_{PacketsQueue(), PacketsQueue(),
97 PacketsQueue()};
98
99 // Mask with all packets types that are currently blocked for processing in another threads due to dependencies, e.g.
100 // syncing packets must be processed synchronously one by one, etc...
102
103 // How many workers can process packets from all the queues at the same time
105
106 // How many workers are currently processing packets from all the queues at the same time
107 std::atomic<size_t> act_total_workers_count_;
108
110};
111
112} // namespace taraxa::network::threadpool
Definition packet_data.hpp:12
PacketPriority
Definition packet_data.hpp:15
Definition packets_blocking_mask.hpp:16
Definition packets_queue.hpp:12
Definition priority_queue.hpp:19
bool empty() const
Definition priority_queue.cpp:122
void updateDependenciesStart(const PacketData &packet)
Updates blocking dependencies at the start of packet processing.
Definition priority_queue.cpp:126
std::array< PacketsQueue, PacketData::PacketPriority::Count > packets_queues_
Definition priority_queue.hpp:96
PriorityQueue & operator=(const PriorityQueue &)=delete
PacketsBlockingMask blocked_packets_mask_
Definition priority_queue.hpp:101
bool canBorrowThread()
Queue can borrow reserved thread from one of the other priority queues but each queue must have at le...
Definition priority_queue.cpp:41
logger::Logger logger_
Definition priority_queue.hpp:109
const size_t MAX_TOTAL_WORKERS_COUNT
Definition priority_queue.hpp:104
void updateDependenciesFinish(const PacketData &packet, std::mutex &queue_mutex, std::condition_variable &cond_var)
Updates blocking dependencies after packet processing is done.
Definition priority_queue.cpp:134
PriorityQueue(PriorityQueue &&)=delete
std::atomic< size_t > act_total_workers_count_
Definition priority_queue.hpp:107
bool updateBlockingDependencies(const PacketData &packet, bool unblock_processing=false)
Updates packet blocking dependency.
Definition priority_queue.cpp:163
size_t getPrirotityQueueSize(PacketData::PacketPriority priority) const
Returns specified priority queue actual size.
Definition priority_queue.cpp:236
bool isNonBlockingPacket(SubprotocolPacketType packet_type) const
Definition priority_queue.cpp:149
PriorityQueue(const PriorityQueue &)=delete
std::optional< std::pair< tarcap::TarcapVersion, PacketData > > pop()
Definition priority_queue.cpp:56
PriorityQueue & operator=(PriorityQueue &&)=delete
void pushBack(std::pair< tarcap::TarcapVersion, PacketData > &&packet)
Pushes new packet into the priority queue.
Definition priority_queue.cpp:36
std::shared_ptr< spdlog::logger > Logger
Definition logging.hpp:12
Definition node_stats.hpp:18
SubprotocolPacketType
SubprotocolPacketType is used in networking layer to differentiate packet types.
Definition packet_types.hpp:12
Definition app.hpp:16