From 3e15b148b8abb6056ca73e90e69b740f80c3dcb7 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Jul 2025 02:48:22 +0300 Subject: [PATCH 1/6] row_dispatcher/simdjson: fix batch size --- .../format_handler/parsers/json_parser.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index f95c11387545..96de361906f7 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -419,8 +419,20 @@ class TJsonParser : public TTopicParserBase { const auto [values, size] = Buffer.Finish(); LOG_ROW_DISPATCHER_TRACE("Do parsing, first offset: " << Buffer.Offsets.front() << ", values:\n" << values); + /* + Batch size must be at least maximum of document size. + Since we are merging several messages before feeding them to + `simdjson::iterate_many`, we must specify Buffer.GetSize() as batch + size. + Suppose we batched two rows: + '{"a":"bbbbbbbbbbb"' + ',"c":"d"}{"e":"f"}' + (both 18 byte size) into buffer + '{"a":"bbbbbbbbbbb","c":"d"}{"e":"f"}' + Then, after parsing maximum document size will be 27 bytes. + */ simdjson::ondemand::document_stream documents; - CHECK_JSON_ERROR(Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE).get(documents)) { + CHECK_JSON_ERROR(Parser.iterate_many(values, size, Buffer.GetSize()).get(documents)) { return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); } From 7bb396cfb0dbcce62e49a95f0515e82e69c83435 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Jul 2025 03:14:05 +0300 Subject: [PATCH 2/6] simplify --- .../libs/row_dispatcher/format_handler/parsers/json_parser.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index 96de361906f7..4d9924e098e1 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -432,7 +432,7 @@ class TJsonParser : public TTopicParserBase { Then, after parsing maximum document size will be 27 bytes. */ simdjson::ondemand::document_stream documents; - CHECK_JSON_ERROR(Parser.iterate_many(values, size, Buffer.GetSize()).get(documents)) { + CHECK_JSON_ERROR(Parser.iterate_many(values, size, size).get(documents)) { return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); } From b323cc336683f8adebf3477a4f40bfcfc0410bd3 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Jul 2025 12:53:27 +0300 Subject: [PATCH 3/6] Add tests (fails with fix reverted) --- ydb/tests/fq/yds/test_row_dispatcher.py | 55 +++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 9860e67f7522..290abd953ba0 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1147,3 +1147,58 @@ def test_group_by_hop_restart_node(self, kikimr, client): stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) + + @yq_v1 + def test_huge_messages(self, kikimr, client): + client.create_yds_connection( + YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True + ) + self.init_topics("test_huge") + + sql = Rf''' + $data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, sql String NOT NULL, event String NOT NULL)) + WHERE event = "event1" or event = "event2"; + + $data = SELECT time, LENGTH(sql) AS len FROM $data; + + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $data; + ''' + + query_id = start_yds_query(kikimr, client, sql) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + + large_string = "abcdefghjkl1234567890" + huge_string = large_string*(1000*1000//len(large_string) + 2) + assert len(huge_string) > 1000*1000 + + data = [ + '{"time": 101, "sql":"' + large_string + '", "event": "event1"}', + '{"time": 102, "sql":"' + huge_string + '", "event": "event2"}', + '{"time": 103, "sql": "' + large_string + '", "event": "event3"}', + '{"time": 104, "sql": "' + huge_string + '", "event": "event4"}', + '{"time": 105, "sql":"' + large_string + '", "event": "event1"}', + '{"time": 106, "sql":"' + huge_string + '", "event": "event2"}', + ] + + self.write_stream(data) + expected = [ + f'{{"len":{len(large_string)},"time":101}}', + f'{{"len":{len(huge_string)},"time":102}}', + f'{{"len":{len(large_string)},"time":105}}', + f'{{"len":{len(huge_string)},"time":106}}', + ] + # TODO: normalize json and tolerate order mismatch + + received = self.read_stream(len(expected), topic_path=self.output_topic) + + # wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) + stop_yds_query(client, query_id) + + issues = str(client.describe_query(query_id).result.query.transient_issue) + assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues + issues = str(client.describe_query(query_id).result.query.issue) + assert "Failed to parse json message for offset" not in issues, "Incorrect Issues: " + issues + + assert received == expected From 95875f657d518bc28e9f5a3fc5c8fafd542f645a Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Jul 2025 14:09:57 +0300 Subject: [PATCH 4/6] pq rd json parser: improve error messages Include topic, partition and buffered message offsets --- .../format_handler/parsers/json_parser.cpp | 18 +++++++++--------- .../format_handler/parsers/parser_base.cpp | 2 +- .../fq/libs/row_dispatcher/topic_session.cpp | 4 ++++ 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index 4d9924e098e1..348f241136ce 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -5,6 +5,7 @@ #include #include +#include #include @@ -121,12 +122,12 @@ class TColumnParser { } } - void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) { + void ValidateNumberValues(size_t expectedNumberValues, const TVector& offsets) { if (Status.IsFail()) { return; } if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) { - Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson); + Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values in non optional column '" << Name << "' with type " << TypeYson << ", buffered offsets: " << JoinSeq(' ' , offsets)); } } @@ -433,23 +434,23 @@ class TJsonParser : public TTopicParserBase { */ simdjson::ondemand::document_stream documents; CHECK_JSON_ERROR(Parser.iterate_many(values, size, size).get(documents)) { - return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); } size_t rowId = 0; for (auto document : documents) { if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) { - return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size))); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); } const ui64 offset = Buffer.Offsets[rowId]; CHECK_JSON_ERROR(document.error()) { - return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); } for (auto item : document.get_object()) { CHECK_JSON_ERROR(item.error()) { - return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); } const auto it = ColumnsIndex.find(item.escaped_key().value()); @@ -464,12 +465,11 @@ class TJsonParser : public TTopicParserBase { } if (Y_UNLIKELY(rowId != Buffer.NumberValues)) { - return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size))); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); } - const ui64 firstOffset = Buffer.Offsets.front(); for (auto& column : Columns) { - column.ValidateNumberValues(rowId, firstOffset); + column.ValidateNumberValues(rowId, GetOffsets()); } return TStatus::Success(); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp index 7fed54dde825..0659a5746007 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp @@ -139,7 +139,7 @@ TString TruncateString(std::string_view rawString, size_t maxSize) { if (rawString.size() <= maxSize) { return TString(rawString); } - return TStringBuilder() << rawString.substr(0, maxSize) << " truncated..."; + return TStringBuilder() << rawString.substr(0, maxSize) << " truncated (full length was " << rawString.size() << ")..."; } } // namespace NFq::NRowDispatcher diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 02188eb866fe..2d5de6240911 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -846,7 +846,11 @@ void TTopicSession::SendSessionError(TActorId readActorId, TStatus status, bool LOG_ROW_DISPATCHER_WARN("SendSessionError to " << readActorId << ", status: " << status.GetErrorMessage()); auto event = std::make_unique(); event->Record.SetStatusCode(status.GetStatus()); + event->Record.SetPartitionId(PartitionId); NYql::IssuesToMessage(status.GetErrorDescription(), event->Record.MutableIssues()); + auto& issue = *event->Record.AddIssues(); + issue.set_message(TStringBuilder() << "Topic " << TopicPathPartition); + issue.set_severity(NYql::TSeverityIds::S_INFO); event->ReadActorId = readActorId; event->IsFatalError = isFatalError; Send(RowDispatcherActorId, event.release()); From 7f4741dd6c8a7007740b34574c31082ce2428c1a Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Jul 2025 16:48:00 +0300 Subject: [PATCH 5/6] [ut] fix expected error messages --- .../libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp | 2 +- .../libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index d1d75e520ce6..2c3ceca2a9b2 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -381,7 +381,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { {GetMessage(firstOffset, R"({"com_col": "event1", "col_second": "str_second"})")}, ClientIds[0], EStatusId::PRECONDITION_FAILED, - TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << firstOffset << " in non optional column 'col_first' with type [DataType; String]" + TStringBuilder() << "Failed to parse json messages, found 1 missing values in non optional column 'col_first' with type [DataType; String], buffered offsets: " << firstOffset ); } } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index b8084c4f600d..859683efa6c6 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -368,7 +368,7 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(MissingFieldsValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[DataType; String]"}, {"a2", "[DataType; Uint64]"}})); - CheckColumnError(R"({"a2": 105, "event": "event1"})", 0, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << FIRST_OFFSET << " in non optional column 'a1' with type [DataType; String]"); + CheckColumnError(R"({"a2": 105, "event": "event1"})", 0, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found 1 missing values in non optional column 'a1' with type [DataType; String], buffered offsets: " << FIRST_OFFSET); CheckColumnError(R"({"a1": "hello1", "a2": null, "event": "event1"})", 1, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a2' with type [DataType; Uint64] subissue: {
: Error: Found unexpected null value, expected non optional data type Uint64 }"); } From 747e7f1ba3321b8a6e2bee21983cd7ac09fbba2b Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Mon, 21 Jul 2025 20:42:34 +0300 Subject: [PATCH 6/6] fq rd simdjson: test with broken stream feed it with incorrect json_one_row stream with incorrect boundaries --- ydb/tests/fq/yds/test_row_dispatcher.py | 53 +++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 290abd953ba0..04819c9a120c 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1202,3 +1202,56 @@ def test_huge_messages(self, kikimr, client): assert "Failed to parse json message for offset" not in issues, "Incorrect Issues: " + issues assert received == expected + + @yq_v1 + def test_huge_splitted(self, kikimr, client): + client.create_yds_connection( + YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True + ) + self.init_topics("test_huge_splitted") + + sql = Rf''' + $data = SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, sql String NOT NULL, event String NOT NULL)) + WHERE event = "event1" or event = "event2"; + + $data = SELECT time, LENGTH(sql) AS len FROM $data; + + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $data; + ''' + + query_id = start_yds_query(kikimr, client, sql) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + + large_string = "abcdefghjkl1234567890" * 10 + huge_string = large_string*(1000*1000//len(large_string) + 2) + assert len(huge_string) > 1000*1000 + + split_point = 500*1000 + data = [ + f'{{"time": 101, "sql":"{large_string}", "event": "event1"}}{{"time": 102, "event": "event2", "sql":"{huge_string[:split_point]}', + f'{huge_string[split_point:]}"}}', + f'{{"time": 103, "sql":"{large_string}", "event": "event3"}}', + f'{{"time": 105, "sql":"{large_string}", "event": "event1"}}' + ] + + self.write_stream(data) + expected = [ + f'{{"len":{len(large_string)},"time":101}}', + f'{{"len":{len(huge_string)},"time":102}}', + f'{{"len":{len(large_string)},"time":105}}', + ] + # TODO: normalize json and tolerate order mismatch + + received = self.read_stream(len(expected), topic_path=self.output_topic) + + # wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) + stop_yds_query(client, query_id) + + issues = str(client.describe_query(query_id).result.query.transient_issue) + assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues + issues = str(client.describe_query(query_id).result.query.issue) + assert "Failed to parse json message for offset" not in issues, "Incorrect Issues: " + issues + + assert received == expected