38 template <WatchType type_,
typename InputType_,
typename OutputType_ = placeholder_t,
typename Params = placeholder_t>
41 static constexpr
auto type = type_;
43 using OutputType = std::conditional_t<std::is_same_v<OutputType_, placeholder_t>,
InputType, OutputType_>;
45 using Updater = std::function<void(Params
const&,
47 std::function<
void(
OutputType const&)>
const& )>;
59 mutable std::unordered_map<WatchID, Watch>
watches_;
67 if constexpr (std::is_same_v<InputType, OutputType>) {
69 updater_ = [](
auto const&,
auto const& input,
auto const& do_update) { do_update(input); };
79 throw WatchLimitExceeded();
82 watches_.insert_or_assign(
id, Watch{std::move(params), std::chrono::high_resolution_clock::now()});
93 bool did_uninstall =
false;
96 duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - it->second.last_touched)) {
103 if (
auto num_buckets =
watches_.bucket_count(); did_uninstall && (1 << 10) < num_buckets) {
104 if (
size_t desired_num_buckets = 1 << uint(ceil(log2(
watches_.size()))); desired_num_buckets != num_buckets) {
105 watches_.rehash(desired_num_buckets);
113 return entry->second.params;
121 auto& watch = entry.second;
122 updater_(watch.params, obj_in, [&](
auto const& obj_out) {
123 std::unique_lock l(watch.mu.val);
124 watch.updates.push_back(obj_out);
130 std::vector<OutputType> ret;
133 auto& watch = entry->second;
134 std::unique_lock l1(watch.mu.val);
135 swap(ret, watch.updates);
136 watch.last_touched = std::chrono::high_resolution_clock::now();
149 std::pair<ExtendedTransactionLocation const&, TransactionReceipt const&>,
LocalisedLogEntry,
152 [](
auto const& log_filter,
auto const& input,
auto const& do_update) {
153 auto const& [trx_loc, receipt] = input;
154 log_filter.match_one(trx_loc, receipt, do_update);
158 template <
typename Visitor>
166 return visitor(&
logs_);
186 template <
typename Visitor>
188 if (
auto type =
WatchType(watch_id & ((1 << watch_id_type_mask_bits()) - 1)); type <
COUNT) {
189 return visit(type, std::forward<Visitor>(visitor));
Definition: watches.hpp:39
std::vector< OutputType > updates
Definition: watches.hpp:52
std::shared_mutex watches_mu_
Definition: watches.hpp:60
std::conditional_t< std::is_same_v< OutputType_, placeholder_t >, InputType, OutputType_ > OutputType
Definition: watches.hpp:43
std::unordered_map< WatchID, Watch > watches_
Definition: watches.hpp:59
auto poll(WatchID watch_id) const
Definition: watches.hpp:129
Updater updater_
Definition: watches.hpp:58
bool uninstall_watch(WatchID watch_id) const
Definition: watches.hpp:86
WatchGroupConfig cfg_
Definition: watches.hpp:57
std::chrono::high_resolution_clock::time_point time_point
Definition: watches.hpp:44
std::function< void(Params const &, InputType const &, std::function< void(OutputType const &)> const &)> Updater
Definition: watches.hpp:47
void process_update(InputType const &obj_in) const
Definition: watches.hpp:118
void uninstall_stale_watches() const
Definition: watches.hpp:91
static constexpr auto type
Definition: watches.hpp:41
time_point last_touched
Definition: watches.hpp:51
util::DefaultConstructCopyableMovable< std::shared_mutex > mu
Definition: watches.hpp:53
WatchID watch_id_seq_
Definition: watches.hpp:61
WatchID install_watch(Params &¶ms={}) const
Definition: watches.hpp:76
WatchGroup(WatchesConfig const &cfg={}, Updater &&updater={})
Definition: watches.hpp:64
InputType_ InputType
Definition: watches.hpp:42
Params params
Definition: watches.hpp:50
std::optional< Params > get_watch_params(WatchID watch_id) const
Definition: watches.hpp:110
Definition: watches.hpp:49
Definition: watches.hpp:142
Watches & operator=(const Watches &)=delete
WatchGroup< WatchType::new_blocks, h256 > const new_blocks_
Definition: watches.hpp:146
Watches(const Watches &)=delete
WatchGroup< WatchType::new_transactions, h256 > const new_transactions_
Definition: watches.hpp:147
Watches(WatchesConfig const &_cfg)
Watches & operator=(Watches &&)=delete
std::atomic< bool > destructor_called_
Definition: watches.hpp:175
~Watches()
Definition: watches.cpp:41
WatchesConfig const cfg_
Definition: watches.hpp:144
Watches(Watches &&)=delete
auto visit(WatchType type, Visitor &&visitor)
Definition: watches.hpp:159
std::condition_variable watch_cleaner_wait_cv_
Definition: watches.hpp:173
std::thread watch_cleaner_
Definition: watches.hpp:174
WatchGroup< WatchType::logs, std::pair< ExtendedTransactionLocation const &, TransactionReceipt const & >, LocalisedLogEntry, LogFilter > const logs_
Definition: watches.hpp:150
auto visit_by_id(WatchID watch_id, Visitor &&visitor)
Definition: watches.hpp:187
uint64_t WatchID
Definition: watches.hpp:33
uint64_t max_watches
Definition: watches.hpp:27
std::chrono::seconds idle_timeout
Definition: watches.hpp:28
DEV_SIMPLE_EXCEPTION(WatchLimitExceeded)
std::array< WatchGroupConfig, WatchType::COUNT > WatchesConfig
Definition: watches.hpp:31
std::chrono::system_clock::time_point time_point
Definition: watches.hpp:12
WatchType
Definition: watches.hpp:17
@ new_transactions
Definition: watches.hpp:19
@ new_blocks
Definition: watches.hpp:18
@ COUNT
Definition: watches.hpp:23
@ logs
Definition: watches.hpp:20
GLOBAL_CONST(WatchType, watch_id_type_mask_bits)
Definition: watches.hpp:26
Definition: watches.hpp:37
Definition: LogFilter.hpp:8