Skip to content

WIP: ApplicationTime support #4536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bindings/CXX11/adios2/cxx11/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ bool Engine::BetweenStepPairs()
return m_Engine->BetweenStepPairs();
}

void Engine::SetStepApplicationTime(const double ApplicationTime, const double PostStepIncrement)
{
helper::CheckForNullptr(m_Engine, "in call to Engine::SetApplicationStepTime");
m_Engine->SetStepApplicationTime(ApplicationTime, PostStepIncrement);
}

double Engine::GetStepApplicationTime()
{
helper::CheckForNullptr(m_Engine, "in call to Engine::GetStepApplicationTime");
return m_Engine->GetStepApplicationTime();
}

std::vector<double> Engine::AllStepsApplicationTime()
{
helper::CheckForNullptr(m_Engine, "in call to Engine::AllStepsApplicationTime");
return m_Engine->AllStepsApplicationTime();
}

StepStatus Engine::BeginStep(const StepMode mode, const float timeoutSeconds)
{
helper::CheckForNullptr(m_Engine, "in call to Engine::BeginStep(const StepMode, const float)");
Expand Down
28 changes: 27 additions & 1 deletion bindings/CXX11/adios2/cxx11/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,32 @@ class Engine
*/
void EndStep();

/**
* Set "Application Time" for the next step. ADIOS does not
* directly interpret this value, but it is associated with the
* metadata for the next step (used in writer-side EndStep()).
* PostStepIncrement is used to increment the value after each
* step, but repeated calls to SetApplicationStepTime can be made
* to achieve irregular time intervals.
*/
void SetStepApplicationTime(const double ApplicationTime, const double PostStepIncrement = 1.0);

/**
* Get "Application Time" for the next or current step. This is a
* reader-side call and if called outside of BeginStep/EndStep it
* will return the ApplicationTime for the next step, if after
* BeginStep it returns for the current step. Not supported by
* all engines and not available in ReadRandomAccess mode. Next
* step time not possible for some streaming engines.
*/
double GetStepApplicationTime();

/**
* Get "Application Time" for all available steps. Only available
* in ReadRandomAccessRead mode.
*/
std::vector<double> AllStepsApplicationTime();

/**
* Returns True if engine status is between BeginStep()/EndStep() pair,
* False otherwise.
Expand Down Expand Up @@ -516,7 +542,7 @@ class Engine

/**
* @brief Promise that the reader data selections of are fixed and
* will not change in future timesteps. This information, provided
* will not change in future steps. This information, provided
* before the EndStep() representing a fixed read pattern, may be
* utilized by the input Engine to optimize data flow.
*/
Expand Down
16 changes: 16 additions & 0 deletions source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ void Engine::EndStep() { ThrowUp("EndStep"); }
void Engine::PerformPuts() { ThrowUp("PerformPuts"); }
void Engine::PerformGets() { ThrowUp("PerformGets"); }
void Engine::PerformDataWrite() { return; }
void Engine::SetStepApplicationTime(const double ApplicationTime, const double PostStepIncrement)
{
ThrowUp("SetStepApplicationTime");
}

double Engine::GetStepApplicationTime()
{
ThrowUp("GetStepApplicationTime");
return -1.0;
}

std::vector<double> Engine::AllStepsApplicationTime()
{
ThrowUp("AllStepsApplicationTime");
return std::vector<double>();
}

void Engine::Close(const int transportIndex)
{
Expand Down
26 changes: 26 additions & 0 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,32 @@ class Engine
*/
bool BetweenStepPairs() const;

/**
* Set "Application Time" for the next timestep. ADIOS does not
* directly interpret this value, but it is associated with the
* metadata for the next timestep (used in writer-side EndStep()).
* PostStepIncrement is used to increment the value after each
* step, but repeated calls to SetApplicationStepTime can be made
* to achieve irregular time intervals.
*/
void SetStepApplicationTime(const double ApplicationTime, const double PostStepIncrement = 1.0);

/**
* Get "Application Time" for the next or current step. This is a
* reader-side call and if called outside of BeginStep/EndStep it
* will return the ApplicationTime for the next step, if after
* BeginStep it returns for the current step. Not supported by
* all engines and not available in RandomAccessRead mode. Next
* step time not possible for some streaming engines.
*/
double GetStepApplicationTime();

/**
* Get "Application Time" for all available steps. Only available
* in RandomAccessRead mode.
*/
std::vector<double> AllStepsApplicationTime();

/**
* Put signature that pre-allocates a Variable in Buffer returning a Span of
* the payload memory from variable.m_Count
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BP5Engine
4: abs. pos in metadata File for step
*/
std::unordered_map<uint64_t, std::vector<uint64_t>> m_MetadataIndexTable;
std::vector<double> m_ApplicationTimeTable;

struct Minifooter
{
Expand Down Expand Up @@ -77,7 +78,7 @@ class BP5Engine
static constexpr size_t m_VersionTagLength = sizeof(BP5IndexTableHeader().VersionTag);
static constexpr size_t m_HeaderTailPadding = sizeof(BP5IndexTableHeader().unused2);

static constexpr uint8_t m_BP5MinorVersion = 2;
static constexpr uint8_t m_BP5MinorVersion = 3;

/** Index record types */
enum IndexRecord
Expand Down
87 changes: 80 additions & 7 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,14 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds)
if (m_FirstStep)
{
m_FirstStep = false;
// this might have been done earlier
m_NextStepApplicationTime = m_ApplicationTimeTable[0];
}
else
{
++m_CurrentStep;
// this might have been done earlier
m_NextStepApplicationTime = m_ApplicationTimeTable[m_CurrentStep];
}

m_IO.m_EngineStep = m_CurrentStep;
Expand Down Expand Up @@ -289,6 +293,68 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds)

size_t BP5Reader::CurrentStep() const { return m_CurrentStep; }

double BP5Reader::GetStepApplicationTime()
{
if (m_OpenMode != Mode::Read)
{
helper::Throw<std::logic_error>("Engine", "BP5Reader", "EndStep",
"GetStepApplicationTime called in random access mode");
}
if (m_BetweenStepPairs)
return m_NextStepApplicationTime;
else
{
// messier logic
double timeoutSeconds = 1.0; // arbitrary
if (m_FirstStep)
{
if (!m_StepsCount)
{
// not steps was found in Open/Init, check for new steps now
(void)CheckForNewSteps(Seconds(timeoutSeconds));
}
}
else
{
if (m_CurrentStep + 1 >= m_StepsCount)
{
// we processed steps in memory, check for new steps now
(void)CheckForNewSteps(Seconds(timeoutSeconds));
}
}
if (m_StepsCount > m_CurrentStep)
{
/* we have got new steps and new metadata in memory */
m_NextStepApplicationTime = m_ApplicationTimeTable[m_CurrentStep + 1];
}
else
{
if (m_WriterIsActive)
{
// Something better to do here?
helper::Throw<std::logic_error>("Engine", "BP5Reader", "GetStepApplicationTime",
"GetStepApplicationTime finds Writer Not Ready");
}
else
{
helper::Throw<std::logic_error>("Engine", "BP5Reader", "GetStepApplicationTime",
"GetStepApplicationTime finds End of Stream");
}
}
}
return m_NextStepApplicationTime;
}

std::vector<double> BP5Reader::AllStepsApplicationTime()
{
if (m_OpenMode != Mode::ReadRandomAccess)
{
helper::Throw<std::logic_error>("Engine", "BP5Reader", "EndStep",
"GetStepApplicationTime called in random access mode");
}
return m_ApplicationTimeTable;
}

void BP5Reader::EndStep()
{
if (m_OpenMode != Mode::Read)
Expand Down Expand Up @@ -1396,16 +1462,17 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, const size_t
" version");
}

// BP minor version, unused
// BP minor version, important!
position = m_BPMinorVersionPosition;
const uint8_t minorversion =
m_WriterMinorVersion =
helper::ReadValue<uint8_t>(buffer, position, m_Minifooter.IsLittleEndian);
if (minorversion != m_BP5MinorVersion)
if (m_WriterMinorVersion < 2)
{
helper::Throw<std::runtime_error>("Engine", "BP5Reader", "ParseMetadataIndex",
"Current ADIOS2 BP5 Engine only supports version 5." +
std::to_string(m_BP5MinorVersion) + ", found 5." +
std::to_string(minorversion) + " version");
helper::Throw<std::runtime_error>(
"Engine", "BP5Reader", "ParseMetadataIndex",
"Current ADIOS2 BP5 Engine only supports BP5 files between version 5.2 and 5." +
std::to_string(m_BP5MinorVersion) + ", found 5." +
std::to_string(m_WriterMinorVersion) + " version");
}

// Writer active flag
Expand Down Expand Up @@ -1482,6 +1549,10 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, const size_t
}
case IndexRecord::StepRecord: {
std::vector<uint64_t> ptrs;
double MetadataApplicationTime = -1.0;
if (m_WriterMinorVersion >= 3)
MetadataApplicationTime =
helper::ReadValue<double>(buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataPos =
helper::ReadValue<uint64_t>(buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize =
Expand All @@ -1507,6 +1578,8 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, const size_t
// absolute pos in file before read
ptrs.push_back(MetadataPos);
m_MetadataIndexTable[m_StepsCount] = ptrs;
m_ApplicationTimeTable.resize(m_StepsCount + 1);
m_ApplicationTimeTable[m_StepsCount] = MetadataApplicationTime;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
Expand Down
6 changes: 5 additions & 1 deletion source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class BP5Reader : public BP5Engine, public Engine

void EndStep() final;

double GetStepApplicationTime();
std::vector<double> AllStepsApplicationTime();

void PerformGets() final;

MinVarInfo *MinBlocksInfo(const VariableBase &, const size_t Step) const;
Expand Down Expand Up @@ -120,6 +123,7 @@ class BP5Reader : public BP5Engine, public Engine
uint64_t m_LastMapStep = 0; // remember last step that had writer map
uint64_t m_LastWriterCount = 0; // remember writer count in that step
bool m_FirstStep = true;
double m_NextStepApplicationTime = -1.0;

/** used to filter steps */
helper::RangeFilter m_SelectedSteps;
Expand All @@ -128,7 +132,7 @@ class BP5Reader : public BP5Engine, public Engine
std::vector<std::pair<uint64_t, uint64_t>> m_FilteredMetadataInfo;

Minifooter m_Minifooter;

int m_WriterMinorVersion;
bool m_InitialWriterActiveCheckDone = false;
bool m_ReadMetadataFromFile = true;

Expand Down
15 changes: 12 additions & 3 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ void BP5Writer::PerformPuts()
return;
}

void BP5Writer::SetStepApplicationTime(const double ApplicationTime, const double PostStepIncrement)
{
m_ApplicationTimeForNextStep = ApplicationTime;
m_ApplicationTimeIncrement = PostStepIncrement;
}

void BP5Writer::WriteMetaMetadata(
const std::vector<format::BP5Base::MetaMetaInfoBlock> MetaMetaBlocks)
{
Expand Down Expand Up @@ -381,7 +387,7 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos, uint64_t MetaDataSi
{
// bufsize: Step record
size_t bufsize =
1 + (4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) * sizeof(uint64_t);
1 + (5 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) * sizeof(uint64_t);
if (MetaDataPos == 0)
{
// First time, write the headers
Expand Down Expand Up @@ -424,8 +430,10 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos, uint64_t MetaDataSi
// Step record
record = StepRecord;
helper::CopyToBuffer(buf, pos, &record, 1); // record type
d = (3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) * sizeof(uint64_t);
d = (4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) * sizeof(uint64_t);
helper::CopyToBuffer(buf, pos, &d, 1); // record length
helper::CopyToBuffer(buf, pos, (uint64_t *)&m_ApplicationTimeForNextStep, 1);
m_ApplicationTimeForNextStep += m_ApplicationTimeIncrement;
helper::CopyToBuffer(buf, pos, &MetaDataPos, 1);
helper::CopyToBuffer(buf, pos, &MetaDataSize, 1);
d = static_cast<uint64_t>(FlushPosSizeInfo.size());
Expand Down Expand Up @@ -1055,7 +1063,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
break;
}
case IndexRecord::StepRecord: {
position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize
position += 3 * sizeof(uint64_t); // ApplicationTime, MetadataPos, MetadataSize
const uint64_t FlushCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
// jump over the metadata positions
Expand Down Expand Up @@ -1144,6 +1152,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
case IndexRecord::StepRecord: {
m_AppendMetadataIndexPos =
position - sizeof(unsigned char) - sizeof(uint64_t); // pos of RecordID
position += sizeof(uint64_t); // ApplicationTime
const uint64_t MetadataPos =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
position += sizeof(uint64_t); // MetadataSize
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class BP5Writer : public BP5Engine, public core::Engine
~BP5Writer();

StepStatus BeginStep(StepMode mode, const float timeoutSeconds = -1.0) final;
void SetStepApplicationTime(const double ApplicationTime, const double PostStepIncrement = 1.0);
size_t CurrentStep() const final;
void PerformPuts() final;
void PerformDataWrite() final;
Expand All @@ -71,6 +72,8 @@ class BP5Writer : public BP5Engine, public core::Engine

int64_t m_WriterStep = 0;
bool m_IsFirstStep = true; // might not be 0 for append
double m_ApplicationTimeForNextStep = 0.0;
double m_ApplicationTimeIncrement = 1.0;
/*
* Burst buffer variables
*/
Expand Down
Loading
Loading