Skip to content

Commit

Permalink
capicxx-someip-runtime 3.1.12.17
Browse files Browse the repository at this point in the history
  • Loading branch information
lutzbichler committed Jul 22, 2019
1 parent 64f0cb0 commit c262d86
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 82 deletions.
20 changes: 20 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
Changes
=======

v3.1.12.17
- Fix race condition which could occur when doing calls on an
unavailable proxy. The race could lead to waiting the complete
specified timeout before sending out the request instead of sending
it out directly if the proxy got available in the meantime.

v3.1.12.16
- Fix handling of broken UTF16 surrogate pairs

v3.1.12.14
- Ensure vsomeip message delivery via mainloop thread
on synchronous calls.

v3.1.12.13
- Deregister dispatch sources later on connection destruction.
- Improve handling of spurious wakeups caused by system time
adjustments.
- Improve robustness of Inputstream regarding malformed messages
containing Variants.

v3.1.12.12
- Fix deregistration of events upon service deregistration
- Improve error handling on send errors
Expand Down
1 change: 0 additions & 1 deletion include/CommonAPI/SomeIP/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ class Connection:
std::map<std::tuple<service_id_t, instance_id_t, eventgroup_id_t, event_id_t>,
std::shared_ptr<SubscriptionStatusWrapper>> subscriptionStates_;

std::mutex availabilityCalledMutex_;
std::map<service_id_t, std::map<instance_id_t, bool>> availabilityCalled_;

std::mutex requestedServicesMutex_;
Expand Down
8 changes: 4 additions & 4 deletions include/CommonAPI/SomeIP/InputStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class InputStream: public CommonAPI::InputStream<InputStream> {
bitAlign();

uint32_t serial;
_readBitValue(serial, 32, false);
errorOccurred_ = _readBitValue(serial, 32, false);
if (!hasError()) {
_value = Polymorphic_Struct::create(serial);
_value->template readValue<InputStream>(*this, _depl);
Expand All @@ -200,8 +200,8 @@ class InputStream: public CommonAPI::InputStream<InputStream> {
DeleteVisitor<maxSize> visitor(_value.valueStorage_);
ApplyVoidVisitor<DeleteVisitor<maxSize>,
Variant<Types_...>, Types_... >::visit(visitor, _value);
_value.valueType_ = 0;
}
_value.valueType_ = 0;

uint32_t itsSize;
uint32_t itsType;
Expand All @@ -218,7 +218,7 @@ class InputStream: public CommonAPI::InputStream<InputStream> {
readValue(itsSize, unionLengthWidth, true);
}

if(!itsType)
if (!itsType)
errorOccurred_ = true;

if (!hasError()) {
Expand Down Expand Up @@ -427,7 +427,7 @@ class InputStream: public CommonAPI::InputStream<InputStream> {

_value.clear();

_readBitValue(itsSize, 32, false);
errorOccurred_ = _readBitValue(itsSize, 32, false);

while (itsSize > 0) {
size_t remainingBeforeRead = remaining_;
Expand Down
28 changes: 14 additions & 14 deletions src/CommonAPI/SomeIP/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ void Connection::receive(const std::shared_ptr<vsomeip::message> &_message) {
(_message->get_message_type() < vsomeip::message_type_e::MT_NOTIFICATION ?
commDirectionType::STUBRECEIVE : commDirectionType::PROXYRECEIVE);

// avoid blocking the mainloop
// avoid blocking the mainloop when a synchronous call in a callback was done
bool isSendAndBlockAnswer = false;
{
if (itsDirection == commDirectionType::PROXYRECEIVE) {
std::lock_guard<std::recursive_mutex> itsLock(sendReceiveMutex_);
if(_message->get_message_type() != vsomeip::message_type_e::MT_NOTIFICATION &&
sendAndBlockAnswers_.find(_message->get_session()) != sendAndBlockAnswers_.end()) {
Expand Down Expand Up @@ -175,7 +175,7 @@ void Connection::onConnectionEvent(state_type_e state) {
void Connection::onAvailabilityChange(service_id_t _service, instance_id_t _instance,
bool _is_available) {
{
std::lock_guard<std::mutex> itsLock(availabilityCalledMutex_);
std::lock_guard<std::mutex> itsLock(availabilityMutex_);
auto its_service = availabilityCalled_.find(_service);
if (its_service != availabilityCalled_.end()) {
auto its_instance = its_service->second.find(_instance);
Expand All @@ -184,7 +184,6 @@ void Connection::onAvailabilityChange(service_id_t _service, instance_id_t _inst
queueSubscriptionStatusHandler(_service, _instance);
}
}
availabilityCalled_[_service][_instance] = true;
}
if (auto lockedContext = mainLoopContext_.lock()) {
std::shared_ptr<AvblQueueEntry> avbl_queue_entry = std::make_shared<AvblQueueEntry>(
Expand Down Expand Up @@ -256,7 +255,7 @@ void Connection::handleAvailabilityChange(const service_id_t _service,
std::weak_ptr<Proxy>,
void*>> itsHandlers;
{
std::unique_lock<std::mutex> itsLock(availabilityMutex_);
std::lock_guard<std::mutex> itsLock(availabilityMutex_);
auto foundService = availabilityHandlers_.find(_service);
if (foundService != availabilityHandlers_.end()) {
auto foundInstance = foundService->second.find(_instance);
Expand All @@ -271,14 +270,14 @@ void Connection::handleAvailabilityChange(const service_id_t _service,
itsHandlers.push_back(h.second);
}
}
availabilityCalled_[_service][_instance] = true;
}

for (auto h : itsHandlers) {
if(auto itsProxy = std::get<1>(h).lock())
std::get<0>(h)(itsProxy, _service, _instance, _is_available,
std::get<2>(h));
}

}

void Connection::dispatch() {
Expand Down Expand Up @@ -374,6 +373,10 @@ Connection::Connection(const std::string &_name)
}

Connection::~Connection() {
if (auto lockedContext = mainLoopContext_.lock()) {
lockedContext->deregisterDispatchSource(dispatchSource_);
lockedContext->deregisterWatch(watch_);
}
bool shouldDisconnect(false);
{
std::lock_guard<std::mutex> itsLock(connectionMutex_);
Expand Down Expand Up @@ -422,10 +425,6 @@ bool Connection::connect(bool) {
}

void Connection::doDisconnect() {
if (auto lockedContext = mainLoopContext_.lock()) {
lockedContext->deregisterDispatchSource(dispatchSource_);
lockedContext->deregisterWatch(watch_);
}
if (asyncAnswersCleanupThread_) {
{
std::lock_guard<std::mutex> lg(cleanupMutex_);
Expand Down Expand Up @@ -575,17 +574,18 @@ Message Connection::sendMessageWithReplyAndBlock(
break;
}
waitStatus = sendAndBlockCondition_.wait_until(lock, elapsed);
if (waitStatus == std::cv_status::timeout || itsAnswer.first->second)
if (itsAnswer.first->second || (waitStatus == std::cv_status::timeout && (elapsed < std::chrono::steady_clock::now())))
break;
} while (!itsAnswer.first->second);

// If there was an answer (thus, we did not run into the timeout),
// move it to itsResult
if (waitStatus != std::cv_status::timeout) {
if (itsAnswer.first->second) {
itsResult = std::move(itsAnswer.first->second);
sendAndBlockAnswers_.erase(itsAnswer.first);
}

sendAndBlockAnswers_.erase(itsAnswer.first);

return itsResult;
}

Expand Down Expand Up @@ -736,7 +736,7 @@ Connection::isAvailable(const Address &_address) {
}
{
bool availabilityCalled(false);
std::lock_guard<std::mutex> itsLock(availabilityCalledMutex_);
std::lock_guard<std::mutex> itsLock(availabilityMutex_);
auto its_service = availabilityCalled_.find(_address.getService());
if (its_service != availabilityCalled_.end()) {
auto its_instance = its_service->second.find(_address.getInstance());
Expand Down
68 changes: 29 additions & 39 deletions src/CommonAPI/SomeIP/InputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,19 @@ InputStream& InputStream::readValue(uint32_t &_value, const uint8_t &_width, con
case 1:
{
uint8_t temp;
_readBitValue(temp, 8, false);
errorOccurred_ = _readBitValue(temp, 8, false);
_value = temp;
}
break;
case 2:
{
uint16_t temp;
_readBitValue(temp, 16, false);
errorOccurred_ = _readBitValue(temp, 16, false);
_value = temp;
}
break;
case 4:
_readBitValue(_value, 32, false);
errorOccurred_ = _readBitValue(_value, 32, false);
break;
default:
errorOccurred_ = true;
Expand Down Expand Up @@ -244,46 +244,36 @@ InputStream& InputStream::readValue(std::string &_value, const StringDeployment
size_t length = 0;

if (encoder->checkBom(data, itsSize, _depl->stringEncoding_)) {
switch (_depl->stringEncoding_)
{
case StringEncoding::UTF16BE:
while (itsSize > 1 && (data[itsSize - 1] != 0x00 || data[itsSize - 2] != 0x00))
itsSize--;

if (itsSize % 2 != 0) {
errorOccurred_ = true;
}

if(!hasError()) {
encoder->utf16To8((byte_t *) data, BIG_ENDIAN, itsSize - 2, status, &bytes, length);
const StringEncoding encoding = _depl->stringEncoding_;
if (encoding == StringEncoding::UTF8) {
if (data[itsSize - 1] != 0x00) {
errorOccurred_ = true;
} else {
status = EncodingStatus::SUCCESS;
}

bytes = (byte_t *) data;
} else { // UTF16BE + UTF16LE
while (itsSize > 1 && (data[itsSize - 1] != 0x00 || data[itsSize - 2] != 0x00))
itsSize--;

if (itsSize % 2 != 0) {
errorOccurred_ = true;
}

if(!hasError()) {
const int endianess = (encoding == StringEncoding::UTF16BE) ? BIG_ENDIAN : LITTLE_ENDIAN;
encoder->utf16To8((byte_t *) data, endianess, itsSize - 2, status, &bytes, length);
if (length) {
itsSize = static_cast<uint32_t>(length);
}
break;

case StringEncoding::UTF16LE:
while (itsSize > 1 && (data[itsSize - 1] != 0x00 || data[itsSize - 2] != 0x00))
itsSize--;

if (itsSize % 2 != 0) {
status = EncodingStatus::SUCCESS;
} else {
errorOccurred_ = true;
delete[] bytes;
bytes = NULL;
}

if(!hasError()) {
encoder->utf16To8((byte_t *) data, LITTLE_ENDIAN, itsSize - 2, status, &bytes, length);
itsSize = static_cast<uint32_t>(length);
}
break;

default:
if (data[itsSize - 1] != 0x00) {
errorOccurred_ = true;
}

bytes = (byte_t *) data;
break;
}
}

status = EncodingStatus::SUCCESS;
} else {
status = EncodingStatus::INVALID_BOM;
}
Expand Down
Loading

0 comments on commit c262d86

Please sign in to comment.