Skip to content

Commit

Permalink
bug 1847950 - Trigger upload after scanning pending pings
Browse files Browse the repository at this point in the history
This requires changing the thread crashing test as we now launch another
uploader thread that needs to be crashed (and the indices of threads needing
crashing has changed).

Continues the questionable trend of calling Global STATE Callbacks from inside
glean-core specifically to trigger upload if possible.
  • Loading branch information
chutten committed Sep 25, 2023
1 parent c098e58 commit 3fe7b72
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* General
* BREAKING CHANGE: Adding `0` to a `counter` or `labeled_counter` metric will be silently ignored instead of raising an `invalid_value` error ([bug 1762859](https://bugzilla.mozilla.org/show_bug.cgi?id=1762859))
* Trigger the uploader thread after scanning the pending pings directory ([bug 1847950](https://bugzilla.mozilla.org/show_bug.cgi?id=1847950))

# v54.0.0 (2023-09-12)

Expand Down
6 changes: 4 additions & 2 deletions glean-core/python/tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def test_recording_upload_errors_doesnt_clobber_database(
tmpdir = Path(tmpdir)

Glean._reset()

# Force the ping upload worker into a separate process
monkeypatch.setattr(PingUploadWorker, "process", PingUploadWorker._process)

Glean.initialize(
application_id=GLEAN_APP_ID,
application_version=glean_version,
Expand All @@ -80,8 +84,6 @@ def test_recording_upload_errors_doesnt_clobber_database(

safe_httpserver.serve_content(b"", code=400)

# Force the ping upload worker into a separate process
monkeypatch.setattr(PingUploadWorker, "process", PingUploadWorker._process)
Glean._configuration._server_endpoint = safe_httpserver.url
_builtins.pings.baseline.submit()
# `Ping.submit()` is async on the Rust dispatcher.
Expand Down
11 changes: 6 additions & 5 deletions glean-core/rlb/examples/crashing-threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ mod unix {

// thread 1 = glean.initialize
// thread 2 = ping directory processor
// thread 3 = MPS
// thread 4 = uploader for first metrics ping <- this is the one we want to fail
// thread 5 = uploader for prototype ping <- this is the one we want to fail
// thread 6 = shutdown wait thread
// thread 3 = uploader for after ping directory processor <-- want to fail
// thread 4 = MPS
// thread 5 = uploader for first metrics ping <- this is the one we want to fail
// thread 6 = uploader for prototype ping <- this is the one we want to fail
// thread 7 = shutdown wait thread
let spawned = ALLOW_THREAD_SPAWNED.fetch_add(1, Ordering::SeqCst);
if spawned == 4 || spawned == 5 {
if spawned == 3 || spawned == 5 || spawned == 6 {
return -1;
}

Expand Down
4 changes: 2 additions & 2 deletions glean-core/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Glean {
// We only scan the pending ping directories when calling this from a subprocess,
// when calling this from ::new we need to scan the directories after dealing with the upload state.
if scan_directories {
let _scanning_thread = upload_manager.scan_pending_pings_directories();
let _scanning_thread = upload_manager.scan_pending_pings_directories(false);
}

let start_time = local_now_with_offset();
Expand Down Expand Up @@ -281,7 +281,7 @@ impl Glean {
// If upload is disabled, we delete all pending pings files
// and we need to do that **before** scanning the pending pings folder
// to ensure we don't enqueue pings before their files are deleted.
let _scanning_thread = glean.upload_manager.scan_pending_pings_directories();
let _scanning_thread = glean.upload_manager.scan_pending_pings_directories(true);

Ok(glean)
}
Expand Down
8 changes: 8 additions & 0 deletions glean-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ fn global_state() -> &'static Mutex<State> {
STATE.get().unwrap()
}

/// Attempt to get a reference to the global state object.
///
/// If it hasn't been set yet, we return None.
#[track_caller] // If this fails we're interested in the caller.
fn maybe_global_state() -> Option<&'static Mutex<State>> {
STATE.get()
}

/// Set or replace the global bindings State object.
fn setup_state(state: State) {
// The `OnceCell` type wrapping our state is thread-safe and can only be set once.
Expand Down
30 changes: 23 additions & 7 deletions glean-core/src/upload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,34 @@ impl PingUploadManager {
/// # Returns
///
/// The `JoinHandle` to the spawned thread
pub fn scan_pending_pings_directories(&self) -> std::thread::JoinHandle<()> {
pub fn scan_pending_pings_directories(
&self,
trigger_upload: bool,
) -> std::thread::JoinHandle<()> {
let local_manager = self.directory_manager.clone();
let local_cached_pings = self.cached_pings.clone();
let local_flag = self.processed_pending_pings.clone();
thread::Builder::new()
.name("glean.ping_directory_manager.process_dir".to_string())
.spawn(move || {
let mut local_cached_pings = local_cached_pings
.write()
.expect("Can't write to pending pings cache.");
local_cached_pings.extend(local_manager.process_dirs());
local_flag.store(true, Ordering::SeqCst);
{
// Be sure to drop local_cached_pings lock before triggering upload.
let mut local_cached_pings = local_cached_pings
.write()
.expect("Can't write to pending pings cache.");
local_cached_pings.extend(local_manager.process_dirs());
local_flag.store(true, Ordering::SeqCst);
}
if trigger_upload {
if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok()) {
if let Err(e) = state.callbacks.trigger_upload() {
log::error!(
"Triggering upload after pending ping scan failed. Error: {}",
e
);
}
}
}
})
.expect("Unable to spawn thread to process pings directories.")
}
Expand All @@ -280,7 +296,7 @@ impl PingUploadManager {

// When building for tests, always scan the pending pings directories and do it sync.
upload_manager
.scan_pending_pings_directories()
.scan_pending_pings_directories(false)
.join()
.unwrap();

Expand Down

0 comments on commit 3fe7b72

Please sign in to comment.