Skip to content

Commit

Permalink
Add propagateNeedsSignal function.
Browse files Browse the repository at this point in the history
Add propagateNeedsSignal function to the ChannelElement interface.
This function allow an input port to tell if it needs signaling
upon new data availability.
  • Loading branch information
ressac authored and Peter Soetens committed Dec 11, 2012
1 parent 8486fe6 commit c512431
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 3 deletions.
8 changes: 8 additions & 0 deletions rtt/base/ChannelElementBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ namespace RTT { namespace base {
*/
void setOutput(shared_ptr output);

/** Tell if the output needs to be signaled when new data are
* available.
* By default, the channel element forwards the call to its input
*
* @param true_false true if need signaling.
*/
virtual void propagateNeedsSignaling(bool);

/** Signals that there is new data available on this channel
* By default, the channel element forwards the call to its output
*
Expand Down
8 changes: 8 additions & 0 deletions rtt/base/ChannelInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ void ChannelElementBase::clear()
input->clear();
}

void ChannelElementBase::propagateNeedsSignaling(bool true_false)
{
// we go against the data stream
shared_ptr input = getInput();
if (input)
input->propagateNeedsSignaling(true_false);
}

bool ChannelElementBase::signal()
{
shared_ptr output = getOutput();
Expand Down
6 changes: 6 additions & 0 deletions rtt/base/InputPortInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ void InputPortInterface::signal()
void InputPortInterface::signalInterface(bool true_false)
{
msignal_interface = true_false;
// propagate the information to the output end
std::list<internal::ConnectionManager::ChannelDescriptor> channels = cmanager.getChannels();
std::list<internal::ConnectionManager::ChannelDescriptor>::iterator it;
for(it = channels.begin(); it != channels.end(); ++it) {
it->get<1>()->propagateNeedsSignaling(true_false);
}
}
#endif
FlowStatus InputPortInterface::read(DataSourceBase::shared_ptr source, bool copy_old_data)
Expand Down
8 changes: 8 additions & 0 deletions rtt/transports/corba/DataFlow.idl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ module RTT
* side is such that they can relay a signal or disconnect.
*/
void setRemoteSide(in CRemoteChannelElement other);

/**
* Used by the 'remote' side to tell that it needs signaling
* when new data is available for reading.
* @param true_false true if the remote side is an "eventPort".
*/
void remotePropagateNeedsSignaling(in boolean true_false);

/**
* Used by the 'remote' side to inform this channel element that new data
* is available for reading.
Expand Down
37 changes: 34 additions & 3 deletions rtt/transports/corba/RemoteChannelElement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ namespace RTT {
* In pull mode, we don't send data, just signal it and remote must read it back.
*/
bool pull;
/**
* In pull mode, we don't send data neither signal if not needed.
*/
bool need_signal;

/** This is used on to read the channel */
typename base::ChannelElement<T>::value_t sample;
Expand All @@ -95,7 +99,7 @@ namespace RTT {
value_data_source(new internal::ValueDataSource<T>),
ref_data_source(new internal::LateReferenceDataSource<T>),
const_ref_data_source(new internal::LateConstReferenceDataSource<T>),
valid(true), pull(is_pull),
valid(true), pull(is_pull), need_signal(false,
msender(sender),
write_any(new CORBA::Any)
{
Expand Down Expand Up @@ -123,6 +127,30 @@ namespace RTT {
{ this->deref(); }


/**
* CORBA IDL function.
* CRemoteChannelElement_i propagateNeedsSignaling implementation.
*/
void remotePropagateNeedsSignaling(CORBA::Boolean true_false) ACE_THROW_SPEC ((
CORBA::SystemException
))
{
// forward too.
base::ChannelElementBase::propagateNeedsSignaling(true_false);
// update the cache
need_signal = true_false;
}

/**
* base::ChannelElement<T> propagateNeedsSignaling implementation.
*/
void propagateNeedsSignaling(bool true_false)
{
if ( !CORBA::is_nil(remote_side.in()) )
// FIXME should we care about the RT here, as in signal ?
remote_side->remotePropagateNeedsSignaling(true_false);
}

/**
* CORBA IDL function.
*/
Expand All @@ -138,6 +166,9 @@ namespace RTT {
// intercept signal if no remote side set.
if ( CORBA::is_nil(remote_side.in()) )
return true;
// intercept signal if no signal needed.
if ( !need_signal )
return true;
// Remember that signal() is called in the context of the one
// that wrote the data, so we must decouple here to keep hard-RT happy.
// the dispatch thread must read the data and send it over by calling transferSample().
Expand All @@ -151,7 +182,7 @@ namespace RTT {
return;
//log(Debug) <<"transfering..." <<endlog();
// in push mode, transfer all data, in pull mode, only signal once for each sample.
if ( pull ) {
if ( pull && need_signal ) {
try
{ valid = remote_side->remoteSignal(); }
#ifdef CORBA_IS_OMNIORB
Expand All @@ -166,7 +197,7 @@ namespace RTT {
log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
valid = false;
}
} else {
} else if (!pull) {
//log(Debug) <<"...read..."<<endlog();
while ( this->read(sample, false) == NewData && valid) {
//log(Debug) <<"...write..."<<endlog();
Expand Down

0 comments on commit c512431

Please sign in to comment.