From d52ed3c18225997ced83458c63cd5c62f05133ae Mon Sep 17 00:00:00 2001 From: Ruben Perez Date: Tue, 2 Dec 2025 11:23:44 +0100 Subject: [PATCH] Initial sketch --- example/CMakeLists.txt | 5 +- example/cpp20_subscriber.cpp | 34 ++++++- include/boost/redis/pubsub_response.hpp | 123 ++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 6 deletions(-) create mode 100644 include/boost/redis/pubsub_response.hpp diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 7f77b4f10..86d5cbbb7 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -10,9 +10,10 @@ macro(make_example EXAMPLE_NAME STANDARD) if (${STANDARD} STREQUAL "20") target_link_libraries(${EXAMPLE_NAME} PRIVATE examples_main) endif() - if (${EXAMPLE_NAME} STREQUAL "cpp20_json") + # TODO: this + # if (${EXAMPLE_NAME} STREQUAL "cpp20_json") target_link_libraries(${EXAMPLE_NAME} PRIVATE Boost::json Boost::container_hash) - endif() + # endif() endmacro() macro(make_testable_example EXAMPLE_NAME STANDARD) diff --git a/example/cpp20_subscriber.cpp b/example/cpp20_subscriber.cpp index 1f0dd86fe..567fff0a0 100644 --- a/example/cpp20_subscriber.cpp +++ b/example/cpp20_subscriber.cpp @@ -5,6 +5,8 @@ */ #include +#include +#include #include #include @@ -13,8 +15,12 @@ #include #include #include +#include +#include +#include #include +#include #if defined(BOOST_ASIO_HAS_CO_AWAIT) @@ -25,6 +31,7 @@ using boost::redis::generic_flat_response; using boost::redis::config; using boost::system::error_code; using boost::redis::connection; +using boost::redis::pubsub_message; using asio::signal_set; /* This example will subscribe and read pushes indefinitely. @@ -43,13 +50,31 @@ using asio::signal_set; * > CLIENT kill TYPE pubsub */ +// Struct that will be stored in Redis using json serialization. +struct user { + std::string name; + std::string age; + std::string country; +}; + +// The type must be described for serialization to work. +BOOST_DESCRIBE_STRUCT(user, (), (name, age, country)) + +void boost_redis_from_bulk( + user& u, + boost::redis::resp3::node_view const& node, + boost::system::error_code&) +{ + u = boost::json::value_to(boost::json::parse(node.value)); +} + // Receives server pushes. auto receiver(std::shared_ptr conn) -> asio::awaitable { request req; req.push("SUBSCRIBE", "channel"); - generic_flat_response resp; + std::vector> resp; conn->set_receive_response(resp); // Loop while reconnection is enabled @@ -66,12 +91,13 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable // The response must be consumed without suspending the // coroutine i.e. without the use of async operations. - for (auto const& elem: resp.value().get_view()) - std::cout << elem.value << "\n"; + for (auto const& elem : resp) + std::cout << elem.channel << ": age=" << elem.payload.age + << ", name=" << elem.payload.name << "\n"; std::cout << std::endl; - resp.value().clear(); + resp.clear(); } } } diff --git a/include/boost/redis/pubsub_response.hpp b/include/boost/redis/pubsub_response.hpp new file mode 100644 index 000000000..875582b56 --- /dev/null +++ b/include/boost/redis/pubsub_response.hpp @@ -0,0 +1,123 @@ +/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_PUBSUB_RESPONSE_HPP +#define BOOST_REDIS_PUBSUB_RESPONSE_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace boost::redis { + +template +struct pubsub_message { + std::string channel; + T payload; +}; + +namespace adapter::detail { + +template +struct pubsub_adapter { + std::vector>* vec; + int resume_point{0}; + bool ignore{false}; + std::string channel_name{}; + + void on_init() + { + resume_point = 0; + ignore = false; + } + void on_done() { } + + system::error_code on_node_impl(resp3::node_view const& nd) + { + if (ignore) + return system::error_code(); + + switch (resume_point) { + BOOST_REDIS_CORO_INITIAL + + // The root node should be a push + if (nd.data_type != resp3::type::push) { + ignore = true; + return system::error_code(); + } + + BOOST_REDIS_YIELD(resume_point, 1, system::error_code()) + + // The next element should be the string "message" + if (nd.data_type != resp3::type::blob_string || nd.value != "message") { + ignore = true; + return system::error_code(); + } + + BOOST_REDIS_YIELD(resume_point, 2, system::error_code()) + + // The next element is the channel name + if (nd.data_type != resp3::type::blob_string) { + ignore = true; + return system::error_code(); + } + + channel_name = nd.value; + + BOOST_REDIS_YIELD(resume_point, 3, system::error_code()) + + // The last element is the payload + if (nd.data_type != resp3::type::blob_string) { + ignore = true; + return system::error_code(); + } + + // Try to deserialize the payload, if required + T elm; + system::error_code ec; + boost_redis_from_bulk(elm, nd, ec); + + if (ec) + return ec; + + vec->push_back(pubsub_message{std::move(channel_name), std::move(elm)}); + + return system::error_code(); + } + + return system::error_code(); + } + + template + void on_node(resp3::basic_node const& nd, system::error_code& ec) + { + ec = on_node_impl(nd); + } +}; + +template +struct response_traits>> { + using response_type = std::vector>; + using adapter_type = pubsub_adapter; + + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +} // namespace adapter::detail + +} // namespace boost::redis + +#endif