Skip to content

bug 1847950 - Trigger upload after scanning pending pings #2621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* General
* BREAKING CHANGE: Adding `0` to a `counter` or `labeled_counter` metric will be silently ignored instead of raising an `invalid_value` error ([bug 1762859](https://bugzilla.mozilla.org/show_bug.cgi?id=1762859))
* Trigger the uploader thread after scanning the pending pings directory ([bug 1847950](https://bugzilla.mozilla.org/show_bug.cgi?id=1847950))

# v54.0.0 (2023-09-12)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import mozilla.telemetry.glean.delayMetricsPing
import mozilla.telemetry.glean.getContext
import mozilla.telemetry.glean.getMockWebServer
import mozilla.telemetry.glean.getPlainBody
import mozilla.telemetry.glean.getWorkerStatus
import mozilla.telemetry.glean.private.CommonMetricData
import mozilla.telemetry.glean.private.EventMetricType
import mozilla.telemetry.glean.private.Lifetime
Expand All @@ -25,7 +24,7 @@ import okhttp3.mockwebserver.MockWebServer
import org.json.JSONObject
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Rule
import org.junit.Test
Expand Down Expand Up @@ -176,10 +175,14 @@ class CustomPingTest {
val customPing = PingType<NoReasonCodes>(
name = pingName,
includeClientId = true,
sendIfEmpty = false,
sendIfEmpty = true,
reasonCodes = emptyList(),
)

// The PingUploadWorker might be queued for at-init reasons, so to ensure
// init didn't submit this custom ping we submit it deliberately only once,
// and assert that we didn't receive it twice.

// This is equivalent to a consumer calling `Glean.initialize` at startup
resetGlean(
context,
Expand All @@ -190,17 +193,15 @@ class CustomPingTest {
uploadEnabled = true,
)

// There should be no ping upload worker,
// because there is no ping to upload.
assertFalse(getWorkerStatus(context, PingUploadWorker.PING_WORKER_TAG).isEnqueued)

// But if the custom ping is specifically submitted,
// If the custom ping is specifically submitted,
// it should be received.
customPing.submit()
// Trigger work manager once.
// This should launch one worker that handles all pending pings.
triggerWorkManager(context)
var request = server.takeRequest(2L, TimeUnit.SECONDS)!!
// Assert we only got the one:
assertNull(server.takeRequest(2L, TimeUnit.SECONDS))
var docType = request.path!!.split("/")[3]
assertEquals(pingName, docType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ class PingTypeTest {
counter.add()
assertEquals(1, counter.testGetValue())

// We might have some work queued by init that we'll need to clear.
triggerWorkManager(context)

Glean.submitPingByName("unknown")

assertFalse(
Expand Down
10 changes: 8 additions & 2 deletions glean-core/python/tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def test_recording_upload_errors_doesnt_clobber_database(
tmpdir = Path(tmpdir)

Glean._reset()

# Force the ping upload worker into a separate process
monkeypatch.setattr(PingUploadWorker, "process", PingUploadWorker._process)

Glean.initialize(
application_id=GLEAN_APP_ID,
application_version=glean_version,
Expand All @@ -80,9 +84,11 @@ def test_recording_upload_errors_doesnt_clobber_database(

safe_httpserver.serve_content(b"", code=400)

# Force the ping upload worker into a separate process
monkeypatch.setattr(PingUploadWorker, "process", PingUploadWorker._process)
Glean._configuration._server_endpoint = safe_httpserver.url

# There might be an early ping upload process from init. Wait for it.
ProcessDispatcher._wait_for_last_process()

_builtins.pings.baseline.submit()
# `Ping.submit()` is async on the Rust dispatcher.
# We briefly wait to give it a chance to trigger.
Expand Down
5 changes: 3 additions & 2 deletions glean-core/rlb/examples/crashing-threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ mod unix {
// thread 3 = MPS
// thread 4 = uploader for first metrics ping <- this is the one we want to fail
// thread 5 = uploader for prototype ping <- this is the one we want to fail
// thread 6 = shutdown wait thread
// thread 6 = post-init uploader <- this needs to fail, too
// thread 7 = shutdown wait thread
let spawned = ALLOW_THREAD_SPAWNED.fetch_add(1, Ordering::SeqCst);
if spawned == 4 || spawned == 5 {
if spawned == 4 || spawned == 5 || spawned == 6 {
return -1;
}

Expand Down
4 changes: 2 additions & 2 deletions glean-core/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl Glean {
// We only scan the pending ping directories when calling this from a subprocess,
// when calling this from ::new we need to scan the directories after dealing with the upload state.
if scan_directories {
let _scanning_thread = upload_manager.scan_pending_pings_directories();
let _scanning_thread = upload_manager.scan_pending_pings_directories(false);
}

let start_time = local_now_with_offset();
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Glean {
// If upload is disabled, we delete all pending pings files
// and we need to do that **before** scanning the pending pings folder
// to ensure we don't enqueue pings before their files are deleted.
let _scanning_thread = glean.upload_manager.scan_pending_pings_directories();
let _scanning_thread = glean.upload_manager.scan_pending_pings_directories(true);

Ok(glean)
}
Expand Down
8 changes: 8 additions & 0 deletions glean-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ fn global_state() -> &'static Mutex<State> {
STATE.get().unwrap()
}

/// Attempt to get a reference to the global state object.
///
/// If it hasn't been set yet, we return None.
#[track_caller] // If this fails we're interested in the caller.
fn maybe_global_state() -> Option<&'static Mutex<State>> {
STATE.get()
}

/// Set or replace the global bindings State object.
fn setup_state(state: State) {
// The `OnceCell` type wrapping our state is thread-safe and can only be set once.
Expand Down
33 changes: 26 additions & 7 deletions glean-core/src/upload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,37 @@ impl PingUploadManager {
/// # Returns
///
/// The `JoinHandle` to the spawned thread
pub fn scan_pending_pings_directories(&self) -> std::thread::JoinHandle<()> {
pub fn scan_pending_pings_directories(
&self,
trigger_upload: bool,
) -> std::thread::JoinHandle<()> {
let local_manager = self.directory_manager.clone();
let local_cached_pings = self.cached_pings.clone();
let local_flag = self.processed_pending_pings.clone();
thread::Builder::new()
.name("glean.ping_directory_manager.process_dir".to_string())
.spawn(move || {
let mut local_cached_pings = local_cached_pings
.write()
.expect("Can't write to pending pings cache.");
local_cached_pings.extend(local_manager.process_dirs());
local_flag.store(true, Ordering::SeqCst);
{
// Be sure to drop local_cached_pings lock before triggering upload.
let mut local_cached_pings = local_cached_pings
.write()
.expect("Can't write to pending pings cache.");
local_cached_pings.extend(local_manager.process_dirs());
local_flag.store(true, Ordering::SeqCst);
}
if trigger_upload {
crate::dispatcher::launch(|| {
if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok())
{
if let Err(e) = state.callbacks.trigger_upload() {
log::error!(
"Triggering upload after pending ping scan failed. Error: {}",
e
);
}
}
});
}
})
.expect("Unable to spawn thread to process pings directories.")
}
Expand All @@ -280,7 +299,7 @@ impl PingUploadManager {

// When building for tests, always scan the pending pings directories and do it sync.
upload_manager
.scan_pending_pings_directories()
.scan_pending_pings_directories(false)
.join()
.unwrap();

Expand Down