WebSocket协议是现代Web开发中不可或缺的一部分,它允许客户端和服务器之间建立持久的连接,实现双向实时通信。与传统的HTTP请求不同,WebSocket提供了一种全双工的通信通道,使得数据可以在任意方向上传输,而无需等待对方请求或者应答。
Boost.Beast:这是一个基于Boost库的WebSocket库,提供了高性能和易用的API来实现WebSocket通信。Boost.Beast适用于需要高性能和易用性的场景。
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
// 线程安全的消息队列
template<typename T>
class ConcurrentQueue {
    std::queue<T> queue_;
    mutable std::mutex mutex_;
    std::condition_variable cond_;
public:
    void push(T const& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_.push(value);
        lock.unlock();
        cond_.notify_one();
    }
    bool try_pop(T& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        if (queue_.empty()) {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }
    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        cond_.wait(lock, [this] { return !queue_.empty(); });
        value = queue_.front();
        queue_.pop();
    }
    bool empty() const {
        std::unique_lock<std::mutex> lock(mutex_);
        return queue_.empty();
    }
};
// WebSocket 会话类
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
    websocket::stream<beast::tcp_stream> ws_;
    beast::multi_buffer buffer_;
    ConcurrentQueue<std::string>& output_queue_;
    std::atomic<bool> is_closing_{ false };
    std::atomic<bool> writing_{ false };
    std::queue<std::string> write_queue_;
    std::mutex write_mutex_;
    std::vector<char> write_buffer_;
    static constexpr size_t MAX_BUFFER_SIZE = 128 * 1024 * 1024; // 16MB
public:
    WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue)
        : ws_(std::move(socket))
        , output_queue_(output_queue)
        , write_buffer_(MAX_BUFFER_SIZE){
    }
    ~WebSocketSession() {
        close();
    }
    // 获取底层 WebSocket 流
    websocket::stream<beast::tcp_stream>& ws() {
        return ws_;
    }
    void run() {
        // 设置更合理的超时选项
        websocket::stream_base::timeout timeout{
            std::chrono::seconds(30),   // 握手超时
            std::chrono::seconds(30),   // 空闲超时
            true                       // 启用 keep-alive pings
        };
        ws_.set_option(timeout);
        // 设置消息大小限制
        ws_.read_message_max(MAX_BUFFER_SIZE);
        // 设置 decorator
        ws_.set_option(websocket::stream_base::decorator(
            [](websocket::response_type& res) {
                res.set(http::field::server,
                    std::string(BOOST_BEAST_VERSION_STRING) +
                    " websocket-server");
            }));
        // 异步接受 WebSocket 握手
        ws_.async_accept(
            beast::bind_front_handler(
                &WebSocketSession::on_accept,
                shared_from_this()));
    }
    void on_accept(beast::error_code ec) {
        if (ec) {
            std::cerr << "WebSocket accept error: " << ec.message() << "\n";
            return;
        }
        // 开始读取消息
        do_read();
    }
    void close(websocket::close_code code = websocket::close_code::normal) {
        if (is_closing_.exchange(true)) {
            return;
        }
        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
        ws_.async_close(code,
            [self = shared_from_this()](beast::error_code ec) {
                if (ec) {
                    std::cerr << "WebSocket close error: " << ec.message() << "\n";
                }
            });
    }
    void do_read() {
        if (is_closing_) {
            return;
        }
        // 检查缓冲区大小
        if (buffer_.size() >= MAX_BUFFER_SIZE) {
            std::cerr << "Buffer size exceeded, closing connection\n";
            close(websocket::close_code::too_big);
            return;
        }
        // 异步读取
        ws_.async_read(
            buffer_,
            beast::bind_front_handler(
                &WebSocketSession::on_read,
                shared_from_this()));
    }
    void on_read(beast::error_code ec, std::size_t bytes_transferred) {
        std::cout << "on_read bytes_transferred:" << bytes_transferred << "\n";
        if (ec == websocket::error::closed) {
            close();
            return;
        }
        if (ec) {
            std::cerr << "WebSocket read error: " << ec.message() << "\n";
            close();
            return;
        }
        // 检查消息是否完整
        if (!ws_.is_message_done()) {
            do_read();
            return;
        }
        // 处理完整消息
        std::string message = beast::buffers_to_string(buffer_.data());
        buffer_.consume(buffer_.size());
        try {
            output_queue_.push(message);
        }
        catch (const std::exception& e) {
            std::cerr << "Message queue error: " << e.what() << "\n";
            close(websocket::close_code::internal_error);
            return;
        }
        // 继续读取
        do_read();
    }
    void do_write(const std::string& message) {
        if (is_closing_) {
            return;
        }
        // 添加到写队列
        {
            std::lock_guard<std::mutex> lock(write_mutex_);
            write_queue_.push(message);
            if (writing_) {
                return;
            }
            writing_ = true;
        }
        write_loop();
    }
    void write_loop() {
        {
            std::lock_guard<std::mutex> lock(write_mutex_);
            if (write_queue_.empty()) {
                writing_ = false;
                return;
            }
            std::string msg = std::move(write_queue_.front());
            write_buffer_.assign(msg.begin(), msg.end());
            write_queue_.pop();
        }
        ws_.async_write(
            net::buffer(&write_buffer_[0], write_buffer_.size()),
            beast::bind_front_handler(
                &WebSocketSession::on_write,
                shared_from_this()));
    }
    void on_write(beast::error_code ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cerr << "WebSocket write error: " << ec.message() << "\n";
            close();
            return;
        }
        std::cout << "on_write bytes_transferred:" << bytes_transferred << "\n";
        write_loop();
    }
};
// WebSocket 服务器类
class WebSocketServer {
    net::io_context ioc_;
    tcp::acceptor acceptor_;
    std::vector<std::thread> io_threads_;
    std::atomic<bool> stopped_{ false };
    // 输出消息队列
    ConcurrentQueue<std::string> output_queue_;
    // 活动会话列表
    std::vector<std::shared_ptr<WebSocketSession>> sessions_;
    mutable std::mutex sessions_mutex_;
public:
    WebSocketServer(
        unsigned short port,
        int thread_count = std::thread::hardware_concurrency())
        : ioc_(thread_count)
        , acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port })
        {
        // 启动 I/O 线程池
        for (int i = 0; i < thread_count; ++i) {
            io_threads_.emplace_back([this] {
                ioc_.run();
                });
        }
        // 启动输出处理线程
        io_threads_.emplace_back([this] {
            process_output();
            });
    }
    ~WebSocketServer() {
        stop();
    }
    // 启动服务器
    void start() {
        if (!stopped_) {
            do_accept();
        }
    }
    // 停止服务器
    void stop() {
        if (stopped_.exchange(true)) {
            return;
        }
        // 关闭所有会话
        {
            std::lock_guard<std::mutex> lock(sessions_mutex_);
            for (auto& session : sessions_) {
                session->close();
            }
            sessions_.clear();
        }
        // 停止 I/O 上下文
        ioc_.stop();
        // 等待所有线程结束
        for (auto& thread : io_threads_) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }
    // 广播消息给所有客户端
    void broadcast(const std::string& message) {
        std::lock_guard<std::mutex> lock(sessions_mutex_);
        for (auto& session : sessions_) {
            session->do_write(message);
        }
    }
private:
    void do_accept() {
        acceptor_.async_accept(
            net::make_strand(ioc_),
            beast::bind_front_handler(
                &WebSocketServer::on_accept,
                this));
    }
    void on_accept(beast::error_code ec, tcp::socket socket) {
        if (ec) {
            if (ec != net::error::operation_aborted) {
                std::cerr << "Accept error: " << ec.message() << "\n";
            }
            return;
        }
        // 创建新会话并添加到会话列表
        auto session = std::make_shared<WebSocketSession>(
            std::move(socket), output_queue_);
        {
            std::lock_guard<std::mutex> lock(sessions_mutex_);
            sessions_.push_back(session);
        }
        // 启动会话
        session->run();
        // 继续接受新连接
        if (!stopped_) {
            do_accept();
        }
    }
    // 处理输出消息
    void process_output() {
        while (!stopped_) {
            std::string message;
            output_queue_.wait_and_pop(message);
            broadcast(message);
        }
    }
};
int main(int argc, char* argv[]) {
    try {
        auto const port = 8080;
        auto const threads = std::max<int>(1, std::thread::hardware_concurrency());
        // 创建并启动服务器
        WebSocketServer server{ port, threads };
        server.start();
        std::cout << "WebSocket server is running on port " << port << std::endl;
        std::cout << "Press Enter to exit..." << std::endl;
        // 等待用户输入以停止服务器
        std::cin.get();
        server.stop();
    }
    catch (std::exception const& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
} 收藏的用户(0) X 
 正在加载信息~
 推荐阅读
  最新回复 (1) 
 -   
  #include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/strand.hpp> #include <boost/config.hpp> #include <algorithm> #include <cstdlib> #include <functional> #include <iostream> #include <memory> #include <string> #include <thread> #include <vector> #include <atomic> #include <mutex> #include <queue> #include <condition_variable> namespace beast = boost::beast; namespace http = beast::http; namespace websocket = beast::websocket; namespace net = boost::asio; using tcp = boost::asio::ip::tcp; // 线程安全的消息队列 template<typename T> class ConcurrentQueue { std::queue<T> queue_; mutable std::mutex mutex_; std::condition_variable cond_; public: void push(T const& value) { std::unique_lock<std::mutex> lock(mutex_); queue_.push(value); lock.unlock(); cond_.notify_one(); } bool try_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); if (queue_.empty()) { return false; } value = queue_.front(); queue_.pop(); return true; } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); cond_.wait(lock, [this] { return !queue_.empty(); }); value = queue_.front(); queue_.pop(); } bool empty() const { std::unique_lock<std::mutex> lock(mutex_); return queue_.empty(); } }; // WebSocket 会话类 class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> { websocket::stream<beast::tcp_stream> ws_; // 读写入缓冲空 beast::flat_buffer read_buffer_; std::vector<char> write_buffer_; ConcurrentQueue<std::string>& output_queue_; // 关闭处理标志 std::atomic<bool> is_closing_{ false }; net::steady_timer timer_; bool ping_sent_ = false; std::chrono::steady_clock::time_point last_pong_time_; public: WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue) : ws_(std::move(socket)) , output_queue_(output_queue) , write_buffer_(10 * 1024 * 1024) , timer_(ws_.get_executor()) { } ~WebSocketSession() { close(); } // 获取底层 WebSocket 流 websocket::stream<beast::tcp_stream>& ws() { return ws_; } // 启动 WebSocket 会话 void run() { // 设置建议的 timeout 选项 ws_.set_option( websocket::stream_base::timeout::suggested( beast::role_type::server)); // 设置 decorator 修改握手响应 ws_.set_option(websocket::stream_base::decorator( [](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server"); })); ws_.read_message_max(10 * 1024 * 1024); ws_.control_callback([&](websocket::frame_type kind, beast::string_view payload) { switch (kind) { case websocket::frame_type::ping: // 收到Ping,自动回复Pong std::cout << "Received ping, auto-replying with pong\n"; break; case websocket::frame_type::pong: // 收到Pong响应 ping_sent_ = false; last_pong_time_ = std::chrono::steady_clock::now(); std::cout << "Received pong\n"; break; case websocket::frame_type::close: std::cout << "Received close frame\n"; break; } }); // 异步接受 WebSocket 握手 ws_.async_accept( beast::bind_front_handler( &WebSocketSession::on_accept, shared_from_this())); } // 关闭连接 void close() { if (is_closing_.exchange(true)) { return; // 已经在关闭过程中 } // 安全地关闭 WebSocket 连接 beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); ws_.async_close(websocket::close_code::normal, [self = shared_from_this()](beast::error_code ec) { if (ec) { std::cerr << "WebSocket close error: " << ec.message() << "\n"; } }); } void on_close(beast::error_code ec) { if (ec) std::cerr << "onclose error: " << ec.message() << "\n"; // 取消定时器 timer_.cancel(); } void start_ping_timer() { timer_.expires_after(std::chrono::seconds(5)); timer_.async_wait( beast::bind_front_handler( &WebSocketSession::on_ping_timer, shared_from_this())); } void on_ping_timer(beast::error_code ec) { if (ec == net::error::operation_aborted) return; if (ec)std::cerr << "timer: " << ec.message() << "\n"; // 检查上次Pong响应时间 auto now = std::chrono::steady_clock::now(); if (now - last_pong_time_ > std::chrono::seconds(10)) { std::cerr << "No pong received for 10 seconds, closing connection\n"; return ws_.async_close(websocket::close_code::normal, beast::bind_front_handler( &WebSocketSession::on_close, shared_from_this())); } // 发送Ping if (!ping_sent_) { ping_sent_ = true; ws_.async_ping("", beast::bind_front_handler( &WebSocketSession::on_ping, shared_from_this())); } else { // 已经发送过Ping但未收到Pong,可能是连接问题 std::cerr << "Previous ping not answered, closing connection\n"; return ws_.async_close(websocket::close_code::normal, beast::bind_front_handler( &WebSocketSession::on_close, shared_from_this())); } } void on_ping(beast::error_code ec) { if (ec) std::cerr << "ping error: " << ec.message() << "\n"; // 重置定时器 start_ping_timer(); } void on_accept(beast::error_code ec) { if (ec) { std::cerr << "WebSocket accept error: " << ec.message() << "\n"; return; } last_pong_time_ = std::chrono::steady_clock::now(); // 开始心跳检测 start_ping_timer(); // 开始读取消息 do_read(); } void do_read() { if (is_closing_) { return; } // 读取 WebSocket 消息 ws_.async_read( read_buffer_, beast::bind_front_handler( &WebSocketSession::on_read, shared_from_this())); } void on_read(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec == websocket::error::closed) { // 客户端正常关闭连接 close(); return; } if (ec) { std::cerr << "WebSocket read error: " << ec.message() << "\n"; close(); return; } // 处理接收到的消息 std::string message = beast::buffers_to_string(read_buffer_.data()); read_buffer_.consume(read_buffer_.size()); std::cout << "on_read bytes_transferred: " << bytes_transferred << std::endl; // 将消息放入输出队列 output_queue_.push(message); // 继续读取下一条消息 do_read(); } void do_write(const std::string& message) { if (is_closing_) { return; } write_buffer_.assign(message.begin(), message.end()); // 异步写入消息 ws_.async_write( net::buffer(write_buffer_), beast::bind_front_handler( &WebSocketSession::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { std::cerr << "WebSocket write error: " << ec.message() << "\n"; close(); return; } std::cout << "on_write bytes_transferred: " << bytes_transferred << std::endl; } }; // WebSocket 服务器类 class WebSocketServer { net::io_context ioc_; tcp::acceptor acceptor_; std::vector<std::thread> io_threads_; std::atomic<bool> stopped_{ false }; // 输出消息队列 ConcurrentQueue<std::string> output_queue_; // 活动会话列表 std::vector<std::shared_ptr<WebSocketSession>> sessions_; mutable std::mutex sessions_mutex_; public: WebSocketServer( unsigned short port, int thread_count = std::thread::hardware_concurrency()) : ioc_(thread_count) , acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port }) { // 启动 I/O 线程池 for (int i = 0; i < thread_count; ++i) { io_threads_.emplace_back([this] { ioc_.run(); }); } // 启动输出处理线程 io_threads_.emplace_back([this] { process_output(); }); } ~WebSocketServer() { stop(); } // 启动服务器 void start() { if (!stopped_) { do_accept(); } } // 停止服务器 void stop() { if (stopped_.exchange(true)) { return; } // 关闭所有会话 { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->close(); } sessions_.clear(); } // 停止 I/O 上下文 ioc_.stop(); // 等待所有线程结束 for (auto& thread : io_threads_) { if (thread.joinable()) { thread.join(); } } } // 广播消息给所有客户端 void broadcast(const std::string& message) { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->do_write(message); } } private: void do_accept() { acceptor_.async_accept( net::make_strand(ioc_), beast::bind_front_handler( &WebSocketServer::on_accept, this)); } void on_accept(beast::error_code ec, tcp::socket socket) { if (ec) { if (ec != net::error::operation_aborted) { std::cerr << "Accept error: " << ec.message() << "\n"; } return; } // 创建新会话并添加到会话列表 auto session = std::make_shared<WebSocketSession>( std::move(socket), output_queue_); { std::lock_guard<std::mutex> lock(sessions_mutex_); sessions_.push_back(session); } // 启动会话 session->run(); // 继续接受新连接 if (!stopped_) { do_accept(); } } // 处理输出消息 void process_output() { while (!stopped_) { std::string message; output_queue_.wait_and_pop(message); // 示例:回显消息给所有客户端 std::string response = "Echo: " + message; broadcast(response); } } }; int main(int argc, char* argv[]) { try { auto const port = 8080; auto const threads = std::max<int>(1, std::thread::hardware_concurrency()); // 创建并启动服务器 WebSocketServer server{ port, threads }; server.start(); std::cout << "WebSocket server is running on port " << port << std::endl; std::cout << "Press Enter to exit..." << std::endl; // 等待停止信号 std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lock(mtx); cv.wait(lock); server.stop(); } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS; } 
站点信息
 -  文章2313
 -  用户1336
 -  访客11760899
 
 每日一句 
 Pride in your steps to dreams.
为追梦的每一步而自豪。
 为追梦的每一步而自豪。
新会员