Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] RFC: corba: allow selecting dispatchers per-channel #227

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 114 additions & 6 deletions rtt/transports/corba/CorbaDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,120 @@


#include "CorbaDispatcher.hpp"
#include <boost/lexical_cast.hpp>

namespace RTT {
using namespace corba;
CorbaDispatcher::DispatchMap CorbaDispatcher::DispatchI;
RTT_CORBA_API os::Mutex* CorbaDispatcher::mlock = 0;
using namespace RTT;
using namespace RTT::corba;

int CorbaDispatcher::defaultScheduler = ORO_SCHED_RT;
int CorbaDispatcher::defaultPriority = os::LowestPriority;
CorbaDispatcher::DispatchMap CorbaDispatcher::DispatchI;
RTT_CORBA_API os::Mutex* CorbaDispatcher::mlock = 0;

int CorbaDispatcher::defaultScheduler = ORO_SCHED_RT;
int CorbaDispatcher::defaultPriority = os::LowestPriority;

CorbaDispatcher::DispatchEntry& CorbaDispatcher::Get(std::string const& name, int scheduler, int priority)
{
DispatchMap::iterator result = DispatchI.find(name);
if ( result != DispatchI.end() )
return result->second;

CorbaDispatcher* dispatcher = new CorbaDispatcher( name, scheduler, priority );
dispatcher->start();
return (DispatchI[name] = DispatchEntry(dispatcher));
}

CorbaDispatcher* CorbaDispatcher::Instance(DataFlowInterface* iface, int scheduler, int priority) {
return Instance(defaultDispatcherName(iface), scheduler, priority);
}

std::string CorbaDispatcher::defaultDispatcherName(DataFlowInterface* iface)
{
std::string name;
if ( iface == 0 || iface->getOwner() == 0)
name = "Global";
else
name = iface->getOwner()->getName();
return name + ".CorbaDispatch." + boost::lexical_cast<std::string>(reinterpret_cast<uint64_t>(iface));
}

/**
* Create a new dispatcher and registers it under a certain name
*
* @param name the dispatcher registration name
* @return
*/
CorbaDispatcher* CorbaDispatcher::Instance(std::string const& name, int scheduler, int priority) {
if (!mlock) mlock = new os::Mutex();
os::MutexLock lock(*mlock);

return Get(name, scheduler, priority).dispatcher;
}

CorbaDispatcher* CorbaDispatcher::Acquire(RTT::DataFlowInterface* interface, int scheduler, int priority) {
return Acquire(defaultDispatcherName(interface), scheduler, priority);
}

CorbaDispatcher* CorbaDispatcher::Acquire(std::string const& name, int scheduler, int priority) {
if (!mlock) mlock = new os::Mutex();
os::MutexLock lock(*mlock);

DispatchEntry& entry = Get(name, scheduler, priority);
entry.refcount.inc();
return entry.dispatcher;
}

void CorbaDispatcher::Release(CorbaDispatcher* dispatcher) {
return Release(dispatcher->getName());
}

void CorbaDispatcher::Release(std::string const& name) {
if (!mlock) return;
os::MutexLock lock(*mlock);

DispatchMap::iterator result = DispatchI.find(name);
if ( result != DispatchI.end() ) {
if (result->second.refcount.dec_and_test())
{
delete result->second.dispatcher;
DispatchI.erase(result);
}
}
}

void CorbaDispatcher::hasElement(base::ChannelElementBase::shared_ptr c0, base::ChannelElementBase::shared_ptr c1, bool& result)
{
result = result || (c0 == c1);
}

void CorbaDispatcher::dispatchChannel( base::ChannelElementBase::shared_ptr chan ) {
bool has_element = false;
RClist.apply(boost::bind(&CorbaDispatcher::hasElement, _1, chan, boost::ref(has_element)));
if (!has_element)
RClist.append( chan );
this->trigger();
}

void CorbaDispatcher::cancelChannel( base::ChannelElementBase::shared_ptr chan ) {
RClist.erase( chan );
}

bool CorbaDispatcher::initialize() {
log(Info) <<"Started " << this->getName() << "." <<endlog();
do_exit = false;
return true;
}

void CorbaDispatcher::loop() {
while ( !RClist.empty() && !do_exit) {
base::ChannelElementBase::shared_ptr chan = RClist.front();
CRemoteChannelElement_i* rbase = dynamic_cast<CRemoteChannelElement_i*>(chan.get());
if (rbase)
rbase->transferSamples();
RClist.erase( chan );
}
}

bool CorbaDispatcher::breakLoop() {
do_exit = true;
return true;
}
126 changes: 36 additions & 90 deletions rtt/transports/corba/CorbaDispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,16 @@ namespace RTT {
*/
class CorbaDispatcher : public Activity
{
typedef std::map<DataFlowInterface*,CorbaDispatcher*> DispatchMap;
struct DispatchEntry {
os::AtomicInt refcount;
CorbaDispatcher* dispatcher;

DispatchEntry()
: dispatcher(0) {}
explicit DispatchEntry(CorbaDispatcher* dispatcher)
: dispatcher(dispatcher) {}
};
typedef std::map<std::string, DispatchEntry> DispatchMap;
RTT_CORBA_API static DispatchMap DispatchI;

typedef internal::List<base::ChannelElementBase::shared_ptr> RCList;
Expand Down Expand Up @@ -85,106 +94,43 @@ namespace RTT {
this->stop();
}

/** Internal access and auto-creation of dispatch entries
*
* It is a helper method, and does NOT acquire the locking mutex.
* Callers MUST acquire it before calling
*/
static DispatchEntry& Get(std::string const& name, int scheduler = defaultScheduler, int priority = defaultPriority);

public:
static std::string defaultDispatcherName(DataFlowInterface* iface);

static CorbaDispatcher* Instance(DataFlowInterface* iface, int scheduler = defaultScheduler, int priority = defaultPriority);

/**
* Create a new dispatcher for a given data flow interface.
* This method will only lock and allocate when a new dispatcher must be created,
* otherwise, the access is lock-free and real-time.
* One dispatcher per \a iface is created.
* @param iface The interface to dispatch data flow messages for.
* Create a new dispatcher and registers it under a certain name
*
* @param name the dispatcher registration name
* @return
*/
static CorbaDispatcher* Instance(DataFlowInterface* iface, int scheduler = defaultScheduler, int priority = defaultPriority) {
if (!mlock)
mlock = new os::Mutex();
DispatchMap::iterator result = DispatchI.find(iface);
if ( result == DispatchI.end() ) {
os::MutexLock lock(*mlock);
// re-try to find (avoid race):
result = DispatchI.find(iface);
if ( result != DispatchI.end() )
return result->second;
// *really* not found, let's create it.
std::string name;
if ( iface == 0 || iface->getOwner() == 0)
name = "Global";
else
name = iface->getOwner()->getName();
name += ".CorbaDispatch";
DispatchI[iface] = new CorbaDispatcher( name, scheduler, priority );
DispatchI[iface]->start();
return DispatchI[iface];
}
return result->second;
}
static CorbaDispatcher* Instance(std::string const& name, int scheduler = defaultScheduler, int priority = defaultPriority);

/**
* Releases and cleans up a specific interface from dispatching.
* @param iface
*/
static void Release(DataFlowInterface* iface) {
DispatchMap::iterator result = DispatchI.find(iface);
if ( result != DispatchI.end() ) {
os::MutexLock lock(*mlock);
delete result->second;
DispatchI.erase(result);
}
if ( DispatchI.empty() )
delete mlock;
mlock = 0;
}
static CorbaDispatcher* Acquire(DataFlowInterface* interface, int scheduler = defaultScheduler, int priority = defaultPriority);

/**
* May be called during program termination to clean up all resources.
*/
static void ReleaseAll() {
DispatchMap::iterator result = DispatchI.begin();
while ( result != DispatchI.end() ) {
delete result->second;
DispatchI.erase(result);
result = DispatchI.begin();
}
delete mlock;
mlock = 0;
}
static CorbaDispatcher* Acquire(std::string const& name, int scheduler = defaultScheduler, int priority = defaultPriority);

static void hasElement(base::ChannelElementBase::shared_ptr c0, base::ChannelElementBase::shared_ptr c1, bool& result)
{
result = result || (c0 == c1);
}

void dispatchChannel( base::ChannelElementBase::shared_ptr chan ) {
bool has_element = false;
RClist.apply(boost::bind(&CorbaDispatcher::hasElement, _1, chan, boost::ref(has_element)));
if (!has_element)
RClist.append( chan );
this->trigger();
}
static void Release(std::string const& name);
static void Release(CorbaDispatcher* dispatcher);

void cancelChannel( base::ChannelElementBase::shared_ptr chan ) {
RClist.erase( chan );
}
static void hasElement(base::ChannelElementBase::shared_ptr c0, base::ChannelElementBase::shared_ptr c1, bool& result);

bool initialize() {
log(Info) <<"Started " << this->getName() << "." <<endlog();
do_exit = false;
return true;
}
void dispatchChannel( base::ChannelElementBase::shared_ptr chan );

void loop() {
while ( !RClist.empty() && !do_exit) {
base::ChannelElementBase::shared_ptr chan = RClist.front();
CRemoteChannelElement_i* rbase = dynamic_cast<CRemoteChannelElement_i*>(chan.get());
if (rbase)
rbase->transferSamples();
RClist.erase( chan );
}
}
void cancelChannel( base::ChannelElementBase::shared_ptr chan );

bool breakLoop() {
do_exit = true;
return true;
}
bool initialize();

void loop();
bool breakLoop();
};
}
}
Expand Down
8 changes: 7 additions & 1 deletion rtt/transports/corba/CorbaLib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ namespace RTT {

virtual base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy) const { return 0; }

virtual CRemoteChannelElement_i* createChannelElement_i(DataFlowInterface*, ::PortableServer::POA* poa, bool) const {
virtual CRemoteChannelElement_i* createOutputChannelElement_i(std::string const&, ::PortableServer::POA* poa, bool) const {
Logger::In in("CorbaFallBackProtocol");
log(Error) << "Could create Channel : data type not known to CORBA Transport." <<Logger::endl;
return 0;

}
virtual CRemoteChannelElement_i* createInputChannelElement_i(::PortableServer::POA* poa, bool) const {
Logger::In in("CorbaFallBackProtocol");
log(Error) << "Could create Channel : data type not known to CORBA Transport." <<Logger::endl;
return 0;
Expand Down
7 changes: 5 additions & 2 deletions rtt/transports/corba/CorbaTemplateProtocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ namespace RTT
*/
typedef typename Property<T>::DataSourceType PropertyType;

CRemoteChannelElement_i* createChannelElement_i(DataFlowInterface* sender,PortableServer::POA_ptr poa, bool is_pull) const
{ return new RemoteChannelElement<T>(*this, sender, poa, is_pull); }
CRemoteChannelElement_i* createOutputChannelElement_i(std::string const& dispatcherName,PortableServer::POA_ptr poa, bool is_pull) const
{ return new RemoteChannelElement<T>(dispatcherName, *this, poa, is_pull); }

CRemoteChannelElement_i* createInputChannelElement_i(PortableServer::POA_ptr poa, bool is_pull) const
{ return new RemoteChannelElement<T>(*this, poa, is_pull); }

/**
* Create an transportable object for a \a protocol which contains the value of \a source.
Expand Down
10 changes: 9 additions & 1 deletion rtt/transports/corba/CorbaTypeTransporter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ namespace RTT {
* @param poa The POA to manage the server code.
* @return the created CChannelElement_i.
*/
virtual CRemoteChannelElement_i* createChannelElement_i(DataFlowInterface* sender, ::PortableServer::POA* poa, bool is_pull) const = 0;
virtual CRemoteChannelElement_i* createOutputChannelElement_i(std::string const& dispatcherName, ::PortableServer::POA* poa, bool is_pull) const = 0;

/**
* Builds a channel element for remote transport in both directions.
* @param sender The data flow interface which will be sending or receiving this channel.
* @param poa The POA to manage the server code.
* @return the created CChannelElement_i.
*/
virtual CRemoteChannelElement_i* createInputChannelElement_i(::PortableServer::POA* poa, bool is_pull) const = 0;

/**
* The CORBA transport does not support creating 'CORBA' streams.
Expand Down
Loading