TARAXA
ws_server.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <jsonrpccpp/server/abstractserverconnector.h>
4 
5 #include <algorithm>
6 #include <atomic>
7 #include <boost/asio/strand.hpp>
8 #include <boost/beast/core.hpp>
9 #include <boost/beast/websocket.hpp>
10 #include <cstdlib>
11 #include <functional>
12 #include <iostream>
13 #include <memory>
14 #include <queue>
15 #include <string>
16 #include <thread>
17 #include <vector>
18 
19 #include "config/config.hpp"
20 #include "dag/dag_block.hpp"
21 #include "final_chain/data.hpp"
22 #include "pbft/pbft_chain.hpp"
24 
25 namespace taraxa::net {
26 
27 namespace beast = boost::beast; // from <boost/beast.hpp>
28 namespace http = beast::http; // from <boost/beast/http.hpp>
29 namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
30 using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
31 
32 class WsServer;
33 class WsSession : public std::enable_shared_from_this<WsSession> {
34  public:
35  // Take ownership of the socket
36  explicit WsSession(tcp::socket&& socket, addr_t node_addr, std::shared_ptr<WsServer> ws_server)
37  : ws_(std::move(socket)), write_strand_(boost::asio::make_strand(ws_.get_executor())) {
38  LOG_OBJECTS_CREATE("WS_SESSION");
39  ws_server_ = ws_server;
40  }
41 
42  // Start the asynchronous operation
43  void run();
44  void close(bool normal = true);
45 
46  void on_accept(beast::error_code ec);
47  void do_read();
48  void on_read(beast::error_code ec, std::size_t bytes_transferred);
49 
50  virtual std::string processRequest(const std::string_view& request) = 0;
51 
52  void newEthBlock(const ::taraxa::final_chain::BlockHeader& payload, const TransactionHashes& trx_hashes);
53  void newDagBlock(const DagBlock& blk);
54  void newDagBlockFinalized(const blk_hash_t& blk, uint64_t period);
55  void newPbftBlockExecuted(const Json::Value& payload);
56  void newPendingTransaction(const trx_hash_t& trx_hash);
57  void newPillarBlockData(const pillar_chain::PillarBlockData& pillar_block_data);
58  bool is_closed() const { return closed_; }
59  bool is_normal(const beast::error_code& ec) const;
60  void on_write(beast::error_code ec, std::size_t bytes_transferred);
62 
63  protected:
64  void processAsync();
65  void writeAsync(std::string&& message);
66  void writeImpl(std::string&& message);
67  websocket::stream<beast::tcp_stream> ws_;
68  boost::asio::strand<boost::asio::any_io_executor> write_strand_;
69  beast::flat_buffer read_buffer_;
70  std::atomic<int> subscription_id_ = 0;
78  std::atomic<bool> closed_ = false;
79  std::weak_ptr<WsServer> ws_server_;
80 };
81 
82 //------------------------------------------------------------------------------
83 
84 // Accepts incoming connections and launches the sessions
85 class WsServer : public std::enable_shared_from_this<WsServer>, public jsonrpc::AbstractServerConnector {
86  public:
87  WsServer(boost::asio::io_context& ioc, tcp::endpoint endpoint, addr_t node_addr);
88  virtual ~WsServer();
89 
90  WsServer(const WsServer&) = delete;
91  WsServer(WsServer&&) = delete;
92  WsServer& operator=(const WsServer&) = delete;
93  WsServer& operator=(WsServer&&) = delete;
94 
95  // Start accepting incoming connections
96  void run();
97  void newEthBlock(const ::taraxa::final_chain::BlockHeader& payload, const TransactionHashes& trx_hashes);
98  void newDagBlock(const DagBlock& blk);
99  void newDagBlockFinalized(const blk_hash_t& blk, uint64_t period);
100  void newPbftBlockExecuted(const PbftBlock& sche_blk, const std::vector<blk_hash_t>& finalized_dag_blk_hashes);
101  void newPendingTransaction(const trx_hash_t& trx_hash);
102  void newPillarBlockData(const pillar_chain::PillarBlockData& pillar_block_data);
103  uint32_t numberOfSessions();
104 
105  virtual std::shared_ptr<WsSession> createSession(tcp::socket&& socket) = 0;
106 
107  virtual bool StartListening() { return true; }
108  virtual bool StopListening() { return true; }
109 
110  private:
111  void do_accept();
112  void on_accept(beast::error_code ec, tcp::socket socket);
114  boost::asio::io_context& ioc_;
115  tcp::acceptor acceptor_;
116  std::list<std::shared_ptr<WsSession>> sessions;
117  std::atomic<bool> stopped_ = false;
118  boost::shared_mutex sessions_mtx_;
119 
120  protected:
122 };
123 
124 } // namespace taraxa::net
Definition: FixedHash.h:35
DagBlock class is a DAG block class which main data is a list of transaction hashes included in the b...
Definition: dag_block.hpp:16
Definition: ws_server.hpp:85
void on_accept(beast::error_code ec, tcp::socket socket)
Definition: ws_server.cpp:255
WsServer(boost::asio::io_context &ioc, tcp::endpoint endpoint, addr_t node_addr)
Definition: ws_server.cpp:201
virtual ~WsServer()
Definition: ws_server.cpp:239
void newEthBlock(const ::taraxa::final_chain::BlockHeader &payload, const TransactionHashes &trx_hashes)
Definition: ws_server.cpp:304
WsServer & operator=(const WsServer &)=delete
void newDagBlockFinalized(const blk_hash_t &blk, uint64_t period)
Definition: ws_server.cpp:288
const addr_t node_addr_
Definition: ws_server.hpp:121
virtual bool StopListening()
Definition: ws_server.hpp:108
void newDagBlock(const DagBlock &blk)
Definition: ws_server.cpp:281
boost::shared_mutex sessions_mtx_
Definition: ws_server.hpp:118
virtual bool StartListening()
Definition: ws_server.hpp:107
uint32_t numberOfSessions()
Definition: ws_server.cpp:325
WsServer(const WsServer &)=delete
void newPillarBlockData(const pillar_chain::PillarBlockData &pillar_block_data)
Definition: ws_server.cpp:318
virtual std::shared_ptr< WsSession > createSession(tcp::socket &&socket)=0
void newPbftBlockExecuted(const PbftBlock &sche_blk, const std::vector< blk_hash_t > &finalized_dag_blk_hashes)
Definition: ws_server.cpp:295
void do_accept()
Definition: ws_server.cpp:250
void newPendingTransaction(const trx_hash_t &trx_hash)
Definition: ws_server.cpp:311
LOG_OBJECTS_DEFINE boost::asio::io_context & ioc_
Definition: ws_server.hpp:114
tcp::acceptor acceptor_
Definition: ws_server.hpp:115
WsServer & operator=(WsServer &&)=delete
std::atomic< bool > stopped_
Definition: ws_server.hpp:117
void run()
Definition: ws_server.cpp:237
WsServer(WsServer &&)=delete
std::list< std::shared_ptr< WsSession > > sessions
Definition: ws_server.hpp:116
Definition: ws_server.hpp:33
void on_read(beast::error_code ec, std::size_t bytes_transferred)
Definition: ws_server.cpp:42
void processAsync()
Definition: ws_server.cpp:59
bool is_normal(const beast::error_code &ec) const
Definition: ws_server.cpp:194
virtual std::string processRequest(const std::string_view &request)=0
void newEthBlock(const ::taraxa::final_chain::BlockHeader &payload, const TransactionHashes &trx_hashes)
Definition: ws_server.cpp:100
beast::flat_buffer read_buffer_
Definition: ws_server.hpp:69
void writeAsync(std::string &&message)
Definition: ws_server.cpp:75
int new_dag_blocks_subscription_
Definition: ws_server.hpp:72
std::atomic< int > subscription_id_
Definition: ws_server.hpp:70
websocket::stream< beast::tcp_stream > ws_
Definition: ws_server.hpp:67
int new_dag_block_finalized_subscription_
Definition: ws_server.hpp:74
int new_pillar_block_subscription_
Definition: ws_server.hpp:76
bool is_closed() const
Definition: ws_server.hpp:58
void newDagBlockFinalized(const blk_hash_t &blk, uint64_t period)
Definition: ws_server.cpp:132
std::atomic< bool > closed_
Definition: ws_server.hpp:78
bool include_pillar_block_signatures
Definition: ws_server.hpp:77
void newDagBlock(const DagBlock &blk)
Definition: ws_server.cpp:114
void do_read()
Definition: ws_server.cpp:37
void writeImpl(std::string &&message)
Definition: ws_server.cpp:89
void close(bool normal=true)
Definition: ws_server.cpp:187
void newPillarBlockData(const pillar_chain::PillarBlockData &pillar_block_data)
Definition: ws_server.cpp:161
void newPbftBlockExecuted(const Json::Value &payload)
Definition: ws_server.cpp:147
boost::asio::strand< boost::asio::any_io_executor > write_strand_
Definition: ws_server.hpp:68
int new_heads_subscription_
Definition: ws_server.hpp:71
std::weak_ptr< WsServer > ws_server_
Definition: ws_server.hpp:79
int new_pbft_block_executed_subscription_
Definition: ws_server.hpp:75
void newPendingTransaction(const trx_hash_t &trx_hash)
Definition: ws_server.cpp:174
void on_accept(beast::error_code ec)
Definition: ws_server.cpp:27
void on_write(beast::error_code ec, std::size_t bytes_transferred)
void run()
Definition: ws_server.cpp:14
int new_transactions_subscription_
Definition: ws_server.hpp:73
WsSession(tcp::socket &&socket, addr_t node_addr, std::shared_ptr< WsServer > ws_server)
Definition: ws_server.hpp:36
The PbftBlock class is a PBFT block class that includes PBFT block hash, previous PBFT block hash,...
Definition: pbft_block.hpp:21
Definition: pillar_block.hpp:143
#define LOG_OBJECTS_DEFINE
Definition: logger.hpp:60
#define LOG_OBJECTS_CREATE(channel)
Definition: logger.hpp:76
Definition: Log.h:192
std::hash for asio::adress
Definition: FixedHash.h:483
Definition: http_processor.hpp:11
boost::asio::ip::tcp tcp
Definition: ws_server.hpp:30
std::vector< trx_hash_t > TransactionHashes
Definition: transaction.hpp:82