38template <WatchType type_, 
typename InputType_, 
typename OutputType_ = placeholder_t, 
typename Params = placeholder_t>
 
   41  static constexpr auto type = type_;
 
   43  using OutputType = std::conditional_t<std::is_same_v<OutputType_, placeholder_t>, 
InputType, OutputType_>;
 
   44  using time_point = std::chrono::high_resolution_clock::time_point;
 
   45  using Updater = std::function<void(Params 
const&,     
 
   47                                     std::function<
void(
OutputType const&)> 
const& )>;
 
   59  mutable std::unordered_map<WatchID, Watch> 
watches_;
 
   67    if constexpr (std::is_same_v<InputType, OutputType>) {
 
   69        updater_ = [](
auto const&, 
auto const& input, 
auto const& do_update) { do_update(input); };
 
 
   79      throw WatchLimitExceeded();
 
   82    watches_.insert_or_assign(
id, Watch{std::move(params), std::chrono::high_resolution_clock::now()});
 
 
   93    bool did_uninstall = 
false;
 
   96          duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - it->second.last_touched)) {
 
  103    if (
auto num_buckets = 
watches_.bucket_count(); did_uninstall && (1 << 10) < num_buckets) {
 
  104      if (
size_t desired_num_buckets = 1 << uint(ceil(log2(
watches_.size()))); desired_num_buckets != num_buckets) {
 
  105        watches_.rehash(desired_num_buckets);
 
 
  113      return entry->second.params;
 
 
  121      auto& watch = entry.second;
 
  122      updater_(watch.params, obj_in, [&](
auto const& obj_out) {
 
  123        std::unique_lock l(watch.mu.val);
 
  124        watch.updates.push_back(obj_out);
 
 
  130    std::vector<OutputType> ret;
 
  133      auto& watch = entry->second;
 
  134      std::unique_lock l1(watch.mu.val);
 
  135      swap(ret, watch.updates);
 
  136      watch.last_touched = std::chrono::high_resolution_clock::now();
 
 
 
  149             std::pair<ExtendedTransactionLocation const&, TransactionReceipt const&>, 
LocalisedLogEntry,
 
  152      [](
auto const& log_filter, 
auto const& input, 
auto const& do_update) {
 
  153        auto const& [trx_loc, receipt] = input;
 
  154        log_filter.
match_one(trx_loc, receipt, do_update);
 
 
  158  template <
typename Visitor>
 
  166        return visitor(&
logs_);
 
 
  186  template <
typename Visitor>
 
  188    if (
auto type = 
WatchType(watch_id & ((1 << watch_id_type_mask_bits()) - 1)); type < 
COUNT) {
 
  189      return visit(type, std::forward<Visitor>(visitor));
 
 
 
Definition watches.hpp:39
 
std::vector< OutputType > updates
Definition watches.hpp:52
 
std::shared_mutex watches_mu_
Definition watches.hpp:60
 
std::conditional_t< std::is_same_v< OutputType_, placeholder_t >, InputType, OutputType_ > OutputType
Definition watches.hpp:43
 
std::unordered_map< WatchID, Watch > watches_
Definition watches.hpp:59
 
auto poll(WatchID watch_id) const
Definition watches.hpp:129
 
Updater updater_
Definition watches.hpp:58
 
bool uninstall_watch(WatchID watch_id) const
Definition watches.hpp:86
 
WatchGroupConfig cfg_
Definition watches.hpp:57
 
std::chrono::high_resolution_clock::time_point time_point
Definition watches.hpp:44
 
std::function< void(Params const  &, InputType const  &, std::function< void(OutputType const  &)> const  &)> Updater
Definition watches.hpp:47
 
void process_update(InputType const &obj_in) const
Definition watches.hpp:118
 
void uninstall_stale_watches() const
Definition watches.hpp:91
 
static constexpr auto type
Definition watches.hpp:41
 
time_point last_touched
Definition watches.hpp:51
 
util::DefaultConstructCopyableMovable< std::shared_mutex > mu
Definition watches.hpp:53
 
WatchID watch_id_seq_
Definition watches.hpp:61
 
WatchID install_watch(Params &¶ms={}) const
Definition watches.hpp:76
 
std::optional< Params > get_watch_params(WatchID watch_id) const
Definition watches.hpp:110
 
WatchGroup(WatchesConfig const &cfg={}, Updater &&updater={})
Definition watches.hpp:64
 
InputType_ InputType
Definition watches.hpp:42
 
Params params
Definition watches.hpp:50
 
Definition watches.hpp:49
 
Definition watches.hpp:142
 
WatchGroup< WatchType::new_blocks, h256 > const new_blocks_
Definition watches.hpp:146
 
Watches(const Watches &)=delete
 
WatchGroup< WatchType::new_transactions, h256 > const new_transactions_
Definition watches.hpp:147
 
Watches(WatchesConfig const &_cfg)
 
Watches & operator=(const Watches &)=delete
 
Watches & operator=(Watches &&)=delete
 
std::atomic< bool > destructor_called_
Definition watches.hpp:175
 
~Watches()
Definition watches.cpp:41
 
WatchesConfig const cfg_
Definition watches.hpp:144
 
Watches(Watches &&)=delete
 
auto visit(WatchType type, Visitor &&visitor)
Definition watches.hpp:159
 
std::condition_variable watch_cleaner_wait_cv_
Definition watches.hpp:173
 
std::thread watch_cleaner_
Definition watches.hpp:174
 
WatchGroup< WatchType::logs, std::pair< ExtendedTransactionLocation const  &, TransactionReceipt const  & >, LocalisedLogEntry, LogFilter > const logs_
Definition watches.hpp:150
 
auto visit_by_id(WatchID watch_id, Visitor &&visitor)
Definition watches.hpp:187
 
#define GLOBAL_CONST(_type_, _name_)
Definition global_const.hpp:3
 
#define DEV_SIMPLE_EXCEPTION(X)
Definition Exceptions.h:25
 
std::hash for asio::adress
Definition FixedHash.h:483
 
uint64_t WatchID
Definition watches.hpp:33
 
uint64_t max_watches
Definition watches.hpp:27
 
std::chrono::seconds idle_timeout
Definition watches.hpp:28
 
std::array< WatchGroupConfig, WatchType::COUNT > WatchesConfig
Definition watches.hpp:31
 
std::chrono::system_clock::time_point time_point
Definition watches.hpp:12
 
WatchType
Definition watches.hpp:17
 
@ new_transactions
Definition watches.hpp:19
 
@ new_blocks
Definition watches.hpp:18
 
@ COUNT
Definition watches.hpp:23
 
@ logs
Definition watches.hpp:20
 
Definition watches.hpp:26
 
Definition watches.hpp:37
 
Definition LogFilter.hpp:8
 
void match_one(const TransactionReceipt &r, const std::function< void(size_t)> &cb) const
Definition LogFilter.cpp:86
 
Definition default_construct_copyable_movable.hpp:13