Skip to content

Commit

Permalink
[EN-7588] Implement PublishProfiles sample
Browse files Browse the repository at this point in the history
IsolatedDXPublisherObservableSubscription
  • Loading branch information
ttldtor committed May 20, 2024
1 parent 0f8091e commit 12956c4
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 26 deletions.
1 change: 1 addition & 0 deletions include/dxfeed_graal_cpp_api/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251 4996)
#include "exceptions/GraalException.hpp"

#include "isolated/api/IsolatedDXEndpoint.hpp"
#include "isolated/api/IsolatedDXPublisherObservableSubscription.hpp"
#include "isolated/api/osub/IsolatedObservableSubscriptionChangeListener.hpp"
#include "isolated/internal/IsolatedString.hpp"
#include "isolated/internal/IsolatedTimeFormat.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,35 @@

DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251)

#include "../internal/EventClassList.hpp"
#include "../internal/context/ApiContext.hpp"

#include "../entity/EntityModule.hpp"
#include "osub/ObservableSubscription.hpp"
#include "../event/EventType.hpp"
#include "../event/EventTypeEnum.hpp"
#include "../internal/Common.hpp"
#include "../internal/Handler.hpp"
#include "../internal/JavaObjectHandle.hpp"
#include "../symbols/SymbolWrapper.hpp"
#include "osub/ObservableSubscription.hpp"

#include <concepts>
#include <memory>
#include <type_traits>
#include <unordered_set>

DXFCPP_BEGIN_NAMESPACE

class DXFCPP_EXPORT DXPublisherObservableSubscription : public ObservableSubscription, public SharedEntity {
struct DXFCPP_EXPORT DXPublisherObservableSubscription : RequireMakeShared<DXPublisherObservableSubscription>,
ObservableSubscription {
static constexpr std::size_t FAKE_LISTENER_ID{static_cast<std::size_t>(-1)};

private:
inline static std::atomic<std::size_t> lastListenerId_{};

JavaObjectHandle<DXPublisherObservableSubscription> handle_;
std::unordered_map<std::size_t, std::shared_ptr<ObservableSubscriptionChangeListener>> listeners_;
std::recursive_mutex listenersMutex_{};

public:
DXPublisherObservableSubscription(LockExternalConstructionTag,
JavaObjectHandle<DXPublisherObservableSubscription> &&handle);
~DXPublisherObservableSubscription() override;

static std::shared_ptr<DXPublisherObservableSubscription>
create(JavaObjectHandle<DXPublisherObservableSubscription> &&handle);

bool isClosed() override;
std::unordered_set<EventTypeEnum> getEventTypes() override;
bool containsEventType(const EventTypeEnum &eventType) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@

DXFCXX_DISABLE_MSC_WARNINGS_PUSH(4251)

#include "../../event/IndexedEventSource.hpp"
#include "../../symbols/SymbolWrapper.hpp"

#include <cstdint>
#include <memory>
#include <unordered_set>
#include <utility>

DXFCPP_BEGIN_NAMESPACE

Expand All @@ -29,7 +26,10 @@ struct DXFCPP_EXPORT ObservableSubscriptionChangeListener : RequireMakeShared<Ob
std::function<void(const std::unordered_set<SymbolWrapper> &symbols)> onSymbolsRemoved,
std::function<void()> onSubscriptionClosed);

const JavaObjectHandle<ObservableSubscriptionChangeListener> &getHandle() const;

private:
mutable std::recursive_mutex mutex_{};
JavaObjectHandle<ObservableSubscriptionChangeListener> handle_;
SimpleHandler<void(const std::unordered_set<SymbolWrapper> &symbols)> onSymbolsAdded_{};
SimpleHandler<void(const std::unordered_set<SymbolWrapper> &symbols)> onSymbolsRemoved_{};
Expand Down
57 changes: 45 additions & 12 deletions src/api/DXPublisherObservableSubscription.cpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,71 @@
// Copyright (c) 2024 Devexperts LLC.
// SPDX-License-Identifier: MPL-2.0

#include <dxfg_api.h>

#include <dxfeed_graal_cpp_api/api.hpp>

#include <memory>
#include <typeinfo>

#include <fmt/chrono.h>
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <fmt/std.h>

DXFCPP_BEGIN_NAMESPACE

DXPublisherObservableSubscription::DXPublisherObservableSubscription(
LockExternalConstructionTag, JavaObjectHandle<DXPublisherObservableSubscription> &&handle)
: handle_{std::move(handle)} {
}

DXPublisherObservableSubscription::~DXPublisherObservableSubscription() = default;

std::shared_ptr<DXPublisherObservableSubscription>
DXPublisherObservableSubscription::create(JavaObjectHandle<DXPublisherObservableSubscription> &&handle) {
auto sub = DXPublisherObservableSubscription::createShared(std::move(handle));

ApiContext::getInstance()->getManager<EntityManager<DXPublisherObservableSubscription>>()->registerEntity(sub);

return sub;
}

bool DXPublisherObservableSubscription::isClosed() {
return false;
return isolated::api::IsolatedDXPublisherObservableSubscription::isClosed(handle_);
}

std::unordered_set<EventTypeEnum> DXPublisherObservableSubscription::getEventTypes() {
return {};
return isolated::api::IsolatedDXPublisherObservableSubscription::getEventTypes(handle_);
}

bool DXPublisherObservableSubscription::containsEventType(const EventTypeEnum &eventType) {
return false;
return isolated::api::IsolatedDXPublisherObservableSubscription::containsEventType(handle_, eventType);
}

std::size_t
DXPublisherObservableSubscription::addChangeListener(std::shared_ptr<ObservableSubscriptionChangeListener> listener) {
return Id<ObservableSubscriptionChangeListener>::UNKNOWN.getValue();
isolated::api::IsolatedDXPublisherObservableSubscription::addChangeListener(handle_, listener->getHandle());

std::lock_guard guard{listenersMutex_};

if (lastListenerId_ >= FAKE_LISTENER_ID - 1) {
return FAKE_LISTENER_ID;
}

auto id = ++lastListenerId_;

listeners_.emplace(id, listener);

return id;
}

void DXPublisherObservableSubscription::removeChangeListener(std::size_t id) {
std::lock_guard guard{listenersMutex_};

if (id == FAKE_LISTENER_ID) {
return;
}

if (auto found = listeners_.find(id); found != listeners_.end()) {
auto listener = found->second;

isolated::api::IsolatedDXPublisherObservableSubscription::removeChangeListener(handle_, listener->getHandle());

listeners_.erase(found);
}
}

DXFCPP_END_NAMESPACE
6 changes: 6 additions & 0 deletions src/api/osub/ObservableSubscriptionChangeListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ std::shared_ptr<ObservableSubscriptionChangeListener> ObservableSubscriptionChan
return listener;
}

const JavaObjectHandle<ObservableSubscriptionChangeListener> &ObservableSubscriptionChangeListener::getHandle() const {
std::lock_guard guard{mutex_};

return handle_;
}

DXFCPP_END_NAMESPACE

0 comments on commit 12956c4

Please sign in to comment.