TARAXA
Loading...
Searching...
No Matches
watches.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <chrono>
4#include <queue>
5
6#include "LogFilter.hpp"
8#include "data.hpp"
9
10namespace taraxa::net::rpc::eth {
11
12using time_point = std::chrono::system_clock::time_point;
13
14// TODO taraxa simple exception
15DEV_SIMPLE_EXCEPTION(WatchLimitExceeded);
16
21
22 // do not touch
24};
25
27 uint64_t max_watches = 0;
28 std::chrono::seconds idle_timeout{5 * 60};
29};
30
31using WatchesConfig = std::array<WatchGroupConfig, WatchType::COUNT>;
32
33using WatchID = uint64_t;
34
35GLOBAL_CONST(WatchType, watch_id_type_mask_bits);
36
37struct placeholder_t {};
38template <WatchType type_, typename InputType_, typename OutputType_ = placeholder_t, typename Params = placeholder_t>
40 public:
41 static constexpr auto type = type_;
42 using InputType = InputType_;
43 using OutputType = std::conditional_t<std::is_same_v<OutputType_, placeholder_t>, InputType, OutputType_>;
44 using time_point = std::chrono::high_resolution_clock::time_point;
45 using Updater = std::function<void(Params const&, //
46 InputType const&, //
47 std::function<void(OutputType const&)> const& /*do_update*/)>;
48
55
56 private:
59 mutable std::unordered_map<WatchID, Watch> watches_;
60 mutable std::shared_mutex watches_mu_;
62
63 public:
64 explicit WatchGroup(WatchesConfig const& cfg = {}, Updater&& updater = {})
65 : cfg_(cfg[type]), updater_(std::move(updater)) {
66 assert(cfg_.idle_timeout.count() != 0);
67 if constexpr (std::is_same_v<InputType, OutputType>) {
68 if (!updater_) {
69 updater_ = [](auto const&, auto const& input, auto const& do_update) { do_update(input); };
70 }
71 } else {
72 assert(updater_);
73 }
74 }
75
76 WatchID install_watch(Params&& params = {}) const {
77 std::unique_lock l(watches_mu_);
78 if (cfg_.max_watches && watches_.size() == cfg_.max_watches) {
79 throw WatchLimitExceeded();
80 }
81 auto id = ((++watch_id_seq_) << watch_id_type_mask_bits()) + type;
82 watches_.insert_or_assign(id, Watch{std::move(params), std::chrono::high_resolution_clock::now()});
83 return id;
84 }
85
86 bool uninstall_watch(WatchID watch_id) const {
87 std::unique_lock l(watches_mu_);
88 return watches_.erase(watch_id);
89 }
90
92 std::unique_lock l(watches_mu_);
93 bool did_uninstall = false;
94 for (auto it = watches_.begin(); it != watches_.end();) {
95 if (cfg_.idle_timeout <=
96 duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - it->second.last_touched)) {
97 it = watches_.erase(it);
98 did_uninstall = true;
99 } else {
100 it++;
101 }
102 }
103 if (auto num_buckets = watches_.bucket_count(); did_uninstall && (1 << 10) < num_buckets) {
104 if (size_t desired_num_buckets = 1 << uint(ceil(log2(watches_.size()))); desired_num_buckets != num_buckets) {
105 watches_.rehash(desired_num_buckets);
106 }
107 }
108 }
109
110 std::optional<Params> get_watch_params(WatchID watch_id) const {
111 std::shared_lock l(watches_mu_);
112 if (auto entry = watches_.find(watch_id); entry != watches_.end()) {
113 return entry->second.params;
114 }
115 return {};
116 }
117
118 void process_update(InputType const& obj_in) const {
119 std::shared_lock l(watches_mu_);
120 for (auto& entry : watches_) {
121 auto& watch = entry.second;
122 updater_(watch.params, obj_in, [&](auto const& obj_out) {
123 std::unique_lock l(watch.mu.val);
124 watch.updates.push_back(obj_out);
125 });
126 }
127 }
128
129 auto poll(WatchID watch_id) const {
130 std::vector<OutputType> ret;
131 std::shared_lock l(watches_mu_);
132 if (auto entry = watches_.find(watch_id); entry != watches_.end()) {
133 auto& watch = entry->second;
134 std::unique_lock l1(watch.mu.val);
135 swap(ret, watch.updates);
136 watch.last_touched = std::chrono::high_resolution_clock::now();
137 }
138 return ret;
139 }
140};
141
142class Watches {
143 public:
145
149 std::pair<ExtendedTransactionLocation const&, TransactionReceipt const&>, LocalisedLogEntry,
151 cfg_,
152 [](auto const& log_filter, auto const& input, auto const& do_update) {
153 auto const& [trx_loc, receipt] = input;
154 log_filter.match_one(trx_loc, receipt, do_update);
155 },
156 };
157
158 template <typename Visitor>
159 auto visit(WatchType type, Visitor&& visitor) {
160 switch (type) {
162 return visitor(&new_blocks_);
164 return visitor(&new_transactions_);
165 case WatchType::logs:
166 return visitor(&logs_);
167 default:
168 assert(false);
169 }
170 }
171
172 private:
173 std::condition_variable watch_cleaner_wait_cv_;
174 std::thread watch_cleaner_;
175 std::atomic<bool> destructor_called_ = false;
176
177 public:
178 Watches(WatchesConfig const& _cfg);
179 ~Watches();
180
181 Watches(const Watches&) = delete;
182 Watches(Watches&&) = delete;
183 Watches& operator=(const Watches&) = delete;
185
186 template <typename Visitor>
187 auto visit_by_id(WatchID watch_id, Visitor&& visitor) {
188 if (auto type = WatchType(watch_id & ((1 << watch_id_type_mask_bits()) - 1)); type < COUNT) {
189 return visit(type, std::forward<Visitor>(visitor));
190 }
191 return visitor(decltype (&new_blocks_)(nullptr));
192 }
193};
194
195} // namespace taraxa::net::rpc::eth
Definition watches.hpp:39
std::vector< OutputType > updates
Definition watches.hpp:52
std::shared_mutex watches_mu_
Definition watches.hpp:60
std::conditional_t< std::is_same_v< OutputType_, placeholder_t >, InputType, OutputType_ > OutputType
Definition watches.hpp:43
std::unordered_map< WatchID, Watch > watches_
Definition watches.hpp:59
auto poll(WatchID watch_id) const
Definition watches.hpp:129
Updater updater_
Definition watches.hpp:58
bool uninstall_watch(WatchID watch_id) const
Definition watches.hpp:86
WatchGroupConfig cfg_
Definition watches.hpp:57
std::chrono::high_resolution_clock::time_point time_point
Definition watches.hpp:44
std::function< void(Params const &, InputType const &, std::function< void(OutputType const &)> const &)> Updater
Definition watches.hpp:47
void process_update(InputType const &obj_in) const
Definition watches.hpp:118
void uninstall_stale_watches() const
Definition watches.hpp:91
static constexpr auto type
Definition watches.hpp:41
time_point last_touched
Definition watches.hpp:51
util::DefaultConstructCopyableMovable< std::shared_mutex > mu
Definition watches.hpp:53
WatchID watch_id_seq_
Definition watches.hpp:61
WatchID install_watch(Params &&params={}) const
Definition watches.hpp:76
std::optional< Params > get_watch_params(WatchID watch_id) const
Definition watches.hpp:110
WatchGroup(WatchesConfig const &cfg={}, Updater &&updater={})
Definition watches.hpp:64
InputType_ InputType
Definition watches.hpp:42
Params params
Definition watches.hpp:50
Definition watches.hpp:142
WatchGroup< WatchType::new_blocks, h256 > const new_blocks_
Definition watches.hpp:146
Watches(const Watches &)=delete
WatchGroup< WatchType::new_transactions, h256 > const new_transactions_
Definition watches.hpp:147
Watches(WatchesConfig const &_cfg)
Watches & operator=(const Watches &)=delete
Watches & operator=(Watches &&)=delete
std::atomic< bool > destructor_called_
Definition watches.hpp:175
~Watches()
Definition watches.cpp:41
WatchesConfig const cfg_
Definition watches.hpp:144
Watches(Watches &&)=delete
auto visit(WatchType type, Visitor &&visitor)
Definition watches.hpp:159
std::condition_variable watch_cleaner_wait_cv_
Definition watches.hpp:173
std::thread watch_cleaner_
Definition watches.hpp:174
WatchGroup< WatchType::logs, std::pair< ExtendedTransactionLocation const &, TransactionReceipt const & >, LocalisedLogEntry, LogFilter > const logs_
Definition watches.hpp:150
auto visit_by_id(WatchID watch_id, Visitor &&visitor)
Definition watches.hpp:187
#define GLOBAL_CONST(_type_, _name_)
Definition global_const.hpp:3
#define DEV_SIMPLE_EXCEPTION(X)
Definition Exceptions.h:25
std::hash for asio::adress
Definition FixedHash.h:483
Definition data.hpp:5
uint64_t WatchID
Definition watches.hpp:33
uint64_t max_watches
Definition watches.hpp:27
std::chrono::seconds idle_timeout
Definition watches.hpp:28
std::array< WatchGroupConfig, WatchType::COUNT > WatchesConfig
Definition watches.hpp:31
std::chrono::system_clock::time_point time_point
Definition watches.hpp:12
WatchType
Definition watches.hpp:17
@ new_transactions
Definition watches.hpp:19
@ new_blocks
Definition watches.hpp:18
@ COUNT
Definition watches.hpp:23
@ logs
Definition watches.hpp:20
Definition data.hpp:33
Definition watches.hpp:37
Definition LogFilter.hpp:8
void match_one(const TransactionReceipt &r, const std::function< void(size_t)> &cb) const
Definition LogFilter.cpp:86
Definition default_construct_copyable_movable.hpp:13