WebSocket协议是现代Web开发中不可或缺的一部分,它允许客户端和服务器之间建立持久的连接,实现双向实时通信。与传统的HTTP请求不同,WebSocket提供了一种全双工的通信通道,使得数据可以在任意方向上传输,而无需等待对方请求或者应答。
Boost.Beast:这是一个基于Boost库的WebSocket库,提供了高性能和易用的API来实现WebSocket通信。Boost.Beast适用于需要高性能和易用性的场景。
#include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/strand.hpp> #include <boost/config.hpp> #include <algorithm> #include <cstdlib> #include <functional> #include <iostream> #include <memory> #include <string> #include <thread> #include <vector> #include <atomic> #include <mutex> #include <queue> #include <condition_variable> namespace beast = boost::beast; namespace http = beast::http; namespace websocket = beast::websocket; namespace net = boost::asio; using tcp = boost::asio::ip::tcp; // 线程安全的消息队列 template<typename T> class ConcurrentQueue { std::queue<T> queue_; mutable std::mutex mutex_; std::condition_variable cond_; public: void push(T const& value) { std::unique_lock<std::mutex> lock(mutex_); queue_.push(value); lock.unlock(); cond_.notify_one(); } bool try_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); if (queue_.empty()) { return false; } value = queue_.front(); queue_.pop(); return true; } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); cond_.wait(lock, [this] { return !queue_.empty(); }); value = queue_.front(); queue_.pop(); } bool empty() const { std::unique_lock<std::mutex> lock(mutex_); return queue_.empty(); } }; // WebSocket 会话类 class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> { websocket::stream<beast::tcp_stream> ws_; beast::multi_buffer buffer_; ConcurrentQueue<std::string>& output_queue_; std::atomic<bool> is_closing_{ false }; std::atomic<bool> writing_{ false }; std::queue<std::string> write_queue_; std::mutex write_mutex_; std::vector<char> write_buffer_; static constexpr size_t MAX_BUFFER_SIZE = 128 * 1024 * 1024; // 16MB public: WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue) : ws_(std::move(socket)) , output_queue_(output_queue) , write_buffer_(MAX_BUFFER_SIZE){ } ~WebSocketSession() { close(); } // 获取底层 WebSocket 流 websocket::stream<beast::tcp_stream>& ws() { return ws_; } void run() { // 设置更合理的超时选项 websocket::stream_base::timeout timeout{ std::chrono::seconds(30), // 握手超时 std::chrono::seconds(30), // 空闲超时 true // 启用 keep-alive pings }; ws_.set_option(timeout); // 设置消息大小限制 ws_.read_message_max(MAX_BUFFER_SIZE); // 设置 decorator ws_.set_option(websocket::stream_base::decorator( [](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server"); })); // 异步接受 WebSocket 握手 ws_.async_accept( beast::bind_front_handler( &WebSocketSession::on_accept, shared_from_this())); } void on_accept(beast::error_code ec) { if (ec) { std::cerr << "WebSocket accept error: " << ec.message() << "\n"; return; } // 开始读取消息 do_read(); } void close(websocket::close_code code = websocket::close_code::normal) { if (is_closing_.exchange(true)) { return; } beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); ws_.async_close(code, [self = shared_from_this()](beast::error_code ec) { if (ec) { std::cerr << "WebSocket close error: " << ec.message() << "\n"; } }); } void do_read() { if (is_closing_) { return; } // 检查缓冲区大小 if (buffer_.size() >= MAX_BUFFER_SIZE) { std::cerr << "Buffer size exceeded, closing connection\n"; close(websocket::close_code::too_big); return; } // 异步读取 ws_.async_read( buffer_, beast::bind_front_handler( &WebSocketSession::on_read, shared_from_this())); } void on_read(beast::error_code ec, std::size_t bytes_transferred) { std::cout << "on_read bytes_transferred:" << bytes_transferred << "\n"; if (ec == websocket::error::closed) { close(); return; } if (ec) { std::cerr << "WebSocket read error: " << ec.message() << "\n"; close(); return; } // 检查消息是否完整 if (!ws_.is_message_done()) { do_read(); return; } // 处理完整消息 std::string message = beast::buffers_to_string(buffer_.data()); buffer_.consume(buffer_.size()); try { output_queue_.push(message); } catch (const std::exception& e) { std::cerr << "Message queue error: " << e.what() << "\n"; close(websocket::close_code::internal_error); return; } // 继续读取 do_read(); } void do_write(const std::string& message) { if (is_closing_) { return; } // 添加到写队列 { std::lock_guard<std::mutex> lock(write_mutex_); write_queue_.push(message); if (writing_) { return; } writing_ = true; } write_loop(); } void write_loop() { { std::lock_guard<std::mutex> lock(write_mutex_); if (write_queue_.empty()) { writing_ = false; return; } std::string msg = std::move(write_queue_.front()); write_buffer_.assign(msg.begin(), msg.end()); write_queue_.pop(); } ws_.async_write( net::buffer(&write_buffer_[0], write_buffer_.size()), beast::bind_front_handler( &WebSocketSession::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { if (ec) { std::cerr << "WebSocket write error: " << ec.message() << "\n"; close(); return; } std::cout << "on_write bytes_transferred:" << bytes_transferred << "\n"; write_loop(); } }; // WebSocket 服务器类 class WebSocketServer { net::io_context ioc_; tcp::acceptor acceptor_; std::vector<std::thread> io_threads_; std::atomic<bool> stopped_{ false }; // 输出消息队列 ConcurrentQueue<std::string> output_queue_; // 活动会话列表 std::vector<std::shared_ptr<WebSocketSession>> sessions_; mutable std::mutex sessions_mutex_; public: WebSocketServer( unsigned short port, int thread_count = std::thread::hardware_concurrency()) : ioc_(thread_count) , acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port }) { // 启动 I/O 线程池 for (int i = 0; i < thread_count; ++i) { io_threads_.emplace_back([this] { ioc_.run(); }); } // 启动输出处理线程 io_threads_.emplace_back([this] { process_output(); }); } ~WebSocketServer() { stop(); } // 启动服务器 void start() { if (!stopped_) { do_accept(); } } // 停止服务器 void stop() { if (stopped_.exchange(true)) { return; } // 关闭所有会话 { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->close(); } sessions_.clear(); } // 停止 I/O 上下文 ioc_.stop(); // 等待所有线程结束 for (auto& thread : io_threads_) { if (thread.joinable()) { thread.join(); } } } // 广播消息给所有客户端 void broadcast(const std::string& message) { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->do_write(message); } } private: void do_accept() { acceptor_.async_accept( net::make_strand(ioc_), beast::bind_front_handler( &WebSocketServer::on_accept, this)); } void on_accept(beast::error_code ec, tcp::socket socket) { if (ec) { if (ec != net::error::operation_aborted) { std::cerr << "Accept error: " << ec.message() << "\n"; } return; } // 创建新会话并添加到会话列表 auto session = std::make_shared<WebSocketSession>( std::move(socket), output_queue_); { std::lock_guard<std::mutex> lock(sessions_mutex_); sessions_.push_back(session); } // 启动会话 session->run(); // 继续接受新连接 if (!stopped_) { do_accept(); } } // 处理输出消息 void process_output() { while (!stopped_) { std::string message; output_queue_.wait_and_pop(message); broadcast(message); } } }; int main(int argc, char* argv[]) { try { auto const port = 8080; auto const threads = std::max<int>(1, std::thread::hardware_concurrency()); // 创建并启动服务器 WebSocketServer server{ port, threads }; server.start(); std::cout << "WebSocket server is running on port " << port << std::endl; std::cout << "Press Enter to exit..." << std::endl; // 等待用户输入以停止服务器 std::cin.get(); server.stop(); } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS; }
收藏的用户(0) X
正在加载信息~
推荐阅读
最新回复 (1)
-
#include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/strand.hpp> #include <boost/config.hpp> #include <algorithm> #include <cstdlib> #include <functional> #include <iostream> #include <memory> #include <string> #include <thread> #include <vector> #include <atomic> #include <mutex> #include <queue> #include <condition_variable> namespace beast = boost::beast; namespace http = beast::http; namespace websocket = beast::websocket; namespace net = boost::asio; using tcp = boost::asio::ip::tcp; // 线程安全的消息队列 template<typename T> class ConcurrentQueue { std::queue<T> queue_; mutable std::mutex mutex_; std::condition_variable cond_; public: void push(T const& value) { std::unique_lock<std::mutex> lock(mutex_); queue_.push(value); lock.unlock(); cond_.notify_one(); } bool try_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); if (queue_.empty()) { return false; } value = queue_.front(); queue_.pop(); return true; } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); cond_.wait(lock, [this] { return !queue_.empty(); }); value = queue_.front(); queue_.pop(); } bool empty() const { std::unique_lock<std::mutex> lock(mutex_); return queue_.empty(); } }; // WebSocket 会话类 class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> { websocket::stream<beast::tcp_stream> ws_; // 读写入缓冲空 beast::flat_buffer read_buffer_; std::vector<char> write_buffer_; ConcurrentQueue<std::string>& output_queue_; // 关闭处理标志 std::atomic<bool> is_closing_{ false }; net::steady_timer timer_; bool ping_sent_ = false; std::chrono::steady_clock::time_point last_pong_time_; public: WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue) : ws_(std::move(socket)) , output_queue_(output_queue) , write_buffer_(10 * 1024 * 1024) , timer_(ws_.get_executor()) { } ~WebSocketSession() { close(); } // 获取底层 WebSocket 流 websocket::stream<beast::tcp_stream>& ws() { return ws_; } // 启动 WebSocket 会话 void run() { // 设置建议的 timeout 选项 ws_.set_option( websocket::stream_base::timeout::suggested( beast::role_type::server)); // 设置 decorator 修改握手响应 ws_.set_option(websocket::stream_base::decorator( [](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server"); })); ws_.read_message_max(10 * 1024 * 1024); ws_.control_callback([&](websocket::frame_type kind, beast::string_view payload) { switch (kind) { case websocket::frame_type::ping: // 收到Ping,自动回复Pong std::cout << "Received ping, auto-replying with pong\n"; break; case websocket::frame_type::pong: // 收到Pong响应 ping_sent_ = false; last_pong_time_ = std::chrono::steady_clock::now(); std::cout << "Received pong\n"; break; case websocket::frame_type::close: std::cout << "Received close frame\n"; break; } }); // 异步接受 WebSocket 握手 ws_.async_accept( beast::bind_front_handler( &WebSocketSession::on_accept, shared_from_this())); } // 关闭连接 void close() { if (is_closing_.exchange(true)) { return; // 已经在关闭过程中 } // 安全地关闭 WebSocket 连接 beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); ws_.async_close(websocket::close_code::normal, [self = shared_from_this()](beast::error_code ec) { if (ec) { std::cerr << "WebSocket close error: " << ec.message() << "\n"; } }); } void on_close(beast::error_code ec) { if (ec) std::cerr << "onclose error: " << ec.message() << "\n"; // 取消定时器 timer_.cancel(); } void start_ping_timer() { timer_.expires_after(std::chrono::seconds(5)); timer_.async_wait( beast::bind_front_handler( &WebSocketSession::on_ping_timer, shared_from_this())); } void on_ping_timer(beast::error_code ec) { if (ec == net::error::operation_aborted) return; if (ec)std::cerr << "timer: " << ec.message() << "\n"; // 检查上次Pong响应时间 auto now = std::chrono::steady_clock::now(); if (now - last_pong_time_ > std::chrono::seconds(10)) { std::cerr << "No pong received for 10 seconds, closing connection\n"; return ws_.async_close(websocket::close_code::normal, beast::bind_front_handler( &WebSocketSession::on_close, shared_from_this())); } // 发送Ping if (!ping_sent_) { ping_sent_ = true; ws_.async_ping("", beast::bind_front_handler( &WebSocketSession::on_ping, shared_from_this())); } else { // 已经发送过Ping但未收到Pong,可能是连接问题 std::cerr << "Previous ping not answered, closing connection\n"; return ws_.async_close(websocket::close_code::normal, beast::bind_front_handler( &WebSocketSession::on_close, shared_from_this())); } } void on_ping(beast::error_code ec) { if (ec) std::cerr << "ping error: " << ec.message() << "\n"; // 重置定时器 start_ping_timer(); } void on_accept(beast::error_code ec) { if (ec) { std::cerr << "WebSocket accept error: " << ec.message() << "\n"; return; } last_pong_time_ = std::chrono::steady_clock::now(); // 开始心跳检测 start_ping_timer(); // 开始读取消息 do_read(); } void do_read() { if (is_closing_) { return; } // 读取 WebSocket 消息 ws_.async_read( read_buffer_, beast::bind_front_handler( &WebSocketSession::on_read, shared_from_this())); } void on_read(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec == websocket::error::closed) { // 客户端正常关闭连接 close(); return; } if (ec) { std::cerr << "WebSocket read error: " << ec.message() << "\n"; close(); return; } // 处理接收到的消息 std::string message = beast::buffers_to_string(read_buffer_.data()); read_buffer_.consume(read_buffer_.size()); std::cout << "on_read bytes_transferred: " << bytes_transferred << std::endl; // 将消息放入输出队列 output_queue_.push(message); // 继续读取下一条消息 do_read(); } void do_write(const std::string& message) { if (is_closing_) { return; } write_buffer_.assign(message.begin(), message.end()); // 异步写入消息 ws_.async_write( net::buffer(write_buffer_), beast::bind_front_handler( &WebSocketSession::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { std::cerr << "WebSocket write error: " << ec.message() << "\n"; close(); return; } std::cout << "on_write bytes_transferred: " << bytes_transferred << std::endl; } }; // WebSocket 服务器类 class WebSocketServer { net::io_context ioc_; tcp::acceptor acceptor_; std::vector<std::thread> io_threads_; std::atomic<bool> stopped_{ false }; // 输出消息队列 ConcurrentQueue<std::string> output_queue_; // 活动会话列表 std::vector<std::shared_ptr<WebSocketSession>> sessions_; mutable std::mutex sessions_mutex_; public: WebSocketServer( unsigned short port, int thread_count = std::thread::hardware_concurrency()) : ioc_(thread_count) , acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port }) { // 启动 I/O 线程池 for (int i = 0; i < thread_count; ++i) { io_threads_.emplace_back([this] { ioc_.run(); }); } // 启动输出处理线程 io_threads_.emplace_back([this] { process_output(); }); } ~WebSocketServer() { stop(); } // 启动服务器 void start() { if (!stopped_) { do_accept(); } } // 停止服务器 void stop() { if (stopped_.exchange(true)) { return; } // 关闭所有会话 { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->close(); } sessions_.clear(); } // 停止 I/O 上下文 ioc_.stop(); // 等待所有线程结束 for (auto& thread : io_threads_) { if (thread.joinable()) { thread.join(); } } } // 广播消息给所有客户端 void broadcast(const std::string& message) { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->do_write(message); } } private: void do_accept() { acceptor_.async_accept( net::make_strand(ioc_), beast::bind_front_handler( &WebSocketServer::on_accept, this)); } void on_accept(beast::error_code ec, tcp::socket socket) { if (ec) { if (ec != net::error::operation_aborted) { std::cerr << "Accept error: " << ec.message() << "\n"; } return; } // 创建新会话并添加到会话列表 auto session = std::make_shared<WebSocketSession>( std::move(socket), output_queue_); { std::lock_guard<std::mutex> lock(sessions_mutex_); sessions_.push_back(session); } // 启动会话 session->run(); // 继续接受新连接 if (!stopped_) { do_accept(); } } // 处理输出消息 void process_output() { while (!stopped_) { std::string message; output_queue_.wait_and_pop(message); // 示例:回显消息给所有客户端 std::string response = "Echo: " + message; broadcast(response); } } }; int main(int argc, char* argv[]) { try { auto const port = 8080; auto const threads = std::max<int>(1, std::thread::hardware_concurrency()); // 创建并启动服务器 WebSocketServer server{ port, threads }; server.start(); std::cout << "WebSocket server is running on port " << port << std::endl; std::cout << "Press Enter to exit..." << std::endl; // 等待停止信号 std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lock(mtx); cv.wait(lock); server.stop(); } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS; }
站点信息
- 文章2306
- 用户1336
- 访客11538996
每日一句
The truth is out there, and it's waiting. — The X-Files
真相在外面,它在等待。 —《X档案》
真相在外面,它在等待。 —《X档案》
新会员