TARAXA
Loading...
Searching...
No Matches
ws_session.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <json/value.h>
4#include <jsonrpccpp/server/abstractserverconnector.h>
5
6#include <atomic>
7#include <boost/asio/strand.hpp>
8#include <boost/beast/core.hpp>
9#include <boost/beast/websocket.hpp>
10#include <cstdlib>
11#include <memory>
12#include <string>
13
14#include "common/types.hpp"
15#include "final_chain/data.hpp"
16#include "logger/logger.hpp"
18
19namespace beast = boost::beast; // from <boost/beast.hpp>
20namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
21using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
22
23namespace taraxa::net {
24
25class WsServer;
26class WsSession : public std::enable_shared_from_this<WsSession> {
27 public:
28 // Take ownership of the socket
29 explicit WsSession(tcp::socket&& socket, addr_t node_addr, std::shared_ptr<WsServer> ws_server)
30 : ws_(std::move(socket)),
31 ws_server_(ws_server),
32 subscriptions_(std::bind(&WsSession::do_write, this, std::placeholders::_1)),
33 write_strand_(boost::asio::make_strand(ws_.get_executor())) {
34 LOG_OBJECTS_CREATE("WS_SESSION");
35 }
36
37 // Start the asynchronous operation
38 void run();
39 void close(bool normal = true);
40 bool is_closed() const;
41
42 virtual std::string processRequest(const std::string_view& request) = 0;
43
44 void newEthBlock(const Json::Value& payload);
45 void newDagBlock(const Json::Value& blk);
46 void newDagBlockFinalized(const Json::Value& payload);
47 void newPbftBlockExecuted(const Json::Value& payload);
48 void newPendingTransaction(const Json::Value& payload);
49 void newPillarBlockData(const Json::Value& payload);
50 void newLogs(const final_chain::BlockHeader& header, TransactionHashes trx_hashes,
51 const TransactionReceipts& receipts);
52
54 private:
55 static bool is_normal(const beast::error_code& ec);
56 void on_close(beast::error_code ec);
57 void on_accept(beast::error_code ec);
58 void do_read();
59 void on_read(beast::error_code ec, std::size_t bytes_transferred);
60 void write(std::string&& message);
61
62 protected:
63 void handleRequest();
64 void do_write(std::string&& message);
65
66 websocket::stream<beast::tcp_stream> ws_;
67 std::atomic<int> subscription_id_ = 0;
68 std::weak_ptr<WsServer> ws_server_;
70
71 private:
72 boost::asio::strand<boost::asio::any_io_executor> write_strand_;
73 beast::flat_buffer read_buffer_;
74 std::atomic<bool> closed_ = false;
75 std::string ip_;
76};
77} // namespace taraxa::net
Definition FixedHash.h:35
Definition subscriptions.hpp:107
Definition ws_session.hpp:26
void on_read(beast::error_code ec, std::size_t bytes_transferred)
Definition ws_server.cpp:68
void newPendingTransaction(const Json::Value &payload)
Definition ws_server.cpp:184
virtual std::string processRequest(const std::string_view &request)=0
beast::flat_buffer read_buffer_
Definition ws_session.hpp:73
std::atomic< int > subscription_id_
Definition ws_session.hpp:67
void on_close(beast::error_code ec)
Definition ws_server.cpp:153
websocket::stream< beast::tcp_stream > ws_
Definition ws_session.hpp:66
std::string ip_
Definition ws_session.hpp:75
bool is_closed() const
Definition ws_server.cpp:165
void newDagBlockFinalized(const Json::Value &payload)
Definition ws_server.cpp:172
std::atomic< bool > closed_
Definition ws_session.hpp:74
void do_read()
Definition ws_server.cpp:63
Subscriptions subscriptions_
Definition ws_session.hpp:69
void newLogs(const final_chain::BlockHeader &header, TransactionHashes trx_hashes, const TransactionReceipts &receipts)
Definition ws_server.cpp:188
void write(std::string &&message)
Definition ws_server.cpp:133
void close(bool normal=true)
Definition ws_server.cpp:145
void newPbftBlockExecuted(const Json::Value &payload)
Definition ws_server.cpp:176
boost::asio::strand< boost::asio::any_io_executor > write_strand_
Definition ws_session.hpp:72
void newEthBlock(const Json::Value &payload)
Definition ws_server.cpp:167
void newPillarBlockData(const Json::Value &payload)
Definition ws_server.cpp:180
std::weak_ptr< WsServer > ws_server_
Definition ws_session.hpp:68
void on_accept(beast::error_code ec)
Definition ws_server.cpp:51
void newDagBlock(const Json::Value &blk)
Definition ws_server.cpp:168
void run()
Definition ws_server.cpp:15
static bool is_normal(const beast::error_code &ec)
Definition ws_server.cpp:161
void handleRequest()
Definition ws_server.cpp:85
void do_write(std::string &&message)
Definition ws_server.cpp:115
WsSession(tcp::socket &&socket, addr_t node_addr, std::shared_ptr< WsServer > ws_server)
Definition ws_session.hpp:29
Definition data.hpp:40
#define LOG_OBJECTS_DEFINE
Definition logger.hpp:60
#define LOG_OBJECTS_CREATE(channel)
Definition logger.hpp:68
Definition Log.h:192
std::hash for asio::adress
Definition FixedHash.h:483
Definition http_processor.hpp:11
std::vector< TransactionReceipt > TransactionReceipts
Definition receipt.hpp:35
std::vector< trx_hash_t > TransactionHashes
Definition transaction.hpp:87
boost::asio::ip::tcp tcp
Definition ws_session.hpp:21