diff --git a/CMakeLists.txt b/CMakeLists.txt index b899458e..14be7357 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -191,6 +191,7 @@ set(dxFeedGraalCxxApi_Exceptions_Sources set(dxFeedGraalCxxApi_Isolated_Sources src/isolated/api/IsolatedDXEndpoint.cpp + src/isolated/api/IsolatedDXPublisher.cpp src/isolated/api/IsolatedDXPublisherObservableSubscription.cpp src/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.cpp src/isolated/internal/IsolatedString.cpp diff --git a/include/dxfeed_graal_cpp_api/api.hpp b/include/dxfeed_graal_cpp_api/api.hpp index f322a38d..f38bc16a 100644 --- a/include/dxfeed_graal_cpp_api/api.hpp +++ b/include/dxfeed_graal_cpp_api/api.hpp @@ -53,6 +53,7 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251 4996) #include "exceptions/GraalException.hpp" #include "isolated/api/IsolatedDXEndpoint.hpp" +#include "isolated/api/IsolatedDXPublisher.hpp" #include "isolated/api/IsolatedDXPublisherObservableSubscription.hpp" #include "isolated/api/osub/IsolatedObservableSubscriptionChangeListener.hpp" #include "isolated/internal/IsolatedString.hpp" diff --git a/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp b/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp index 2d319a30..b2434998 100644 --- a/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp +++ b/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp @@ -69,13 +69,15 @@ struct DXFCPP_EXPORT DXPublisher : SharedEntity { friend struct DXEndpoint; private: + mutable std::recursive_mutex mutex_{}; JavaObjectHandle handle_; + std::shared_ptr subscription_{}; static std::shared_ptr create(void *handle); void publishEventsImpl(void *graalEventsList) const noexcept; protected: - DXPublisher() noexcept : handle_{} { + DXPublisher() noexcept { if constexpr (Debugger::isDebug) { Debugger::debug("DXPublisher()"); } @@ -217,7 +219,30 @@ struct DXFCPP_EXPORT DXPublisher : SharedEntity { EventMapper::freeGraalList(list); } - std::shared_ptr getSubscription(const EventTypeEnum &); + /** + * Returns observable set of subscribed symbols for the specified event type. + * Note, that subscription is represented by SymbolWrapper symbols. Check the type of each symbol + * in ObservableSubscription using SymbolWrapper::isStringSymbol(), SymbolWrapper::isWildcardSymbol(), + * SymbolWrapper::isIndexedEventSubscriptionSymbol(), SymbolWrapper::isTimeSeriesSubscriptionSymbol(), + * SymbolWrapper::isCandleSymbol(). + * + *

The set of subscribed symbols contains WildcardSymbol::ALL if and + * only if there is a subscription to this wildcard symbol. + * + *

If DXFeedTimeSeriesSubscription is used to subscribe to time-service of the events of this type, then + * instances of TimeSeriesSubscriptionSymbol class represent the corresponding subscription item. + * + *

The resulting observable subscription can generate repeated ObservableSubscriptionChangeListener::onSymbolsAdded_ notifications to + * its listeners for the same symbols without the corresponding ObservableSubscriptionChangeListener::onSymbolsRemoved_ + * notifications in between them. It happens when subscription disappears, cached data is lost, and subscription + * reappears again. On each ObservableSubscriptionChangeListener::onSymbolsAdded_ + * notification data provider shall @ref DXPublisher::publishEvents() "publish" the most recent events for + * the corresponding symbols. + * + * @param eventType The event type. + * @return Observable subscription for the specified event type. + */ + std::shared_ptr getSubscription(const EventTypeEnum &eventType); std::string toString() const noexcept override; }; diff --git a/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp new file mode 100644 index 00000000..97fadeed --- /dev/null +++ b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp @@ -0,0 +1,48 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +#include "../../internal/Conf.hpp" + +DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) + +#include "../../api/DXPublisher.hpp" + +DXFCPP_BEGIN_NAMESPACE + +struct DXPublisherObservableSubscription; + +namespace isolated::api::IsolatedDXPublisher { + +/** + * Calls the Graal SDK function `dxfg_DXPublisher_publishEvents` in isolation. + * + * @param publisher The publisher's handle. + * @param events Events to be published. + * @throws std::invalid_argument if publisher handle is invalid or events' pointer is nullptr. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +void /* int32_t */ publishEvents(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_type_list * */ void *events); + +/** + * Calls the Graal SDK function `dxfg_DXPublisher_getSubscription` in isolation. + * + * @param publisher The publisher's handle. + * @param eventType The event type. + * @throws std::invalid_argument if publisher handle is invalid. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + * @return The underlying observable subscription's handle. + */ +JavaObjectHandle /* dxfg_observable_subscription_t * */ +getSubscription(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_clazz_t */ const EventTypeEnum &eventType); + +} // namespace isolated::api::IsolatedDXPublisher + +DXFCPP_END_NAMESPACE + +DXFCXX_DISABLE_MSC_WARNINGS_POP() \ No newline at end of file diff --git a/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisherObservableSubscription.hpp b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisherObservableSubscription.hpp index d6727a67..8bf26d94 100644 --- a/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisherObservableSubscription.hpp +++ b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisherObservableSubscription.hpp @@ -5,10 +5,10 @@ #include "../../internal/Conf.hpp" -#include - DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) +#include + #include "../../api/DXPublisherObservableSubscription.hpp" DXFCPP_BEGIN_NAMESPACE diff --git a/samples/cpp/PublishProfiles/src/main.cpp b/samples/cpp/PublishProfiles/src/main.cpp index d2e61bb3..c3e36c16 100644 --- a/samples/cpp/PublishProfiles/src/main.cpp +++ b/samples/cpp/PublishProfiles/src/main.cpp @@ -29,28 +29,28 @@ int main(int argc, char *argv[]) { auto publisher = endpoint->getPublisher(); // Observe Profile subscriptions and publish profiles for "xxx:TEST" symbols - // publisher->getSubscription(Profile::TYPE)->addChangeListener(ObservableSubscriptionChangeListener::create( - // [publisher](auto&& symbols) /* symbolsAdded */ { - // std::vector> events{}; - // events.reserve(symbols.sise()); - // - // for (auto&& symbol : symbols) { - // if (symbol.isStringSymbol()) { - // auto s = symbol.asStringSymbol(); - // - // if (s.size() > 5 && s.rfind(":TEST") == s.size() - 5) { - // auto profile = std::make_shared(s); - // - // profile->setDescription("Test symbol"); - // events.push_back(profile); - // } - // } - // } - // - // events.shrink_to_fit(); - // publisher->publishEvents(events); - // } - // )); + publisher->getSubscription(Profile::TYPE) + ->addChangeListener( + ObservableSubscriptionChangeListener::create([publisher](auto &&symbols) /* symbolsAdded */ { + std::vector> events{}; + events.reserve(symbols.size()); + + for (auto &&symbol : symbols) { + if (symbol.isStringSymbol()) { + auto s = symbol.asStringSymbol(); + + if (s.size() > 5 && s.rfind(":TEST") == s.size() - 5) { + auto profile = std::make_shared(s); + + profile->setDescription("Test symbol"); + events.push_back(profile); + } + } + } + + events.shrink_to_fit(); + publisher->publishEvents(events); + })); std::cin.get(); } catch (const JavaException &e) { diff --git a/src/api/DXPublisher.cpp b/src/api/DXPublisher.cpp index 1630b5b7..77fbe466 100644 --- a/src/api/DXPublisher.cpp +++ b/src/api/DXPublisher.cpp @@ -39,17 +39,26 @@ std::shared_ptr DXPublisher::create(void *handle) { return publisher; } +std::shared_ptr DXPublisher::getSubscription(const EventTypeEnum &eventType) { + std::lock_guard guard{mutex_}; + + if (subscription_) { + return subscription_; + } + + subscription_ = DXPublisherObservableSubscription::create(isolated::api::IsolatedDXPublisher::getSubscription(handle_, eventType)); + + return subscription_; +} + std::string DXPublisher::toString() const noexcept { return fmt::format("DXPublisher{{{}}}", handle_.toString()); } void DXPublisher::publishEventsImpl(void *graalEventsList) const noexcept { - runIsolatedOrElse( - [handle = static_cast(handle_.get()), graalEventsList](auto threadHandle) { - return dxfg_DXPublisher_publishEvents(static_cast(threadHandle), handle, - static_cast(graalEventsList)) == 0; - }, - false); + std::lock_guard guard{mutex_}; + + isolated::api::IsolatedDXPublisher::publishEvents(handle_, graalEventsList); } DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/src/isolated/api/IsolatedDXPublisher.cpp b/src/isolated/api/IsolatedDXPublisher.cpp new file mode 100644 index 00000000..76139e83 --- /dev/null +++ b/src/isolated/api/IsolatedDXPublisher.cpp @@ -0,0 +1,46 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include +#include +#include + +DXFCPP_BEGIN_NAMESPACE + +namespace isolated::api::IsolatedDXPublisher { + +void /* int32_t */ publishEvents(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_type_list * */ void *events) { + if (!publisher) { + throw std::invalid_argument( + "Unable to execute function `dxfg_DXPublisher_publishEvents`. The `publisher` handle is invalid"); + } + + if (!events) { + throw std::invalid_argument( + "Unable to execute function `dxfg_DXPublisher_publishEvents`. The `events` is nullptr"); + } + + runGraalFunctionAndThrowIfLessThanZero(dxfg_DXPublisher_publishEvents, + static_cast(publisher.get()), + static_cast(events)); +} + +JavaObjectHandle /* dxfg_observable_subscription_t * */ +getSubscription(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_clazz_t */ const EventTypeEnum &eventType) { + if (!publisher) { + throw std::invalid_argument( + "Unable to execute function `dxfg_DXPublisher_getSubscription`. The `publisher` handle is invalid"); + } + + return JavaObjectHandle(runGraalFunctionAndThrowIfNullptr( + dxfg_DXPublisher_getSubscription, static_cast(publisher.get()), + static_cast(eventType.getId()))); +} + +} // namespace isolated::api::IsolatedDXPublisher + +DXFCPP_END_NAMESPACE \ No newline at end of file