Skip to content

Commit 01118b5

Browse files
committed
Rework minblkinfo logic, kill racy OnDemand leak
1 parent 9863a11 commit 01118b5

File tree

9 files changed

+81
-23
lines changed

9 files changed

+81
-23
lines changed

bindings/C/adios2/c/adios2_c_engine.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -632,14 +632,14 @@ adios2_varinfo *adios2_inquire_blockinfo(adios2_engine *engine, adios2_variable
632632
if (minBlocksInfo->WasLocalValue)
633633
{
634634
varinfo->Shape = (size_t *)malloc(sizeof(size_t));
635-
varinfo->Shape[0] = (intptr_t)minBlocksInfo->Shape;
635+
varinfo->Shape[0] = minBlocksInfo->Shape[0];
636636
}
637637
else
638638
{
639-
if (minBlocksInfo->Shape)
639+
if (minBlocksInfo->Shape.size())
640640
{
641641
varinfo->Shape = (size_t *)malloc(sizeof(size_t) * minBlocksInfo->Dims);
642-
memcpy(varinfo->Shape, minBlocksInfo->Shape,
642+
memcpy(varinfo->Shape, minBlocksInfo->Shape.data(),
643643
sizeof(size_t) * minBlocksInfo->Dims);
644644
}
645645
}

bindings/CXX11/adios2/cxx11/Variable.tcc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,21 +154,21 @@ Variable<T>::ToBlocksInfoMin(const MinVarInfo *coreVarInfo) const
154154
typename Variable<T>::Info blockInfo;
155155

156156
blockInfo.Step = Step;
157-
if (coreVarInfo->Shape)
157+
if (coreVarInfo->Shape.size())
158158
{
159159
blockInfo.Start.reserve(coreVarInfo->Dims);
160160
blockInfo.Count.reserve(coreVarInfo->Dims);
161161
if (coreVarInfo->WasLocalValue)
162162
{
163-
/* Start and count are really values, not pointers */
164163
blockInfo.Start.push_back((size_t)coreBlockInfo.Start);
165164
blockInfo.Count.push_back((size_t)coreBlockInfo.Count);
166165
}
167166
else
168167
{
169168
for (int i = 0; i < coreVarInfo->Dims; i++)
170169
{
171-
blockInfo.Start.push_back(coreBlockInfo.Start[i]);
170+
if (coreBlockInfo.Start)
171+
blockInfo.Start.push_back(coreBlockInfo.Start[i]);
172172
blockInfo.Count.push_back(coreBlockInfo.Count[i]);
173173
}
174174
}

source/adios2/common/ADIOSTypes.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ static void PrintMBI(std::ostream &os, const MinBlockInfo &blk, int Dims)
478478
void PrintMVI(std::ostream &os, const MinVarInfo &mvi)
479479
{
480480
os << "Step: " << mvi.Step << " Dims: " << mvi.Dims << " Shape: {";
481-
if ((mvi.Dims == 0) || (mvi.Shape == NULL))
481+
if ((mvi.Dims == 0) || (mvi.Shape.size() == 0))
482482
os << "NULL";
483483
else
484484
{

source/adios2/common/ADIOSTypes.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,18 @@ struct MinVarInfo
227227
size_t Step;
228228
bool WasLocalValue; // writer: localValue -> reader: 1D global array
229229
int Dims;
230-
const size_t *Shape;
230+
std::vector<size_t> Shape;
231231
bool IsValue = false;
232232
bool IsReverseDims = false;
233233
std::vector<struct MinBlockInfo> BlocksInfo;
234234
MinVarInfo(const int D, const size_t *S)
235-
: Dims(D), Shape(S), IsValue(false), IsReverseDims(false), BlocksInfo({})
235+
: Dims(D), Shape(D), IsValue(false), IsReverseDims(false), BlocksInfo({})
236236
{
237+
if (S)
238+
{
239+
for (int i = 0; i < D; i++)
240+
Shape[i] = S[i];
241+
}
237242
}
238243
};
239244

source/adios2/toolkit/format/bp5/BP5Deserializer.cpp

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2315,6 +2315,7 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
23152315
auto PossiblyAddValueBlocks = [this](MinVarInfo *MV, BP5VarRec *VarRec, size_t &Id,
23162316
const size_t AbsStep) {
23172317
const size_t writerCohortSize = WriterCohortSize(AbsStep);
2318+
size_t LocalValueCount = 0;
23182319
for (size_t WriterRank = 0; WriterRank < writerCohortSize; WriterRank++)
23192320
{
23202321
MetaArrayRec *writer_meta_base =
@@ -2330,8 +2331,10 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
23302331
Blk.Count = NULL;
23312332
if (VarRec->OrigShapeID == ShapeID::LocalValue)
23322333
{
2334+
// we are abusing this a bit, not pointers, but holding values
23332335
Blk.Count = (size_t *)1;
2334-
Blk.Start = (size_t *)WriterRank;
2336+
Blk.Start = (size_t *)LocalValueCount++;
2337+
MV->Shape[0]++;
23352338
}
23362339
if (writer_meta_base)
23372340
{
@@ -2366,24 +2369,27 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
23662369
size_t Id = 0;
23672370
MV->Step = RelStep;
23682371
MV->Dims = (int)VarRec->DimCount;
2369-
MV->Shape = NULL;
2372+
size_t *ShapeFromRankMetadata = NULL;
2373+
23702374
MV->IsReverseDims = ((MV->Dims > 1) && (m_WriterIsRowMajor != m_ReaderIsRowMajor));
23712375

23722376
MV->WasLocalValue = (VarRec->OrigShapeID == ShapeID::LocalValue);
2373-
MV->WasLocalValue |= (VarRec->OrigShapeID == ShapeID::JoinedArray);
23742377
if ((VarRec->OrigShapeID == ShapeID::LocalValue) ||
23752378
(VarRec->OrigShapeID == ShapeID::GlobalValue))
23762379
{
2380+
// This code path doesn't need to loop through blocks
23772381
const size_t writerCohortSize = WriterCohortSize(AbsStep);
23782382
if (VarRec->OrigShapeID == ShapeID::LocalValue)
23792383
{
23802384
// appear as an array locally
23812385
MV->IsValue = false;
23822386
MV->Dims = 1;
2383-
MV->Shape = (size_t *)writerCohortSize;
2387+
MV->Shape.resize(1);
2388+
MV->Shape[0] = 0;
23842389
}
23852390
else
23862391
{
2392+
// Global value, leave dims = 0, Shape vector uninitialized
23872393
MV->IsValue = true;
23882394
}
23892395
MV->BlocksInfo.reserve(writerCohortSize);
@@ -2394,6 +2400,7 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
23942400
}
23952401
return MV;
23962402
}
2403+
// This code path loops through blocks
23972404
for (size_t Step = StepLoopStart; Step < StepLoopEnd; Step++)
23982405
{
23992406
const size_t writerCohortSize = WriterCohortSize(Step);
@@ -2407,7 +2414,7 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
24072414
// and should be immaterial otherise
24082415
if (writer_meta_base->Shape != NULL)
24092416
{
2410-
MV->Shape = writer_meta_base->Shape;
2417+
ShapeFromRankMetadata = writer_meta_base->Shape;
24112418
}
24122419
size_t WriterBlockCount =
24132420
writer_meta_base->Dims ? writer_meta_base->DBCount / writer_meta_base->Dims : 1;
@@ -2416,6 +2423,16 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
24162423
}
24172424
}
24182425
MV->BlocksInfo.reserve(Id);
2426+
if (ShapeFromRankMetadata)
2427+
{
2428+
MV->Shape.resize(VarRec->DimCount);
2429+
for (int i = 0; i < MV->Dims; i++)
2430+
MV->Shape[i] = ShapeFromRankMetadata[i];
2431+
if (VarRec->OrigShapeID == ShapeID::JoinedArray)
2432+
{
2433+
MV->Shape[VarRec->JoinedDimen] = 0;
2434+
}
2435+
}
24192436

24202437
Id = 0;
24212438
for (size_t Step = StepLoopStart; Step < StepLoopEnd; Step++)
@@ -2448,6 +2465,8 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
24482465
Blk.Start = Offsets;
24492466
Blk.Count = Count;
24502467
Blk.MinMax.Init(VarRec->Type);
2468+
if (VarRec->OrigShapeID == ShapeID::JoinedArray)
2469+
MV->Shape[VarRec->JoinedDimen] += writer_meta_base->Count[VarRec->JoinedDimen];
24512470
if (MMs)
24522471
{
24532472

@@ -2492,28 +2511,38 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
24922511
size_t Id = 0;
24932512
MV->Step = RelStep;
24942513
MV->Dims = (int)VarRec->DimCount;
2495-
MV->Shape = NULL;
2514+
size_t *ShapeFromRankMetadata = NULL;
24962515
MV->IsReverseDims = ((MV->Dims > 1) && (m_WriterIsRowMajor != m_ReaderIsRowMajor));
24972516

24982517
MV->WasLocalValue = (VarRec->OrigShapeID == ShapeID::LocalValue);
24992518
if ((VarRec->OrigShapeID == ShapeID::LocalValue) ||
25002519
(VarRec->OrigShapeID == ShapeID::GlobalValue))
25012520
{
2502-
// Throw
2521+
// Throw??
25032522
}
25042523
for (size_t Step = StepLoopStart; Step < StepLoopEnd; Step++)
25052524
{
25062525
MetaArrayRec *writer_meta_base = (MetaArrayRec *)GetMetadataBase(VarRec, Step, WriterRank);
25072526
if (writer_meta_base)
25082527
{
2509-
if (MV->Shape == NULL)
2528+
if (ShapeFromRankMetadata == NULL)
25102529
{
2511-
MV->Shape = writer_meta_base->Shape;
2530+
ShapeFromRankMetadata = writer_meta_base->Shape;
25122531
}
25132532
Id += 1; // one block
25142533
}
25152534
}
25162535
MV->BlocksInfo.reserve(Id);
2536+
if (ShapeFromRankMetadata)
2537+
{
2538+
MV->Shape.resize(VarRec->DimCount);
2539+
for (int i = 0; i < MV->Dims; i++)
2540+
MV->Shape[i] = ShapeFromRankMetadata[i];
2541+
if (VarRec->OrigShapeID == ShapeID::JoinedArray)
2542+
{
2543+
MV->Shape[VarRec->JoinedDimen] = 0;
2544+
}
2545+
}
25172546

25182547
Id = BlockID;
25192548
for (size_t Step = StepLoopStart; Step < StepLoopEnd; Step++)
@@ -2544,6 +2573,13 @@ MinVarInfo *BP5Deserializer::MinBlocksInfo(const VariableBase &Var, size_t RelSt
25442573
Blk.Start = Offsets;
25452574
Blk.Count = Count;
25462575
Blk.MinMax.Init(VarRec->Type);
2576+
if (VarRec->OrigShapeID == ShapeID::JoinedArray)
2577+
{
2578+
if (ShapeFromRankMetadata)
2579+
{
2580+
MV->Shape[VarRec->JoinedDimen] = Count[VarRec->JoinedDimen];
2581+
}
2582+
}
25472583
if (MMs)
25482584
{
25492585
char *BlockMinAddr = (((char *)MMs) + 2 * BlockID * VarRec->ElementSize);

source/adios2/toolkit/sst/cp/cp_common.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,12 @@ extern void SstStreamDestroy(SstStream Stream)
11321132
Stream->Timesteps = Next;
11331133
}
11341134

1135+
while (Stream->StepRequestQueue)
1136+
{
1137+
StepRequest Request = Stream->StepRequestQueue;
1138+
Stream->StepRequestQueue = Request->Next;
1139+
free(Request);
1140+
}
11351141
if (Stream->DP_Stream)
11361142
{
11371143
STREAM_MUTEX_UNLOCK(Stream);

source/adios2/toolkit/sst/cp/cp_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ struct _SstStream
198198
char *AbsoluteFilename;
199199
int GlobalOpRequired;
200200
StepRequest StepRequestQueue;
201+
int CloseMessagesSent;
201202

202203
/* writer side marshal info */
203204
void *WriterMarshalData;

source/adios2/toolkit/sst/cp/cp_writer.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,7 @@ static void SendCloseMsgs(SstStream Stream)
14861486

14871487
sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat, &Msg,
14881488
&Msg.RS_Stream);
1489+
Stream->CloseMessagesSent = 1;
14891490
}
14901491

14911492
/*
@@ -2530,6 +2531,11 @@ void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, void *Msg_v, vo
25302531

25312532
STREAM_MUTEX_LOCK(CP_WSR_Stream->ParentStream);
25322533
CPTimestepList List = Stream->QueuedTimesteps;
2534+
if (Stream->CloseMessagesSent)
2535+
{
2536+
CP_verbose(Stream, TraceVerbose, "In RequestStepHandler, stream closing, ignore\n");
2537+
STREAM_MUTEX_UNLOCK(CP_WSR_Stream->ParentStream);
2538+
}
25332539
int RequestingReader = -1;
25342540
for (int i = 0; i < Stream->ReaderCount; i++)
25352541
{
@@ -2538,6 +2544,13 @@ void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, void *Msg_v, vo
25382544
RequestingReader = i;
25392545
}
25402546
}
2547+
if (RequestingReader == -1)
2548+
{
2549+
CP_verbose(Stream, TraceVerbose,
2550+
"In RequestStepHandler, RequestingReader not found, ignore\n");
2551+
STREAM_MUTEX_UNLOCK(CP_WSR_Stream->ParentStream);
2552+
}
2553+
25412554
while (List)
25422555
{
25432556
size_t NextTS = Stream->LastDemandTimestep + 1;
@@ -2580,7 +2593,6 @@ void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, void *Msg_v, vo
25802593
}
25812594

25822595
CP_verbose(Stream, TraceVerbose, "In RequestStepHandler, queueing request\n");
2583-
assert(RequestingReader != -1);
25842596
StepRequest Request = calloc(sizeof(*Request), 1);
25852597
Request->RequestingReader = RequestingReader;
25862598
if (!Stream->StepRequestQueue)

source/utils/bpls/bpls.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3177,13 +3177,11 @@ Dims get_global_array_signature(core::Engine *fp, core::IO *io, core::Variable<T
31773177
{
31783178
minBlocks = fp->MinBlocksInfo(*variable, step);
31793179
}
3180-
if (minBlocks->Shape)
3180+
if (minBlocks->Shape.size())
31813181
{
31823182
for (size_t k = 0; k < ndim; k++)
31833183
{
3184-
size_t n =
3185-
(minBlocks->WasLocalValue ? reinterpret_cast<size_t>(minBlocks->Shape)
3186-
: minBlocks->Shape[k]);
3184+
size_t n = minBlocks->Shape[k];
31873185
if (firstStep)
31883186
{
31893187
dims[k] = n;

0 commit comments

Comments
 (0)