diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ca82db4d..2784a615 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,12 +25,24 @@ jobs: xcode: '15.0' cc: 'clang' cxx: 'clang++' - - name: ubuntu-22.04-gcc-13 - image: ubuntu-22.04 + - name: ubuntu-24.04-clang-18 + image: ubuntu-24.04 + os: linux + cores: 4 + cc: 'clang-18' + cxx: 'clang++-18' + - name: ubuntu-24.04-gcc-14 + image: ubuntu-24.04 + os: linux + cores: 4 + cc: 'gcc-14' + cxx: 'g++-14' + - name: ubuntu-24.04-gcc-12 + image: ubuntu-24.04 os: linux cores: 4 - cc: 'gcc-13' - cxx: 'g++-13' + cc: 'gcc-12' + cxx: 'g++-12' # - name: ubuntu-22.04-gcc-9 # image: ubuntu-22.04 # os: linux diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index db5992e8..a63601b3 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -68,12 +68,12 @@ jobs: xcode: '15.0' cc: 'clang' cxx: 'clang++' - - name: ubuntu-22.04-gcc-13 - image: ubuntu-22.04 + - name: ubuntu-24.04-gcc-14 + image: ubuntu-24.04 os: linux cores: 4 - cc: 'gcc-13' - cxx: 'g++-13' + cc: 'gcc-14' + cxx: 'g++-14' - name: macos-14-xcode-15 image: macos-14 os: macos diff --git a/CMakeLists.txt b/CMakeLists.txt index e6edde8a..16422d2a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ project(dxFeedGraalCxxApi) set(DXFCXX_VERSION "v2.0.0-rc3" CACHE STRING "The dxFeed Graal CXX API package version") -set(DXFEED_GRAAL_NATIVE_SDK_VERSION "1.1.12" CACHE STRING "") +set(DXFEED_GRAAL_NATIVE_SDK_VERSION "1.1.16" CACHE STRING "") set(FMTLIB_VERSION "10.2.1") set(BOOST_VERSION "1.84.0") set(UTFCPP_VERSION "3.2.3") @@ -191,6 +191,9 @@ set(dxFeedGraalCxxApi_Exceptions_Sources set(dxFeedGraalCxxApi_Isolated_Sources src/isolated/api/IsolatedDXEndpoint.cpp + src/isolated/api/IsolatedDXPublisher.cpp + src/isolated/api/IsolatedDXPublisherObservableSubscription.cpp + src/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.cpp src/isolated/internal/IsolatedString.cpp src/isolated/internal/IsolatedTimeFormat.cpp ) @@ -199,6 +202,7 @@ set(dxFeedGraalCxxApi_Api_Sources src/api/DXEndpoint.cpp src/api/DXFeed.cpp src/api/DXFeedSubscription.cpp + src/api/DXPublisherObservableSubscription.cpp src/api/DXPublisher.cpp ) @@ -206,6 +210,7 @@ set(dxFeedGraalCxxApi_ApiOsub_Sources src/api/osub/WildcardSymbol.cpp src/api/osub/TimeSeriesSubscriptionSymbol.cpp src/api/osub/IndexedEventSubscriptionSymbol.cpp + src/api/osub/ObservableSubscriptionChangeListener.cpp ) set(dxFeedGraalCxxApi_Ipf_Sources @@ -461,6 +466,7 @@ endif () if (DXFCXX_BUILD_SAMPLES) add_subdirectory(samples/cpp/PrintQuoteEvents) + add_subdirectory(samples/cpp/PublishProfiles) add_subdirectory(samples/cpp/DxFeedSample) add_subdirectory(samples/cpp/WriteTapeFile) add_subdirectory(samples/cpp/ConvertTapeFile) diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 2a76daed..b3f35226 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -2,7 +2,7 @@ ## Compile-time -- [dxFeed Graal Native SDK](https://github.com/dxFeed/dxfeed-graal-native-sdk) v1.1.6 +- [dxFeed Graal Native SDK](https://github.com/dxFeed/dxfeed-graal-native-sdk) v1.1.16 - [Boost](https://github.com/boostorg/boost) v1.84.0 - Boost.Stacktrace 1.0 - [utfcpp](https://github.com/nemtrif/utfcpp) v3.2.3 @@ -24,7 +24,7 @@ - addr2line \[opt] (Diagnostic backtraces) ## Run-time -- [dxFeed Graal Native SDK](https://github.com/dxFeed/dxfeed-graal-native-sdk) v1.1.6 +- [dxFeed Graal Native SDK](https://github.com/dxFeed/dxfeed-graal-native-sdk) v1.1.16 - [doctest](https://github.com/doctest/doctest) v2.4.11 (Tests) diff --git a/README.md b/README.md index b5483383..7e65d43a 100644 --- a/README.md +++ b/README.md @@ -350,6 +350,7 @@ versions): is a simple demonstration of how to get Instrument Profiles - [x] [DxFeedLiveIpfSample](https://github.com/dxFeed/dxfeed-graal-cxx-api/blob/main/samples/cpp/DxFeedLiveIpfSample/src/main.cpp) is a simple demonstration of how to get live updates for Instrument Profiles +- [x] [PublishProfiles](https://github.com/dxFeed/dxfeed-graal-cxx-api/blob/main/samples/cpp/PublishProfiles/src/main.cpp) is a simple demonstration of how to publish market events - [x] [ScheduleSample](https://github.com/dxFeed/dxfeed-graal-cxx-api/blob/main/samples/cpp/ScheduleSample/src/main.cpp) is a simple demonstration of how to get various scheduling information for instruments - [x] [OnDemandSample](https://github.com/dxFeed/dxfeed-graal-cxx-api/blob/main/samples/cpp/OnDemandSample/src/main.cpp) diff --git a/include/dxfeed_graal_cpp_api/api.hpp b/include/dxfeed_graal_cpp_api/api.hpp index e7a19018..f38bc16a 100644 --- a/include/dxfeed_graal_cpp_api/api.hpp +++ b/include/dxfeed_graal_cpp_api/api.hpp @@ -53,6 +53,9 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251 4996) #include "exceptions/GraalException.hpp" #include "isolated/api/IsolatedDXEndpoint.hpp" +#include "isolated/api/IsolatedDXPublisher.hpp" +#include "isolated/api/IsolatedDXPublisherObservableSubscription.hpp" +#include "isolated/api/osub/IsolatedObservableSubscriptionChangeListener.hpp" #include "isolated/internal/IsolatedString.hpp" #include "isolated/internal/IsolatedTimeFormat.hpp" #include "isolated/internal/IsolatedTools.hpp" diff --git a/include/dxfeed_graal_cpp_api/api/ApiModule.hpp b/include/dxfeed_graal_cpp_api/api/ApiModule.hpp index 056c3ca3..4a317161 100644 --- a/include/dxfeed_graal_cpp_api/api/ApiModule.hpp +++ b/include/dxfeed_graal_cpp_api/api/ApiModule.hpp @@ -10,6 +10,7 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) #include "DXEndpoint.hpp" #include "DXFeed.hpp" #include "DXFeedSubscription.hpp" +#include "DXPublisherObservableSubscription.hpp" #include "DXPublisher.hpp" #include "FilteredSubscriptionSymbol.hpp" #include "osub/OsubModule.hpp" diff --git a/include/dxfeed_graal_cpp_api/api/DXFeed.hpp b/include/dxfeed_graal_cpp_api/api/DXFeed.hpp index 46b257c3..c264d79e 100644 --- a/include/dxfeed_graal_cpp_api/api/DXFeed.hpp +++ b/include/dxfeed_graal_cpp_api/api/DXFeed.hpp @@ -122,9 +122,6 @@ struct DXFCPP_EXPORT DXFeed : SharedEntity { private: JavaObjectHandle handle_; - - std::unordered_set> subscriptions_{}; - static std::shared_ptr create(void *feedHandle); protected: diff --git a/include/dxfeed_graal_cpp_api/api/DXFeedSubscription.hpp b/include/dxfeed_graal_cpp_api/api/DXFeedSubscription.hpp index d0ba71bb..79174fa8 100644 --- a/include/dxfeed_graal_cpp_api/api/DXFeedSubscription.hpp +++ b/include/dxfeed_graal_cpp_api/api/DXFeedSubscription.hpp @@ -16,9 +16,7 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) #include "../internal/Common.hpp" #include "../internal/Handler.hpp" #include "../internal/JavaObjectHandle.hpp" -#include "../symbols/StringSymbol.hpp" #include "../symbols/SymbolWrapper.hpp" -#include "osub/WildcardSymbol.hpp" #include #include @@ -693,6 +691,7 @@ class DXFCPP_EXPORT DXFeedSubscription : public SharedEntity { /** * Returns `true` if this subscription contains the corresponding event type. * + * @param eventType The type of event that is checked. * @return `true` if this subscription contains the corresponding event type. * * @see DXFeedSubscription::getEventTypes() diff --git a/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp b/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp index 8cf7944a..b2434998 100644 --- a/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp +++ b/include/dxfeed_graal_cpp_api/api/DXPublisher.hpp @@ -25,7 +25,7 @@ DXFCPP_BEGIN_NAMESPACE struct DXEndpoint; class EventTypeEnum; -class ObservableSubscription; +struct ObservableSubscription; /** * Provides API for publishing of events to local or remote DXFeed. @@ -69,13 +69,15 @@ struct DXFCPP_EXPORT DXPublisher : SharedEntity { friend struct DXEndpoint; private: + mutable std::recursive_mutex mutex_{}; JavaObjectHandle handle_; + std::shared_ptr subscription_{}; static std::shared_ptr create(void *handle); void publishEventsImpl(void *graalEventsList) const noexcept; protected: - DXPublisher() noexcept : handle_{} { + DXPublisher() noexcept { if constexpr (Debugger::isDebug) { Debugger::debug("DXPublisher()"); } @@ -184,7 +186,7 @@ struct DXFCPP_EXPORT DXPublisher : SharedEntity { /** * Publishes events to the corresponding feed. * - * @param collection The collection of events to publish. + * @param events The collection of events to publish. */ void publishEvents(std::initializer_list> events) noexcept { publishEvents(std::begin(events), std::end(events)); @@ -217,7 +219,30 @@ struct DXFCPP_EXPORT DXPublisher : SharedEntity { EventMapper::freeGraalList(list); } - std::shared_ptr getSubscription(const EventTypeEnum &); + /** + * Returns observable set of subscribed symbols for the specified event type. + * Note, that subscription is represented by SymbolWrapper symbols. Check the type of each symbol + * in ObservableSubscription using SymbolWrapper::isStringSymbol(), SymbolWrapper::isWildcardSymbol(), + * SymbolWrapper::isIndexedEventSubscriptionSymbol(), SymbolWrapper::isTimeSeriesSubscriptionSymbol(), + * SymbolWrapper::isCandleSymbol(). + * + *

The set of subscribed symbols contains WildcardSymbol::ALL if and + * only if there is a subscription to this wildcard symbol. + * + *

If DXFeedTimeSeriesSubscription is used to subscribe to time-service of the events of this type, then + * instances of TimeSeriesSubscriptionSymbol class represent the corresponding subscription item. + * + *

The resulting observable subscription can generate repeated ObservableSubscriptionChangeListener::onSymbolsAdded_ notifications to + * its listeners for the same symbols without the corresponding ObservableSubscriptionChangeListener::onSymbolsRemoved_ + * notifications in between them. It happens when subscription disappears, cached data is lost, and subscription + * reappears again. On each ObservableSubscriptionChangeListener::onSymbolsAdded_ + * notification data provider shall @ref DXPublisher::publishEvents() "publish" the most recent events for + * the corresponding symbols. + * + * @param eventType The event type. + * @return Observable subscription for the specified event type. + */ + std::shared_ptr getSubscription(const EventTypeEnum &eventType); std::string toString() const noexcept override; }; diff --git a/include/dxfeed_graal_cpp_api/api/DXPublisherObservableSubscription.hpp b/include/dxfeed_graal_cpp_api/api/DXPublisherObservableSubscription.hpp new file mode 100644 index 00000000..01f5b5d3 --- /dev/null +++ b/include/dxfeed_graal_cpp_api/api/DXPublisherObservableSubscription.hpp @@ -0,0 +1,48 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +#include "../internal/Conf.hpp" + +DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) + +#include "../entity/EntityModule.hpp" +#include "../event/EventTypeEnum.hpp" +#include "../internal/JavaObjectHandle.hpp" +#include "osub/ObservableSubscription.hpp" + +#include +#include + +DXFCPP_BEGIN_NAMESPACE + +struct DXFCPP_EXPORT DXPublisherObservableSubscription : RequireMakeShared, + ObservableSubscription { + static constexpr std::size_t FAKE_LISTENER_ID{static_cast(-1)}; + + private: + inline static std::atomic lastListenerId_{}; + + JavaObjectHandle handle_; + std::unordered_map> listeners_; + std::recursive_mutex listenersMutex_{}; + + public: + DXPublisherObservableSubscription(LockExternalConstructionTag, + JavaObjectHandle &&handle); + ~DXPublisherObservableSubscription() override; + + static std::shared_ptr + create(JavaObjectHandle &&handle); + + bool isClosed() override; + std::unordered_set getEventTypes() override; + bool containsEventType(const EventTypeEnum &eventType) override; + std::size_t addChangeListener(std::shared_ptr listener) override; + void removeChangeListener(std::size_t id) override; +}; + +DXFCPP_END_NAMESPACE + +DXFCXX_DISABLE_MSC_WARNINGS_POP() \ No newline at end of file diff --git a/include/dxfeed_graal_cpp_api/api/osub/ObservableSubscription.hpp b/include/dxfeed_graal_cpp_api/api/osub/ObservableSubscription.hpp index 00800fa8..39e97347 100644 --- a/include/dxfeed_graal_cpp_api/api/osub/ObservableSubscription.hpp +++ b/include/dxfeed_graal_cpp_api/api/osub/ObservableSubscription.hpp @@ -9,14 +9,66 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) #include "../../event/IndexedEventSource.hpp" #include "../../symbols/SymbolWrapper.hpp" +#include "ObservableSubscriptionChangeListener.hpp" #include #include +#include #include DXFCPP_BEGIN_NAMESPACE -class ObservableSubscription {}; +/** + * Observable set of subscription symbols. + */ +struct DXFCPP_EXPORT ObservableSubscription { + virtual ~ObservableSubscription() = default; + + /** + * @return `true` if this subscription is closed. + * The observable subscription that is returned as a result of DXPublisher::getSubscription() is closed when the + * corresponding DXEndpoint is closed. + */ + virtual bool isClosed() = 0; + + /** + * @return a set of event types for this subscription. The resulting set cannot be modified. + * The observable subscription that is returned as a result of DXPublisher::getSubscription() has a singleton set + * of event types. + */ + virtual std::unordered_set getEventTypes() = 0; + + /** + * Returns `true` if this subscription contains the corresponding event type. + * + * @param eventType The type of event that is checked. + * @return `true` if this subscription contains the corresponding event type. + * @see #getEventTypes() + */ + virtual bool containsEventType(const EventTypeEnum &eventType) = 0; + + /** + * Adds subscription change listener. This method does nothing if subscription is closed. + * Otherwise, it installs the corresponding listener and immediately + * invokes ObservableSubscriptionChangeListener::symbolsAdded() on the given listener while holding the lock for + * this subscription. This way the given listener synchronously receives existing subscription state and is + * synchronously notified on all changes in subscription afterwards. + * + * @param listener The subscription change listener. + * @return The listener id + */ + virtual std::size_t addChangeListener(std::shared_ptr listener) = 0; + + /** + * Removes subscription change listener by id. This method does nothing if the listener with the given id was not + * installed or was already removed as subscription change listener for this subscription. Otherwise it removes the + * corresponding listener and immediately invokes ObservableSubscriptionChangeListener::subscriptionClosed() on the + * given listener while holding the lock for this subscription. + * + * @param id The subscription change listener id. + */ + virtual void removeChangeListener(std::size_t id) = 0; +}; DXFCPP_END_NAMESPACE diff --git a/include/dxfeed_graal_cpp_api/api/osub/ObservableSubscriptionChangeListener.hpp b/include/dxfeed_graal_cpp_api/api/osub/ObservableSubscriptionChangeListener.hpp new file mode 100644 index 00000000..d28974eb --- /dev/null +++ b/include/dxfeed_graal_cpp_api/api/osub/ObservableSubscriptionChangeListener.hpp @@ -0,0 +1,45 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +#include "../../internal/Conf.hpp" + +DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) + +#include "../../symbols/SymbolWrapper.hpp" + +#include +#include + +DXFCPP_BEGIN_NAMESPACE + +struct DXFCPP_EXPORT ObservableSubscriptionChangeListener : RequireMakeShared { + explicit ObservableSubscriptionChangeListener(LockExternalConstructionTag); + ~ObservableSubscriptionChangeListener() noexcept override; + + static std::shared_ptr + create(std::function &symbols)> onSymbolsAdded); + + static std::shared_ptr + create(std::function &symbols)> onSymbolsAdded, + std::function &symbols)> onSymbolsRemoved, + std::function onSubscriptionClosed); + + const JavaObjectHandle &getHandle() const; + + private: + mutable std::recursive_mutex mutex_{}; + JavaObjectHandle handle_; + SimpleHandler &symbols)> onSymbolsAdded_{}; + SimpleHandler &symbols)> onSymbolsRemoved_{}; + SimpleHandler onSubscriptionClosed_{}; + + struct Impl; + + std::unique_ptr impl_; +}; + +DXFCPP_END_NAMESPACE + +DXFCXX_DISABLE_MSC_WARNINGS_POP() \ No newline at end of file diff --git a/include/dxfeed_graal_cpp_api/api/osub/OsubModule.hpp b/include/dxfeed_graal_cpp_api/api/osub/OsubModule.hpp index 8c89589d..95365bfc 100644 --- a/include/dxfeed_graal_cpp_api/api/osub/OsubModule.hpp +++ b/include/dxfeed_graal_cpp_api/api/osub/OsubModule.hpp @@ -8,6 +8,7 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) #include "IndexedEventSubscriptionSymbol.hpp" +#include "ObservableSubscriptionChangeListener.hpp" #include "ObservableSubscription.hpp" #include "TimeSeriesSubscriptionSymbol.hpp" #include "WildcardSymbol.hpp" diff --git a/include/dxfeed_graal_cpp_api/event/candle/CandleAlignment.hpp b/include/dxfeed_graal_cpp_api/event/candle/CandleAlignment.hpp index 4484c5fe..6e30218e 100644 --- a/include/dxfeed_graal_cpp_api/event/candle/CandleAlignment.hpp +++ b/include/dxfeed_graal_cpp_api/event/candle/CandleAlignment.hpp @@ -141,7 +141,7 @@ struct DXFCPP_EXPORT CandleAlignment : public CandleSymbolAttribute { * @param symbol The candle symbol string. * @return candle symbol string with the normalized representation of the the candle alignment attribute. */ - static DXFCPP_CXX20_CONSTEXPR_STRING std::string + static std::string normalizeAttributeForSymbol(const dxfcpp::StringLikeWrapper &symbol) { auto a = MarketEventSymbols::getAttributeStringByKey(symbol, ATTRIBUTE_KEY); diff --git a/include/dxfeed_graal_cpp_api/event/candle/CandlePeriod.hpp b/include/dxfeed_graal_cpp_api/event/candle/CandlePeriod.hpp index ee6132ef..c0835a3c 100644 --- a/include/dxfeed_graal_cpp_api/event/candle/CandlePeriod.hpp +++ b/include/dxfeed_graal_cpp_api/event/candle/CandlePeriod.hpp @@ -215,7 +215,7 @@ struct DXFCPP_EXPORT CandlePeriod : public CandleSymbolAttribute { * @param symbol candle symbol string. * @return candle symbol string with the normalized representation of the the candle period attribute. */ - static DXFCPP_CXX20_CONSTEXPR_STRING std::string + static std::string normalizeAttributeForSymbol(const dxfcpp::StringLikeWrapper &symbol) { auto a = MarketEventSymbols::getAttributeStringByKey(symbol, ATTRIBUTE_KEY); diff --git a/include/dxfeed_graal_cpp_api/event/candle/CandlePrice.hpp b/include/dxfeed_graal_cpp_api/event/candle/CandlePrice.hpp index 7d830e64..b547e114 100644 --- a/include/dxfeed_graal_cpp_api/event/candle/CandlePrice.hpp +++ b/include/dxfeed_graal_cpp_api/event/candle/CandlePrice.hpp @@ -163,7 +163,7 @@ struct DXFCPP_EXPORT CandlePrice : public CandleSymbolAttribute { * @param symbol candle symbol string. * @return candle symbol string with the normalized representation of the the candle price type attribute. */ - static DXFCPP_CXX20_CONSTEXPR_STRING std::string + static std::string normalizeAttributeForSymbol(const dxfcpp::StringLikeWrapper &symbol) { auto a = MarketEventSymbols::getAttributeStringByKey(symbol, ATTRIBUTE_KEY); diff --git a/include/dxfeed_graal_cpp_api/event/candle/CandlePriceLevel.hpp b/include/dxfeed_graal_cpp_api/event/candle/CandlePriceLevel.hpp index 05934fde..a41bdfdb 100644 --- a/include/dxfeed_graal_cpp_api/event/candle/CandlePriceLevel.hpp +++ b/include/dxfeed_graal_cpp_api/event/candle/CandlePriceLevel.hpp @@ -145,7 +145,7 @@ struct DXFCPP_EXPORT CandlePriceLevel : public CandleSymbolAttribute { * @param symbol candle symbol string. * @return candle symbol string with the normalized representation of the the candle price level attribute. */ - static DXFCPP_CXX20_CONSTEXPR_STRING std::string + static std::string normalizeAttributeForSymbol(const dxfcpp::StringLikeWrapper &symbol) { auto a = MarketEventSymbols::getAttributeStringByKey(symbol, ATTRIBUTE_KEY); diff --git a/include/dxfeed_graal_cpp_api/event/candle/CandleSession.hpp b/include/dxfeed_graal_cpp_api/event/candle/CandleSession.hpp index 251539bf..8e0d6b11 100644 --- a/include/dxfeed_graal_cpp_api/event/candle/CandleSession.hpp +++ b/include/dxfeed_graal_cpp_api/event/candle/CandleSession.hpp @@ -159,7 +159,7 @@ struct DXFCPP_EXPORT CandleSession : public CandleSymbolAttribute { * @param symbol candle symbol string. * @return candle symbol string with the normalized representation of the the candle session attribute. */ - static DXFCPP_CXX20_CONSTEXPR_STRING std::string normalizeAttributeForSymbol(const dxfcpp::StringLikeWrapper &symbol) noexcept { + static std::string normalizeAttributeForSymbol(const dxfcpp::StringLikeWrapper &symbol) noexcept { auto a = MarketEventSymbols::getAttributeStringByKey(symbol, ATTRIBUTE_KEY); if (!a) { diff --git a/include/dxfeed_graal_cpp_api/internal/Common.hpp b/include/dxfeed_graal_cpp_api/internal/Common.hpp index a9e64ed0..e672160f 100644 --- a/include/dxfeed_graal_cpp_api/internal/Common.hpp +++ b/include/dxfeed_graal_cpp_api/internal/Common.hpp @@ -280,7 +280,7 @@ constexpr static std::int32_t getYearMonthDayByDayId(std::int32_t dayId) { return yyyy >= 0 ? yyyymmdd : -yyyymmdd; } -constexpr static std::int32_t getDayIdByYearMonthDay(std::int32_t year, std::int32_t month, std::int32_t day) { +static std::int32_t getDayIdByYearMonthDay(std::int32_t year, std::int32_t month, std::int32_t day) { if (month < 1 || month > 12) { throw std::invalid_argument("invalid month " + std::to_string(month)); } diff --git a/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp new file mode 100644 index 00000000..97fadeed --- /dev/null +++ b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp @@ -0,0 +1,48 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +#include "../../internal/Conf.hpp" + +DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) + +#include "../../api/DXPublisher.hpp" + +DXFCPP_BEGIN_NAMESPACE + +struct DXPublisherObservableSubscription; + +namespace isolated::api::IsolatedDXPublisher { + +/** + * Calls the Graal SDK function `dxfg_DXPublisher_publishEvents` in isolation. + * + * @param publisher The publisher's handle. + * @param events Events to be published. + * @throws std::invalid_argument if publisher handle is invalid or events' pointer is nullptr. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +void /* int32_t */ publishEvents(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_type_list * */ void *events); + +/** + * Calls the Graal SDK function `dxfg_DXPublisher_getSubscription` in isolation. + * + * @param publisher The publisher's handle. + * @param eventType The event type. + * @throws std::invalid_argument if publisher handle is invalid. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + * @return The underlying observable subscription's handle. + */ +JavaObjectHandle /* dxfg_observable_subscription_t * */ +getSubscription(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_clazz_t */ const EventTypeEnum &eventType); + +} // namespace isolated::api::IsolatedDXPublisher + +DXFCPP_END_NAMESPACE + +DXFCXX_DISABLE_MSC_WARNINGS_POP() \ No newline at end of file diff --git a/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisherObservableSubscription.hpp b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisherObservableSubscription.hpp new file mode 100644 index 00000000..8bf26d94 --- /dev/null +++ b/include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisherObservableSubscription.hpp @@ -0,0 +1,88 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +#include "../../internal/Conf.hpp" + +DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) + +#include + +#include "../../api/DXPublisherObservableSubscription.hpp" + +DXFCPP_BEGIN_NAMESPACE + +namespace isolated::api::IsolatedDXPublisherObservableSubscription { +/** + * Calls the Graal SDK function `dxfg_ObservableSubscription_isClosed` in isolation. + * + * @param sub The subscription's handle. + * @return Returns `true` if this subscription is closed. + * @throws std::invalid_argument if DXPublisherObservableSubscription's handle is invalid. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +bool /* int32_t */ +isClosed(/* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub); + +/** + * Calls the Graal SDK function `dxfg_ObservableSubscription_getEventTypes` in isolation. + * + * @param sub The subscription's handle. + * @return The event types set. + * @throws std::invalid_argument if DXPublisherObservableSubscription's handle is invalid. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +std::unordered_set /* dxfg_event_clazz_list_t* */ getEventTypes( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub); + +/** + * Calls the Graal SDK function `dxfg_ObservableSubscription_containsEventType` in isolation. + * + * @param sub The subscription's handle. + * @param eventType The type of event that is checked. + * @return `true` if this subscription contains the corresponding event type. + * @throws std::invalid_argument if DXPublisherObservableSubscription's handle is invalid. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +bool /* int32_t */ containsEventType( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub, + /* dxfg_event_clazz_t */ const EventTypeEnum & eventType); + +/** + * Calls the Graal SDK function `dxfg_ObservableSubscription_addChangeListener` in isolation. + * + * @param sub The subscription's handle. + * @param listener The listener's handle. + * @throws std::invalid_argument if DXPublisherObservableSubscription's or ObservableSubscriptionChangeListener's handle + * is invalid. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +void /* int32_t */ addChangeListener( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub, + /* dxfg_observable_subscription_change_listener_t * */ const JavaObjectHandle + &listener); + +/** + * Calls the Graal SDK function `dxfg_ObservableSubscription_removeChangeListener` in isolation. + * + * @param sub The subscription's handle. + * @param listener The listener's handle. + * @throws std::invalid_argument if DXPublisherObservableSubscription's or ObservableSubscriptionChangeListener's handle + * is invalid. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +void /* int32_t */ removeChangeListener( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub, + /* dxfg_observable_subscription_change_listener_t * */ const JavaObjectHandle + &listener); +} // namespace isolated::api::IsolatedDXPublisherObservableSubscription + +DXFCPP_END_NAMESPACE + +DXFCXX_DISABLE_MSC_WARNINGS_POP() \ No newline at end of file diff --git a/include/dxfeed_graal_cpp_api/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.hpp b/include/dxfeed_graal_cpp_api/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.hpp new file mode 100644 index 00000000..ca3a2c15 --- /dev/null +++ b/include/dxfeed_graal_cpp_api/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.hpp @@ -0,0 +1,40 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +#include "../../../internal/Conf.hpp" + +#include +#include + +DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251) + +#include "../../../api/osub/ObservableSubscriptionChangeListener.hpp" + +DXFCPP_BEGIN_NAMESPACE + +namespace isolated::api::IsolatedObservableSubscriptionChangeListener { +/** + * Calls the Graal SDK function `dxfg_ObservableSubscriptionChangeListener_new` in isolation. + * @param functionSymbolsAdded A user function that is used as a `symbolsAdded` callback for the listener. + * @param functionSymbolsRemoved A user function that is used as a `symbolsRemoved` callback for the listener. + * @param functionSubscriptionClosed A user function that is used as a `subscriptionClosed` callback for the listener. + * @param userData User data, which is placed each time as a callback parameter when called from listener. + * @return The ObservableSubscriptionChangeListener's handle. + * + * @throws std::invalid_argument if functionSymbolsAdded or functionSymbolsRemoved or functionSubscriptionClosed is + * nullptr. + * @throws JavaException if something happened with the dxFeed API backend. + * @throws GraalException if something happened with the GraalVM. + */ +/* dxfg_observable_subscription_change_listener_t* */ JavaObjectHandle +create(/* dxfg_ObservableSubscriptionChangeListener_function_symbolsAdded */ void *functionSymbolsAdded, + /* dxfg_ObservableSubscriptionChangeListener_function_symbolsRemoved */ void *functionSymbolsRemoved, + /* dxfg_ObservableSubscriptionChangeListener_function_subscriptionClosed */ void *functionSubscriptionClosed, + void *userData); +} // namespace isolated::api::IsolatedObservableSubscriptionChangeListener + +DXFCPP_END_NAMESPACE + +DXFCXX_DISABLE_MSC_WARNINGS_POP() \ No newline at end of file diff --git a/samples/cpp/PublishProfiles/CMakeLists.txt b/samples/cpp/PublishProfiles/CMakeLists.txt new file mode 100644 index 00000000..62248eef --- /dev/null +++ b/samples/cpp/PublishProfiles/CMakeLists.txt @@ -0,0 +1,56 @@ +# Copyright (c) 2024 Devexperts LLC. +# SPDX-License-Identifier: MPL-2.0 + +cmake_minimum_required(VERSION 3.21) + +if (POLICY CMP0092) + cmake_policy(SET CMP0092 NEW) +endif () + +if (POLICY CMP0135) + cmake_policy(SET CMP0135 NEW) +endif () + +project(PublishProfiles LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_C_STANDARD 11) +set(CXX_EXTENSIONS OFF) +set(C_EXTENSIONS OFF) + +if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") + set(CMAKE_MACOSX_RPATH ON) + set(CMAKE_SKIP_BUILD_RPATH ON) + set(CMAKE_BUILD_WITH_INSTALL_RPATH ON) + set(CMAKE_INSTALL_RPATH_USE_LINK_PATH OFF) + set(CMAKE_BUILD_RPATH_USE_ORIGIN ON) + set(CMAKE_INSTALL_RPATH "@loader_path/../${CMAKE_INSTALL_LIBDIR};@loader_path;@executable_path;@executable_path/../Frameworks") +elseif (UNIX) + set(CMAKE_SKIP_BUILD_RPATH ON) + set(CMAKE_BUILD_WITH_INSTALL_RPATH ON) + set(CMAKE_INSTALL_RPATH_USE_LINK_PATH OFF) + set(CMAKE_BUILD_RPATH_USE_ORIGIN ON) + set(CMAKE_INSTALL_RPATH "$ORIGIN/../${CMAKE_INSTALL_LIBDIR}:$ORIGIN/../lib64:$ORIGIN/../lib:$ORIGIN") +endif () + +add_executable(${PROJECT_NAME} src/main.cpp) + +target_include_directories(${PROJECT_NAME} PRIVATE ../../../third_party/range-v3-0.12/include) + +target_link_libraries(${PROJECT_NAME} PRIVATE dxfcxx::static) + +if (DXFCXX_FEATURE_STACKTRACE) + LinkStacktrace(${PROJECT_NAME}) +endif () + +add_custom_command(TARGET ${PROJECT_NAME} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ + $) + +if (DXFCXX_INSTALL AND DXFCXX_INSTALL_SAMPLES) + install(TARGETS ${PROJECT_NAME} DESTINATION ${CMAKE_INSTALL_BINDIR}) + + if (WIN32) + install(FILES $ DESTINATION ${CMAKE_INSTALL_BINDIR}) + endif () +endif () \ No newline at end of file diff --git a/samples/cpp/PublishProfiles/src/main.cpp b/samples/cpp/PublishProfiles/src/main.cpp new file mode 100644 index 00000000..c3e36c16 --- /dev/null +++ b/samples/cpp/PublishProfiles/src/main.cpp @@ -0,0 +1,65 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include + +#include + +using namespace dxfcpp; +using namespace dxfcpp::literals; +using namespace std::literals; + +/* + * Using address like ":7700" it starts a server on a specified port where it provides Profile + * event for any symbol ending with ":TEST" suffix. + */ +int main(int argc, char *argv[]) { + try { + if (argc < 2) { + return 0; + } + + // Parse args. + std::string address = argv[1]; + + // Create publisher endpoint and connect it to the specified address + auto endpoint = DXEndpoint::create(DXEndpoint::Role::PUBLISHER)->connect(address); + auto publisher = endpoint->getPublisher(); + + // Observe Profile subscriptions and publish profiles for "xxx:TEST" symbols + publisher->getSubscription(Profile::TYPE) + ->addChangeListener( + ObservableSubscriptionChangeListener::create([publisher](auto &&symbols) /* symbolsAdded */ { + std::vector> events{}; + events.reserve(symbols.size()); + + for (auto &&symbol : symbols) { + if (symbol.isStringSymbol()) { + auto s = symbol.asStringSymbol(); + + if (s.size() > 5 && s.rfind(":TEST") == s.size() - 5) { + auto profile = std::make_shared(s); + + profile->setDescription("Test symbol"); + events.push_back(profile); + } + } + } + + events.shrink_to_fit(); + publisher->publishEvents(events); + })); + + std::cin.get(); + } catch (const JavaException &e) { + std::cerr << e.what() << '\n'; + std::cerr << e.getStackTrace() << '\n'; + } catch (const GraalException &e) { + std::cerr << e.what() << '\n'; + std::cerr << e.getStackTrace() << '\n'; + } + + return 0; +} \ No newline at end of file diff --git a/src/api/DXFeed.cpp b/src/api/DXFeed.cpp index 82bc46c2..087c05fc 100644 --- a/src/api/DXFeed.cpp +++ b/src/api/DXFeed.cpp @@ -40,8 +40,6 @@ void DXFeed::attachSubscription(std::shared_ptr subscription subHandle) == 0; }, false)) { - - subscriptions_.emplace(subscription); } } @@ -61,8 +59,6 @@ void DXFeed::detachSubscription(std::shared_ptr subscription subHandle) == 0; }, false)) { - - subscriptions_.erase(subscription); } } @@ -82,8 +78,6 @@ void DXFeed::detachSubscriptionAndClear(std::shared_ptr subs handle, subHandle) == 0; }, false)) { - - subscriptions_.erase(subscription); } } diff --git a/src/api/DXPublisher.cpp b/src/api/DXPublisher.cpp index 1630b5b7..77fbe466 100644 --- a/src/api/DXPublisher.cpp +++ b/src/api/DXPublisher.cpp @@ -39,17 +39,26 @@ std::shared_ptr DXPublisher::create(void *handle) { return publisher; } +std::shared_ptr DXPublisher::getSubscription(const EventTypeEnum &eventType) { + std::lock_guard guard{mutex_}; + + if (subscription_) { + return subscription_; + } + + subscription_ = DXPublisherObservableSubscription::create(isolated::api::IsolatedDXPublisher::getSubscription(handle_, eventType)); + + return subscription_; +} + std::string DXPublisher::toString() const noexcept { return fmt::format("DXPublisher{{{}}}", handle_.toString()); } void DXPublisher::publishEventsImpl(void *graalEventsList) const noexcept { - runIsolatedOrElse( - [handle = static_cast(handle_.get()), graalEventsList](auto threadHandle) { - return dxfg_DXPublisher_publishEvents(static_cast(threadHandle), handle, - static_cast(graalEventsList)) == 0; - }, - false); + std::lock_guard guard{mutex_}; + + isolated::api::IsolatedDXPublisher::publishEvents(handle_, graalEventsList); } DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/src/api/DXPublisherObservableSubscription.cpp b/src/api/DXPublisherObservableSubscription.cpp new file mode 100644 index 00000000..77b2b972 --- /dev/null +++ b/src/api/DXPublisherObservableSubscription.cpp @@ -0,0 +1,71 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include + +DXFCPP_BEGIN_NAMESPACE + +DXPublisherObservableSubscription::DXPublisherObservableSubscription( + LockExternalConstructionTag, JavaObjectHandle &&handle) + : handle_{std::move(handle)} { +} + +DXPublisherObservableSubscription::~DXPublisherObservableSubscription() = default; + +std::shared_ptr +DXPublisherObservableSubscription::create(JavaObjectHandle &&handle) { + auto sub = DXPublisherObservableSubscription::createShared(std::move(handle)); + + ApiContext::getInstance()->getManager>()->registerEntity(sub); + + return sub; +} + +bool DXPublisherObservableSubscription::isClosed() { + return isolated::api::IsolatedDXPublisherObservableSubscription::isClosed(handle_); +} + +std::unordered_set DXPublisherObservableSubscription::getEventTypes() { + return isolated::api::IsolatedDXPublisherObservableSubscription::getEventTypes(handle_); +} + +bool DXPublisherObservableSubscription::containsEventType(const EventTypeEnum &eventType) { + return isolated::api::IsolatedDXPublisherObservableSubscription::containsEventType(handle_, eventType); +} + +std::size_t +DXPublisherObservableSubscription::addChangeListener(std::shared_ptr listener) { + isolated::api::IsolatedDXPublisherObservableSubscription::addChangeListener(handle_, listener->getHandle()); + + std::lock_guard guard{listenersMutex_}; + + if (lastListenerId_ >= FAKE_LISTENER_ID - 1) { + return FAKE_LISTENER_ID; + } + + auto id = ++lastListenerId_; + + listeners_.emplace(id, listener); + + return id; +} + +void DXPublisherObservableSubscription::removeChangeListener(std::size_t id) { + std::lock_guard guard{listenersMutex_}; + + if (id == FAKE_LISTENER_ID) { + return; + } + + if (auto found = listeners_.find(id); found != listeners_.end()) { + auto listener = found->second; + + isolated::api::IsolatedDXPublisherObservableSubscription::removeChangeListener(handle_, listener->getHandle()); + + listeners_.erase(found); + } +} + +DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/src/api/osub/ObservableSubscriptionChangeListener.cpp b/src/api/osub/ObservableSubscriptionChangeListener.cpp new file mode 100644 index 00000000..d0b3b87e --- /dev/null +++ b/src/api/osub/ObservableSubscriptionChangeListener.cpp @@ -0,0 +1,109 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include + +DXFCPP_BEGIN_NAMESPACE + +struct ObservableSubscriptionChangeListener::Impl { + static void onSymbolsAdded(graal_isolatethread_t * /* thread */, dxfg_symbol_list *symbols, void *userData) { + if (!symbols) { + return; + } + + auto id = Id::from(userData); + auto listener = + ApiContext::getInstance()->getManager>()->getEntity(id); + + if (!listener) { + return; + } + + auto symbolList = SymbolWrapper::SymbolListUtils::fromGraalList(symbols); + listener->onSymbolsAdded_({symbolList.begin(), symbolList.end()}); + } + + static void onSymbolsRemoved(graal_isolatethread_t * /* thread */, dxfg_symbol_list *symbols, void *userData) { + if (!symbols) { + return; + } + + auto id = Id::from(userData); + auto listener = + ApiContext::getInstance()->getManager>()->getEntity(id); + + if (!listener) { + return; + } + + auto symbolList = SymbolWrapper::SymbolListUtils::fromGraalList(symbols); + listener->onSymbolsRemoved_({symbolList.begin(), symbolList.end()}); + } + + static void onSubscriptionClosed(graal_isolatethread_t * /* thread */, void *userData) { + auto id = Id::from(userData); + auto listener = + ApiContext::getInstance()->getManager>()->getEntity(id); + + if (!listener) { + return; + } + + listener->onSubscriptionClosed_(); + } +}; + +ObservableSubscriptionChangeListener::ObservableSubscriptionChangeListener(LockExternalConstructionTag) + : handle_{}, impl_{std::make_unique()} { +} + +ObservableSubscriptionChangeListener::~ObservableSubscriptionChangeListener() noexcept { +} + +std::shared_ptr ObservableSubscriptionChangeListener::create( + std::function &symbols)> onSymbolsAdded) { + auto listener = ObservableSubscriptionChangeListener::createShared(); + auto id = + ApiContext::getInstance()->getManager>()->registerEntity( + listener); + + listener->handle_ = isolated::api::IsolatedObservableSubscriptionChangeListener::create( + dxfcpp::bit_cast(&ObservableSubscriptionChangeListener::Impl::onSymbolsAdded), + dxfcpp::bit_cast(&ObservableSubscriptionChangeListener::Impl::onSymbolsRemoved), + dxfcpp::bit_cast(&ObservableSubscriptionChangeListener::Impl::onSubscriptionClosed), + dxfcpp::bit_cast(id.getValue())); + listener->onSymbolsAdded_ += std::move(onSymbolsAdded); + + return listener; +} + +std::shared_ptr ObservableSubscriptionChangeListener::create( + std::function &symbols)> onSymbolsAdded, + std::function &symbols)> onSymbolsRemoved, + std::function onSubscriptionClosed) { + auto listener = ObservableSubscriptionChangeListener::createShared(); + auto id = + ApiContext::getInstance()->getManager>()->registerEntity( + listener); + + listener->handle_ = isolated::api::IsolatedObservableSubscriptionChangeListener::create( + dxfcpp::bit_cast(&ObservableSubscriptionChangeListener::Impl::onSymbolsAdded), + dxfcpp::bit_cast(&ObservableSubscriptionChangeListener::Impl::onSymbolsRemoved), + dxfcpp::bit_cast(&ObservableSubscriptionChangeListener::Impl::onSubscriptionClosed), + dxfcpp::bit_cast(id.getValue())); + listener->onSymbolsAdded_ += std::move(onSymbolsAdded); + listener->onSymbolsRemoved_ += std::move(onSymbolsRemoved); + listener->onSubscriptionClosed_ += std::move(onSubscriptionClosed); + + return listener; +} + +const JavaObjectHandle &ObservableSubscriptionChangeListener::getHandle() const { + std::lock_guard guard{mutex_}; + + return handle_; +} + +DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/src/internal/JavaObjectHandle.cpp b/src/internal/JavaObjectHandle.cpp index 1ce5840a..21893ddc 100644 --- a/src/internal/JavaObjectHandle.cpp +++ b/src/internal/JavaObjectHandle.cpp @@ -56,4 +56,7 @@ template struct JavaObjectHandle; template struct JavaObjectHandle; +template struct JavaObjectHandle; +template struct JavaObjectHandle; + DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/src/isolated/api/IsolatedDXPublisher.cpp b/src/isolated/api/IsolatedDXPublisher.cpp new file mode 100644 index 00000000..76139e83 --- /dev/null +++ b/src/isolated/api/IsolatedDXPublisher.cpp @@ -0,0 +1,46 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include +#include +#include + +DXFCPP_BEGIN_NAMESPACE + +namespace isolated::api::IsolatedDXPublisher { + +void /* int32_t */ publishEvents(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_type_list * */ void *events) { + if (!publisher) { + throw std::invalid_argument( + "Unable to execute function `dxfg_DXPublisher_publishEvents`. The `publisher` handle is invalid"); + } + + if (!events) { + throw std::invalid_argument( + "Unable to execute function `dxfg_DXPublisher_publishEvents`. The `events` is nullptr"); + } + + runGraalFunctionAndThrowIfLessThanZero(dxfg_DXPublisher_publishEvents, + static_cast(publisher.get()), + static_cast(events)); +} + +JavaObjectHandle /* dxfg_observable_subscription_t * */ +getSubscription(/* dxfg_publisher_t * */ const JavaObjectHandle &publisher, + /* dxfg_event_clazz_t */ const EventTypeEnum &eventType) { + if (!publisher) { + throw std::invalid_argument( + "Unable to execute function `dxfg_DXPublisher_getSubscription`. The `publisher` handle is invalid"); + } + + return JavaObjectHandle(runGraalFunctionAndThrowIfNullptr( + dxfg_DXPublisher_getSubscription, static_cast(publisher.get()), + static_cast(eventType.getId()))); +} + +} // namespace isolated::api::IsolatedDXPublisher + +DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/src/isolated/api/IsolatedDXPublisherObservableSubscription.cpp b/src/isolated/api/IsolatedDXPublisherObservableSubscription.cpp new file mode 100644 index 00000000..6a3cc3b4 --- /dev/null +++ b/src/isolated/api/IsolatedDXPublisherObservableSubscription.cpp @@ -0,0 +1,105 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include +#include + +DXFCPP_BEGIN_NAMESPACE + +namespace isolated::api::IsolatedDXPublisherObservableSubscription { + +bool /* int32_t */ +isClosed(/* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub) { + if (!sub) { + throw std::invalid_argument( + "Unable to execute function `dxfg_ObservableSubscription_isClosed`. The `sub` handle is invalid"); + } + + return runGraalFunctionAndThrowIfLessThanZero(dxfg_ObservableSubscription_isClosed, + static_cast(sub.get())) == 1; +} + +std::unordered_set /* dxfg_event_clazz_list_t* */ getEventTypes( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub) { + if (!sub) { + throw std::invalid_argument( + "Unable to execute function `dxfg_ObservableSubscription_getEventTypes`. The `sub` handle is invalid"); + } + + std::unordered_set result{}; + + dxfg_event_clazz_list_t *eventTypesList = runGraalFunctionAndThrowIfNullptr( + dxfg_ObservableSubscription_getEventTypes, static_cast(sub.get())); + + if (eventTypesList->size > 0) { + for (auto i = 0; i < eventTypesList->size; i++) { + dxfg_event_clazz_t ec = *eventTypesList->elements[static_cast(i)]; + + if (auto etIt = EventTypeEnum::ALL_BY_ID.find(static_cast(ec)); + etIt != EventTypeEnum::ALL_BY_ID.end()) { + result.emplace(etIt->second.get()); + } + } + } + + runGraalFunctionAndThrowIfLessThanZero(dxfg_CList_EventClazz_release, eventTypesList); + + return result; +} + +bool /* int32_t */ containsEventType( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub, + /* dxfg_event_clazz_t */ const EventTypeEnum &eventType) { + if (!sub) { + throw std::invalid_argument( + "Unable to execute function `dxfg_ObservableSubscription_containsEventType`. The `sub` handle is invalid"); + } + + return runGraalFunctionAndThrowIfLessThanZero(dxfg_ObservableSubscription_containsEventType, + static_cast(sub.get()), + static_cast(eventType.getId())) == 1; +} + +void /* int32_t */ addChangeListener( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub, + /* dxfg_observable_subscription_change_listener_t * */ const JavaObjectHandle + &listener) { + if (!sub) { + throw std::invalid_argument( + "Unable to execute function `dxfg_ObservableSubscription_addChangeListener`. The `sub` handle is invalid"); + } + + if (!listener) { + throw std::invalid_argument("Unable to execute function `dxfg_ObservableSubscription_addChangeListener`. The " + "`listener` handle is invalid"); + } + + runGraalFunctionAndThrowIfLessThanZero( + dxfg_ObservableSubscription_addChangeListener, static_cast(sub.get()), + static_cast(listener.get())); +} + +void /* int32_t */ removeChangeListener( + /* dxfg_observable_subscription_t * */ const JavaObjectHandle &sub, + /* dxfg_observable_subscription_change_listener_t * */ const JavaObjectHandle + &listener) { + if (!sub) { + throw std::invalid_argument("Unable to execute function `dxfg_ObservableSubscription_removeChangeListener`. " + "The `sub` handle is invalid"); + } + + if (!listener) { + throw std::invalid_argument( + "Unable to execute function `dxfg_ObservableSubscription_removeChangeListener`. The " + "`listener` handle is invalid"); + } + + runGraalFunctionAndThrowIfLessThanZero( + dxfg_ObservableSubscription_removeChangeListener, static_cast(sub.get()), + static_cast(listener.get())); +} +} // namespace isolated::api::IsolatedDXPublisherObservableSubscription + +DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/src/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.cpp b/src/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.cpp new file mode 100644 index 00000000..45b2f735 --- /dev/null +++ b/src/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.cpp @@ -0,0 +1,44 @@ +// Copyright (c) 2024 Devexperts LLC. +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include +#include + +DXFCPP_BEGIN_NAMESPACE + +namespace isolated::api::IsolatedObservableSubscriptionChangeListener { + +JavaObjectHandle +create(/* dxfg_ObservableSubscriptionChangeListener_function_symbolsAdded */ void *functionSymbolsAdded, + /* dxfg_ObservableSubscriptionChangeListener_function_symbolsRemoved */ void *functionSymbolsRemoved, + /* dxfg_ObservableSubscriptionChangeListener_function_subscriptionClosed */ void *functionSubscriptionClosed, + void *userData) { + if (!functionSymbolsAdded) { + throw std::invalid_argument( + "Unable to create ObservableSubscriptionChangeListener. The `functionSymbolsAdded` parameter is nullptr"); + } + + if (!functionSymbolsRemoved) { + throw std::invalid_argument( + "Unable to create ObservableSubscriptionChangeListener. The `functionSymbolsRemoved` parameter is nullptr"); + } + + if (!functionSubscriptionClosed) { + throw std::invalid_argument("Unable to create ObservableSubscriptionChangeListener. The " + "`functionSubscriptionClosed` parameter is nullptr"); + } + + return JavaObjectHandle(runGraalFunctionAndThrowIfNullptr( + dxfg_ObservableSubscriptionChangeListener_new, + dxfcpp::bit_cast(functionSymbolsAdded), + dxfcpp::bit_cast(functionSymbolsRemoved), + dxfcpp::bit_cast( + functionSubscriptionClosed), + userData)); +} + +} // namespace isolated::api::IsolatedObservableSubscriptionChangeListener + +DXFCPP_END_NAMESPACE \ No newline at end of file diff --git a/tests/api/DataIntegrityTest.cpp b/tests/api/DataIntegrityTest.cpp index 1aa98008..a547e0ac 100644 --- a/tests/api/DataIntegrityTest.cpp +++ b/tests/api/DataIntegrityTest.cpp @@ -34,6 +34,8 @@ struct DataIntegrityTestFixture { endpoint = DXEndpoint::newBuilder() ->withRole(DXEndpoint::Role::LOCAL_HUB) ->withProperty(DXEndpoint::DXFEED_WILDCARD_ENABLE_PROPERTY, "true") + ->withProperty(DXEndpoint::DXENDPOINT_EVENT_TIME_PROPERTY, "true") + ->withProperty(DXEndpoint::DXSCHEME_NANO_TIME_PROPERTY, "true") ->build(); feed = endpoint->getFeed(); pub = endpoint->getPublisher(); @@ -52,14 +54,10 @@ struct DataIntegrityLocalAddressTestFixture { DXFeed::Ptr feed{}; DataIntegrityLocalAddressTestFixture() { - pubEndpoint = DXEndpoint::newBuilder() - ->withRole(DXEndpoint::Role::PUBLISHER) - ->build(); + pubEndpoint = DXEndpoint::newBuilder()->withRole(DXEndpoint::Role::PUBLISHER)->build(); pubEndpoint->connect(":7766"); pub = pubEndpoint->getPublisher(); - feedEndpoint = DXEndpoint::newBuilder() - ->withRole(DXEndpoint::Role::FEED) - ->build(); + feedEndpoint = DXEndpoint::newBuilder()->withRole(DXEndpoint::Role::FEED)->build(); feedEndpoint->connect("127.0.0.1:7766"); feed = feedEndpoint->getFeed(); } @@ -89,13 +87,12 @@ struct DataIntegrityRemoteTestFixture { } }; -TEST_CASE_FIXTURE(DataIntegrityTestFixture, "Test Message" * doctest::should_fail() ) { +TEST_CASE_FIXTURE(DataIntegrityTestFixture, "Test Message" * doctest::should_fail()) { auto sub = feed->createSubscription(Message::TYPE); - auto message = std::make_shared("TEST", "Attachment2"); - sub->addEventListener([message = message](const std::vector> & messages) { - for (auto&& m : messages) { + sub->addEventListener([message = message](const std::vector> &messages) { + for (auto &&m : messages) { fmt::println("{}", m->toString()); REQUIRE(message->getAttachment() == m->getAttachment()); @@ -103,8 +100,78 @@ TEST_CASE_FIXTURE(DataIntegrityTestFixture, "Test Message" * doctest::should_fai }); sub->addSymbols("TEST"); - pub->publishEvents(message); std::this_thread::sleep_for(2s); } + +TEST_CASE_FIXTURE(DataIntegrityTestFixture, "dxFeed :: Test attach & detach sub") { + std::mutex ioMutex{}; + + auto println = [&ioMutex](auto s) { + std::lock_guard lock{ioMutex}; + std::cout << s << std::endl; + }; + + auto tnsSubAAA = DXFeedSubscription::create({TimeAndSale::TYPE}); + auto tnsSubBBB = DXFeedSubscription::create({TimeAndSale::TYPE}); + + tnsSubAAA->addSymbols("AAA"); + tnsSubBBB->addSymbols("BBB"); + + tnsSubAAA->addEventListener([&println](const auto &timeAndSales) { + for (auto &&tns : timeAndSales) { + println(fmt::format("tnsSubAAA: {}", tns->toString())); + } + }); + + tnsSubBBB->addEventListener([&println](const auto &timeAndSales) { + for (auto &&tns : timeAndSales) { + println(fmt::format("tnsSubBBB: {}", tns->toString())); + } + }); + + std::atomic publishAAA{true}; + std::atomic publishBBB{true}; + std::atomic stop{false}; + + auto t = std::thread([pub = pub, &publishAAA, &publishBBB, &stop]() { + auto tnsAAA = std::make_shared("AAA"); + auto tnsBBB = std::make_shared("BBB"); + + while (!stop) { + if (publishAAA) { + tnsAAA->setTime(tnsAAA->getTime() + 1000); + pub->publishEvents(tnsAAA); + } + + if (publishBBB) { + tnsBBB->setTime(tnsBBB->getTime() + 1000); + pub->publishEvents(tnsBBB); + } + + std::this_thread::sleep_for(100ms); + } + }); + + std::this_thread::sleep_for(1ms); + + println("Attach tnsSubAAA"); + feed->attachSubscription(tnsSubAAA); + println("Attach tnsSubBBB"); + feed->attachSubscription(tnsSubBBB); + + std::this_thread::sleep_for(5s); + + println("Detach tnsSubAAA"); + feed->detachSubscription(tnsSubAAA); + + std::this_thread::sleep_for(5s); + + println("Detach tnsSubBBB"); + feed->detachSubscription(tnsSubBBB); + + std::this_thread::sleep_for(5s); + stop = true; + t.join(); +}