From 3599e26747423a5ea6da8f8f8c3414e5a28157c8 Mon Sep 17 00:00:00 2001 From: Hans-Joachim Krauch Date: Fri, 14 Jul 2023 11:46:27 -0300 Subject: [PATCH] Update C++ example implementation (#491) ### Public-Facing Changes Update C++ example implementation ### Description - Allows choosing between boost asio and standalone asio (fixes #483) - Adds support for `asset` capability - Some refactoring --------- Co-authored-by: Jacob Bandes-Storch --- .github/workflows/ci.yml | 17 +- cpp/Makefile | 4 + cpp/dev.Dockerfile | 26 +- cpp/examples/conanfile.py | 4 +- cpp/examples/src/base64.hpp | 42 - .../src/example_server_flatbuffers.cpp | 6 +- cpp/examples/src/example_server_protobuf.cpp | 6 +- cpp/foxglove-websocket/CMakeLists.txt | 5 +- cpp/foxglove-websocket/conanfile.py | 7 +- .../include/foxglove/websocket/base64.hpp | 14 + .../foxglove/websocket/callback_queue.hpp | 81 ++ .../include/foxglove/websocket/common.hpp | 22 +- .../include/foxglove/websocket/parameter.hpp | 57 +- .../foxglove/websocket/serialization.hpp | 2 + .../foxglove/websocket/server_interface.hpp | 28 + .../foxglove/websocket/websocket_client.hpp | 6 + .../foxglove/websocket/websocket_logging.hpp | 4 +- .../foxglove/websocket/websocket_server.hpp | 870 +++++++++++------- cpp/foxglove-websocket/src/base64.cpp | 106 +++ cpp/foxglove-websocket/src/parameter.cpp | 77 +- cpp/foxglove-websocket/src/serialization.cpp | 109 ++- 21 files changed, 964 insertions(+), 529 deletions(-) delete mode 100644 cpp/examples/src/base64.hpp create mode 100644 cpp/foxglove-websocket/include/foxglove/websocket/base64.hpp create mode 100644 cpp/foxglove-websocket/include/foxglove/websocket/callback_queue.hpp create mode 100644 cpp/foxglove-websocket/src/base64.cpp diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 583e231d..192432b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -74,19 +74,14 @@ jobs: cpp: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + make_cmd: [format-check, build, build-cpp20, build-boost-asio] defaults: run: working-directory: cpp + name: cpp (${{ matrix.make_cmd }}) steps: - uses: actions/checkout@v3 - - run: make format-check - - run: make build - - cpp-std-20: - runs-on: ubuntu-latest - defaults: - run: - working-directory: cpp - steps: - - uses: actions/checkout@v3 - - run: make build-cpp20 + - run: make ${{ matrix.make_cmd }} diff --git a/cpp/Makefile b/cpp/Makefile index c6ecd02f..c8e3f713 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -4,6 +4,10 @@ default: build build: docker compose build --build-arg CPPSTD=17 +.PHONY: build-boost-asio +build-boost-asio: + docker compose build --build-arg CPPSTD=17 --build-arg ASIO=boost + .PHONY: build-cpp20 build-cpp20: docker compose build --build-arg CPPSTD=20 diff --git a/cpp/dev.Dockerfile b/cpp/dev.Dockerfile index 2f895be1..a2022a2b 100644 --- a/cpp/dev.Dockerfile +++ b/cpp/dev.Dockerfile @@ -24,24 +24,18 @@ WORKDIR /src FROM base as build RUN pip --no-cache-dir install conan RUN conan profile detect --force - -FROM build as build_examples -COPY ./foxglove-websocket/conanfile.py /src/foxglove-websocket/conanfile.py -ARG CPPSTD=17 -RUN conan install foxglove-websocket -s compiler.cppstd=$CPPSTD --build=missing COPY ./foxglove-websocket /src/foxglove-websocket/ -RUN conan create foxglove-websocket -s compiler.cppstd=$CPPSTD -COPY ./examples/conanfile.py /src/examples/conanfile.py -RUN conan install examples --output-folder examples/build --build=missing -s compiler.cppstd=$CPPSTD -s build_type=Debug +ARG CPPSTD=17 +ARG ASIO=standalone +RUN conan create foxglove-websocket -s compiler.cppstd=$CPPSTD --build=missing -o foxglove-websocket*:asio=$ASIO -FROM build_examples AS examples -COPY --from=build_examples /src /src +FROM build as build_examples +COPY --from=build /root/.conan2 /root/.conan2 COPY ./examples /src/examples -COPY --from=build_examples /src/examples/build/ /src/examples/build/ -RUN conan build examples --output-folder examples/ -s compiler.cppstd=$CPPSTD -s build_type=Debug +RUN conan build examples --output-folder examples/ -s compiler.cppstd=$CPPSTD --build=missing -FROM examples AS example_server_protobuf -CMD ["examples/build/Debug/example_server_protobuf"] +FROM build_examples AS example_server_protobuf +CMD ["examples/build/Release/example_server_protobuf"] -FROM examples AS example_server_flatbuffers -CMD ["examples/build/Debug/example_server_flatbuffers", "/src/examples/autogenerated_flatbuffers/SceneUpdate.bfbs"] +FROM build_examples AS example_server_flatbuffers +CMD ["examples/build/Release/example_server_flatbuffers", "/src/examples/autogenerated_flatbuffers/SceneUpdate.bfbs"] diff --git a/cpp/examples/conanfile.py b/cpp/examples/conanfile.py index 36b3f7bd..8ff36d13 100644 --- a/cpp/examples/conanfile.py +++ b/cpp/examples/conanfile.py @@ -4,14 +4,14 @@ class FoxgloveWebSocketExamplesConan(ConanFile): name = "foxglove-websocket-example" - version = "1.0.0" + version = "1.1.0" settings = "os", "compiler", "build_type", "arch" exports_sources = "CMakeLists.txt", "src/*", "proto/*" generators = "CMakeDeps" def requirements(self): self.requires("flatbuffers/23.5.26") - self.requires("foxglove-websocket/1.0.0") + self.requires("foxglove-websocket/1.1.0") self.requires("protobuf/3.21.4") self.requires("zlib/1.2.13") diff --git a/cpp/examples/src/base64.hpp b/cpp/examples/src/base64.hpp deleted file mode 100644 index 612f744b..00000000 --- a/cpp/examples/src/base64.hpp +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include -#include - -// Adapted from: -// https://gist.github.com/tomykaira/f0fd86b6c73063283afe550bc5d77594 -// https://github.com/protocolbuffers/protobuf/blob/01fe22219a0312b178a265e75fe35422ea6afbb1/src/google/protobuf/compiler/csharp/csharp_helpers.cc#L346 -inline std::string Base64Encode(std::string_view input) { - constexpr const char ALPHABET[] = - "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; - std::string result; - // Every 3 bytes of data yields 4 bytes of output - result.reserve((input.size() + (3 - 1 /* round up */)) / 3 * 4); - - // Unsigned values are required for bit-shifts below to work properly - const unsigned char* data = reinterpret_cast(input.data()); - - size_t i = 0; - for (; i + 2 < input.size(); i += 3) { - result.push_back(ALPHABET[data[i] >> 2]); - result.push_back(ALPHABET[((data[i] & 0b11) << 4) | (data[i + 1] >> 4)]); - result.push_back(ALPHABET[((data[i + 1] & 0b1111) << 2) | (data[i + 2] >> 6)]); - result.push_back(ALPHABET[data[i + 2] & 0b111111]); - } - switch (input.size() - i) { - case 2: - result.push_back(ALPHABET[data[i] >> 2]); - result.push_back(ALPHABET[((data[i] & 0b11) << 4) | (data[i + 1] >> 4)]); - result.push_back(ALPHABET[(data[i + 1] & 0b1111) << 2]); - result.push_back('='); - break; - case 1: - result.push_back(ALPHABET[data[i] >> 2]); - result.push_back(ALPHABET[(data[i] & 0b11) << 4]); - result.push_back('='); - result.push_back('='); - break; - } - - return result; -} diff --git a/cpp/examples/src/example_server_flatbuffers.cpp b/cpp/examples/src/example_server_flatbuffers.cpp index 7039d3e6..ad03280e 100644 --- a/cpp/examples/src/example_server_flatbuffers.cpp +++ b/cpp/examples/src/example_server_flatbuffers.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -12,7 +13,6 @@ #include #include "SceneUpdate_generated.h" -#include "base64.hpp" #include "flatbuffers/flatbuffers.h" namespace foxglove { @@ -78,13 +78,13 @@ int main(int argc, char** argv) { .topic = "example_msg", .encoding = "flatbuffer", .schemaName = "foxglove.SceneUpdate", - .schema = Base64Encode(getFileContents(sceneUpdateBfbsPath)), + .schema = foxglove::base64Encode(getFileContents(sceneUpdateBfbsPath)), }}); const auto chanId = channelIds.front(); bool running = true; - asio::signal_set signals(server->getEndpoint().get_io_service(), SIGINT); + websocketpp::lib::asio::signal_set signals(server->getEndpoint().get_io_service(), SIGINT); signals.async_wait([&](std::error_code const& ec, int sig) { if (ec) { std::cerr << "signal error: " << ec.message() << std::endl; diff --git a/cpp/examples/src/example_server_protobuf.cpp b/cpp/examples/src/example_server_protobuf.cpp index 3a6ec65d..d8dc6c8c 100644 --- a/cpp/examples/src/example_server_protobuf.cpp +++ b/cpp/examples/src/example_server_protobuf.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -13,7 +14,6 @@ #include #include -#include "base64.hpp" #include "foxglove/SceneUpdate.pb.h" namespace foxglove { @@ -80,13 +80,13 @@ int main() { .topic = "example_msg", .encoding = "protobuf", .schemaName = foxglove::SceneUpdate::descriptor()->full_name(), - .schema = Base64Encode(SerializeFdSet(foxglove::SceneUpdate::descriptor())), + .schema = foxglove::base64Encode(SerializeFdSet(foxglove::SceneUpdate::descriptor())), }}); const auto chanId = channelIds.front(); bool running = true; - asio::signal_set signals(server->getEndpoint().get_io_service(), SIGINT); + websocketpp::lib::asio::signal_set signals(server->getEndpoint().get_io_service(), SIGINT); signals.async_wait([&](std::error_code const& ec, int sig) { if (ec) { std::cerr << "signal error: " << ec.message() << std::endl; diff --git a/cpp/foxglove-websocket/CMakeLists.txt b/cpp/foxglove-websocket/CMakeLists.txt index 94f609d1..a2011585 100644 --- a/cpp/foxglove-websocket/CMakeLists.txt +++ b/cpp/foxglove-websocket/CMakeLists.txt @@ -3,11 +3,10 @@ project(FoxgloveWebSocket CXX) find_package(nlohmann_json REQUIRED) find_package(websocketpp REQUIRED) -find_package(asio REQUIRED) -add_library(foxglove_websocket src/parameter.cpp src/serialization.cpp) +add_library(foxglove_websocket src/base64.cpp src/parameter.cpp src/serialization.cpp) target_include_directories(foxglove_websocket PUBLIC include) -target_link_libraries(foxglove_websocket nlohmann_json::nlohmann_json websocketpp::websocketpp asio::asio) +target_link_libraries(foxglove_websocket nlohmann_json::nlohmann_json websocketpp::websocketpp) set_target_properties(foxglove_websocket PROPERTIES CXX_STANDARD 17 CXX_STANDARD_REQUIRED ON) install(TARGETS foxglove_websocket) diff --git a/cpp/foxglove-websocket/conanfile.py b/cpp/foxglove-websocket/conanfile.py index 5e00abd5..747b7a6b 100644 --- a/cpp/foxglove-websocket/conanfile.py +++ b/cpp/foxglove-websocket/conanfile.py @@ -5,7 +5,7 @@ class FoxgloveWebSocketConan(ConanFile): name = "foxglove-websocket" - version = "1.0.0" + version = "1.1.0" url = "https://github.com/foxglove/ws-protocol" homepage = "https://github.com/foxglove/ws-protocol" description = "A C++ server implementation of the Foxglove WebSocket Protocol" @@ -15,6 +15,8 @@ class FoxgloveWebSocketConan(ConanFile): settings = ("os", "compiler", "build_type", "arch") generators = "CMakeDeps" exports_sources = "CMakeLists.txt", "LICENSE", "src/*", "include/*" + options = {"asio": ["standalone", "boost"]} + default_options = {"asio": "standalone"} def validate(self): check_min_cppstd(self, "17") @@ -24,7 +26,8 @@ def requirements(self): self.requires("websocketpp/0.8.2", transitive_headers=True) def configure(self): - self.options["websocketpp"].asio = "standalone" + if self.options.asio == "standalone": + self.options["websocketpp"].asio = "standalone" def layout(self): cmake_layout(self) diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/base64.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/base64.hpp new file mode 100644 index 00000000..6ec9bf87 --- /dev/null +++ b/cpp/foxglove-websocket/include/foxglove/websocket/base64.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include +#include +#include +#include + +namespace foxglove { + +std::string base64Encode(const std::string_view& input); + +std::vector base64Decode(const std::string& input); + +} // namespace foxglove diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/callback_queue.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/callback_queue.hpp new file mode 100644 index 00000000..b0ed0194 --- /dev/null +++ b/cpp/foxglove-websocket/include/foxglove/websocket/callback_queue.hpp @@ -0,0 +1,81 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "websocket_logging.hpp" + +namespace foxglove { + +class CallbackQueue { +public: + CallbackQueue(LogCallback logCallback, size_t numThreads = 1) + : _logCallback(logCallback) + , _quit(false) { + for (size_t i = 0; i < numThreads; ++i) { + _workerThreads.push_back(std::thread(&CallbackQueue::doWork, this)); + } + } + + ~CallbackQueue() { + stop(); + } + + void stop() { + _quit = true; + _cv.notify_all(); + for (auto& thread : _workerThreads) { + thread.join(); + } + } + + void addCallback(std::function cb) { + if (_quit) { + return; + } + std::unique_lock lock(_mutex); + _callbackQueue.push_back(cb); + _cv.notify_one(); + } + +private: + void doWork() { + while (!_quit) { + std::unique_lock lock(_mutex); + _cv.wait(lock, [this] { + return (_quit || !_callbackQueue.empty()); + }); + if (_quit) { + break; + } else if (!_callbackQueue.empty()) { + std::function cb = _callbackQueue.front(); + _callbackQueue.pop_front(); + lock.unlock(); + try { + cb(); + } catch (const std::exception& ex) { + // Should never get here if we catch all exceptions in the callbacks. + const std::string msg = + std::string("Caught unhandled exception in calback_queue") + ex.what(); + _logCallback(WebSocketLogLevel::Error, msg.c_str()); + } catch (...) { + _logCallback(WebSocketLogLevel::Error, "Caught unhandled exception in calback_queue"); + } + } + } + } + + LogCallback _logCallback; + std::atomic _quit; + std::mutex _mutex; + std::condition_variable _cv; + std::deque> _callbackQueue; + std::vector _workerThreads; +}; + +} // namespace foxglove diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/common.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/common.hpp index d3ef9af3..29ae852d 100644 --- a/cpp/foxglove-websocket/include/foxglove/websocket/common.hpp +++ b/cpp/foxglove-websocket/include/foxglove/websocket/common.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -15,10 +16,11 @@ constexpr char CAPABILITY_PARAMETERS[] = "parameters"; constexpr char CAPABILITY_PARAMETERS_SUBSCRIBE[] = "parametersSubscribe"; constexpr char CAPABILITY_SERVICES[] = "services"; constexpr char CAPABILITY_CONNECTION_GRAPH[] = "connectionGraph"; +constexpr char CAPABILITY_ASSETS[] = "assets"; -constexpr std::array DEFAULT_CAPABILITIES = { +constexpr std::array DEFAULT_CAPABILITIES = { CAPABILITY_CLIENT_PUBLISH, CAPABILITY_CONNECTION_GRAPH, CAPABILITY_PARAMETERS_SUBSCRIBE, - CAPABILITY_PARAMETERS, CAPABILITY_SERVICES, + CAPABILITY_PARAMETERS, CAPABILITY_SERVICES, CAPABILITY_ASSETS, }; using ChannelId = uint32_t; @@ -30,6 +32,7 @@ enum class BinaryOpcode : uint8_t { MESSAGE_DATA = 1, TIME_DATA = 2, SERVICE_CALL_RESPONSE = 3, + FETCH_ASSET_RESPONSE = 4, }; enum class ClientBinaryOpcode : uint8_t { @@ -50,10 +53,11 @@ struct ChannelWithoutId { std::string encoding; std::string schemaName; std::string schema; + std::optional schemaEncoding; bool operator==(const ChannelWithoutId& other) const { return topic == other.topic && encoding == other.encoding && schemaName == other.schemaName && - schema == other.schema; + schema == other.schema && schemaEncoding == other.schemaEncoding; } }; @@ -143,4 +147,16 @@ struct ServiceResponse { using ServiceRequest = ServiceResponse; +enum class FetchAssetStatus : uint8_t { + Success = 0, + Error = 1, +}; + +struct FetchAssetResponse { + uint32_t requestId; + FetchAssetStatus status; + std::string errorMessage; + std::vector data; +}; + } // namespace foxglove diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/parameter.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/parameter.hpp index 1e088fa8..7a33e44e 100644 --- a/cpp/foxglove-websocket/include/foxglove/websocket/parameter.hpp +++ b/cpp/foxglove-websocket/include/foxglove/websocket/parameter.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include namespace foxglove { @@ -18,45 +19,59 @@ enum class ParameterType { PARAMETER_INTEGER, PARAMETER_DOUBLE, PARAMETER_STRING, - PARAMETER_BOOL_ARRAY, - PARAMETER_INTEGER_ARRAY, - PARAMETER_DOUBLE_ARRAY, - PARAMETER_STRING_ARRAY, + PARAMETER_ARRAY, + PARAMETER_STRUCT, // ROS 1 only + PARAMETER_BYTE_ARRAY, // ROS 2 only +}; + +class ParameterValue { +public: + ParameterValue(); + ParameterValue(bool value); + ParameterValue(int value); + ParameterValue(int64_t value); + ParameterValue(double value); + ParameterValue(const std::string& value); + ParameterValue(const char* value); + ParameterValue(const std::vector& value); + ParameterValue(const std::vector& value); + ParameterValue(const std::unordered_map& value); + + inline ParameterType getType() const { + return _type; + } + + template + inline const T& getValue() const { + return std::any_cast(_value); + } + +private: + ParameterType _type; + std::any _value; }; class Parameter { public: Parameter(); Parameter(const std::string& name); - Parameter(const std::string& name, bool value); - Parameter(const std::string& name, int value); - Parameter(const std::string& name, int64_t value); - Parameter(const std::string& name, double value); - Parameter(const std::string& name, std::string value); - Parameter(const std::string& name, const char* value); - Parameter(const std::string& name, const std::vector& value); - Parameter(const std::string& name, const std::vector& value); - Parameter(const std::string& name, const std::vector& value); - Parameter(const std::string& name, const std::vector& value); - Parameter(const std::string& name, const std::vector& value); + Parameter(const std::string& name, const ParameterValue& value); inline const std::string& getName() const { return _name; } inline ParameterType getType() const { - return _type; + return _value.getType(); } - template - inline const T& getValue() const { - return std::any_cast(_value); + inline const ParameterValue& getValue() const { + return _value; } private: std::string _name; - ParameterType _type; - std::any _value; + ParameterValue _value; }; } // namespace foxglove diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/serialization.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/serialization.hpp index 788de429..87893239 100644 --- a/cpp/foxglove-websocket/include/foxglove/websocket/serialization.hpp +++ b/cpp/foxglove-websocket/include/foxglove/websocket/serialization.hpp @@ -46,6 +46,8 @@ inline uint32_t ReadUint32LE(const uint8_t* buf) { void to_json(nlohmann::json& j, const Channel& c); void from_json(const nlohmann::json& j, Channel& c); +void to_json(nlohmann::json& j, const ParameterValue& p); +void from_json(const nlohmann::json& j, ParameterValue& p); void to_json(nlohmann::json& j, const Parameter& p); void from_json(const nlohmann::json& j, Parameter& p); void to_json(nlohmann::json& j, const Service& p); diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/server_interface.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/server_interface.hpp index 63c49fdc..b9968cf5 100644 --- a/cpp/foxglove-websocket/include/foxglove/websocket/server_interface.hpp +++ b/cpp/foxglove-websocket/include/foxglove/websocket/server_interface.hpp @@ -17,6 +17,31 @@ constexpr size_t DEFAULT_SEND_BUFFER_LIMIT_BYTES = 10000000UL; // 10 MB using MapOfSets = std::unordered_map>; +template +class ExeptionWithId : public std::runtime_error { +public: + explicit ExeptionWithId(IdType id, const std::string& what_arg) + : std::runtime_error(what_arg) + , _id(id) {} + + IdType id() const { + return _id; + } + +private: + IdType _id; +}; + +class ChannelError : public ExeptionWithId { + using ExeptionWithId::ExeptionWithId; +}; +class ClientChannelError : public ExeptionWithId { + using ExeptionWithId::ExeptionWithId; +}; +class ServiceError : public ExeptionWithId { + using ExeptionWithId::ExeptionWithId; +}; + struct ServerOptions { std::vector capabilities; std::vector supportedEncodings; @@ -48,6 +73,7 @@ struct ServerHandlers { parameterSubscriptionHandler; std::function serviceRequestHandler; std::function subscribeConnectionGraphHandler; + std::function fetchAssetHandler; }; template @@ -78,6 +104,8 @@ class ServerInterface { virtual void updateConnectionGraph(const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics, const MapOfSets& advertisedServices) = 0; + virtual void sendFetchAssetResponse(ConnectionHandle clientHandle, + const FetchAssetResponse& response) = 0; virtual uint16_t getPort() = 0; virtual std::string remoteEndpointString(ConnectionHandle clientHandle) = 0; diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/websocket_client.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/websocket_client.hpp index 1d20796d..e9ea254f 100644 --- a/cpp/foxglove-websocket/include/foxglove/websocket/websocket_client.hpp +++ b/cpp/foxglove-websocket/include/foxglove/websocket/websocket_client.hpp @@ -50,6 +50,7 @@ class ClientInterface { const std::optional& requestId) = 0; virtual void subscribeParameterUpdates(const std::vector& parameterNames) = 0; virtual void unsubscribeParameterUpdates(const std::vector& parameterNames) = 0; + virtual void fetchAsset(const std::string& name, uint32_t requestId) = 0; virtual void setTextMessageHandler(TextMessageHandler handler) = 0; virtual void setBinaryMessageHandler(BinaryMessageHandler handler) = 0; @@ -221,6 +222,11 @@ class Client : public ClientInterface { sendText(jsonPayload.dump()); } + void fetchAsset(const std::string& uri, uint32_t requestId) override { + nlohmann::json jsonPayload{{"op", "fetchAsset"}, {"uri", uri}, {"requestId", requestId}}; + sendText(jsonPayload.dump()); + } + void setTextMessageHandler(TextMessageHandler handler) override { std::unique_lock lock(_mutex); _textMessageHandler = std::move(handler); diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/websocket_logging.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/websocket_logging.hpp index 22e463b8..cc9ee6a8 100644 --- a/cpp/foxglove-websocket/include/foxglove/websocket/websocket_logging.hpp +++ b/cpp/foxglove-websocket/include/foxglove/websocket/websocket_logging.hpp @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -11,7 +11,7 @@ namespace foxglove { using LogCallback = std::function; -inline std::string IPAddressToString(const asio::ip::address& addr) { +inline std::string IPAddressToString(const websocketpp::lib::asio::ip::address& addr) { if (addr.is_v6()) { return "[" + addr.to_string() + "]"; } diff --git a/cpp/foxglove-websocket/include/foxglove/websocket/websocket_server.hpp b/cpp/foxglove-websocket/include/foxglove/websocket/websocket_server.hpp index 74d38512..2c70383e 100644 --- a/cpp/foxglove-websocket/include/foxglove/websocket/websocket_server.hpp +++ b/cpp/foxglove-websocket/include/foxglove/websocket/websocket_server.hpp @@ -18,6 +18,7 @@ #include #include +#include "callback_queue.hpp" #include "common.hpp" #include "parameter.hpp" #include "regex_utils.hpp" @@ -37,6 +38,29 @@ } \ } +namespace { + +constexpr uint32_t StringHash(const std::string_view str) { + uint32_t result = 0x811C9DC5; // FNV-1a 32-bit algorithm + for (char c : str) { + result = (static_cast(c) ^ result) * 0x01000193; + } + return result; +} + +constexpr auto SUBSCRIBE = StringHash("subscribe"); +constexpr auto UNSUBSCRIBE = StringHash("unsubscribe"); +constexpr auto ADVERTISE = StringHash("advertise"); +constexpr auto UNADVERTISE = StringHash("unadvertise"); +constexpr auto GET_PARAMETERS = StringHash("getParameters"); +constexpr auto SET_PARAMETERS = StringHash("setParameters"); +constexpr auto SUBSCRIBE_PARAMETER_UPDATES = StringHash("subscribeParameterUpdates"); +constexpr auto UNSUBSCRIBE_PARAMETER_UPDATES = StringHash("unsubscribeParameterUpdates"); +constexpr auto SUBSCRIBE_CONNECTION_GRAPH = StringHash("subscribeConnectionGraph"); +constexpr auto UNSUBSCRIBE_CONNECTION_GRAPH = StringHash("unsubscribeConnectionGraph"); +constexpr auto FETCH_ASSET = StringHash("fetchAsset"); +} // namespace + namespace foxglove { using json = nlohmann::json; @@ -48,14 +72,6 @@ static const websocketpp::log::level APP = websocketpp::log::alevel::app; static const websocketpp::log::level WARNING = websocketpp::log::elevel::warn; static const websocketpp::log::level RECOVERABLE = websocketpp::log::elevel::rerror; -constexpr uint32_t Integer(const std::string_view str) { - uint32_t result = 0x811C9DC5; // FNV-1a 32-bit algorithm - for (char c : str) { - result = (static_cast(c) ^ result) * 0x01000193; - } - return result; -} - /// Map of required capability by client operation (text). const std::unordered_map CAPABILITY_BY_CLIENT_OPERATION = { // {"subscribe", }, // No required capability. @@ -68,6 +84,7 @@ const std::unordered_map CAPABILITY_BY_CLIENT_OPERATIO {"unsubscribeParameterUpdates", CAPABILITY_PARAMETERS_SUBSCRIBE}, {"subscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH}, {"unsubscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH}, + {"fetchAsset", CAPABILITY_ASSETS}, }; /// Map of required capability by client operation (binary). @@ -82,16 +99,16 @@ enum class StatusLevel : uint8_t { Error = 2, }; -constexpr const char* StatusLevelToString(StatusLevel level) { +constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level) { switch (level) { case StatusLevel::Info: - return "INFO"; + return APP; case StatusLevel::Warning: - return "WARN"; + return WARNING; case StatusLevel::Error: - return "ERROR"; + return RECOVERABLE; default: - return "UNKNOWN"; + return RECOVERABLE; } } @@ -132,6 +149,7 @@ class Server final : public ServerInterface { void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse& response) override; void updateConnectionGraph(const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics, const MapOfSets& advertisedServices) override; + void sendFetchAssetResponse(ConnHandle clientHandle, const FetchAssetResponse& response) override; uint16_t getPort() override; std::string remoteEndpointString(ConnHandle clientHandle) override; @@ -164,6 +182,7 @@ class Server final : public ServerInterface { ServerOptions _options; ServerType _server; std::unique_ptr _serverThread; + std::unique_ptr _handlerCallbackQueue; uint32_t _nextChannelId = 0; std::map> _clients; @@ -195,17 +214,30 @@ class Server final : public ServerInterface { void handleConnectionOpened(ConnHandle hdl); void handleConnectionClosed(ConnHandle hdl); void handleMessage(ConnHandle hdl, MessagePtr msg); - void handleTextMessage(ConnHandle hdl, const std::string& msg); - void handleBinaryMessage(ConnHandle hdl, const uint8_t* msg, size_t length); + void handleTextMessage(ConnHandle hdl, MessagePtr msg); + void handleBinaryMessage(ConnHandle hdl, MessagePtr msg); void sendJson(ConnHandle hdl, json&& payload); void sendJsonRaw(ConnHandle hdl, const std::string& payload); void sendBinary(ConnHandle hdl, const uint8_t* payload, size_t payloadSize); - void sendStatus(ConnHandle clientHandle, const StatusLevel level, const std::string& message); + void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level, + const std::string& message); void unsubscribeParamsWithoutSubscriptions(ConnHandle hdl, const std::unordered_set& paramNames); bool isParameterSubscribed(const std::string& paramName) const; bool hasCapability(const std::string& capability) const; + bool hasHandler(uint32_t op) const; + void handleSubscribe(const nlohmann::json& payload, ConnHandle hdl); + void handleUnsubscribe(const nlohmann::json& payload, ConnHandle hdl); + void handleAdvertise(const nlohmann::json& payload, ConnHandle hdl); + void handleUnadvertise(const nlohmann::json& payload, ConnHandle hdl); + void handleGetParameters(const nlohmann::json& payload, ConnHandle hdl); + void handleSetParameters(const nlohmann::json& payload, ConnHandle hdl); + void handleSubscribeParameterUpdates(const nlohmann::json& payload, ConnHandle hdl); + void handleUnsubscribeParameterUpdates(const nlohmann::json& payload, ConnHandle hdl); + void handleSubscribeConnectionGraph(ConnHandle hdl); + void handleUnsubscribeConnectionGraph(ConnHandle hdl); + void handleFetchAsset(const nlohmann::json& payload, ConnHandle hdl); }; template @@ -218,7 +250,7 @@ inline Server::Server(std::string name, LogCallback logger, _server.get_alog().set_callback(_logger); _server.get_elog().set_callback(_logger); - std::error_code ec; + websocketpp::lib::error_code ec; _server.init_asio(ec); if (ec) { throw std::runtime_error("Failed to initialize websocket server: " + ec.message()); @@ -236,6 +268,9 @@ inline Server::Server(std::string name, LogCallback logger, std::bind(&Server::handleMessage, this, std::placeholders::_1, std::placeholders::_2)); _server.set_reuse_addr(true); _server.set_listen_backlog(128); + + // Callback queue for handling client requests. + _handlerCallbackQueue = std::make_unique(_logger, /*numThreads=*/1ul); } template @@ -243,7 +278,7 @@ inline Server::~Server() {} template inline void Server::socketInit(ConnHandle hdl) { - std::error_code ec; + websocketpp::lib::asio::error_code ec; _server.get_con_from_hdl(hdl)->get_raw_socket().set_option(Tcp::no_delay(true), ec); if (ec) { _server.get_elog().write(RECOVERABLE, "Failed to set TCP_NODELAY: " + ec.message()); @@ -343,7 +378,14 @@ inline void Server::handleConnectionClosed(ConnHandle hdl) _server.get_alog().write(APP, "Client " + clientName + " unadvertising channel " + std::to_string(clientChannelId) + " due to disconnect"); if (_handlers.clientUnadvertiseHandler) { - _handlers.clientUnadvertiseHandler(clientChannelId, hdl); + try { + _handlers.clientUnadvertiseHandler(clientChannelId, hdl); + } catch (const std::exception& ex) { + _server.get_elog().write( + RECOVERABLE, "Exception caught when closing connection: " + std::string(ex.what())); + } catch (...) { + _server.get_elog().write(RECOVERABLE, "Exception caught when closing connection"); + } } } @@ -356,7 +398,14 @@ inline void Server::handleConnectionClosed(ConnHandle hdl) if (_handlers.unsubscribeHandler) { for (const auto& [chanId, subs] : oldSubscriptionsByChannel) { (void)subs; - _handlers.unsubscribeHandler(chanId, hdl); + try { + _handlers.unsubscribeHandler(chanId, hdl); + } catch (const std::exception& ex) { + _server.get_elog().write( + RECOVERABLE, "Exception caught when closing connection: " + std::string(ex.what())); + } catch (...) { + _server.get_elog().write(RECOVERABLE, "Exception caught when closing connection"); + } } } @@ -374,7 +423,14 @@ inline void Server::handleConnectionClosed(ConnHandle hdl) _connectionGraph.subscriptionCount--; if (_connectionGraph.subscriptionCount == 0 && _handlers.subscribeConnectionGraphHandler) { _server.get_alog().write(APP, "Unsubscribing from connection graph updates."); - _handlers.subscribeConnectionGraphHandler(false); + try { + _handlers.subscribeConnectionGraphHandler(false); + } catch (const std::exception& ex) { + _server.get_elog().write( + RECOVERABLE, "Exception caught when closing connection: " + std::string(ex.what())); + } catch (...) { + _server.get_elog().write(RECOVERABLE, "Exception caught when closing connection"); + } } } @@ -392,7 +448,7 @@ inline void Server::stop() { } _server.get_alog().write(APP, "Stopping WebSocket server"); - std::error_code ec; + websocketpp::lib::error_code ec; _server.stop_perpetual(); @@ -469,7 +525,7 @@ inline void Server::start(const std::string& host, uint16_t throw std::runtime_error("Server already started"); } - std::error_code ec; + websocketpp::lib::error_code ec; _server.listen(host, std::to_string(port), ec); if (ec) { @@ -492,8 +548,9 @@ inline void Server::start(const std::string& host, uint16_t throw std::runtime_error("WebSocket server failed to listen on port " + std::to_string(port)); } - auto endpoint = _server.get_local_endpoint(ec); - if (ec) { + websocketpp::lib::asio::error_code asioEc; + auto endpoint = _server.get_local_endpoint(asioEc); + if (asioEc) { throw std::runtime_error("Failed to resolve the local endpoint: " + ec.message()); } @@ -533,13 +590,15 @@ inline void Server::sendBinary(ConnHandle hdl, const uint8_ } template -inline void Server::sendStatus(ConnHandle clientHandle, - const StatusLevel level, - const std::string& message) { +inline void Server::sendStatusAndLogMsg(ConnHandle clientHandle, + const StatusLevel level, + const std::string& message) { const std::string endpoint = remoteEndpointString(clientHandle); - const std::string logMessage = - "sendStatus(" + endpoint + ", " + StatusLevelToString(level) + ", " + message + ")"; - _server.get_elog().write(RECOVERABLE, logMessage); + const std::string logMessage = endpoint + ": " + message; + const auto logLevel = StatusLevelToLogLevel(level); + auto logger = level == StatusLevel::Info ? _server.get_alog() : _server.get_elog(); + logger.write(logLevel, logMessage); + sendJson(clientHandle, json{ {"op", "status"}, {"level", static_cast(level)}, @@ -550,324 +609,144 @@ inline void Server::sendStatus(ConnHandle clientHandle, template inline void Server::handleMessage(ConnHandle hdl, MessagePtr msg) { const OpCode op = msg->get_opcode(); - - try { - switch (op) { - case OpCode::TEXT: { - handleTextMessage(hdl, msg->get_payload()); - } break; - case OpCode::BINARY: { - const auto& payload = msg->get_payload(); - handleBinaryMessage(hdl, reinterpret_cast(payload.data()), payload.size()); - } break; - default: - break; + _handlerCallbackQueue->addCallback([this, hdl, msg, op]() { + try { + if (op == OpCode::TEXT) { + handleTextMessage(hdl, msg); + } else if (op == OpCode::BINARY) { + handleBinaryMessage(hdl, msg); + } + } catch (const std::exception& e) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what()); + } catch (...) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Exception occurred when executing message handler"); } - } catch (std::exception const& ex) { - sendStatus(hdl, StatusLevel::Error, std::string{"Error parsing message: "} + ex.what()); - } + }); } template -inline void Server::handleTextMessage(ConnHandle hdl, const std::string& msg) { - const json payload = json::parse(msg); +inline void Server::handleTextMessage(ConnHandle hdl, MessagePtr msg) { + const json payload = json::parse(msg->get_payload()); const std::string& op = payload.at("op").get(); const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_OPERATION.find(op); if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_OPERATION.end() && !hasCapability(requiredCapabilityIt->second)) { - sendStatus(hdl, StatusLevel::Error, - "Operation '" + op + "' not supported as server capability '" + - requiredCapabilityIt->second + "' is missing"); + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Operation '" + op + "' not supported as server capability '" + + requiredCapabilityIt->second + "' is missing"); return; } - std::shared_lock clientsLock(_clientsMutex); - auto& clientInfo = _clients.at(hdl); - - const auto findSubscriptionBySubId = [&clientInfo](SubscriptionId subId) { - return std::find_if(clientInfo.subscriptionsByChannel.begin(), - clientInfo.subscriptionsByChannel.end(), [&subId](const auto& mo) { - return mo.second == subId; - }); - }; - - constexpr auto SUBSCRIBE = Integer("subscribe"); - constexpr auto UNSUBSCRIBE = Integer("unsubscribe"); - constexpr auto ADVERTISE = Integer("advertise"); - constexpr auto UNADVERTISE = Integer("unadvertise"); - constexpr auto GET_PARAMETERS = Integer("getParameters"); - constexpr auto SET_PARAMETERS = Integer("setParameters"); - constexpr auto SUBSCRIBE_PARAMETER_UPDATES = Integer("subscribeParameterUpdates"); - constexpr auto UNSUBSCRIBE_PARAMETER_UPDATES = Integer("unsubscribeParameterUpdates"); - constexpr auto SUBSCRIBE_CONNECTION_GRAPH = Integer("subscribeConnectionGraph"); - constexpr auto UNSUBSCRIBE_CONNECTION_GRAPH = Integer("unsubscribeConnectionGraph"); - - switch (Integer(op)) { - case SUBSCRIBE: { - for (const auto& sub : payload.at("subscriptions")) { - SubscriptionId subId = sub.at("id"); - ChannelId channelId = sub.at("channelId"); - if (findSubscriptionBySubId(subId) != clientInfo.subscriptionsByChannel.end()) { - sendStatus(hdl, StatusLevel::Error, - "Client subscription id " + std::to_string(subId) + - " was already used; ignoring subscription"); - continue; - } - const auto& channelIt = _channels.find(channelId); - if (channelIt == _channels.end()) { - sendStatus( - hdl, StatusLevel::Warning, - "Channel " + std::to_string(channelId) + " is not available; ignoring subscription"); - continue; - } - clientInfo.subscriptionsByChannel.emplace(channelId, subId); - if (_handlers.subscribeHandler) { - _handlers.subscribeHandler(channelId, hdl); - } - } - } break; - case UNSUBSCRIBE: { - for (const auto& subIdJson : payload.at("subscriptionIds")) { - SubscriptionId subId = subIdJson; - const auto& sub = findSubscriptionBySubId(subId); - if (sub == clientInfo.subscriptionsByChannel.end()) { - sendStatus(hdl, StatusLevel::Warning, - "Client subscription id " + std::to_string(subId) + - " did not exist; ignoring unsubscription"); - continue; - } - ChannelId chanId = sub->first; - clientInfo.subscriptionsByChannel.erase(sub); - if (_handlers.unsubscribeHandler) { - _handlers.unsubscribeHandler(chanId, hdl); - } - } - } break; - case ADVERTISE: { - std::unique_lock clientChannelsLock(_clientChannelsMutex); - auto [clientPublicationsIt, isFirstPublication] = - _clientChannels.emplace(hdl, std::unordered_map()); - - auto& clientPublications = clientPublicationsIt->second; - - for (const auto& chan : payload.at("channels")) { - ClientChannelId channelId = chan.at("id"); - if (!isFirstPublication && clientPublications.find(channelId) != clientPublications.end()) { - sendStatus(hdl, StatusLevel::Error, - "Channel " + std::to_string(channelId) + " was already advertised"); - continue; - } + if (!hasHandler(StringHash(op))) { + sendStatusAndLogMsg( + hdl, StatusLevel::Error, + "Operation '" + op + "' not supported as server handler function is missing"); + return; + } - const auto topic = chan.at("topic").get(); - if (!isWhitelisted(topic, _options.clientTopicWhitelistPatterns)) { - sendStatus(hdl, StatusLevel::Error, - "Can't advertise channel " + std::to_string(channelId) + ", topic '" + topic + - "' not whitelisted"); - continue; - } - ClientAdvertisement advertisement{}; - advertisement.channelId = channelId; - advertisement.topic = topic; - advertisement.encoding = chan.at("encoding").get(); - advertisement.schemaName = chan.at("schemaName").get(); - clientPublications.emplace(channelId, advertisement); - clientInfo.advertisedChannels.emplace(channelId); - if (_handlers.clientAdvertiseHandler) { - _handlers.clientAdvertiseHandler(advertisement, hdl); - } - } - } break; - case UNADVERTISE: { - std::unique_lock clientChannelsLock(_clientChannelsMutex); - auto clientPublicationsIt = _clientChannels.find(hdl); - if (clientPublicationsIt == _clientChannels.end()) { - sendStatus(hdl, StatusLevel::Error, "Client has no advertised channels"); + try { + switch (StringHash(op)) { + case SUBSCRIBE: + handleSubscribe(payload, hdl); break; - } - - auto& clientPublications = clientPublicationsIt->second; - - for (const auto& chanIdJson : payload.at("channelIds")) { - ClientChannelId channelId = chanIdJson.get(); - const auto& channelIt = clientPublications.find(channelId); - if (channelIt == clientPublications.end()) { - continue; - } - clientPublications.erase(channelIt); - if (const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId) != - clientInfo.advertisedChannels.end()) { - clientInfo.advertisedChannels.erase(advertisedChannelIt); - } - - if (_handlers.clientUnadvertiseHandler) { - _handlers.clientUnadvertiseHandler(channelId, hdl); - } - } - } break; - case GET_PARAMETERS: { - if (!_handlers.parameterRequestHandler) { - return; - } - - const auto paramNames = payload.at("parameterNames").get>(); - const auto requestId = payload.find("id") == payload.end() - ? std::nullopt - : std::optional(payload["id"].get()); - _handlers.parameterRequestHandler(paramNames, requestId, hdl); - } break; - case SET_PARAMETERS: { - if (!_handlers.parameterChangeHandler) { - return; - } - - const auto parameters = payload.at("parameters").get>(); - const auto requestId = payload.find("id") == payload.end() - ? std::nullopt - : std::optional(payload["id"].get()); - _handlers.parameterChangeHandler(parameters, requestId, hdl); - } break; - case SUBSCRIBE_PARAMETER_UPDATES: { - if (!_handlers.parameterSubscriptionHandler) { - return; - } - - const auto paramNames = payload.at("parameterNames").get>(); - std::vector paramsToSubscribe; - { - // Only consider parameters that are not subscribed yet (by this or by other clients) - std::lock_guard lock(_clientParamSubscriptionsMutex); - std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToSubscribe), - [this](const std::string& paramName) { - return !isParameterSubscribed(paramName); - }); - - // Update the client's parameter subscriptions. - auto& clientSubscribedParams = _clientParamSubscriptions[hdl]; - clientSubscribedParams.insert(paramNames.begin(), paramNames.end()); - } - - if (!paramsToSubscribe.empty()) { - _handlers.parameterSubscriptionHandler(paramsToSubscribe, - ParameterSubscriptionOperation::SUBSCRIBE, hdl); - } - } break; - case UNSUBSCRIBE_PARAMETER_UPDATES: { - if (!_handlers.parameterSubscriptionHandler) { - return; - } - - const auto paramNames = payload.at("parameterNames").get>(); - { - std::lock_guard lock(_clientParamSubscriptionsMutex); - auto& clientSubscribedParams = _clientParamSubscriptions[hdl]; - for (const auto& paramName : paramNames) { - clientSubscribedParams.erase(paramName); - } - } - - unsubscribeParamsWithoutSubscriptions(hdl, paramNames); - } break; - case SUBSCRIBE_CONNECTION_GRAPH: { - std::unique_lock lock(_connectionGraphMutex); - _connectionGraph.subscriptionCount++; - - if (_connectionGraph.subscriptionCount == 1 && _handlers.subscribeConnectionGraphHandler) { - // First subscriber, let the handler know that we are interested in updates. - _server.get_alog().write(APP, "Subscribing to connection graph updates."); - _handlers.subscribeConnectionGraphHandler(true); - clientInfo.subscribedToConnectionGraph = true; - } - - json::array_t publishedTopicsJson, subscribedTopicsJson, advertisedServicesJson; - for (const auto& [name, ids] : _connectionGraph.publishedTopics) { - publishedTopicsJson.push_back(nlohmann::json{{"name", name}, {"publisherIds", ids}}); - } - for (const auto& [name, ids] : _connectionGraph.subscribedTopics) { - subscribedTopicsJson.push_back(nlohmann::json{{"name", name}, {"subscriberIds", ids}}); - } - for (const auto& [name, ids] : _connectionGraph.advertisedServices) { - advertisedServicesJson.push_back(nlohmann::json{{"name", name}, {"providerIds", ids}}); - } - - const json jsonMsg = { - {"op", "connectionGraphUpdate"}, - {"publishedTopics", publishedTopicsJson}, - {"subscribedTopics", subscribedTopicsJson}, - {"advertisedServices", advertisedServicesJson}, - {"removedTopics", json::array()}, - {"removedServices", json::array()}, - }; - - sendJsonRaw(hdl, jsonMsg.dump()); - } break; - case UNSUBSCRIBE_CONNECTION_GRAPH: { - if (clientInfo.subscribedToConnectionGraph) { - clientInfo.subscribedToConnectionGraph = false; - std::unique_lock lock(_connectionGraphMutex); - _connectionGraph.subscriptionCount--; - if (_connectionGraph.subscriptionCount == 0 && _handlers.subscribeConnectionGraphHandler) { - _server.get_alog().write(APP, "Unsubscribing from connection graph updates."); - _handlers.subscribeConnectionGraphHandler(false); - } - } else { - sendStatus(hdl, StatusLevel::Error, - "Client was not subscribed to connection graph updates"); - } - } break; - default: { - sendStatus(hdl, StatusLevel::Error, "Unrecognized client opcode \"" + op + "\""); - } break; + case UNSUBSCRIBE: + handleUnsubscribe(payload, hdl); + break; + case ADVERTISE: + handleAdvertise(payload, hdl); + break; + case UNADVERTISE: + handleUnadvertise(payload, hdl); + break; + case GET_PARAMETERS: + handleGetParameters(payload, hdl); + break; + case SET_PARAMETERS: + handleSetParameters(payload, hdl); + break; + case SUBSCRIBE_PARAMETER_UPDATES: + handleSubscribeParameterUpdates(payload, hdl); + break; + case UNSUBSCRIBE_PARAMETER_UPDATES: + handleUnsubscribeParameterUpdates(payload, hdl); + break; + case SUBSCRIBE_CONNECTION_GRAPH: + handleSubscribeConnectionGraph(hdl); + break; + case UNSUBSCRIBE_CONNECTION_GRAPH: + handleUnsubscribeConnectionGraph(hdl); + break; + case FETCH_ASSET: + handleFetchAsset(payload, hdl); + break; + default: + sendStatusAndLogMsg(hdl, StatusLevel::Error, "Unrecognized client opcode \"" + op + "\""); + break; + } + } catch (const ChannelError& e) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what()); + } catch (...) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler"); } -} +} // namespace foxglove template -inline void Server::handleBinaryMessage(ConnHandle hdl, const uint8_t* msg, - size_t length) { +inline void Server::handleBinaryMessage(ConnHandle hdl, MessagePtr msg) { + const auto& payload = msg->get_payload(); + const uint8_t* data = reinterpret_cast(payload.data()); + const size_t length = payload.size(); + if (length < 1) { - sendStatus(hdl, StatusLevel::Error, "Received an empty binary message"); + sendStatusAndLogMsg(hdl, StatusLevel::Error, "Received an empty binary message"); return; } - const auto op = static_cast(msg[0]); + const auto op = static_cast(data[0]); const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_BINARY_OPERATION.find(op); if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_BINARY_OPERATION.end() && !hasCapability(requiredCapabilityIt->second)) { - sendStatus(hdl, StatusLevel::Error, - "Binary operation '" + std::to_string(static_cast(op)) + - "' not supported as server capability '" + requiredCapabilityIt->second + - "' is missing"); + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Binary operation '" + std::to_string(static_cast(op)) + + "' not supported as server capability '" + requiredCapabilityIt->second + + "' is missing"); return; } switch (op) { case ClientBinaryOpcode::MESSAGE_DATA: { + if (!_handlers.clientMessageHandler) { + return; + } + if (length < 5) { - sendStatus(hdl, StatusLevel::Error, "Invalid message length " + std::to_string(length)); + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Invalid message length " + std::to_string(length)); return; } const auto timestamp = std::chrono::duration_cast( std::chrono::high_resolution_clock::now().time_since_epoch()) .count(); - const ClientChannelId channelId = *reinterpret_cast(msg + 1); + const ClientChannelId channelId = *reinterpret_cast(data + 1); std::shared_lock lock(_clientChannelsMutex); auto clientPublicationsIt = _clientChannels.find(hdl); if (clientPublicationsIt == _clientChannels.end()) { - sendStatus(hdl, StatusLevel::Error, "Client has no advertised channels"); + sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels"); return; } auto& clientPublications = clientPublicationsIt->second; const auto& channelIt = clientPublications.find(channelId); if (channelIt == clientPublications.end()) { - sendStatus(hdl, StatusLevel::Error, - "Channel " + std::to_string(channelId) + " is not advertised"); + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Channel " + std::to_string(channelId) + " is not advertised"); return; } - if (_handlers.clientMessageHandler) { + try { const auto& advertisement = channelIt->second; const uint32_t sequence = 0; const ClientMessage clientMessage{static_cast(timestamp), @@ -875,25 +754,30 @@ inline void Server::handleBinaryMessage(ConnHandle hdl, con sequence, advertisement, length, - msg}; + data}; _handlers.clientMessageHandler(clientMessage, hdl); + } catch (const ServiceError& e) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what()); + } catch (...) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, "callService: Failed to execute handler"); } } break; case ClientBinaryOpcode::SERVICE_CALL_REQUEST: { ServiceRequest request; if (length < request.size()) { - sendStatus(hdl, StatusLevel::Error, - "Invalid service call request length " + std::to_string(length)); + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Invalid service call request length " + std::to_string(length)); return; } - request.read(msg + 1, length - 1); + request.read(data + 1, length - 1); { std::shared_lock lock(_servicesMutex); if (_services.find(request.serviceId) == _services.end()) { - sendStatus(hdl, StatusLevel::Error, - "Service " + std::to_string(request.serviceId) + " is not advertised"); + sendStatusAndLogMsg( + hdl, StatusLevel::Error, + "Service " + std::to_string(request.serviceId) + " is not advertised"); return; } } @@ -903,8 +787,8 @@ inline void Server::handleBinaryMessage(ConnHandle hdl, con } } break; default: { - sendStatus(hdl, StatusLevel::Error, - "Unrecognized client opcode " + std::to_string(uint8_t(op))); + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Unrecognized client opcode " + std::to_string(uint8_t(op))); } break; } } @@ -1070,20 +954,18 @@ template inline void Server::sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp, const uint8_t* payload, size_t payloadSize) { - std::error_code ec; + websocketpp::lib::error_code ec; const auto con = _server.get_con_from_hdl(clientHandle, ec); if (ec || !con) { return; } const auto bufferSizeinBytes = con->get_buffered_amount(); - if (bufferSizeinBytes >= _options.sendBufferLimitBytes) { - FOXGLOVE_DEBOUNCE( - [this]() { - _server.get_elog().write( - WARNING, "Connection send buffer limit reached, messages will be dropped..."); - }, - 2500); + if (bufferSizeinBytes + payloadSize >= _options.sendBufferLimitBytes) { + const auto logFn = [this, clientHandle]() { + sendStatusAndLogMsg(clientHandle, StatusLevel::Warning, "Send buffer limit reached"); + }; + FOXGLOVE_DEBOUNCE(logFn, 2500); return; } @@ -1142,7 +1024,7 @@ inline void Server::sendServiceResponse(ConnHandle clientHa template inline uint16_t Server::getPort() { - std::error_code ec; + websocketpp::lib::asio::error_code ec; auto endpoint = _server.get_local_endpoint(ec); if (ec) { throw std::runtime_error("Server not listening on any port. Has it been started before?"); @@ -1231,7 +1113,7 @@ inline void Server::updateConnectionGraph( template inline std::string Server::remoteEndpointString(ConnHandle clientHandle) { - std::error_code ec; + websocketpp::lib::error_code ec; const auto con = _server.get_con_from_hdl(clientHandle, ec); return con ? con->get_remote_endpoint() : "(unknown)"; } @@ -1261,8 +1143,16 @@ inline void Server::unsubscribeParamsWithoutSubscriptions( for (const auto& param : paramsToUnsubscribe) { _server.get_alog().write(APP, "Unsubscribing from parameter '" + param + "'."); } - _handlers.parameterSubscriptionHandler(paramsToUnsubscribe, - ParameterSubscriptionOperation::UNSUBSCRIBE, hdl); + + try { + _handlers.parameterSubscriptionHandler(paramsToUnsubscribe, + ParameterSubscriptionOperation::UNSUBSCRIBE, hdl); + } catch (const std::exception& e) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what()); + } catch (...) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Failed to unsubscribe from one more more parameters"); + } } } @@ -1272,4 +1162,344 @@ inline bool Server::hasCapability(const std::string& capabi _options.capabilities.end(); } +template +inline bool Server::hasHandler(uint32_t op) const { + switch (op) { + case SUBSCRIBE: + return bool(_handlers.subscribeHandler); + case UNSUBSCRIBE: + return bool(_handlers.unsubscribeHandler); + case ADVERTISE: + return bool(_handlers.clientAdvertiseHandler); + case UNADVERTISE: + return bool(_handlers.clientUnadvertiseHandler); + case GET_PARAMETERS: + return bool(_handlers.parameterRequestHandler); + case SET_PARAMETERS: + return bool(_handlers.parameterChangeHandler); + case SUBSCRIBE_PARAMETER_UPDATES: + case UNSUBSCRIBE_PARAMETER_UPDATES: + return bool(_handlers.parameterSubscriptionHandler); + case SUBSCRIBE_CONNECTION_GRAPH: + case UNSUBSCRIBE_CONNECTION_GRAPH: + return bool(_handlers.subscribeConnectionGraphHandler); + case FETCH_ASSET: + return bool(_handlers.fetchAssetHandler); + default: + throw std::runtime_error("Unknown operation: " + std::to_string(op)); + } +} + +template +void Server::handleSubscribe(const nlohmann::json& payload, ConnHandle hdl) { + std::unordered_map clientSubscriptionsByChannel; + { + std::shared_lock clientsLock(_clientsMutex); + clientSubscriptionsByChannel = _clients.at(hdl).subscriptionsByChannel; + } + + const auto findSubscriptionBySubId = + [](const std::unordered_map& subscriptionsByChannel, + SubscriptionId subId) { + return std::find_if(subscriptionsByChannel.begin(), subscriptionsByChannel.end(), + [&subId](const auto& mo) { + return mo.second == subId; + }); + }; + + for (const auto& sub : payload.at("subscriptions")) { + SubscriptionId subId = sub.at("id"); + ChannelId channelId = sub.at("channelId"); + if (findSubscriptionBySubId(clientSubscriptionsByChannel, subId) != + clientSubscriptionsByChannel.end()) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Client subscription id " + std::to_string(subId) + + " was already used; ignoring subscription"); + continue; + } + const auto& channelIt = _channels.find(channelId); + if (channelIt == _channels.end()) { + sendStatusAndLogMsg( + hdl, StatusLevel::Warning, + "Channel " + std::to_string(channelId) + " is not available; ignoring subscription"); + continue; + } + + _handlers.subscribeHandler(channelId, hdl); + std::unique_lock clientsLock(_clientsMutex); + _clients.at(hdl).subscriptionsByChannel.emplace(channelId, subId); + } +} + +template +void Server::handleUnsubscribe(const nlohmann::json& payload, ConnHandle hdl) { + std::unordered_map clientSubscriptionsByChannel; + { + std::shared_lock clientsLock(_clientsMutex); + clientSubscriptionsByChannel = _clients.at(hdl).subscriptionsByChannel; + } + + const auto findSubscriptionBySubId = + [](const std::unordered_map& subscriptionsByChannel, + SubscriptionId subId) { + return std::find_if(subscriptionsByChannel.begin(), subscriptionsByChannel.end(), + [&subId](const auto& mo) { + return mo.second == subId; + }); + }; + + for (const auto& subIdJson : payload.at("subscriptionIds")) { + SubscriptionId subId = subIdJson; + const auto& sub = findSubscriptionBySubId(clientSubscriptionsByChannel, subId); + if (sub == clientSubscriptionsByChannel.end()) { + sendStatusAndLogMsg(hdl, StatusLevel::Warning, + "Client subscription id " + std::to_string(subId) + + " did not exist; ignoring unsubscription"); + continue; + } + + ChannelId chanId = sub->first; + _handlers.unsubscribeHandler(chanId, hdl); + std::unique_lock clientsLock(_clientsMutex); + _clients.at(hdl).subscriptionsByChannel.erase(sub); + } +} + +template +void Server::handleAdvertise(const nlohmann::json& payload, ConnHandle hdl) { + std::unique_lock clientChannelsLock(_clientChannelsMutex); + auto [clientPublicationsIt, isFirstPublication] = + _clientChannels.emplace(hdl, std::unordered_map()); + + auto& clientPublications = clientPublicationsIt->second; + + for (const auto& chan : payload.at("channels")) { + ClientChannelId channelId = chan.at("id"); + if (!isFirstPublication && clientPublications.find(channelId) != clientPublications.end()) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Channel " + std::to_string(channelId) + " was already advertised"); + continue; + } + + const auto topic = chan.at("topic").get(); + if (!isWhitelisted(topic, _options.clientTopicWhitelistPatterns)) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Can't advertise channel " + std::to_string(channelId) + ", topic '" + + topic + "' not whitelisted"); + continue; + } + ClientAdvertisement advertisement{}; + advertisement.channelId = channelId; + advertisement.topic = topic; + advertisement.encoding = chan.at("encoding").get(); + advertisement.schemaName = chan.at("schemaName").get(); + + _handlers.clientAdvertiseHandler(advertisement, hdl); + std::unique_lock clientsLock(_clientsMutex); + _clients.at(hdl).advertisedChannels.emplace(channelId); + clientPublications.emplace(channelId, advertisement); + } +} + +template +void Server::handleUnadvertise(const nlohmann::json& payload, ConnHandle hdl) { + std::unique_lock clientChannelsLock(_clientChannelsMutex); + auto clientPublicationsIt = _clientChannels.find(hdl); + if (clientPublicationsIt == _clientChannels.end()) { + sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels"); + return; + } + + auto& clientPublications = clientPublicationsIt->second; + + for (const auto& chanIdJson : payload.at("channelIds")) { + ClientChannelId channelId = chanIdJson.get(); + const auto& channelIt = clientPublications.find(channelId); + if (channelIt == clientPublications.end()) { + continue; + } + + _handlers.clientUnadvertiseHandler(channelId, hdl); + std::unique_lock clientsLock(_clientsMutex); + auto& clientInfo = _clients.at(hdl); + clientPublications.erase(channelIt); + const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId); + if (advertisedChannelIt != clientInfo.advertisedChannels.end()) { + clientInfo.advertisedChannels.erase(advertisedChannelIt); + } + } +} + +template +void Server::handleGetParameters(const nlohmann::json& payload, + ConnHandle hdl) { + const auto paramNames = payload.at("parameterNames").get>(); + const auto requestId = payload.find("id") == payload.end() + ? std::nullopt + : std::optional(payload["id"].get()); + _handlers.parameterRequestHandler(paramNames, requestId, hdl); +} + +template +void Server::handleSetParameters(const nlohmann::json& payload, + ConnHandle hdl) { + const auto parameters = payload.at("parameters").get>(); + const auto requestId = payload.find("id") == payload.end() + ? std::nullopt + : std::optional(payload["id"].get()); + _handlers.parameterChangeHandler(parameters, requestId, hdl); +} + +template +void Server::handleSubscribeParameterUpdates(const nlohmann::json& payload, + ConnHandle hdl) { + const auto paramNames = payload.at("parameterNames").get>(); + std::vector paramsToSubscribe; + { + // Only consider parameters that are not subscribed yet (by this or by other clients) + std::lock_guard lock(_clientParamSubscriptionsMutex); + std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToSubscribe), + [this](const std::string& paramName) { + return !isParameterSubscribed(paramName); + }); + + // Update the client's parameter subscriptions. + auto& clientSubscribedParams = _clientParamSubscriptions[hdl]; + clientSubscribedParams.insert(paramNames.begin(), paramNames.end()); + } + + if (!paramsToSubscribe.empty()) { + _handlers.parameterSubscriptionHandler(paramsToSubscribe, + ParameterSubscriptionOperation::SUBSCRIBE, hdl); + } +} + +template +void Server::handleUnsubscribeParameterUpdates(const nlohmann::json& payload, + ConnHandle hdl) { + const auto paramNames = payload.at("parameterNames").get>(); + { + std::lock_guard lock(_clientParamSubscriptionsMutex); + auto& clientSubscribedParams = _clientParamSubscriptions[hdl]; + for (const auto& paramName : paramNames) { + clientSubscribedParams.erase(paramName); + } + } + + unsubscribeParamsWithoutSubscriptions(hdl, paramNames); +} + +template +void Server::handleSubscribeConnectionGraph(ConnHandle hdl) { + bool subscribeToConnnectionGraph = false; + { + std::unique_lock lock(_connectionGraphMutex); + _connectionGraph.subscriptionCount++; + subscribeToConnnectionGraph = _connectionGraph.subscriptionCount == 1; + } + + if (subscribeToConnnectionGraph) { + // First subscriber, let the handler know that we are interested in updates. + _server.get_alog().write(APP, "Subscribing to connection graph updates."); + _handlers.subscribeConnectionGraphHandler(true); + std::unique_lock clientsLock(_clientsMutex); + _clients.at(hdl).subscribedToConnectionGraph = true; + } + + json::array_t publishedTopicsJson, subscribedTopicsJson, advertisedServicesJson; + { + std::shared_lock lock(_connectionGraphMutex); + for (const auto& [name, ids] : _connectionGraph.publishedTopics) { + publishedTopicsJson.push_back(nlohmann::json{{"name", name}, {"publisherIds", ids}}); + } + for (const auto& [name, ids] : _connectionGraph.subscribedTopics) { + subscribedTopicsJson.push_back(nlohmann::json{{"name", name}, {"subscriberIds", ids}}); + } + for (const auto& [name, ids] : _connectionGraph.advertisedServices) { + advertisedServicesJson.push_back(nlohmann::json{{"name", name}, {"providerIds", ids}}); + } + } + + const json jsonMsg = { + {"op", "connectionGraphUpdate"}, + {"publishedTopics", publishedTopicsJson}, + {"subscribedTopics", subscribedTopicsJson}, + {"advertisedServices", advertisedServicesJson}, + {"removedTopics", json::array()}, + {"removedServices", json::array()}, + }; + + sendJsonRaw(hdl, jsonMsg.dump()); +} + +template +void Server::handleUnsubscribeConnectionGraph(ConnHandle hdl) { + bool clientWasSubscribed = false; + { + std::unique_lock clientsLock(_clientsMutex); + auto& clientInfo = _clients.at(hdl); + if (clientInfo.subscribedToConnectionGraph) { + clientWasSubscribed = true; + clientInfo.subscribedToConnectionGraph = false; + } + } + + if (clientWasSubscribed) { + bool unsubscribeFromConnnectionGraph = false; + { + std::unique_lock lock(_connectionGraphMutex); + _connectionGraph.subscriptionCount--; + unsubscribeFromConnnectionGraph = _connectionGraph.subscriptionCount == 0; + } + if (unsubscribeFromConnnectionGraph) { + _server.get_alog().write(APP, "Unsubscribing from connection graph updates."); + _handlers.subscribeConnectionGraphHandler(false); + } + } else { + sendStatusAndLogMsg(hdl, StatusLevel::Error, + "Client was not subscribed to connection graph updates"); + } +} + +template +void Server::handleFetchAsset(const nlohmann::json& payload, ConnHandle hdl) { + const auto uri = payload.at("uri").get(); + const auto requestId = payload.at("requestId").get(); + _handlers.fetchAssetHandler(uri, requestId, hdl); +} + +template +inline void Server::sendFetchAssetResponse( + ConnHandle clientHandle, const FetchAssetResponse& response) { + websocketpp::lib::error_code ec; + const auto con = _server.get_con_from_hdl(clientHandle, ec); + if (ec || !con) { + return; + } + + const size_t errMsgSize = + response.status == FetchAssetStatus::Error ? response.errorMessage.size() : 0ul; + const size_t dataSize = response.status == FetchAssetStatus::Success ? response.data.size() : 0ul; + const size_t messageSize = 1 + 4 + 1 + 4 + errMsgSize + dataSize; + + auto message = con->get_message(OpCode::BINARY, messageSize); + + const auto op = BinaryOpcode::FETCH_ASSET_RESPONSE; + message->append_payload(&op, 1); + + std::array uint32Data; + foxglove::WriteUint32LE(uint32Data.data(), response.requestId); + message->append_payload(uint32Data.data(), uint32Data.size()); + + const uint8_t status = static_cast(response.status); + message->append_payload(&status, 1); + + foxglove::WriteUint32LE(uint32Data.data(), response.errorMessage.size()); + message->append_payload(uint32Data.data(), uint32Data.size()); + message->append_payload(response.errorMessage.data(), errMsgSize); + + message->append_payload(response.data.data(), dataSize); + con->send(message); +} + } // namespace foxglove diff --git a/cpp/foxglove-websocket/src/base64.cpp b/cpp/foxglove-websocket/src/base64.cpp new file mode 100644 index 00000000..0b7fc7f7 --- /dev/null +++ b/cpp/foxglove-websocket/src/base64.cpp @@ -0,0 +1,106 @@ +#include + +#include + +namespace foxglove { + +// Adapted from: +// https://gist.github.com/tomykaira/f0fd86b6c73063283afe550bc5d77594 +// https://github.com/protocolbuffers/protobuf/blob/01fe22219a0/src/google/protobuf/compiler/csharp/csharp_helpers.cc#L346 +std::string base64Encode(const std::string_view& input) { + constexpr const char ALPHABET[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + std::string result; + // Every 3 bytes of data yields 4 bytes of output + result.reserve((input.size() + (3 - 1 /* round up */)) / 3 * 4); + + // Unsigned values are required for bit-shifts below to work properly + const unsigned char* data = reinterpret_cast(input.data()); + + size_t i = 0; + for (; i + 2 < input.size(); i += 3) { + result.push_back(ALPHABET[data[i] >> 2]); + result.push_back(ALPHABET[((data[i] & 0b11) << 4) | (data[i + 1] >> 4)]); + result.push_back(ALPHABET[((data[i + 1] & 0b1111) << 2) | (data[i + 2] >> 6)]); + result.push_back(ALPHABET[data[i + 2] & 0b111111]); + } + switch (input.size() - i) { + case 2: + result.push_back(ALPHABET[data[i] >> 2]); + result.push_back(ALPHABET[((data[i] & 0b11) << 4) | (data[i + 1] >> 4)]); + result.push_back(ALPHABET[(data[i + 1] & 0b1111) << 2]); + result.push_back('='); + break; + case 1: + result.push_back(ALPHABET[data[i] >> 2]); + result.push_back(ALPHABET[(data[i] & 0b11) << 4]); + result.push_back('='); + result.push_back('='); + break; + } + + return result; +} + +// Adapted from: +// https://github.com/mvorbrodt/blog/blob/cd46051e180/src/base64.hpp#L55-L110 +std::vector base64Decode(const std::string& input) { + if (input.length() % 4) { + throw std::runtime_error("Invalid base64 length!"); + } + + constexpr char kPadCharacter = '='; + + std::size_t padding{}; + + if (input.length()) { + if (input[input.length() - 1] == kPadCharacter) padding++; + if (input[input.length() - 2] == kPadCharacter) padding++; + } + + std::vector decoded; + decoded.reserve(((input.length() / 4) * 3) - padding); + + std::uint32_t temp{}; + auto it = input.begin(); + + while (it < input.end()) { + for (std::size_t i = 0; i < 4; ++i) { + temp <<= 6; + if (*it >= 0x41 && *it <= 0x5A) + temp |= *it - 0x41; + else if (*it >= 0x61 && *it <= 0x7A) + temp |= *it - 0x47; + else if (*it >= 0x30 && *it <= 0x39) + temp |= *it + 0x04; + else if (*it == 0x2B) + temp |= 0x3E; + else if (*it == 0x2F) + temp |= 0x3F; + else if (*it == kPadCharacter) { + switch (input.end() - it) { + case 1: + decoded.push_back((temp >> 16) & 0x000000FF); + decoded.push_back((temp >> 8) & 0x000000FF); + return decoded; + case 2: + decoded.push_back((temp >> 10) & 0x000000FF); + return decoded; + default: + throw std::runtime_error("Invalid padding in base64!"); + } + } else + throw std::runtime_error("Invalid character in base64!"); + + ++it; + } + + decoded.push_back((temp >> 16) & 0x000000FF); + decoded.push_back((temp >> 8) & 0x000000FF); + decoded.push_back((temp)&0x000000FF); + } + + return decoded; +} + +} // namespace foxglove diff --git a/cpp/foxglove-websocket/src/parameter.cpp b/cpp/foxglove-websocket/src/parameter.cpp index b35f0d30..deee0be1 100644 --- a/cpp/foxglove-websocket/src/parameter.cpp +++ b/cpp/foxglove-websocket/src/parameter.cpp @@ -2,65 +2,42 @@ namespace foxglove { -Parameter::Parameter() - : _name("") - , _type(ParameterType::PARAMETER_NOT_SET) - , _value() {} - -Parameter::Parameter(const std::string& name) - : _name(name) - , _type(ParameterType::PARAMETER_NOT_SET) - , _value() {} - -Parameter::Parameter(const std::string& name, bool value) - : _name(name) - , _type(ParameterType::PARAMETER_BOOL) +ParameterValue::ParameterValue() + : _type(ParameterType::PARAMETER_NOT_SET) {} +ParameterValue::ParameterValue(bool value) + : _type(ParameterType::PARAMETER_BOOL) , _value(value) {} - -Parameter::Parameter(const std::string& name, int value) - : Parameter(name, static_cast(value)) {} - -Parameter::Parameter(const std::string& name, int64_t value) - : _name(name) - , _type(ParameterType::PARAMETER_INTEGER) +ParameterValue::ParameterValue(int value) + : _type(ParameterType::PARAMETER_INTEGER) + , _value(static_cast(value)) {} +ParameterValue::ParameterValue(int64_t value) + : _type(ParameterType::PARAMETER_INTEGER) , _value(value) {} - -Parameter::Parameter(const std::string& name, double value) - : _name(name) - , _type(ParameterType::PARAMETER_DOUBLE) +ParameterValue::ParameterValue(double value) + : _type(ParameterType::PARAMETER_DOUBLE) , _value(value) {} - -Parameter::Parameter(const std::string& name, const char* value) - : Parameter(name, std::string(value)) {} - -Parameter::Parameter(const std::string& name, std::string value) - : _name(name) - , _type(ParameterType::PARAMETER_STRING) +ParameterValue::ParameterValue(const std::string& value) + : _type(ParameterType::PARAMETER_STRING) , _value(value) {} - -Parameter::Parameter(const std::string& name, const std::vector& value) - : _name(name) - , _type(ParameterType::PARAMETER_BOOL_ARRAY) +ParameterValue::ParameterValue(const char* value) + : _type(ParameterType::PARAMETER_STRING) + , _value(std::string(value)) {} +ParameterValue::ParameterValue(const std::vector& value) + : _type(ParameterType::PARAMETER_BYTE_ARRAY) , _value(value) {} - -Parameter::Parameter(const std::string& name, const std::vector& value) - : _name(name) - , _type(ParameterType::PARAMETER_INTEGER_ARRAY) - , _value(std::vector(value.begin(), value.end())) {} - -Parameter::Parameter(const std::string& name, const std::vector& value) - : _name(name) - , _type(ParameterType::PARAMETER_INTEGER_ARRAY) +ParameterValue::ParameterValue(const std::vector& value) + : _type(ParameterType::PARAMETER_ARRAY) , _value(value) {} - -Parameter::Parameter(const std::string& name, const std::vector& value) - : _name(name) - , _type(ParameterType::PARAMETER_DOUBLE_ARRAY) +ParameterValue::ParameterValue(const std::unordered_map& value) + : _type(ParameterType::PARAMETER_STRUCT) , _value(value) {} -Parameter::Parameter(const std::string& name, const std::vector& value) +Parameter::Parameter() {} +Parameter::Parameter(const std::string& name) + : _name(name) + , _value(ParameterValue()) {} +Parameter::Parameter(const std::string& name, const ParameterValue& value) : _name(name) - , _type(ParameterType::PARAMETER_STRING_ARRAY) , _value(value) {} } // namespace foxglove diff --git a/cpp/foxglove-websocket/src/serialization.cpp b/cpp/foxglove-websocket/src/serialization.cpp index a53f80c6..1058b278 100644 --- a/cpp/foxglove-websocket/src/serialization.cpp +++ b/cpp/foxglove-websocket/src/serialization.cpp @@ -1,3 +1,4 @@ +#include #include namespace foxglove { @@ -10,40 +11,72 @@ void to_json(nlohmann::json& j, const Channel& c) { {"schemaName", c.schemaName}, {"schema", c.schema}, }; + + if (c.schemaEncoding.has_value()) { + j["schemaEncoding"] = c.schemaEncoding.value(); + } } void from_json(const nlohmann::json& j, Channel& c) { - ChannelWithoutId channelWithoutId{ - j["topic"].get(), - j["encoding"].get(), - j["schemaName"].get(), - j["schema"].get(), - }; + const auto it = j.find("schemaEncoding"); + const auto schemaEncoding = it == j.end() ? std::optional(std::nullopt) + : std::optional(it->get()); + + ChannelWithoutId channelWithoutId{j["topic"].get(), j["encoding"].get(), + j["schemaName"].get(), + j["schema"].get(), schemaEncoding}; c = Channel(j["id"].get(), channelWithoutId); } -void to_json(nlohmann::json& j, const Parameter& p) { +void to_json(nlohmann::json& j, const ParameterValue& p) { const auto paramType = p.getType(); if (paramType == ParameterType::PARAMETER_BOOL) { - j["value"] = p.getValue(); + j = p.getValue(); } else if (paramType == ParameterType::PARAMETER_INTEGER) { - j["value"] = p.getValue(); + j = p.getValue(); } else if (paramType == ParameterType::PARAMETER_DOUBLE) { - j["value"] = p.getValue(); + j = p.getValue(); } else if (paramType == ParameterType::PARAMETER_STRING) { - j["value"] = p.getValue(); - } else if (paramType == ParameterType::PARAMETER_BOOL_ARRAY) { - j["value"] = p.getValue>(); - } else if (paramType == ParameterType::PARAMETER_INTEGER_ARRAY) { - j["value"] = p.getValue>(); - } else if (paramType == ParameterType::PARAMETER_DOUBLE_ARRAY) { - j["value"] = p.getValue>(); - } else if (paramType == ParameterType::PARAMETER_STRING_ARRAY) { - j["value"] = p.getValue>(); + j = p.getValue(); + } else if (paramType == ParameterType::PARAMETER_BYTE_ARRAY) { + const auto& paramValue = p.getValue>(); + const std::string_view strValue(reinterpret_cast(paramValue.data()), + paramValue.size()); + j = base64Encode(strValue); + } else if (paramType == ParameterType::PARAMETER_STRUCT) { + j = p.getValue>(); + } else if (paramType == ParameterType::PARAMETER_ARRAY) { + j = p.getValue>(); } else if (paramType == ParameterType::PARAMETER_NOT_SET) { // empty value. } +} + +void from_json(const nlohmann::json& j, ParameterValue& p) { + const auto jsonType = j.type(); + + if (jsonType == nlohmann::detail::value_t::string) { + p = ParameterValue(j.get()); + } else if (jsonType == nlohmann::detail::value_t::boolean) { + p = ParameterValue(j.get()); + } else if (jsonType == nlohmann::detail::value_t::number_integer) { + p = ParameterValue(j.get()); + } else if (jsonType == nlohmann::detail::value_t::number_unsigned) { + p = ParameterValue(j.get()); + } else if (jsonType == nlohmann::detail::value_t::number_float) { + p = ParameterValue(j.get()); + } else if (jsonType == nlohmann::detail::value_t::object) { + p = ParameterValue(j.get>()); + } else if (jsonType == nlohmann::detail::value_t::array) { + p = ParameterValue(j.get>()); + } +} +void to_json(nlohmann::json& j, const Parameter& p) { + to_json(j["value"], p.getValue()); j["name"] = p.getName(); + if (p.getType() == ParameterType::PARAMETER_BYTE_ARRAY) { + j["type"] = "byte_array"; + } } void from_json(const nlohmann::json& j, Parameter& p) { @@ -54,40 +87,14 @@ void from_json(const nlohmann::json& j, Parameter& p) { return; } - const auto value = j["value"]; - const auto jsonType = j["value"].type(); + ParameterValue pValue; + from_json(j["value"], pValue); - if (jsonType == nlohmann::detail::value_t::string) { - p = Parameter(name, value.get()); - } else if (jsonType == nlohmann::detail::value_t::boolean) { - p = Parameter(name, value.get()); - } else if (jsonType == nlohmann::detail::value_t::number_integer) { - p = Parameter(name, value.get()); - } else if (jsonType == nlohmann::detail::value_t::number_unsigned) { - p = Parameter(name, value.get()); - } else if (jsonType == nlohmann::detail::value_t::number_float) { - p = Parameter(name, value.get()); - } else if (jsonType == nlohmann::detail::value_t::array) { - if (value.empty()) { - // We do not know the type when an empty array is received. - throw std::runtime_error("Setting empty arrays is currently unsupported."); - } - - if (value.front().is_string()) { - p = Parameter(name, value.get>()); - } else if (value.front().is_boolean()) { - p = Parameter(name, value.get>()); - } else if (value.front().is_number_integer()) { - p = Parameter(name, value.get>()); - } else if (value.front().is_number_unsigned()) { - p = Parameter(name, value.get>()); - } else if (value.front().is_number_float()) { - p = Parameter(name, value.get>()); - } else { - throw std::runtime_error("Unsupported array type"); - } + if (j.find("type") != j.end() && j["type"] == "byte_array" && + pValue.getType() == ParameterType::PARAMETER_STRING) { + p = Parameter(name, base64Decode(pValue.getValue())); } else { - throw std::runtime_error("Unsupported type"); + p = Parameter(name, pValue); } }