boost::beast websocket 实例

Home / C++ MrLee 3天前 26

WebSocket协议是现代Web开发中不可或缺的一部分,它允许客户端和服务器之间建立持久的连接,实现双向实时通信。与传统的HTTP请求不同,WebSocket提供了一种全双工的通信通道,使得数据可以在任意方向上传输,而无需等待对方请求或者应答。

  • Boost.Beast‌:这是一个基于Boost库的WebSocket库,提供了高性能和易用的API来实现WebSocket通信。Boost.Beast适用于需要高性能和易用性的场景‌。

#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
// 线程安全的消息队列
template<typename T>
class ConcurrentQueue {
    std::queue<T> queue_;
    mutable std::mutex mutex_;
    std::condition_variable cond_;
public:
    void push(T const& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_.push(value);
        lock.unlock();
        cond_.notify_one();
    }
    bool try_pop(T& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        if (queue_.empty()) {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }
    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        cond_.wait(lock, [this] { return !queue_.empty(); });
        value = queue_.front();
        queue_.pop();
    }
    bool empty() const {
        std::unique_lock<std::mutex> lock(mutex_);
        return queue_.empty();
    }
};
// WebSocket 会话类
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
    websocket::stream<beast::tcp_stream> ws_;
    beast::multi_buffer buffer_;
    ConcurrentQueue<std::string>& output_queue_;
    std::atomic<bool> is_closing_{ false };
    std::atomic<bool> writing_{ false };
    std::queue<std::string> write_queue_;
    std::mutex write_mutex_;
    std::vector<char> write_buffer_;
    static constexpr size_t MAX_BUFFER_SIZE = 128 * 1024 * 1024; // 16MB
public:
    WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue)
        : ws_(std::move(socket))
        , output_queue_(output_queue)
        , write_buffer_(MAX_BUFFER_SIZE){
    }
    ~WebSocketSession() {
        close();
    }
    // 获取底层 WebSocket 流
    websocket::stream<beast::tcp_stream>& ws() {
        return ws_;
    }
    void run() {
        // 设置更合理的超时选项
        websocket::stream_base::timeout timeout{
            std::chrono::seconds(30),   // 握手超时
            std::chrono::seconds(30),   // 空闲超时
            true                       // 启用 keep-alive pings
        };
        ws_.set_option(timeout);
        // 设置消息大小限制
        ws_.read_message_max(MAX_BUFFER_SIZE);
        // 设置 decorator
        ws_.set_option(websocket::stream_base::decorator(
            [](websocket::response_type& res) {
                res.set(http::field::server,
                    std::string(BOOST_BEAST_VERSION_STRING) +
                    " websocket-server");
            }));
        // 异步接受 WebSocket 握手
        ws_.async_accept(
            beast::bind_front_handler(
                &WebSocketSession::on_accept,
                shared_from_this()));
    }
    void on_accept(beast::error_code ec) {
        if (ec) {
            std::cerr << "WebSocket accept error: " << ec.message() << "\n";
            return;
        }
        // 开始读取消息
        do_read();
    }
    void close(websocket::close_code code = websocket::close_code::normal) {
        if (is_closing_.exchange(true)) {
            return;
        }
        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
        ws_.async_close(code,
            [self = shared_from_this()](beast::error_code ec) {
                if (ec) {
                    std::cerr << "WebSocket close error: " << ec.message() << "\n";
                }
            });
    }
    void do_read() {
        if (is_closing_) {
            return;
        }
        // 检查缓冲区大小
        if (buffer_.size() >= MAX_BUFFER_SIZE) {
            std::cerr << "Buffer size exceeded, closing connection\n";
            close(websocket::close_code::too_big);
            return;
        }
        // 异步读取
        ws_.async_read(
            buffer_,
            beast::bind_front_handler(
                &WebSocketSession::on_read,
                shared_from_this()));
    }
    void on_read(beast::error_code ec, std::size_t bytes_transferred) {
        std::cout << "on_read bytes_transferred:" << bytes_transferred << "\n";
        if (ec == websocket::error::closed) {
            close();
            return;
        }
        if (ec) {
            std::cerr << "WebSocket read error: " << ec.message() << "\n";
            close();
            return;
        }
        // 检查消息是否完整
        if (!ws_.is_message_done()) {
            do_read();
            return;
        }
        // 处理完整消息
        std::string message = beast::buffers_to_string(buffer_.data());
        buffer_.consume(buffer_.size());
        try {
            output_queue_.push(message);
        }
        catch (const std::exception& e) {
            std::cerr << "Message queue error: " << e.what() << "\n";
            close(websocket::close_code::internal_error);
            return;
        }
        // 继续读取
        do_read();
    }
    void do_write(const std::string& message) {
        if (is_closing_) {
            return;
        }
        // 添加到写队列
        {
            std::lock_guard<std::mutex> lock(write_mutex_);
            write_queue_.push(message);
            if (writing_) {
                return;
            }
            writing_ = true;
        }
        write_loop();
    }
    void write_loop() {
        {
            std::lock_guard<std::mutex> lock(write_mutex_);
            if (write_queue_.empty()) {
                writing_ = false;
                return;
            }
            std::string msg = std::move(write_queue_.front());
            write_buffer_.assign(msg.begin(), msg.end());
            write_queue_.pop();
        }
        ws_.async_write(
            net::buffer(&write_buffer_[0], write_buffer_.size()),
            beast::bind_front_handler(
                &WebSocketSession::on_write,
                shared_from_this()));
    }
    void on_write(beast::error_code ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cerr << "WebSocket write error: " << ec.message() << "\n";
            close();
            return;
        }
        std::cout << "on_write bytes_transferred:" << bytes_transferred << "\n";
        write_loop();
    }
};
// WebSocket 服务器类
class WebSocketServer {
    net::io_context ioc_;
    tcp::acceptor acceptor_;
    std::vector<std::thread> io_threads_;
    std::atomic<bool> stopped_{ false };
    // 输出消息队列
    ConcurrentQueue<std::string> output_queue_;
    // 活动会话列表
    std::vector<std::shared_ptr<WebSocketSession>> sessions_;
    mutable std::mutex sessions_mutex_;
public:
    WebSocketServer(
        unsigned short port,
        int thread_count = std::thread::hardware_concurrency())
        : ioc_(thread_count)
        , acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port })
        {
        // 启动 I/O 线程池
        for (int i = 0; i < thread_count; ++i) {
            io_threads_.emplace_back([this] {
                ioc_.run();
                });
        }
        // 启动输出处理线程
        io_threads_.emplace_back([this] {
            process_output();
            });
    }
    ~WebSocketServer() {
        stop();
    }
    // 启动服务器
    void start() {
        if (!stopped_) {
            do_accept();
        }
    }
    // 停止服务器
    void stop() {
        if (stopped_.exchange(true)) {
            return;
        }
        // 关闭所有会话
        {
            std::lock_guard<std::mutex> lock(sessions_mutex_);
            for (auto& session : sessions_) {
                session->close();
            }
            sessions_.clear();
        }
        // 停止 I/O 上下文
        ioc_.stop();
        // 等待所有线程结束
        for (auto& thread : io_threads_) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }
    // 广播消息给所有客户端
    void broadcast(const std::string& message) {
        std::lock_guard<std::mutex> lock(sessions_mutex_);
        for (auto& session : sessions_) {
            session->do_write(message);
        }
    }
private:
    void do_accept() {
        acceptor_.async_accept(
            net::make_strand(ioc_),
            beast::bind_front_handler(
                &WebSocketServer::on_accept,
                this));
    }
    void on_accept(beast::error_code ec, tcp::socket socket) {
        if (ec) {
            if (ec != net::error::operation_aborted) {
                std::cerr << "Accept error: " << ec.message() << "\n";
            }
            return;
        }
        // 创建新会话并添加到会话列表
        auto session = std::make_shared<WebSocketSession>(
            std::move(socket), output_queue_);
        {
            std::lock_guard<std::mutex> lock(sessions_mutex_);
            sessions_.push_back(session);
        }
        // 启动会话
        session->run();
        // 继续接受新连接
        if (!stopped_) {
            do_accept();
        }
    }
    // 处理输出消息
    void process_output() {
        while (!stopped_) {
            std::string message;
            output_queue_.wait_and_pop(message);
            broadcast(message);
        }
    }
};
int main(int argc, char* argv[]) {
    try {
        auto const port = 8080;
        auto const threads = std::max<int>(1, std::thread::hardware_concurrency());
        // 创建并启动服务器
        WebSocketServer server{ port, threads };
        server.start();
        std::cout << "WebSocket server is running on port " << port << std::endl;
        std::cout << "Press Enter to exit..." << std::endl;
        // 等待用户输入以停止服务器
        std::cin.get();
        server.stop();
    }
    catch (std::exception const& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}


本文链接:https://www.it72.com/12788.htm

推荐阅读
最新回复 (1)
  • MrLee 2天前
    引用 2
    #include <boost/beast/core.hpp>
    #include <boost/beast/websocket.hpp>
    #include <boost/asio/ip/tcp.hpp>
    #include <boost/asio/strand.hpp>
    #include <boost/config.hpp>
    #include <algorithm>
    #include <cstdlib>
    #include <functional>
    #include <iostream>
    #include <memory>
    #include <string>
    #include <thread>
    #include <vector>
    #include <atomic>
    #include <mutex>
    #include <queue>
    #include <condition_variable>
    namespace beast = boost::beast;
    namespace http = beast::http;
    namespace websocket = beast::websocket;
    namespace net = boost::asio;
    using tcp = boost::asio::ip::tcp;
    // 线程安全的消息队列
    template<typename T>
    class ConcurrentQueue {
        std::queue<T> queue_;
        mutable std::mutex mutex_;
        std::condition_variable cond_;
    public:
        void push(T const& value) {
            std::unique_lock<std::mutex> lock(mutex_);
            queue_.push(value);
            lock.unlock();
            cond_.notify_one();
        }
        bool try_pop(T& value) {
            std::unique_lock<std::mutex> lock(mutex_);
            if (queue_.empty()) {
                return false;
            }
            value = queue_.front();
            queue_.pop();
            return true;
        }
        void wait_and_pop(T& value) {
            std::unique_lock<std::mutex> lock(mutex_);
            cond_.wait(lock, [this] { return !queue_.empty(); });
            value = queue_.front();
            queue_.pop();
        }
        bool empty() const {
            std::unique_lock<std::mutex> lock(mutex_);
            return queue_.empty();
        }
    };
    // WebSocket 会话类
    class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
        websocket::stream<beast::tcp_stream> ws_;
        // 读写入缓冲空
        beast::flat_buffer read_buffer_;
        std::vector<char> write_buffer_;
        ConcurrentQueue<std::string>& output_queue_;
        // 关闭处理标志
        std::atomic<bool> is_closing_{ false };
        net::steady_timer timer_;
        bool ping_sent_ = false;
        std::chrono::steady_clock::time_point last_pong_time_;
    public:
        WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue)
            : ws_(std::move(socket))
            , output_queue_(output_queue)
            , write_buffer_(10 * 1024 * 1024)
            , timer_(ws_.get_executor()) {
        }
        ~WebSocketSession() {
            close();
        }
        // 获取底层 WebSocket 流
        websocket::stream<beast::tcp_stream>& ws() {
            return ws_;
        }
        // 启动 WebSocket 会话
        void run() {
            // 设置建议的 timeout 选项
            ws_.set_option(
                websocket::stream_base::timeout::suggested(
                    beast::role_type::server));
            // 设置 decorator 修改握手响应
            ws_.set_option(websocket::stream_base::decorator(
                [](websocket::response_type& res) {
                    res.set(http::field::server,
                        std::string(BOOST_BEAST_VERSION_STRING) +
                        " websocket-server");
                }));
            ws_.read_message_max(10 * 1024 * 1024);
            ws_.control_callback([&](websocket::frame_type kind, beast::string_view payload) {
                switch (kind)
                {
                case websocket::frame_type::ping:
                    // 收到Ping,自动回复Pong
                    std::cout << "Received ping, auto-replying with pong\n";
                    break;
                case websocket::frame_type::pong:
                    // 收到Pong响应
                    ping_sent_ = false;
                    last_pong_time_ = std::chrono::steady_clock::now();
                    std::cout << "Received pong\n";
                    break;
                case websocket::frame_type::close:
                    std::cout << "Received close frame\n";
                    break;
                }
            });
            // 异步接受 WebSocket 握手
            ws_.async_accept(
                beast::bind_front_handler(
                    &WebSocketSession::on_accept,
                    shared_from_this()));
        }
        // 关闭连接
        void close() {
            if (is_closing_.exchange(true)) {
                return; // 已经在关闭过程中
            }
            // 安全地关闭 WebSocket 连接
            beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
            ws_.async_close(websocket::close_code::normal,
                [self = shared_from_this()](beast::error_code ec) {
                    if (ec) {
                        std::cerr << "WebSocket close error: " << ec.message() << "\n";
                    }
                });
        }
        void on_close(beast::error_code ec)
        {
            if (ec) std::cerr << "onclose error: " << ec.message() << "\n";
            // 取消定时器
            timer_.cancel();
        }
        void start_ping_timer()
        {
            timer_.expires_after(std::chrono::seconds(5));
            timer_.async_wait(
                beast::bind_front_handler(
                    &WebSocketSession::on_ping_timer,
                    shared_from_this()));
        }
        void on_ping_timer(beast::error_code ec)
        {
            if (ec == net::error::operation_aborted)
                return;
            if (ec)std::cerr << "timer: " << ec.message() << "\n";
            // 检查上次Pong响应时间
            auto now = std::chrono::steady_clock::now();
            if (now - last_pong_time_ > std::chrono::seconds(10))
            {
                std::cerr << "No pong received for 10 seconds, closing connection\n";
                return ws_.async_close(websocket::close_code::normal,
                    beast::bind_front_handler(
                        &WebSocketSession::on_close,
                        shared_from_this()));
            }
            // 发送Ping
            if (!ping_sent_)
            {
                ping_sent_ = true;
                ws_.async_ping("",
                    beast::bind_front_handler(
                        &WebSocketSession::on_ping,
                        shared_from_this()));
            }
            else
            {
                // 已经发送过Ping但未收到Pong,可能是连接问题
                std::cerr << "Previous ping not answered, closing connection\n";
                return ws_.async_close(websocket::close_code::normal,
                    beast::bind_front_handler(
                        &WebSocketSession::on_close,
                        shared_from_this()));
            }
        }
        void on_ping(beast::error_code ec)
        {
            if (ec) std::cerr << "ping error: " << ec.message() << "\n";
            // 重置定时器
            start_ping_timer();
        }
        void on_accept(beast::error_code ec) {
            if (ec) {
                std::cerr << "WebSocket accept error: " << ec.message() << "\n";
                return;
            }
            last_pong_time_ = std::chrono::steady_clock::now();
            // 开始心跳检测
            start_ping_timer();
            // 开始读取消息
            do_read();
        }
        void do_read() {
            if (is_closing_) {
                return;
            }
            // 读取 WebSocket 消息
            ws_.async_read(
                read_buffer_,
                beast::bind_front_handler(
                    &WebSocketSession::on_read,
                    shared_from_this()));
        }
        void on_read(beast::error_code ec, std::size_t bytes_transferred) {
            boost::ignore_unused(bytes_transferred);
            if (ec == websocket::error::closed) {
                // 客户端正常关闭连接
                close();
                return;
            }
            if (ec) {
                std::cerr << "WebSocket read error: " << ec.message() << "\n";
                close();
                return;
            }
            // 处理接收到的消息
            std::string message = beast::buffers_to_string(read_buffer_.data());
            read_buffer_.consume(read_buffer_.size());
            std::cout << "on_read bytes_transferred: " << bytes_transferred << std::endl;
            // 将消息放入输出队列
            output_queue_.push(message);
            // 继续读取下一条消息
            do_read();
        }
        void do_write(const std::string& message) {
            if (is_closing_) {
                return;
            }
            write_buffer_.assign(message.begin(), message.end());
            // 异步写入消息
            ws_.async_write(
                net::buffer(write_buffer_),
                beast::bind_front_handler(
                    &WebSocketSession::on_write,
                    shared_from_this()));
        }
        void on_write(beast::error_code ec, std::size_t bytes_transferred) {
            boost::ignore_unused(bytes_transferred);
            if (ec) {
                std::cerr << "WebSocket write error: " << ec.message() << "\n";
                close();
                return;
            }
            std::cout << "on_write bytes_transferred: " << bytes_transferred << std::endl;
        }
    };
    // WebSocket 服务器类
    class WebSocketServer {
        net::io_context ioc_;
        tcp::acceptor acceptor_;
        std::vector<std::thread> io_threads_;
        std::atomic<bool> stopped_{ false };
        // 输出消息队列
        ConcurrentQueue<std::string> output_queue_;
        // 活动会话列表
        std::vector<std::shared_ptr<WebSocketSession>> sessions_;
        mutable std::mutex sessions_mutex_;
    public:
        WebSocketServer(
            unsigned short port,
            int thread_count = std::thread::hardware_concurrency())
            : ioc_(thread_count)
            , acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port }) {
            // 启动 I/O 线程池
            for (int i = 0; i < thread_count; ++i) {
                io_threads_.emplace_back([this] {
                    ioc_.run();
                    });
            }
            // 启动输出处理线程
            io_threads_.emplace_back([this] {
                process_output();
                });
        }
        ~WebSocketServer() {
            stop();
        }
        // 启动服务器
        void start() {
            if (!stopped_) {
                do_accept();
            }
        }
        // 停止服务器
        void stop() {
            if (stopped_.exchange(true)) {
                return;
            }
            // 关闭所有会话
            {
                std::lock_guard<std::mutex> lock(sessions_mutex_);
                for (auto& session : sessions_) {
                    session->close();
                }
                sessions_.clear();
            }
            // 停止 I/O 上下文
            ioc_.stop();
            // 等待所有线程结束
            for (auto& thread : io_threads_) {
                if (thread.joinable()) {
                    thread.join();
                }
            }
        }
        // 广播消息给所有客户端
        void broadcast(const std::string& message) {
            std::lock_guard<std::mutex> lock(sessions_mutex_);
            for (auto& session : sessions_) {
                session->do_write(message);
            }
        }
    private:
        void do_accept() {
            acceptor_.async_accept(
                net::make_strand(ioc_),
                beast::bind_front_handler(
                    &WebSocketServer::on_accept,
                    this));
        }
        void on_accept(beast::error_code ec, tcp::socket socket) {
            if (ec) {
                if (ec != net::error::operation_aborted) {
                    std::cerr << "Accept error: " << ec.message() << "\n";
                }
                return;
            }
            // 创建新会话并添加到会话列表
            auto session = std::make_shared<WebSocketSession>(
                std::move(socket), output_queue_);
            {
                std::lock_guard<std::mutex> lock(sessions_mutex_);
                sessions_.push_back(session);
            }
            // 启动会话
            session->run();
            // 继续接受新连接
            if (!stopped_) {
                do_accept();
            }
        }
        // 处理输出消息
        void process_output() {
            while (!stopped_) {
                std::string message;
                output_queue_.wait_and_pop(message);
                // 示例:回显消息给所有客户端
                std::string response = "Echo: " + message;
                broadcast(response);
            }
        }
    };
    int main(int argc, char* argv[]) {
        try {
            auto const port = 8080;
            auto const threads = std::max<int>(1, std::thread::hardware_concurrency());
            // 创建并启动服务器
            WebSocketServer server{ port, threads };
            server.start();
            std::cout << "WebSocket server is running on port " << port << std::endl;
            std::cout << "Press Enter to exit..." << std::endl;
            // 等待停止信号
            std::mutex mtx;
            std::condition_variable cv;
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock);
            server.stop();
        }
        catch (std::exception const& e) {
            std::cerr << "Error: " << e.what() << std::endl;
            return EXIT_FAILURE;
        }
        return EXIT_SUCCESS;
    }


返回