TARAXA
watches.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <chrono>
4 #include <queue>
5 
6 #include "LogFilter.hpp"
8 #include "data.hpp"
9 
10 namespace taraxa::net::rpc::eth {
11 
13 
14 // TODO taraxa simple exception
15 DEV_SIMPLE_EXCEPTION(WatchLimitExceeded);
16 
17 enum WatchType {
21 
22  // do not touch
24 };
25 
27  uint64_t max_watches = 0;
28  std::chrono::seconds idle_timeout{5 * 60};
29 };
30 
31 using WatchesConfig = std::array<WatchGroupConfig, WatchType::COUNT>;
32 
33 using WatchID = uint64_t;
34 
35 GLOBAL_CONST(WatchType, watch_id_type_mask_bits);
36 
37 struct placeholder_t {};
38 template <WatchType type_, typename InputType_, typename OutputType_ = placeholder_t, typename Params = placeholder_t>
39 class WatchGroup {
40  public:
41  static constexpr auto type = type_;
42  using InputType = InputType_;
43  using OutputType = std::conditional_t<std::is_same_v<OutputType_, placeholder_t>, InputType, OutputType_>;
45  using Updater = std::function<void(Params const&, //
46  InputType const&, //
47  std::function<void(OutputType const&)> const& /*do_update*/)>;
48 
49  struct Watch {
50  Params params;
52  std::vector<OutputType> updates{};
54  };
55 
56  private:
59  mutable std::unordered_map<WatchID, Watch> watches_;
60  mutable std::shared_mutex watches_mu_;
61  mutable WatchID watch_id_seq_ = 0;
62 
63  public:
64  explicit WatchGroup(WatchesConfig const& cfg = {}, Updater&& updater = {})
65  : cfg_(cfg[type]), updater_(std::move(updater)) {
66  assert(cfg_.idle_timeout.count() != 0);
67  if constexpr (std::is_same_v<InputType, OutputType>) {
68  if (!updater_) {
69  updater_ = [](auto const&, auto const& input, auto const& do_update) { do_update(input); };
70  }
71  } else {
72  assert(updater_);
73  }
74  }
75 
76  WatchID install_watch(Params&& params = {}) const {
77  std::unique_lock l(watches_mu_);
78  if (cfg_.max_watches && watches_.size() == cfg_.max_watches) {
79  throw WatchLimitExceeded();
80  }
81  auto id = ((++watch_id_seq_) << watch_id_type_mask_bits()) + type;
82  watches_.insert_or_assign(id, Watch{std::move(params), std::chrono::high_resolution_clock::now()});
83  return id;
84  }
85 
86  bool uninstall_watch(WatchID watch_id) const {
87  std::unique_lock l(watches_mu_);
88  return watches_.erase(watch_id);
89  }
90 
91  void uninstall_stale_watches() const {
92  std::unique_lock l(watches_mu_);
93  bool did_uninstall = false;
94  for (auto it = watches_.begin(); it != watches_.end();) {
95  if (cfg_.idle_timeout <=
96  duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - it->second.last_touched)) {
97  it = watches_.erase(it);
98  did_uninstall = true;
99  } else {
100  it++;
101  }
102  }
103  if (auto num_buckets = watches_.bucket_count(); did_uninstall && (1 << 10) < num_buckets) {
104  if (size_t desired_num_buckets = 1 << uint(ceil(log2(watches_.size()))); desired_num_buckets != num_buckets) {
105  watches_.rehash(desired_num_buckets);
106  }
107  }
108  }
109 
110  std::optional<Params> get_watch_params(WatchID watch_id) const {
111  std::shared_lock l(watches_mu_);
112  if (auto entry = watches_.find(watch_id); entry != watches_.end()) {
113  return entry->second.params;
114  }
115  return {};
116  }
117 
118  void process_update(InputType const& obj_in) const {
119  std::shared_lock l(watches_mu_);
120  for (auto& entry : watches_) {
121  auto& watch = entry.second;
122  updater_(watch.params, obj_in, [&](auto const& obj_out) {
123  std::unique_lock l(watch.mu.val);
124  watch.updates.push_back(obj_out);
125  });
126  }
127  }
128 
129  auto poll(WatchID watch_id) const {
130  std::vector<OutputType> ret;
131  std::shared_lock l(watches_mu_);
132  if (auto entry = watches_.find(watch_id); entry != watches_.end()) {
133  auto& watch = entry->second;
134  std::unique_lock l1(watch.mu.val);
135  swap(ret, watch.updates);
136  watch.last_touched = std::chrono::high_resolution_clock::now();
137  }
138  return ret;
139  }
140 };
141 
142 class Watches {
143  public:
145 
149  std::pair<ExtendedTransactionLocation const&, TransactionReceipt const&>, LocalisedLogEntry,
150  LogFilter> const logs_{
151  cfg_,
152  [](auto const& log_filter, auto const& input, auto const& do_update) {
153  auto const& [trx_loc, receipt] = input;
154  log_filter.match_one(trx_loc, receipt, do_update);
155  },
156  };
157 
158  template <typename Visitor>
159  auto visit(WatchType type, Visitor&& visitor) {
160  switch (type) {
162  return visitor(&new_blocks_);
164  return visitor(&new_transactions_);
165  case WatchType::logs:
166  return visitor(&logs_);
167  default:
168  assert(false);
169  }
170  }
171 
172  private:
173  std::condition_variable watch_cleaner_wait_cv_;
174  std::thread watch_cleaner_;
175  std::atomic<bool> destructor_called_ = false;
176 
177  public:
178  Watches(WatchesConfig const& _cfg);
179  ~Watches();
180 
181  Watches(const Watches&) = delete;
182  Watches(Watches&&) = delete;
183  Watches& operator=(const Watches&) = delete;
184  Watches& operator=(Watches&&) = delete;
185 
186  template <typename Visitor>
187  auto visit_by_id(WatchID watch_id, Visitor&& visitor) {
188  if (auto type = WatchType(watch_id & ((1 << watch_id_type_mask_bits()) - 1)); type < COUNT) {
189  return visit(type, std::forward<Visitor>(visitor));
190  }
191  return visitor(decltype (&new_blocks_)(nullptr));
192  }
193 };
194 
195 } // namespace taraxa::net::rpc::eth
Definition: watches.hpp:39
std::vector< OutputType > updates
Definition: watches.hpp:52
std::shared_mutex watches_mu_
Definition: watches.hpp:60
std::conditional_t< std::is_same_v< OutputType_, placeholder_t >, InputType, OutputType_ > OutputType
Definition: watches.hpp:43
std::unordered_map< WatchID, Watch > watches_
Definition: watches.hpp:59
auto poll(WatchID watch_id) const
Definition: watches.hpp:129
Updater updater_
Definition: watches.hpp:58
bool uninstall_watch(WatchID watch_id) const
Definition: watches.hpp:86
WatchGroupConfig cfg_
Definition: watches.hpp:57
std::chrono::high_resolution_clock::time_point time_point
Definition: watches.hpp:44
std::function< void(Params const &, InputType const &, std::function< void(OutputType const &)> const &)> Updater
Definition: watches.hpp:47
void process_update(InputType const &obj_in) const
Definition: watches.hpp:118
void uninstall_stale_watches() const
Definition: watches.hpp:91
static constexpr auto type
Definition: watches.hpp:41
time_point last_touched
Definition: watches.hpp:51
util::DefaultConstructCopyableMovable< std::shared_mutex > mu
Definition: watches.hpp:53
WatchID watch_id_seq_
Definition: watches.hpp:61
WatchID install_watch(Params &&params={}) const
Definition: watches.hpp:76
WatchGroup(WatchesConfig const &cfg={}, Updater &&updater={})
Definition: watches.hpp:64
InputType_ InputType
Definition: watches.hpp:42
Params params
Definition: watches.hpp:50
std::optional< Params > get_watch_params(WatchID watch_id) const
Definition: watches.hpp:110
Definition: watches.hpp:142
Watches & operator=(const Watches &)=delete
WatchGroup< WatchType::new_blocks, h256 > const new_blocks_
Definition: watches.hpp:146
Watches(const Watches &)=delete
WatchGroup< WatchType::new_transactions, h256 > const new_transactions_
Definition: watches.hpp:147
Watches(WatchesConfig const &_cfg)
Watches & operator=(Watches &&)=delete
std::atomic< bool > destructor_called_
Definition: watches.hpp:175
~Watches()
Definition: watches.cpp:41
WatchesConfig const cfg_
Definition: watches.hpp:144
Watches(Watches &&)=delete
auto visit(WatchType type, Visitor &&visitor)
Definition: watches.hpp:159
std::condition_variable watch_cleaner_wait_cv_
Definition: watches.hpp:173
std::thread watch_cleaner_
Definition: watches.hpp:174
WatchGroup< WatchType::logs, std::pair< ExtendedTransactionLocation const &, TransactionReceipt const & >, LocalisedLogEntry, LogFilter > const logs_
Definition: watches.hpp:150
auto visit_by_id(WatchID watch_id, Visitor &&visitor)
Definition: watches.hpp:187
Definition: data.hpp:5
uint64_t WatchID
Definition: watches.hpp:33
uint64_t max_watches
Definition: watches.hpp:27
std::chrono::seconds idle_timeout
Definition: watches.hpp:28
DEV_SIMPLE_EXCEPTION(WatchLimitExceeded)
std::array< WatchGroupConfig, WatchType::COUNT > WatchesConfig
Definition: watches.hpp:31
std::chrono::system_clock::time_point time_point
Definition: watches.hpp:12
WatchType
Definition: watches.hpp:17
@ new_transactions
Definition: watches.hpp:19
@ new_blocks
Definition: watches.hpp:18
@ COUNT
Definition: watches.hpp:23
@ logs
Definition: watches.hpp:20
GLOBAL_CONST(WatchType, watch_id_type_mask_bits)
Definition: data.hpp:33
Definition: watches.hpp:26
Definition: watches.hpp:37
Definition: LogFilter.hpp:8