Skip to content

Commit 8180c6e

Browse files
authored
Merge pull request #33729 from teskje/source-exports-running
2 parents fadb0de + ad5c672 commit 8180c6e

File tree

8 files changed

+532
-236
lines changed

8 files changed

+532
-236
lines changed

src/catalog/src/builtin.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4531,17 +4531,6 @@ pub static MZ_SOURCE_STATUSES: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinV
45314531
self_events.status <> 'ceased' AND
45324532
parent_events.status = 'stalled'
45334533
THEN parent_events.source_id
4534-
-- TODO: Remove this once subsources eagerly propogate their status
4535-
-- Subsources move from starting to running lazily once they see
4536-
-- a record flow through, even though they are online and healthy.
4537-
-- This has been repeatedly brought up as confusing by users.
4538-
-- So now, if the parent source is running, and the subsource is
4539-
-- starting, we override its status to running to relfect its healthy
4540-
-- status.
4541-
WHEN
4542-
self_events.status = 'starting' AND
4543-
parent_events.status = 'running'
4544-
THEN parent_events.source_id
45454534
ELSE self_events.source_id
45464535
END AS id_to_use
45474536
FROM

src/storage/src/source/generator.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
243243
let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
244244

245245
let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
246-
let partition_count = u64::cast_from(config.source_exports.len());
246+
let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
247+
let partition_count = u64::cast_from(export_ids.len());
247248
let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
248249
partition_count,
249250
|((output, data), time, diff): &(
@@ -256,7 +257,7 @@ fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
256257
},
257258
);
258259
let mut data_collections = BTreeMap::new();
259-
for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
260+
for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
260261
data_collections.insert(*id, data_stream.as_collection());
261262
}
262263

@@ -373,14 +374,17 @@ fn render_simple_generator<G: Scope<Timestamp = MzOffset>>(
373374
Event::Progress(Some(offset)) => {
374375
if resume_offset <= offset && health_cap.is_some() {
375376
let health_cap = health_cap.take().expect("known to exist");
376-
health_output.give(
377-
&health_cap,
378-
HealthStatusMessage {
379-
id: None,
380-
namespace: StatusNamespace::Generator,
381-
update: HealthStatusUpdate::running(),
382-
},
383-
);
377+
let export_ids = export_ids.iter().copied();
378+
for id in export_ids.map(Some).chain(None) {
379+
health_output.give(
380+
&health_cap,
381+
HealthStatusMessage {
382+
id,
383+
namespace: StatusNamespace::Generator,
384+
update: HealthStatusUpdate::running(),
385+
},
386+
);
387+
}
384388
}
385389

386390
// If we've reached the requested maximum offset, cease.

src/storage/src/source/generator/key_value.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ pub fn render<G: Scope<Timestamp = MzOffset>>(
5656
let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone());
5757

5858
let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
59-
let partition_count = u64::cast_from(config.source_exports.len());
59+
let export_ids: Vec<_> = config.source_exports.keys().copied().collect();
60+
let partition_count = u64::cast_from(export_ids.len());
6061
let data_streams: Vec<_> = stream.partition::<CapacityContainerBuilder<_>, _, _>(
6162
partition_count,
6263
|((output, data), time, diff): &(
@@ -69,7 +70,7 @@ pub fn render<G: Scope<Timestamp = MzOffset>>(
6970
},
7071
);
7172
let mut data_collections = BTreeMap::new();
72-
for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
73+
for (id, data_stream) in export_ids.iter().zip_eq(data_streams) {
7374
data_collections.insert(*id, data_stream.as_collection());
7475
}
7576

@@ -252,12 +253,17 @@ pub fn render<G: Scope<Timestamp = MzOffset>>(
252253
})
253254
});
254255

255-
let status = [HealthStatusMessage {
256-
id: None,
257-
namespace: StatusNamespace::Generator,
258-
update: HealthStatusUpdate::running(),
259-
}]
260-
.to_stream(scope);
256+
let status = export_ids
257+
.into_iter()
258+
.map(Some)
259+
.chain(None)
260+
.map(|id| HealthStatusMessage {
261+
id,
262+
namespace: StatusNamespace::Generator,
263+
update: HealthStatusUpdate::running(),
264+
})
265+
.collect::<Vec<_>>()
266+
.to_stream(scope);
261267
(
262268
data_collections,
263269
progress_stream,

src/storage/src/source/mysql.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,17 @@ impl SourceRender for MySqlSourceConnection {
196196
data_collections.insert(*id, data_stream.as_collection());
197197
}
198198

199-
let health_init = std::iter::once(HealthStatusMessage {
200-
id: None,
201-
namespace: Self::STATUS_NAMESPACE,
202-
update: HealthStatusUpdate::Running,
203-
})
204-
.to_stream(scope);
199+
let export_ids = config.source_exports.keys().copied();
200+
let health_init = export_ids
201+
.map(Some)
202+
.chain(None)
203+
.map(|id| HealthStatusMessage {
204+
id,
205+
namespace: Self::STATUS_NAMESPACE,
206+
update: HealthStatusUpdate::Running,
207+
})
208+
.collect::<Vec<_>>()
209+
.to_stream(scope);
205210

206211
let health_errs = snapshot_err
207212
.concat(&repl_err)

src/storage/src/source/postgres.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,17 @@ impl SourceRender for PostgresSourceConnection {
214214
data_collections.insert(*id, data_stream.as_collection());
215215
}
216216

217-
let init = std::iter::once(HealthStatusMessage {
218-
id: None,
219-
namespace: Self::STATUS_NAMESPACE,
220-
update: HealthStatusUpdate::Running,
221-
})
222-
.to_stream(scope);
217+
let export_ids = config.source_exports.keys().copied();
218+
let health_init = export_ids
219+
.map(Some)
220+
.chain(None)
221+
.map(|id| HealthStatusMessage {
222+
id,
223+
namespace: Self::STATUS_NAMESPACE,
224+
update: HealthStatusUpdate::Running,
225+
})
226+
.collect::<Vec<_>>()
227+
.to_stream(scope);
223228

224229
// N.B. Note that we don't check ssh tunnel statuses here. We could, but immediately on
225230
// restart we are going to set the status to an ssh error correctly, so we don't do this
@@ -249,7 +254,7 @@ impl SourceRender for PostgresSourceConnection {
249254
}
250255
});
251256

252-
let health = init.concat(&errs);
257+
let health = health_init.concat(&errs);
253258

254259
(
255260
data_collections,

src/storage/src/source/source_reader_pipeline.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -365,30 +365,6 @@ where
365365

366366
while let Some((cap, data)) = input.next() {
367367
for (message, _, _) in data.iter() {
368-
let status = match &message {
369-
Ok(_) => HealthStatusUpdate::running(),
370-
// All errors coming into the data stream are definite.
371-
// Downstream consumers of this data will preserve this
372-
// status.
373-
Err(error) => HealthStatusUpdate::stalled(
374-
error.to_string(),
375-
Some(
376-
"retracting the errored value may resume the source"
377-
.to_string(),
378-
),
379-
),
380-
};
381-
382-
let status = HealthStatusMessage {
383-
id: Some(id),
384-
namespace: C::STATUS_NAMESPACE.clone(),
385-
update: status,
386-
};
387-
if last_status.as_ref() != Some(&status) {
388-
last_status = Some(status.clone());
389-
health_output.session(&health_cap).give(status);
390-
}
391-
392368
match message {
393369
Ok(message) => {
394370
source_statistics.inc_messages_received_by(1);
@@ -397,7 +373,27 @@ where
397373
bytes_read_counter.inc_by(key_len + value_len);
398374
source_statistics.inc_bytes_received_by(key_len + value_len);
399375
}
400-
Err(_) => {}
376+
Err(error) => {
377+
// All errors coming into the data stream are definite.
378+
// Downstream consumers of this data will preserve this
379+
// status.
380+
let update = HealthStatusUpdate::stalled(
381+
error.to_string(),
382+
Some(
383+
"retracting the errored value may resume the source"
384+
.to_string(),
385+
),
386+
);
387+
let status = HealthStatusMessage {
388+
id: Some(id),
389+
namespace: C::STATUS_NAMESPACE.clone(),
390+
update,
391+
};
392+
if last_status.as_ref() != Some(&status) {
393+
last_status = Some(status.clone());
394+
health_output.session(&health_cap).give(status);
395+
}
396+
}
401397
}
402398
}
403399
let mut output = output.activate();

src/storage/src/source/sql_server.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,18 @@ impl SourceRender for SqlServerSourceConnection {
189189
data_collections.insert(*id, data_stream.as_collection());
190190
}
191191

192-
let health_init = std::iter::once(HealthStatusMessage {
193-
id: None,
194-
namespace: Self::STATUS_NAMESPACE,
195-
update: HealthStatusUpdate::Running,
196-
})
197-
.to_stream(scope);
192+
let export_ids = config.source_exports.keys().copied();
193+
let health_init = export_ids
194+
.map(Some)
195+
.chain(None)
196+
.map(|id| HealthStatusMessage {
197+
id,
198+
namespace: Self::STATUS_NAMESPACE,
199+
update: HealthStatusUpdate::Running,
200+
})
201+
.collect::<Vec<_>>()
202+
.to_stream(scope);
203+
198204
let health_errs = repl_errs.concat(&progress_errs).map(move |err| {
199205
// This update will cause the dataflow to restart
200206
let err_string = err.display_with_causes().to_string();

0 commit comments

Comments
 (0)