Skip to content

Commit

Permalink
[EN-7588] Implement PublishProfiles sample
Browse files Browse the repository at this point in the history
IsolatedDXPublisher
  • Loading branch information
ttldtor committed May 21, 2024
1 parent 12956c4 commit 122bd02
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 32 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ set(dxFeedGraalCxxApi_Exceptions_Sources

set(dxFeedGraalCxxApi_Isolated_Sources
src/isolated/api/IsolatedDXEndpoint.cpp
src/isolated/api/IsolatedDXPublisher.cpp
src/isolated/api/IsolatedDXPublisherObservableSubscription.cpp
src/isolated/api/osub/IsolatedObservableSubscriptionChangeListener.cpp
src/isolated/internal/IsolatedString.cpp
Expand Down
1 change: 1 addition & 0 deletions include/dxfeed_graal_cpp_api/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251 4996)
#include "exceptions/GraalException.hpp"

#include "isolated/api/IsolatedDXEndpoint.hpp"
#include "isolated/api/IsolatedDXPublisher.hpp"
#include "isolated/api/IsolatedDXPublisherObservableSubscription.hpp"
#include "isolated/api/osub/IsolatedObservableSubscriptionChangeListener.hpp"
#include "isolated/internal/IsolatedString.hpp"
Expand Down
29 changes: 27 additions & 2 deletions include/dxfeed_graal_cpp_api/api/DXPublisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ struct DXFCPP_EXPORT DXPublisher : SharedEntity {
friend struct DXEndpoint;

private:
mutable std::recursive_mutex mutex_{};
JavaObjectHandle<DXPublisher> handle_;
std::shared_ptr<ObservableSubscription> subscription_{};

static std::shared_ptr<DXPublisher> create(void *handle);
void publishEventsImpl(void *graalEventsList) const noexcept;

protected:
DXPublisher() noexcept : handle_{} {
DXPublisher() noexcept {
if constexpr (Debugger::isDebug) {
Debugger::debug("DXPublisher()");
}
Expand Down Expand Up @@ -217,7 +219,30 @@ struct DXFCPP_EXPORT DXPublisher : SharedEntity {
EventMapper::freeGraalList(list);
}

std::shared_ptr<ObservableSubscription> getSubscription(const EventTypeEnum &);
/**
* Returns observable set of subscribed symbols for the specified event type.
* Note, that subscription is represented by SymbolWrapper symbols. Check the type of each symbol
* in ObservableSubscription using SymbolWrapper::isStringSymbol(), SymbolWrapper::isWildcardSymbol(),
* SymbolWrapper::isIndexedEventSubscriptionSymbol(), SymbolWrapper::isTimeSeriesSubscriptionSymbol(),
* SymbolWrapper::isCandleSymbol().
*
* <p> The set of subscribed symbols contains WildcardSymbol::ALL if and
* only if there is a subscription to this wildcard symbol.
*
* <p> If DXFeedTimeSeriesSubscription is used to subscribe to time-service of the events of this type, then
* instances of TimeSeriesSubscriptionSymbol class represent the corresponding subscription item.
*
* <p> The resulting observable subscription can generate repeated ObservableSubscriptionChangeListener::onSymbolsAdded_ notifications to
* its listeners for the same symbols without the corresponding ObservableSubscriptionChangeListener::onSymbolsRemoved_
* notifications in between them. It happens when subscription disappears, cached data is lost, and subscription
* reappears again. On each ObservableSubscriptionChangeListener::onSymbolsAdded_
* notification data provider shall @ref DXPublisher::publishEvents() "publish" the most recent events for
* the corresponding symbols.
*
* @param eventType The event type.
* @return Observable subscription for the specified event type.
*/
std::shared_ptr<ObservableSubscription> getSubscription(const EventTypeEnum &eventType);

std::string toString() const noexcept override;
};
Expand Down
48 changes: 48 additions & 0 deletions include/dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2024 Devexperts LLC.
// SPDX-License-Identifier: MPL-2.0

#pragma once

#include "../../internal/Conf.hpp"

DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251)

#include "../../api/DXPublisher.hpp"

DXFCPP_BEGIN_NAMESPACE

struct DXPublisherObservableSubscription;

namespace isolated::api::IsolatedDXPublisher {

/**
* Calls the Graal SDK function `dxfg_DXPublisher_publishEvents` in isolation.
*
* @param publisher The publisher's handle.
* @param events Events to be published.
* @throws std::invalid_argument if publisher handle is invalid or events' pointer is nullptr.
* @throws JavaException if something happened with the dxFeed API backend.
* @throws GraalException if something happened with the GraalVM.
*/
void /* int32_t */ publishEvents(/* dxfg_publisher_t * */ const JavaObjectHandle<dxfcpp::DXPublisher> &publisher,
/* dxfg_event_type_list * */ void *events);

/**
* Calls the Graal SDK function `dxfg_DXPublisher_getSubscription` in isolation.
*
* @param publisher The publisher's handle.
* @param eventType The event type.
* @throws std::invalid_argument if publisher handle is invalid.
* @throws JavaException if something happened with the dxFeed API backend.
* @throws GraalException if something happened with the GraalVM.
* @return The underlying observable subscription's handle.
*/
JavaObjectHandle<DXPublisherObservableSubscription> /* dxfg_observable_subscription_t * */
getSubscription(/* dxfg_publisher_t * */ const JavaObjectHandle<dxfcpp::DXPublisher> &publisher,
/* dxfg_event_clazz_t */ const EventTypeEnum &eventType);

} // namespace isolated::api::IsolatedDXPublisher

DXFCPP_END_NAMESPACE

DXFCXX_DISABLE_MSC_WARNINGS_POP()
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

#include "../../internal/Conf.hpp"

#include <unordered_set>

DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251)

#include <unordered_set>

#include "../../api/DXPublisherObservableSubscription.hpp"

DXFCPP_BEGIN_NAMESPACE
Expand Down
44 changes: 22 additions & 22 deletions samples/cpp/PublishProfiles/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,28 @@ int main(int argc, char *argv[]) {
auto publisher = endpoint->getPublisher();

// Observe Profile subscriptions and publish profiles for "xxx:TEST" symbols
// publisher->getSubscription(Profile::TYPE)->addChangeListener(ObservableSubscriptionChangeListener::create(
// [publisher](auto&& symbols) /* symbolsAdded */ {
// std::vector<std::shared_ptr<Profile>> events{};
// events.reserve(symbols.sise());
//
// for (auto&& symbol : symbols) {
// if (symbol.isStringSymbol()) {
// auto s = symbol.asStringSymbol();
//
// if (s.size() > 5 && s.rfind(":TEST") == s.size() - 5) {
// auto profile = std::make_shared<Profile>(s);
//
// profile->setDescription("Test symbol");
// events.push_back(profile);
// }
// }
// }
//
// events.shrink_to_fit();
// publisher->publishEvents(events);
// }
// ));
publisher->getSubscription(Profile::TYPE)
->addChangeListener(
ObservableSubscriptionChangeListener::create([publisher](auto &&symbols) /* symbolsAdded */ {
std::vector<std::shared_ptr<Profile>> events{};
events.reserve(symbols.size());

for (auto &&symbol : symbols) {
if (symbol.isStringSymbol()) {
auto s = symbol.asStringSymbol();

if (s.size() > 5 && s.rfind(":TEST") == s.size() - 5) {
auto profile = std::make_shared<Profile>(s);

profile->setDescription("Test symbol");
events.push_back(profile);
}
}
}

events.shrink_to_fit();
publisher->publishEvents(events);
}));

std::cin.get();
} catch (const JavaException &e) {
Expand Down
21 changes: 15 additions & 6 deletions src/api/DXPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,26 @@ std::shared_ptr<DXPublisher> DXPublisher::create(void *handle) {
return publisher;
}

std::shared_ptr<ObservableSubscription> DXPublisher::getSubscription(const EventTypeEnum &eventType) {
std::lock_guard guard{mutex_};

if (subscription_) {
return subscription_;
}

subscription_ = DXPublisherObservableSubscription::create(isolated::api::IsolatedDXPublisher::getSubscription(handle_, eventType));

return subscription_;
}

std::string DXPublisher::toString() const noexcept {
return fmt::format("DXPublisher{{{}}}", handle_.toString());
}

void DXPublisher::publishEventsImpl(void *graalEventsList) const noexcept {
runIsolatedOrElse(
[handle = static_cast<dxfg_publisher_t *>(handle_.get()), graalEventsList](auto threadHandle) {
return dxfg_DXPublisher_publishEvents(static_cast<graal_isolatethread_t *>(threadHandle), handle,
static_cast<dxfg_event_type_list *>(graalEventsList)) == 0;
},
false);
std::lock_guard guard{mutex_};

isolated::api::IsolatedDXPublisher::publishEvents(handle_, graalEventsList);
}

DXFCPP_END_NAMESPACE
46 changes: 46 additions & 0 deletions src/isolated/api/IsolatedDXPublisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2024 Devexperts LLC.
// SPDX-License-Identifier: MPL-2.0

#include <dxfg_api.h>

#include <dxfeed_graal_cpp_api/api/DXPublisherObservableSubscription.hpp>
#include <dxfeed_graal_cpp_api/isolated/IsolatedCommon.hpp>
#include <dxfeed_graal_cpp_api/isolated/api/IsolatedDXPublisher.hpp>

DXFCPP_BEGIN_NAMESPACE

namespace isolated::api::IsolatedDXPublisher {

void /* int32_t */ publishEvents(/* dxfg_publisher_t * */ const JavaObjectHandle<dxfcpp::DXPublisher> &publisher,
/* dxfg_event_type_list * */ void *events) {
if (!publisher) {
throw std::invalid_argument(
"Unable to execute function `dxfg_DXPublisher_publishEvents`. The `publisher` handle is invalid");
}

if (!events) {
throw std::invalid_argument(
"Unable to execute function `dxfg_DXPublisher_publishEvents`. The `events` is nullptr");
}

runGraalFunctionAndThrowIfLessThanZero(dxfg_DXPublisher_publishEvents,
static_cast<dxfg_publisher_t *>(publisher.get()),
static_cast<dxfg_event_type_list *>(events));
}

JavaObjectHandle<DXPublisherObservableSubscription> /* dxfg_observable_subscription_t * */
getSubscription(/* dxfg_publisher_t * */ const JavaObjectHandle<dxfcpp::DXPublisher> &publisher,
/* dxfg_event_clazz_t */ const EventTypeEnum &eventType) {
if (!publisher) {
throw std::invalid_argument(
"Unable to execute function `dxfg_DXPublisher_getSubscription`. The `publisher` handle is invalid");
}

return JavaObjectHandle<DXPublisherObservableSubscription>(runGraalFunctionAndThrowIfNullptr(
dxfg_DXPublisher_getSubscription, static_cast<dxfg_publisher_t *>(publisher.get()),
static_cast<dxfg_event_clazz_t>(eventType.getId())));
}

} // namespace isolated::api::IsolatedDXPublisher

DXFCPP_END_NAMESPACE

0 comments on commit 122bd02

Please sign in to comment.