Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/fly_qgc_mission/fly_qgc_mission.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ using std::this_thread::sleep_for;

void usage(const std::string& bin_name)
{
std::cerr << "Usage : " << bin_name << " <connection_url>\n"
std::cerr << "Usage : " << bin_name << " <connection_url> <mission_plan_file>\n"
<< "Connection URL format should be :\n"
<< " For TCP server: tcpin://<our_ip>:<port>\n"
<< " For TCP client: tcpout://<remote_ip>:<port>\n"
Expand Down
2 changes: 1 addition & 1 deletion examples/logfile_download/logfile_download.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void usage(const std::string& bin_name)

int main(int argc, char** argv)
{
if (argc > 3) {
if (argc < 3) {
usage(argv[0]);
return 1;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/set_actuator/set_actuator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using namespace mavsdk;

void usage(const std::string& bin_name)
{
std::cerr << "Usage : " << bin_name << " <connection_url>\n"
std::cerr << "Usage : " << bin_name << " <connection_url> <index> <value>\n"
<< "Connection URL format should be :\n"
<< " For TCP server: tcpin://<our_ip>:<port>\n"
<< " For TCP client: tcpout://<remote_ip>:<port>\n"
Expand Down
207 changes: 173 additions & 34 deletions src/mavsdk/plugins/log_files/log_files_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void LogFilesImpl::get_entries_async(LogFiles::GetEntriesCallback callback)
_log_entries.clear();
_entries_user_callback = callback;
_total_entries = 0;
_entries_retry_count = 0;

_entries_timeout_cookie = _system_impl->register_timeout_handler(
[this]() { entries_timeout(); }, _system_impl->timeout_s() * 10.0);
Expand Down Expand Up @@ -170,6 +171,13 @@ void LogFilesImpl::process_log_entry(const mavlink_message_t& message)
return;
}

// Initialize vector if this is the first entry
if (_total_entries != msg.num_logs) {
_total_entries = msg.num_logs;
_log_entries.clear();
_log_entries.resize(_total_entries);
}

LogFiles::Entry new_entry;
new_entry.id = msg.id;

Expand All @@ -181,37 +189,74 @@ void LogFilesImpl::process_log_entry(const mavlink_message_t& message)
new_entry.date = buf;
new_entry.size_bytes = msg.size;

_log_entries[new_entry.id] = new_entry;
_total_entries = msg.num_logs;
// Store entry at direct index
_log_entries[msg.id] = new_entry;

// Check if all entries are received
if (_log_entries.size() == _total_entries) {
bool all_received =
std::all_of(_log_entries.begin(), _log_entries.end(), [](const auto& entry_opt) {
return entry_opt.has_value();
});

if (all_received) {
_system_impl->unregister_timeout_handler(_entries_timeout_cookie);

// Copy map entries into list to return
std::vector<LogFiles::Entry> entry_list{};
for (unsigned i = 0; i < _log_entries.size(); i++) {
entry_list.push_back(_log_entries[i]);
}
// Build result list from vector (safe since all entries are present)
std::vector<LogFiles::Entry> entry_list;
entry_list.reserve(_log_entries.size());
std::transform(
_log_entries.begin(),
_log_entries.end(),
std::back_inserter(entry_list),
[](const auto& entry_opt) { return entry_opt.value(); });

const auto cb = _entries_user_callback;
if (cb) {
_system_impl->call_user_callback(
[cb, entry_list]() { cb(LogFiles::Result::Success, entry_list); });
}
} else {
// Check for missing entries when we might have all entries
// This handles: receiving the last expected entry AND receiving retried entries that might
// complete the set
if (_total_entries > 0) {
const uint32_t expected_last_entry_id = _total_entries - 1;
// Check if this could be the last entry we need (either the expected last one, or a
// retried one)
if (msg.id == expected_last_entry_id ||
_log_entries[expected_last_entry_id].has_value()) {
check_and_request_missing_entries();
}
}
}
}

void LogFilesImpl::entries_timeout()
{
std::lock_guard<std::mutex> lock(_entries_mutex);

LogDebug() << "Request entries timeout! Retry count: " << _entries_retry_count;

constexpr uint32_t MAX_RETRIES = 5;

if (_entries_retry_count < MAX_RETRIES) {
// Try to request missing entries
check_and_request_missing_entries();
_entries_retry_count++;

// Reset timeout for another attempt (use normal timeout, not * 10.0)
_entries_timeout_cookie = _system_impl->register_timeout_handler(
[this]() { entries_timeout(); }, _system_impl->timeout_s());

// Don't call user callback yet - give it another chance
return;
}

// Max retries exceeded - give up and call user with timeout error
const auto cb = _entries_user_callback;
if (cb) {
_system_impl->call_user_callback([cb]() {
LogDebug() << "Request entries timeout!";
cb(LogFiles::Result::Timeout, std::vector<LogFiles::Entry>());
});
_system_impl->call_user_callback(
[cb]() { cb(LogFiles::Result::Timeout, std::vector<LogFiles::Entry>()); });
}
}

Expand All @@ -237,9 +282,8 @@ void LogFilesImpl::download_log_file_async(
{
std::lock_guard<std::mutex> lock(_entries_mutex);

auto entry_it = _log_entries.find(entry.id);
bool error = entry_it == _log_entries.end() || fs::is_directory(fs::path(file_path)) ||
fs::exists(file_path);
bool error = entry.id >= _log_entries.size() || !_log_entries[entry.id].has_value() ||
fs::is_directory(fs::path(file_path)) || fs::exists(file_path);

if (error) {
LogErr() << "error: download_log_file_async failed";
Expand All @@ -252,7 +296,21 @@ void LogFilesImpl::download_log_file_async(
return;
}

_download_data = LogData(entry_it->second, file_path, callback);
const auto& found_entry = _log_entries[entry.id].value();

// Check for zero-sized file and abort early
if (found_entry.size_bytes == 0) {
LogErr() << "Cannot download zero-sized log file";

if (callback) {
_system_impl->call_user_callback([callback]() {
callback(LogFiles::Result::InvalidArgument, LogFiles::ProgressData());
});
}
return;
}

_download_data = LogData(found_entry, file_path, callback);

if (!_download_data.file_is_open()) {
if (callback) {
Expand All @@ -264,7 +322,7 @@ void LogFilesImpl::download_log_file_async(
}

_download_data.timeout_cookie = _system_impl->register_timeout_handler(
[this]() { LogFilesImpl::data_timeout(); }, _system_impl->timeout_s() * 1.0);
[this]() { LogFilesImpl::data_timeout(); }, _system_impl->timeout_s());

// Request the first chunk
request_log_data(_download_data.entry.id, 0, _download_data.current_chunk_size());
Expand Down Expand Up @@ -329,20 +387,23 @@ void LogFilesImpl::process_log_data(const mavlink_message_t& message)

_download_data.file.write((char*)msg.data, msg.count);

// Quietly ignore duplicate packets -- we don't want to record the bytes_written twice
if (!_download_data.chunk_bin_table[bin]) {
_download_data.chunk_bytes_written += msg.count;
_download_data.chunk_bin_table[bin] = true;
}
// Mark bin as received (quietly ignore duplicates)
_download_data.chunk_bin_table[bin] = true;

bool chunk_complete = _download_data.chunk_bytes_written == _download_data.current_chunk_size();
// Check if all bins in the chunk have been received
const uint32_t bins_in_chunk = _download_data.bins_in_chunk();
bool chunk_complete = std::all_of(
_download_data.chunk_bin_table.begin(),
_download_data.chunk_bin_table.begin() + bins_in_chunk,
[](bool received) { return received; });

if (chunk_complete) {
uint32_t chunk_bytes = _download_data.current_chunk_size();

auto result = LogFiles::Result::Next;

_download_data.current_chunk++;
_download_data.total_bytes_written += _download_data.chunk_bytes_written;
_download_data.chunk_bytes_written = 0;
_download_data.total_bytes_written += chunk_bytes;
_download_data.chunk_bin_table = std::vector<bool>(_download_data.bins_in_chunk(), false);

bool log_complete = _download_data.total_bytes_written == _download_data.entry.size_bytes;
Expand Down Expand Up @@ -370,6 +431,18 @@ void LogFilesImpl::process_log_data(const mavlink_message_t& message)
_system_impl->call_user_callback(
[cb, progress_data, result]() { cb(result, progress_data); });
}
} else {
// Check for missing bins when we might have all bins for this chunk
// This handles: receiving the last expected bin AND receiving retried bins that might
// complete the chunk
if (bins_in_chunk > 0) {
const uint32_t expected_last_bin = bins_in_chunk - 1;
// Check if this could be the last bin we need (either the expected last one, or a
// retried one)
if (bin == expected_last_bin || _download_data.chunk_bin_table[expected_last_bin]) {
check_and_request_missing_bins();
}
}
}
}

Expand All @@ -381,17 +454,83 @@ void LogFilesImpl::data_timeout()
LogErr() << "Requesting missing chunk:\t" << _download_data.current_chunk << "/"
<< _download_data.total_chunks();

// Reset chunk data
_download_data.chunk_bytes_written = 0;
_download_data.chunk_bin_table = std::vector<bool>(_download_data.bins_in_chunk(), false);

request_log_data(
_download_data.entry.id,
_download_data.current_chunk * CHUNK_SIZE,
_download_data.current_chunk_size());
// Don't reset chunk data - preserve what we've received
// Instead, request missing ranges based on bin table
check_and_request_missing_bins();

_download_data.timeout_cookie = _system_impl->register_timeout_handler(
[this]() { LogFilesImpl::data_timeout(); }, _system_impl->timeout_s() * 1.0);
[this]() { LogFilesImpl::data_timeout(); }, _system_impl->timeout_s());
}

void LogFilesImpl::check_and_request_missing_bins()
{
// Note: This function assumes _download_data_mutex is already locked by caller

// Find missing ranges and request the first one
const uint32_t chunk_start = _download_data.current_chunk * CHUNK_SIZE;
const uint32_t bins_in_chunk = _download_data.bins_in_chunk();

uint32_t range_start = 0;
bool in_missing_range = false;
bool requested_something = false;

for (uint32_t bin = 0; bin <= bins_in_chunk; ++bin) {
bool is_missing = (bin < bins_in_chunk) ? !_download_data.chunk_bin_table[bin] : false;

if (is_missing && !in_missing_range) {
// Start of a missing range
range_start = bin;
in_missing_range = true;
} else if (!is_missing && in_missing_range) {
// End of a missing range, request it (but only request the first one)
if (!requested_something) {
const uint32_t missing_start =
chunk_start + (range_start * MAVLINK_MSG_LOG_DATA_FIELD_DATA_LEN);
const uint32_t missing_count =
(bin - range_start) * MAVLINK_MSG_LOG_DATA_FIELD_DATA_LEN;

LogDebug() << "Requesting missing range from offset " << missing_start << " count "
<< missing_count;
request_log_data(_download_data.entry.id, missing_start, missing_count);
requested_something = true;
}
in_missing_range = false;
}
}
}

void LogFilesImpl::check_and_request_missing_entries()
{
// Note: This function assumes _entries_mutex is already locked by caller

// Find missing entry ranges and request them
std::vector<uint16_t> missing_ids;
for (uint16_t i = 0; i < _log_entries.size(); ++i) {
if (!_log_entries[i].has_value()) {
missing_ids.push_back(i);
}
}

if (missing_ids.empty()) {
return;
}

// Group consecutive missing IDs into ranges for efficient requests
// For now, request the first missing range (similar to bins approach)
uint16_t range_start = missing_ids[0];
uint16_t range_end = range_start;

// Find end of first consecutive range
for (size_t i = 1; i < missing_ids.size(); ++i) {
if (missing_ids[i] == missing_ids[i - 1] + 1) {
range_end = missing_ids[i];
} else {
break;
}
}

LogDebug() << "Requesting missing log entries from " << range_start << " to " << range_end;
request_log_list(range_start, range_end);
}

void LogFilesImpl::request_log_data(unsigned id, unsigned start, unsigned count)
Expand Down
11 changes: 8 additions & 3 deletions src/mavsdk/plugins/log_files/log_files_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
#include "plugin_impl_base.h"
#include "system.h"
#include <fstream>
#include <optional>

namespace mavsdk {

static constexpr uint32_t TABLE_BINS = 512;
// We need to keep this smaller, otherwise we end up running over the UDP buffer in Linux when PX4
// sends too much all at once.
static constexpr uint32_t TABLE_BINS = 128;
static constexpr uint32_t CHUNK_SIZE = (TABLE_BINS * MAVLINK_MSG_LOG_DATA_FIELD_DATA_LEN);

struct LogData {
Expand All @@ -31,7 +34,6 @@ struct LogData {
uint32_t current_chunk{};
std::vector<bool> chunk_bin_table{};

uint32_t chunk_bytes_written{};
uint32_t total_bytes_written{};

TimeoutHandler::Cookie timeout_cookie{};
Expand Down Expand Up @@ -70,15 +72,18 @@ class LogFilesImpl : public PluginImplBase {

void process_log_data(const mavlink_message_t& message);
void data_timeout();
void check_and_request_missing_bins();
void check_and_request_missing_entries();

void request_log_list(uint16_t index_min, uint16_t index_max);
void request_log_data(unsigned id, unsigned start, unsigned count);

void request_end();

std::mutex _entries_mutex;
std::unordered_map<uint16_t, LogFiles::Entry> _log_entries;
std::vector<std::optional<LogFiles::Entry>> _log_entries;
uint32_t _total_entries{0};
uint32_t _entries_retry_count{0};
TimeoutHandler::Cookie _entries_timeout_cookie{};
LogFiles::GetEntriesCallback _entries_user_callback{};

Expand Down
Loading