Skip to content

Commit

Permalink
[DXFC-401] Create entity managers and API context.
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyKalin committed May 18, 2023
1 parent 29420cc commit c4bddd9
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 53 deletions.
11 changes: 3 additions & 8 deletions include/dxfeed_graal_cpp_api/api/DXEndpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ struct DXEndpoint : std::enable_shared_from_this<DXEndpoint> {
static inline std::mutex MTX{};
static std::unordered_map<Role, std::shared_ptr<DXEndpoint>> INSTANCES;

mutable std::recursive_mutex mtx_{};
handler_utils::JavaObjectHandler<DXEndpoint> handler_;
Role role_ = Role::FEED;
std::string name_{};
Expand All @@ -480,7 +479,7 @@ struct DXEndpoint : std::enable_shared_from_this<DXEndpoint> {
void closeImpl();

protected:
DXEndpoint() : mtx_{}, handler_{}, role_{}, feed_{}, publisher_{}, stateChangeListenerHandler_{}, onStateChange_{} {
DXEndpoint() : handler_{}, role_{}, feed_{}, publisher_{}, stateChangeListenerHandler_{}, onStateChange_{} {
if constexpr (isDebug) {
debug("DXEndpoint()");
}
Expand Down Expand Up @@ -751,8 +750,6 @@ struct DXEndpoint : std::enable_shared_from_this<DXEndpoint> {
debug("DXEndpoint{{{}}}::close()", handler_.toString());
}

std::lock_guard guard(mtx_);

closeImpl();
}

Expand Down Expand Up @@ -808,12 +805,12 @@ struct DXEndpoint : std::enable_shared_from_this<DXEndpoint> {
class Builder : public std::enable_shared_from_this<Builder> {
friend DXEndpoint;

mutable std::recursive_mutex mtx_{};
// mutable std::recursive_mutex mtx_{};
handler_utils::JavaObjectHandler<Builder> handler_;
Role role_ = Role::FEED;
std::unordered_map<std::string, std::string> properties_;

Builder() : mtx_{}, handler_{}, properties_{} {
Builder() : handler_{}, properties_{} {
if constexpr (isDebug) {
debug("DXEndpoint::Builder::Builder()");
}
Expand Down Expand Up @@ -897,8 +894,6 @@ struct DXEndpoint : std::enable_shared_from_this<DXEndpoint> {
properties.size());
}

std::lock_guard guard(mtx_);

for (auto &&[k, v] : properties) {
withProperty(k, v);
}
Expand Down
5 changes: 1 addition & 4 deletions include/dxfeed_graal_cpp_api/api/DXFeed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ struct DXFeed : std::enable_shared_from_this<DXFeed> {
friend struct DXEndpoint;

private:
mutable std::recursive_mutex mtx_{};
handler_utils::JavaObjectHandler<DXFeed> handler_;

std::unordered_set<std::shared_ptr<DXFeedSubscription>> subscriptions_{};

static std::shared_ptr<DXFeed> create(void *feedHandle) noexcept;

protected:
DXFeed() noexcept : mtx_(), handler_{} {
DXFeed() noexcept : handler_{} {
if constexpr (isDebug) {
debug("DXFeed()");
}
Expand Down Expand Up @@ -98,8 +97,6 @@ struct DXFeed : std::enable_shared_from_this<DXFeed> {
}

std::string toString() const {
std::lock_guard guard(mtx_);

return fmt::format("DXFeed{{{}}}", handler_.toString());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ template <typename EntityType> struct EntityManager : private NonCopyable<Entity
std::lock_guard lockGuard{mutex_};

if (auto it = entitiesById_.find(id); it != entitiesById_.end()) {
entitiesById_.erase(id);
idsByEntities_.erase(it->second);
entitiesById_.erase(id);

return true;
}
Expand Down
1 change: 0 additions & 1 deletion samples/cpp/PrintQuoteEvents/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ int main() {

auto sub =
endpoint->getFeed()->createSubscription({dxfcpp::EventTypeEnum::QUOTE});
// endpoint->getFeed()->createSubscription({dxfcpp::EventTypeEnum::QUOTE, dxfcpp::EventTypeEnum::CANDLE});

sub->addEventListener([](auto &&events) {
for (const auto &e : events) {
Expand Down
38 changes: 5 additions & 33 deletions src/api/DXEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ std::shared_ptr<DXEndpoint> DXEndpoint::create(void *endpointHandle, DXEndpoint:
auto id = Id<DXEndpoint>::from(bit_cast<Id<DXEndpoint>::ValueType>(userData));
auto endpoint = ApiContext::getInstance()->getDxEndpointManager()->getEntity(id);

std::cerr << "onStateChange: id = " + std::to_string(id.getValue()) + ", endpoint = " + ((endpoint) ? endpoint->toString() : "nullptr") + "\n";

if (endpoint) {
endpoint->onStateChange_(graalStateToState(oldState), graalStateToState(newState));
}

if (newState == DXFG_ENDPOINT_STATE_CLOSED) {
ApiContext::getInstance()->getDxEndpointManager()->unregisterEntity(id);
if (newState == DXFG_ENDPOINT_STATE_CLOSED) {
ApiContext::getInstance()->getDxEndpointManager()->unregisterEntity(id);
}
}
};

Expand Down Expand Up @@ -126,8 +128,6 @@ void DXEndpoint::setStateChangeListenerImpl() {
}

DXEndpoint::State DXEndpoint::getState() const {
std::lock_guard guard(mtx_);

return !handler_ ? State::CLOSED
: runIsolatedOrElse(
[handler = bit_cast<dxfg_endpoint_t *>(handler_.get())](auto threadHandle) {
Expand All @@ -154,8 +154,6 @@ std::shared_ptr<DXEndpoint> DXEndpoint::user(const std::string &user) {
debug("DXEndpoint{{{}}}::user(user = {})", handler_.toString(), user);
}

std::lock_guard guard(mtx_);

if (handler_) {
runIsolatedOrElse(
[user = user, handler = bit_cast<dxfg_endpoint_t *>(handler_.get())](auto threadHandle) {
Expand All @@ -173,8 +171,6 @@ std::shared_ptr<DXEndpoint> DXEndpoint::password(const std::string &password) {
debug("DXEndpoint{{{}}}::password(password = {})", handler_.toString(), password);
}

std::lock_guard guard(mtx_);

if (handler_) {
runIsolatedOrElse(
[password = password, handler = bit_cast<dxfg_endpoint_t *>(handler_.get())](auto threadHandle) {
Expand All @@ -192,8 +188,6 @@ std::shared_ptr<DXEndpoint> DXEndpoint::connect(const std::string &address) {
debug("DXEndpoint{{{}}}::connect(address = {})", handler_.toString(), address);
}

std::lock_guard guard(mtx_);

if (handler_) {
runIsolatedOrElse(
[address = address, handler = bit_cast<dxfg_endpoint_t *>(handler_.get())](auto threadHandle) {
Expand All @@ -210,8 +204,6 @@ void DXEndpoint::reconnect() {
debug("DXEndpoint{{{}}}::reconnect()", handler_.toString());
}

std::lock_guard guard(mtx_);

if (!handler_) {
return;
}
Expand All @@ -226,8 +218,6 @@ void DXEndpoint::disconnect() {
debug("DXEndpoint{{{}}}::disconnect()", handler_.toString());
}

std::lock_guard guard(mtx_);

if (!handler_) {
return;
}
Expand All @@ -242,8 +232,6 @@ void DXEndpoint::disconnectAndClear() {
debug("DXEndpoint{{{}}}::disconnectAndClear()", handler_.toString());
}

std::lock_guard guard(mtx_);

if (!handler_) {
return;
}
Expand All @@ -258,8 +246,6 @@ void DXEndpoint::awaitNotConnected() {
debug("DXEndpoint{{{}}}::awaitNotConnected()", handler_.toString());
}

std::lock_guard guard(mtx_);

if (!handler_) {
return;
}
Expand All @@ -274,8 +260,6 @@ void DXEndpoint::awaitProcessed() {
debug("DXEndpoint{{{}}}::awaitProcessed()", handler_.toString());
}

std::lock_guard guard(mtx_);

if (!handler_) {
return;
}
Expand All @@ -290,8 +274,6 @@ void DXEndpoint::closeAndAwaitTermination() {
debug("DXEndpoint{{{}}}::closeAndAwaitTermination()", handler_.toString());
}

std::lock_guard guard(mtx_);

if (!handler_) {
return;
}
Expand All @@ -306,8 +288,6 @@ void DXEndpoint::closeAndAwaitTermination() {
}

std::shared_ptr<DXFeed> DXEndpoint::getFeed() {
std::lock_guard guard(mtx_);

if (!feed_) {
auto feedHandle = !handler_ ? nullptr
: runIsolatedOrElse(
Expand Down Expand Up @@ -388,8 +368,6 @@ std::shared_ptr<DXEndpoint::Builder> DXEndpoint::Builder::withRole(DXEndpoint::R
debug("DXEndpoint::Builder{{{}}}::withRole(role = {})", handler_.toString(), roleToString(role));
}

std::lock_guard guard(mtx_);

role_ = role;

if (handler_) {
Expand All @@ -410,8 +388,6 @@ std::shared_ptr<DXEndpoint::Builder> DXEndpoint::Builder::withProperty(const std
debug("DXEndpoint::Builder{{{}}}::withProperty(key = {}, value = {})", handler_.toString(), key, value);
}

std::lock_guard guard(mtx_);

properties_[key] = value;

if (handler_) {
Expand All @@ -432,8 +408,6 @@ bool DXEndpoint::Builder::supportsProperty(const std::string &key) {
debug("DXEndpoint::Builder{{{}}}::supportsProperty(key = {})", handler_.toString(), key);
}

std::lock_guard guard(mtx_);

if (!handler_) {
return false;
}
Expand All @@ -450,8 +424,6 @@ std::shared_ptr<DXEndpoint> DXEndpoint::Builder::build() {
debug("DXEndpoint::Builder{{{}}}::build()", handler_.toString());
}

std::lock_guard guard(mtx_);

loadDefaultPropertiesImpl();

auto endpointHandle =
Expand Down
6 changes: 0 additions & 6 deletions src/api/DXFeed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ void DXFeed::attachSubscription(std::shared_ptr<DXFeedSubscription> subscription
debug("{}::attachSubscription({})", toString(), subscription->toString());
}

std::lock_guard guard(mtx_);

if (!handler_ || !subscription || !subscription->handler_) {
return;
}
Expand All @@ -46,8 +44,6 @@ void DXFeed::detachSubscription(std::shared_ptr<DXFeedSubscription> subscription
debug("{}::detachSubscription({})", toString(), subscription->toString());
}

std::lock_guard guard(mtx_);

if (!handler_ || !subscription || !subscription->handler_) {
return;
}
Expand All @@ -68,8 +64,6 @@ void DXFeed::detachSubscriptionAndClear(std::shared_ptr<DXFeedSubscription> subs
debug("{}::detachSubscriptionAndClear({})", toString(), subscription->toString());
}

std::lock_guard guard(mtx_);

if (!handler_ || !subscription || !subscription->handler_) {
return;
}
Expand Down

0 comments on commit c4bddd9

Please sign in to comment.