Skip to content

Commit a697c18

Browse files
ivanmorozov333ivanmorozov333
andauthored
scan corrections (#21337)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
1 parent 387d991 commit a697c18

File tree

25 files changed

+138
-82
lines changed

25 files changed

+138
-82
lines changed

ydb/core/formats/arrow/program/execution.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,10 @@ class TProcessorContext {
342342
return result;
343343
}
344344

345-
TProcessorContext(const std::weak_ptr<IDataSource>& dataSource, const std::shared_ptr<NAccessor::TAccessorsCollection>& resources,
345+
TProcessorContext(std::weak_ptr<IDataSource>&& dataSource, const std::shared_ptr<NAccessor::TAccessorsCollection>& resources,
346346
const std::optional<ui32> limit, const bool reverse)
347347
: Resources(resources)
348-
, DataSource(dataSource)
348+
, DataSource(std::move(dataSource))
349349
, Limit(limit)
350350
, Reverse(reverse) {
351351
}

ydb/core/formats/arrow/program/graph_execute.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ std::shared_ptr<TCompiledGraph::TIterator> TCompiledGraph::BuildIterator(const s
268268
auto result = std::make_shared<TIterator>(visitor);
269269
auto rootNodes = FilterRoot;
270270
rootNodes.emplace_back(ResultRoot);
271-
result->Reset(rootNodes).DetachResult();
271+
result->Reset(std::move(rootNodes)).DetachResult();
272272
return result;
273273
}
274274

@@ -362,8 +362,8 @@ TConclusion<bool> TCompiledGraph::TIterator::GlobalInitialize() {
362362
return IsValid();
363363
}
364364

365-
TConclusion<bool> TCompiledGraph::TIterator::Reset(const std::vector<std::shared_ptr<TCompiledGraph::TNode>>& graphNodes) {
366-
GraphNodes = graphNodes;
365+
TConclusion<bool> TCompiledGraph::TIterator::Reset(std::vector<std::shared_ptr<TCompiledGraph::TNode>>&& graphNodes) {
366+
GraphNodes = std::move(graphNodes);
367367
GraphNodes.erase(std::remove_if(GraphNodes.begin(), GraphNodes.end(),
368368
[](const std::shared_ptr<TCompiledGraph::TNode>& item) {
369369
return !item;

ydb/core/formats/arrow/program/graph_execute.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ class TCompiledGraph {
201201
return *CurrentNode;
202202
}
203203

204-
[[nodiscard]] TConclusion<bool> Reset(const std::vector<std::shared_ptr<TCompiledGraph::TNode>>& graphNodes);
204+
[[nodiscard]] TConclusion<bool> Reset(std::vector<std::shared_ptr<TCompiledGraph::TNode>>&& graphNodes);
205205

206206
bool IsValid() const {
207207
return !!CurrentNode;

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,14 +234,14 @@ bool TColumnShardScan::ProduceResults() noexcept {
234234
return true;
235235
}
236236

237-
auto& shardedBatch = result.GetShardedBatch();
238-
auto batch = shardedBatch.GetRecordBatch();
239-
int numRows = batch->num_rows();
240-
int numColumns = batch->num_columns();
241-
ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("columns", numColumns)("rows", result.GetRecordsCount());
242-
243-
AFL_VERIFY(DataFormat == NKikimrDataEvents::FORMAT_ARROW);
244237
{
238+
auto shardedBatch = result.ExtractShardedBatch();
239+
auto batch = shardedBatch.ExtractRecordBatch();
240+
ACFL_DEBUG("stage", "ready result")("iterator", ScanIterator->DebugString())("columns", batch->num_columns())(
241+
"rows", batch->num_rows());
242+
243+
AFL_VERIFY(DataFormat == NKikimrDataEvents::FORMAT_ARROW);
244+
245245
MakeResult(0);
246246
if (shardedBatch.IsSharded()) {
247247
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "compute_sharding_success")(
@@ -254,12 +254,12 @@ bool TColumnShardScan::ProduceResults() noexcept {
254254
}
255255
}
256256
TMemoryProfileGuard mGuard("SCAN_PROFILE::RESULT::TO_KQP", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
257-
Result->ArrowBatch = shardedBatch.GetRecordBatch();
258257
Rows += batch->num_rows();
259-
Bytes += NArrow::GetTableDataSize(Result->ArrowBatch);
258+
Bytes += NArrow::GetTableDataSize(batch);
260259

261-
ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetTableDataSize(Result->ArrowBatch))("num_rows", numRows)(
260+
ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetTableDataSize(Result->ArrowBatch))("num_rows", batch->num_rows())(
262261
"batch_columns", JoinSeq(",", batch->schema()->field_names()));
262+
Result->ArrowBatch = std::move(batch);
263263
}
264264
if (CurrentLastReadKey && result.GetScanCursor()->GetPKCursor() && CurrentLastReadKey->GetPKCursor()) {
265265
auto pNew = result.GetScanCursor()->GetPKCursor();

ydb/core/tx/columnshard/engines/reader/common/conveyor_task.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ void IDataTasksProcessor::ITask::DoExecute(const std::shared_ptr<NConveyor::ITas
99
if (result.IsFail()) {
1010
NActors::TActivationContext::AsActorContext().Send(
1111
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(result, std::move(Guard)));
12-
} else {
12+
} else if (*result) {
1313
NActors::TActivationContext::AsActorContext().Send(OwnerId,
1414
new NColumnShard::TEvPrivate::TEvTaskProcessedResult(static_pointer_cast<IDataTasksProcessor::ITask>(taskPtr), std::move(Guard)));
1515
}

ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class IDataTasksProcessor {
2828
using TBase = NConveyor::ITask;
2929
const NActors::TActorId OwnerId;
3030
NColumnShard::TCounterGuard Guard;
31-
virtual TConclusionStatus DoExecuteImpl() = 0;
31+
virtual TConclusion<bool> DoExecuteImpl() = 0;
3232

3333
protected:
3434
virtual void DoExecute(const std::shared_ptr<NConveyor::ITask>& taskPtr) override final;

ydb/core/tx/columnshard/engines/reader/common/result.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ std::vector<std::shared_ptr<TPartialReadResult>> TPartialReadResult::SplitResult
5252
return result;
5353
}
5454

55-
TPartialReadResult::TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
56-
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
57-
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
55+
TPartialReadResult::TPartialReadResult(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>&& resourceGuards,
56+
std::shared_ptr<NGroupedMemoryManager::TGroupGuard>&& gGuard, NArrow::TShardedRecordBatch&& batch,
57+
std::shared_ptr<IScanCursor>&& scanCursor, const std::shared_ptr<TReadContext>& context,
5858
const std::optional<TPartialSourceAddress> notFinishedInterval)
59-
: ResourceGuards(resourceGuards)
60-
, GroupGuard(gGuard)
61-
, ResultBatch(batch)
62-
, ScanCursor(scanCursor)
59+
: ResourceGuards(std::move(resourceGuards))
60+
, GroupGuard(std::move(gGuard))
61+
, ResultBatch(std::move(batch))
62+
, ScanCursor(std::move(scanCursor))
6363
, NotFinishedInterval(notFinishedInterval)
6464
, Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) {
6565
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());

ydb/core/tx/columnshard/engines/reader/common/result.h

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,48 +37,50 @@ class TPartialReadResult: public TNonCopyable {
3737
std::shared_ptr<IScanCursor> ScanCursor;
3838
YDB_READONLY_DEF(std::optional<TPartialSourceAddress>, NotFinishedInterval);
3939
const NColumnShard::TCounterGuard Guard;
40+
bool Extracted = false;
4041

4142
public:
4243
void Cut(const ui32 limit) {
44+
AFL_VERIFY(!Extracted);
4345
ResultBatch.Cut(limit);
4446
}
4547

4648
const arrow::Table& GetResultBatch() const {
49+
AFL_VERIFY(!Extracted);
4750
return *ResultBatch.GetRecordBatch();
4851
}
4952

50-
const std::shared_ptr<arrow::Table>& GetResultBatchPtrVerified() const {
51-
AFL_VERIFY(ResultBatch.GetRecordBatch());
52-
return ResultBatch.GetRecordBatch();
53-
}
54-
55-
ui64 GetMemorySize() const {
56-
return ResultBatch.GetMemorySize();
57-
}
58-
5953
ui64 GetRecordsCount() const {
54+
AFL_VERIFY(!Extracted);
6055
return ResultBatch.GetRecordsCount();
6156
}
6257

58+
std::shared_ptr<arrow::Schema> GetResultSchema() const {
59+
AFL_VERIFY(!Extracted);
60+
return ResultBatch.GetResultSchema();
61+
}
62+
6363
static std::vector<std::shared_ptr<TPartialReadResult>> SplitResults(
6464
std::vector<std::shared_ptr<TPartialReadResult>>&& resultsExt, const ui32 maxRecordsInResult);
6565

66-
const NArrow::TShardedRecordBatch& GetShardedBatch() const {
67-
return ResultBatch;
66+
NArrow::TShardedRecordBatch ExtractShardedBatch() {
67+
AFL_VERIFY(!Extracted);
68+
Extracted = true;
69+
return std::move(ResultBatch);
6870
}
6971

7072
const std::shared_ptr<IScanCursor>& GetScanCursor() const {
7173
return ScanCursor;
7274
}
7375

74-
explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
75-
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
76-
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
76+
explicit TPartialReadResult(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>&& resourceGuards,
77+
std::shared_ptr<NGroupedMemoryManager::TGroupGuard>&& gGuard, NArrow::TShardedRecordBatch&& batch,
78+
std::shared_ptr<IScanCursor>&& scanCursor, const std::shared_ptr<TReadContext>& context,
7779
const std::optional<TPartialSourceAddress> notFinishedInterval);
7880

79-
explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
81+
explicit TPartialReadResult(NArrow::TShardedRecordBatch&& batch, std::shared_ptr<IScanCursor>&& scanCursor,
8082
const std::shared_ptr<TReadContext>& context, const std::optional<TPartialSourceAddress> notFinishedInterval)
81-
: TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedInterval) {
83+
: TPartialReadResult({}, nullptr, std::move(batch), std::move(scanCursor), context, notFinishedInterval) {
8284
}
8385
};
8486

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ class TSourceChunkToReply {
198198
return Table;
199199
}
200200

201+
std::shared_ptr<arrow::Table>&& ExtractTable() {
202+
AFL_VERIFY(Table);
203+
return std::move(Table);
204+
}
205+
201206
bool HasData() const {
202207
return !!Table && Table->num_rows();
203208
}

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,31 @@
1414
namespace NKikimr::NOlap::NReader::NCommon {
1515

1616
bool TStepAction::DoApply(IDataReader& owner) const {
17-
if (FinishedFlag) {
18-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply");
19-
Source->StartSyncSection();
20-
Source->OnSourceFetchingFinishedSafe(owner, Source);
21-
}
17+
AFL_VERIFY(FinishedFlag);
18+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "apply");
19+
Source->StartSyncSection();
20+
Source->OnSourceFetchingFinishedSafe(owner, Source);
2221
return true;
2322
}
2423

25-
TConclusionStatus TStepAction::DoExecuteImpl() {
24+
TConclusion<bool> TStepAction::DoExecuteImpl() {
2625
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Source->AddEvent("step_action"));
2726
if (Source->GetContext()->IsAborted()) {
28-
return TConclusionStatus::Success();
27+
AFL_VERIFY(!FinishedFlag);
28+
FinishedFlag = true;
29+
return true;
2930
}
3031
auto executeResult = Cursor.Execute(Source);
3132
if (executeResult.IsFail()) {
33+
AFL_VERIFY(!FinishedFlag);
34+
FinishedFlag = true;
3235
return executeResult;
3336
}
3437
if (*executeResult) {
38+
AFL_VERIFY(!FinishedFlag);
3539
FinishedFlag = true;
3640
}
37-
return TConclusionStatus::Success();
41+
return FinishedFlag;
3842
}
3943

4044
TStepAction::TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId,

0 commit comments

Comments
 (0)