Skip to content

Commit e5e29c2

Browse files
kardymondsyumkamCopilotGrigoriyPA
authored
YQ-4735 Shared reading: skipping json errors (#27401)
Co-authored-by: Yuriy Kaminskiy <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: Pisarenko Grigoriy <[email protected]>
1 parent a22d8b8 commit e5e29c2

File tree

9 files changed

+437
-57
lines changed

9 files changed

+437
-57
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ message TJsonParserConfig {
2323
uint64 BatchSizeBytes = 1; // default 1 MiB
2424
uint64 BatchCreationTimeoutMs = 2;
2525
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
26+
bool SkipErrors = 4;
2627
}
2728

2829
message TCompileServiceConfig {

ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace NFq {
99

1010
TRowDispatcherSettings::TJsonParserSettings::TJsonParserSettings(const NConfig::TJsonParserConfig& config)
1111
: BatchCreationTimeout(TDuration::MilliSeconds(config.GetBatchCreationTimeoutMs()))
12+
, SkipErrors(config.GetSkipErrors())
1213
{
1314
if (config.GetBatchSizeBytes()) {
1415
BatchSizeBytes = config.GetBatchSizeBytes();

ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class TRowDispatcherSettings {
3333
YDB_ACCESSOR(ui64, BatchSizeBytes, 1_MB);
3434
YDB_ACCESSOR(TDuration, BatchCreationTimeout, TDuration::Seconds(1));
3535
YDB_ACCESSOR(ui64, BufferCellCount, 1000'000);
36+
YDB_ACCESSOR(bool, SkipErrors, false);
3637
};
3738

3839
class TCompileServiceSettings {

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp

Lines changed: 220 additions & 43 deletions
Large diffs are not rendered by default.

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct TJsonParserConfig {
1717
ui64 BatchSize = 1_MB;
1818
TDuration LatencyLimit;
1919
ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit
20+
bool SkipErrors = false;
2021
};
2122

2223
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters);

ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ void CheckSuccess(const TStatus& status) {
247247
}
248248

249249
void CheckError(const TStatus& status, TStatusCode expectedStatusCode, const TString& expectedMessage) {
250-
UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, "Expected error status " << NYql::NDqProto::StatusIds_StatusCode_Name(expectedStatusCode) << ", but got: " << status.GetErrorMessage());
250+
UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, "Expected error status " << NYql::NDqProto::StatusIds_StatusCode_Name(expectedStatusCode) << ", but got: " << NYql::NDqProto::StatusIds_StatusCode_Name(status.GetStatus()) << " ("<< status.GetErrorMessage() << ")");
251251
UNIT_ASSERT_STRING_CONTAINS_C(status.GetErrorMessage(), expectedMessage, "Unexpected error message, Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(status.GetStatus()));
252252
}
253253

ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ class TBaseParserFixture : public TBaseFixture {
1818
using TPtr = TIntrusivePtr<TParsedDataConsumer>;
1919

2020
public:
21-
TParsedDataConsumer(const TBaseParserFixture& self, const TVector<TSchemaColumn>& columns, TCallback callback)
21+
TParsedDataConsumer(const TBaseParserFixture& self, const TVector<TSchemaColumn>& columns, TCallback callback, bool checkOffsets = true)
2222
: Self(self)
2323
, Columns(columns)
2424
, Callback(callback)
25+
, CheckOffsets(checkOffsets)
2526
{}
2627

2728
void ExpectColumnError(ui64 columnId, TStatusCode statusCode, const TString& message) {
@@ -55,9 +56,11 @@ class TBaseParserFixture : public TBaseFixture {
5556

5657
const auto& offsets = Self.Parser->GetOffsets();
5758
UNIT_ASSERT_VALUES_EQUAL_C(offsets.size(), numberRows, "Unexpected offsets size");
58-
for (const ui64 offset : offsets) {
59-
UNIT_ASSERT_VALUES_EQUAL_C(offset, CurrentOffset, "Unexpected offset");
60-
CurrentOffset++;
59+
if (CheckOffsets) {
60+
for (const ui64 offset : offsets) {
61+
UNIT_ASSERT_VALUES_EQUAL_C(offset, CurrentOffset, "Unexpected offset");
62+
CurrentOffset++;
63+
}
6164
}
6265

6366
TVector<std::span<NYql::NUdf::TUnboxedValue>> result(Columns.size());
@@ -80,6 +83,7 @@ class TBaseParserFixture : public TBaseFixture {
8083
const TBaseParserFixture& Self;
8184
const TVector<TSchemaColumn> Columns;
8285
const TCallback Callback;
86+
const bool CheckOffsets;
8387

8488
std::optional<std::pair<TStatusCode, TString>> ExpectedCommonError;
8589
std::unordered_map<ui64, std::pair<TStatusCode, TString>> ExpectedErrors;
@@ -97,8 +101,8 @@ class TBaseParserFixture : public TBaseFixture {
97101
}
98102

99103
public:
100-
TStatus MakeParser(TVector<TSchemaColumn> columns, TCallback callback) {
101-
ParserHandler = MakeIntrusive<TParsedDataConsumer>(*this, columns, callback);
104+
TStatus MakeParser(TVector<TSchemaColumn> columns, TCallback callback, bool checkOffsets = true) {
105+
ParserHandler = MakeIntrusive<TParsedDataConsumer>(*this, columns, callback, checkOffsets);
102106

103107
auto parserStatus = CreateParser();
104108
if (parserStatus.IsFail()) {
@@ -109,12 +113,12 @@ class TBaseParserFixture : public TBaseFixture {
109113
return TStatus::Success();
110114
}
111115

112-
TStatus MakeParser(TVector<TString> columnNames, TString columnType, TCallback callback) {
116+
TStatus MakeParser(TVector<TString> columnNames, TString columnType, TCallback callback, bool checkOffsets = true) {
113117
TVector<TSchemaColumn> columns;
114118
for (const auto& columnName : columnNames) {
115119
columns.push_back({.Name = columnName, .TypeYson = columnType});
116120
}
117-
return MakeParser(columns, callback);
121+
return MakeParser(columns, callback, checkOffsets);
118122
}
119123

120124
TStatus MakeParser(TVector<TString> columnNames, TString columnType) {
@@ -151,17 +155,19 @@ class TBaseParserFixture : public TBaseFixture {
151155
ui64 ExpectedBatches = 0;
152156
};
153157

154-
class TJsonParserFixture : public TBaseParserFixture {
158+
template<bool SkipErrors = false>
159+
class TJsonParserBaseFixture : public TBaseParserFixture {
155160
using TBase = TBaseParserFixture;
156161

157162
public:
158-
TJsonParserFixture()
163+
TJsonParserBaseFixture()
159164
: TBase()
160165
, Config({
161166
.FunctionRegistry = FunctionRegistry,
162167
.BatchSize = 1_MB,
163168
.LatencyLimit = TDuration::Zero(),
164-
.BufferCellCount = 1000
169+
.BufferCellCount = 1000,
170+
.SkipErrors = SkipErrors
165171
})
166172
{}
167173

@@ -174,6 +180,9 @@ class TJsonParserFixture : public TBaseParserFixture {
174180
TJsonParserConfig Config;
175181
};
176182

183+
using TJsonParserFixture = TJsonParserBaseFixture<false>;
184+
using TJsonParserFixtureSkipErrors = TJsonParserBaseFixture<true>;
185+
177186
class TRawParserFixture : public TBaseParserFixture {
178187
protected:
179188
TValueStatus<ITopicParser::TPtr> CreateParser() override {
@@ -476,6 +485,85 @@ Y_UNIT_TEST_SUITE(TestJsonParser) {
476485
CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {\"a1\": \"x\"} {\"a1\": \"y\"}");
477486
CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {");
478487
}
488+
489+
Y_UNIT_TEST_F(SkipErrors_Simple1, TJsonParserFixtureSkipErrors) {
490+
CheckSuccess(MakeParser({{"a1", "[DataType; String]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [](ui64 numberRows, TVector<std::span<NYql::NUdf::TUnboxedValue>> result) {
491+
UNIT_ASSERT_VALUES_EQUAL(1, numberRows);
492+
UNIT_ASSERT_VALUES_EQUAL(2, result.size());
493+
UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef()));
494+
UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get<ui64>());
495+
}));
496+
PushToParser(FIRST_OFFSET, R"({"a1": "hello1", "a2": 101, "event": "event1"})");
497+
}
498+
499+
Y_UNIT_TEST_F(SkipErrors_StringValidation, TJsonParserFixtureSkipErrors) {
500+
ExpectedBatches = 1;
501+
CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector<std::span<NYql::NUdf::TUnboxedValue>> result) {
502+
UNIT_ASSERT_VALUES_EQUAL(2, numberRows);
503+
UNIT_ASSERT_VALUES_EQUAL(2, result.size());
504+
for (size_t i = 0; i < numberRows; ++i) {
505+
UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i);
506+
UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i);
507+
}
508+
}, false));
509+
510+
Parser->ParseMessages({
511+
GetMessage(FIRST_OFFSET, R"({"a1": "hello1", "a2": "101", "event": "event1"})"),
512+
GetMessage(FIRST_OFFSET + 1, R"({"a1": "hello1", "a2": 999, "event": "event2"})"),
513+
GetMessage(FIRST_OFFSET + 2, R"({"a2": "101", "a1": "hello1", "event": "event3"})")
514+
});
515+
}
516+
517+
Y_UNIT_TEST_F(SkipErrors_NoField, TJsonParserFixtureSkipErrors) {
518+
ExpectedBatches = 1;
519+
CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector<std::span<NYql::NUdf::TUnboxedValue>> result) {
520+
UNIT_ASSERT_VALUES_EQUAL(1, numberRows);
521+
UNIT_ASSERT_VALUES_EQUAL(2, result.size());
522+
for (size_t i = 0; i < numberRows; ++i) {
523+
UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i);
524+
UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i);
525+
}
526+
}, false));
527+
528+
Parser->ParseMessages({
529+
GetMessage(FIRST_OFFSET, R"({"a1": "hello1", "event": "event1"})"),
530+
GetMessage(FIRST_OFFSET + 1, R"({"a1": "hello1", "a2": "101", "event": "event2"})")
531+
});
532+
}
533+
534+
Y_UNIT_TEST_F(SkipErrors_NoJson, TJsonParserFixtureSkipErrors) {
535+
ExpectedBatches = 1;
536+
CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector<std::span<NYql::NUdf::TUnboxedValue>> /*result*/) {
537+
UNIT_ASSERT_VALUES_EQUAL(2, numberRows);
538+
}, false));
539+
540+
Parser->ParseMessages({
541+
GetMessage(FIRST_OFFSET, R"({"a1": "hello0", "a2": "100"})"),
542+
GetMessage(FIRST_OFFSET + 1, "\x80"),
543+
GetMessage(FIRST_OFFSET + 2, R"(})"),
544+
GetMessage(FIRST_OFFSET + 3, R"(lalala)"),
545+
GetMessage(FIRST_OFFSET + 4, R"({"a2": "hello2", "a2": "102"})"),
546+
GetMessage(FIRST_OFFSET + 5, "\x80"),
547+
});
548+
}
549+
550+
Y_UNIT_TEST_F(SkipErrors_Optional, TJsonParserFixtureSkipErrors) {
551+
ExpectedBatches = 1;
552+
CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}, {"a2", "[OptionalType; [DataType; String]]"}}, [&](ui64 numberRows, TVector<std::span<NYql::NUdf::TUnboxedValue>> result) {
553+
UNIT_ASSERT_VALUES_EQUAL(2, numberRows);
554+
UNIT_ASSERT_VALUES_EQUAL(2, result.size());
555+
UNIT_ASSERT_VALUES_EQUAL("hello0", TString(result[0][0].AsStringRef()));
556+
UNIT_ASSERT_VALUES_EQUAL("100", TString(result[1][0].AsStringRef()));
557+
UNIT_ASSERT(!result[0][1]);
558+
UNIT_ASSERT_VALUES_EQUAL("102", TString(result[1][1].AsStringRef()));
559+
}, false));
560+
561+
Parser->ParseMessages({
562+
GetMessage(FIRST_OFFSET, R"({"a1": "hello0", "a2": "100"})"),
563+
GetMessage(FIRST_OFFSET + 1, R"({"a1": "hello1", "a2": 101})"),
564+
GetMessage(FIRST_OFFSET + 2, R"({"a2": "102"})")
565+
});
566+
}
479567
}
480568

481569
Y_UNIT_TEST_SUITE(TestRawParser) {

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ class TFixture : public NTests::TBaseFixture {
4949
RowDispatcherActorId = Runtime.AllocateEdgeActor();
5050
}
5151

52-
void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits<ui64>::max()) {
52+
void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits<ui64>::max(), bool skipErrors = false) {
5353
TopicPath = topicPath;
5454
Config.SetTimeoutBeforeStartSessionSec(TimeoutBeforeStartSessionSec);
5555
Config.SetMaxSessionUsedMemory(maxSessionUsedMemory);
5656
Config.SetSendStatusPeriodSec(2);
5757
Config.SetWithoutConsumer(false);
58+
Config.MutableJsonParser()->SetSkipErrors(skipErrors);
59+
Config.MutableJsonParser()->SetBatchCreationTimeoutMs(100);
5860

5961
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
6062
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
@@ -146,7 +148,7 @@ class TFixture : public NTests::TBaseFixture {
146148
Runtime.Send(new IEventHandle(TopicSession, readActorId, event.release()));
147149
}
148150

149-
void ExpectMessageBatch(NActors::TActorId readActorId, const TBatch& expected) {
151+
void ExpectMessageBatch(NActors::TActorId readActorId, const TBatch& expected, const std::vector<ui64>& expectedLastOffset = {}) {
150152
Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch()));
151153

152154
auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvMessageBatch>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
@@ -156,6 +158,12 @@ class TFixture : public NTests::TBaseFixture {
156158

157159
NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(0);
158160
UNIT_ASSERT_VALUES_EQUAL(message.OffsetsSize(), expected.Rows.size());
161+
if (!expectedLastOffset.empty()) {
162+
UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset.size(), message.OffsetsSize());
163+
for (size_t i =0; i < expectedLastOffset.size(); ++i) {
164+
UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset[i], message.GetOffsets().Get(i));
165+
}
166+
}
159167
CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expected);
160168
}
161169

@@ -619,6 +627,58 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
619627

620628
PassAway();
621629
}
630+
631+
Y_UNIT_TEST_F(WrongJson, TRealTopicFixture) {
632+
const TString topicName = "wrong_json";
633+
PQCreateStream(topicName);
634+
Init(topicName, std::numeric_limits<ui64>::max(), true);
635+
auto source = BuildSource();
636+
StartSession(ReadActorId1, source);
637+
638+
auto writeRead = [&](const std::vector<TString>& input, const TBatch& output) {
639+
PQWrite(input);
640+
if (output.Rows.empty()) {
641+
return;
642+
}
643+
ExpectNewDataArrived({ReadActorId1});
644+
ExpectMessageBatch(ReadActorId1, output);
645+
};
646+
647+
auto test = [&](const TString& wrongJson) {
648+
writeRead({ wrongJson }, { });
649+
Sleep(TDuration::MilliSeconds(100));
650+
writeRead({ Json1, wrongJson, Json3 }, { JsonMessage(1), JsonMessage(3) });
651+
writeRead({ wrongJson, Json2, Json3 }, { JsonMessage(2), JsonMessage(3) });
652+
writeRead({ Json1, Json2 , wrongJson }, { JsonMessage(1), JsonMessage(2) });
653+
writeRead({ Json1, wrongJson, wrongJson, Json3 }, { JsonMessage(1), JsonMessage(3) });
654+
};
655+
656+
test("wrong"); // not json
657+
test("{\"dt\":100,\"value\"}"); // empty value
658+
test("{\"dt\":100}"); // no field
659+
test("{\"dt\":400,\"value\":777}"); // wrong value type
660+
test("{}\x80");
661+
test("}");
662+
test("{");
663+
writeRead({ "{\"dt\":100}", "{}\x80", Json3 }, { JsonMessage(3) });
664+
writeRead({Json1 + Json1, Json3 }, { JsonMessage(1), JsonMessage(1) }); // not checked
665+
writeRead({Json1.substr(0, 3), Json1.substr(3), Json2, Json3 }, { JsonMessage(1), JsonMessage(2), JsonMessage(3) });
666+
PassAway();
667+
}
668+
669+
Y_UNIT_TEST_F(WrongJsonOffset, TRealTopicFixture) {
670+
const TString topicName = "wrong_json_offset";
671+
PQCreateStream(topicName);
672+
Init(topicName, std::numeric_limits<ui64>::max(), true);
673+
auto source = BuildSource();
674+
StartSession(ReadActorId1, source);
675+
676+
TString wrongJson{"wrong"};
677+
PQWrite({ Json1, wrongJson, wrongJson, Json3 });
678+
ExpectNewDataArrived({ReadActorId1});
679+
ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(3) }, {0, 3});
680+
PassAway();
681+
}
622682
}
623683

624684
} // namespace NFq::NRowDispatcher::NTests

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@
2525
COMPUTE_NODE_COUNT = 3
2626

2727

28+
class Param(object):
29+
def __init__(
30+
self,
31+
skip_errors=False,
32+
):
33+
self.skip_errors = skip_errors
34+
35+
2836
@pytest.fixture
2937
def kikimr(request):
3038
kikimr_conf = StreamingOverKikimrConfig(
@@ -33,6 +41,12 @@ def kikimr(request):
3341
kikimr = StreamingOverKikimr(kikimr_conf)
3442
kikimr.compute_plane.fq_config['row_dispatcher']['enabled'] = True
3543
kikimr.compute_plane.fq_config['row_dispatcher']['without_consumer'] = True
44+
kikimr.compute_plane.fq_config['row_dispatcher']['json_parser'] = {}
45+
46+
skip_errors = False
47+
if hasattr(request, "param") and isinstance(request.param, Param):
48+
skip_errors = request.param.skip_errors
49+
kikimr.compute_plane.fq_config['row_dispatcher']['json_parser']['skip_errors'] = skip_errors
3650
kikimr.start_mvp_mock_server()
3751
kikimr.start()
3852
yield kikimr
@@ -1175,3 +1189,40 @@ def test_huge_messages(self, kikimr, client):
11751189
assert "Failed to parse json message for offset" not in issues, "Incorrect Issues: " + issues
11761190

11771191
assert received == expected
1192+
1193+
@yq_v1
1194+
@pytest.mark.parametrize(
1195+
"kikimr", [Param(skip_errors=True)], indirect=["kikimr"]
1196+
)
1197+
def test_json_errors(self, kikimr, client):
1198+
self.init(client, "test_json_errors")
1199+
sql = Rf'''
1200+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
1201+
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
1202+
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));'''
1203+
1204+
query_id = start_yds_query(kikimr, client, sql)
1205+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
1206+
1207+
data = [
1208+
'{"time": 101, "data": "hello1"}',
1209+
'{"time": 102, "data": 7777}',
1210+
'{"time": 103, "data": "hello2"}'
1211+
]
1212+
1213+
self.write_stream(data)
1214+
expected = ['hello1', 'hello2']
1215+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
1216+
1217+
deadline = time.time() + 30
1218+
while True:
1219+
count = 0
1220+
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
1221+
value = kikimr.compute_plane.get_sensors(node_index, "yq").find_sensor(
1222+
{"subsystem": "row_dispatcher", "partition": "0", "format": "json_each_row", "sensor": "ParsingErrors"})
1223+
count += value if value is not None else 0
1224+
if count > 0:
1225+
break
1226+
assert time.time() < deadline, f"Waiting sensor ParsingErrors value failed, current count {count}"
1227+
time.sleep(1)
1228+
stop_yds_query(client, query_id)

0 commit comments

Comments
 (0)