Skip to content

Commit

Permalink
Add templated utility class to centralize common pattern
Browse files Browse the repository at this point in the history
All reading code has a (for threaded reads) need to maintain a set of
contexts and re-use them while dispatching N threads. Create a class to
do that and remove duplicate code.

This also lowers the scope of when the contexts are alive to reduce
memory bloat

Signed-off-by: Kimball Thurston <[email protected]>
  • Loading branch information
kdt3rd committed Sep 23, 2024
1 parent 25c2d77 commit 720cc0c
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 290 deletions.
1 change: 1 addition & 0 deletions src/lib/IlmThread/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ openexr_define_library(IlmThread
IlmThreadMutex.h
IlmThreadNamespace.h
IlmThreadPool.h
IlmThreadProcessGroup.h
IlmThreadSemaphore.h
DEPENDENCIES
OpenEXR::Config
Expand Down
154 changes: 154 additions & 0 deletions src/lib/IlmThread/IlmThreadProcessGroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
//
// SPDX-License-Identifier: BSD-3-Clause
// Copyright (c) Contributors to the OpenEXR Project.
//

#ifndef INCLUDED_ILM_THREAD_PROCESS_GROUP_H
#define INCLUDED_ILM_THREAD_PROCESS_GROUP_H

//-----------------------------------------------------------------------------
//
// Class ProcessGroup is a templated inline helper for constraining
// task contexts to a number of threads. It maintains a list of
// contexts and then can hand them out one at a time, waiting for a
// previous thread request to finish before handing out more,
// preventing over-subscription / allocation of contexts.
//
//-----------------------------------------------------------------------------

#include "IlmThreadConfig.h"
#include "IlmThreadExport.h"
#include "IlmThreadNamespace.h"
#include "IlmThreadSemaphore.h"

#include "Iex.h"

#include <atomic>
#include <string>
#include <type_traits>
#include <vector>

ILMTHREAD_INTERNAL_NAMESPACE_HEADER_ENTER

template <typename P,
std::enable_if_t <
std::is_default_constructible <P>::value &&
std::is_same <decltype (P {}.next), P *>::value, bool> = true>
class ProcessGroup
{
public:
using Process = P;

ProcessGroup (unsigned int numThreads)
: _sem (numThreads)
, _avail_head (nullptr)
, _first_failure (nullptr)
{
_fixed_pool.resize (numThreads);
for ( unsigned int i = 0; i < numThreads; ++i )
{
if (i == (numThreads - 1))
_fixed_pool[i].next = nullptr;
else
_fixed_pool[i].next = &(_fixed_pool[i+1]);
}
_avail_head = &(_fixed_pool[0]);
}

ProcessGroup (const ProcessGroup&) = delete;
ProcessGroup& operator= (const ProcessGroup&) = delete;
ProcessGroup (ProcessGroup&&) = default;
ProcessGroup& operator= (ProcessGroup&&) = delete;
~ProcessGroup()
{
std::string *cur = _first_failure.load ();
delete cur;
}

void push (Process *p)
{
Process* oldhead = _avail_head.load (std::memory_order_relaxed);

do
{
p->next = oldhead;
} while (!_avail_head.compare_exchange_weak (
oldhead, p,
std::memory_order_release,
std::memory_order_relaxed));

// notify someone else there's one available
_sem.post ();
}

// called by the thread dispatching work units, may block
Process* pop ()
{
Process* ret = nullptr;

// we do not have to worry about ABA problems as
// we have a static pool of items we own, we're just
// putting them here and popping them off.

// used for honoring the numThreads, as pop
// should only be called by the one thread
// waiting to submit thread calls
_sem.wait ();

ret = _avail_head.load (std::memory_order_acquire);

Process* newhead;
do
{
if (!ret)
std::cerr << "GACK: serious failure case???" << std::endl;

newhead = ret->next;
} while ( !_avail_head.compare_exchange_weak(
ret, newhead, std::memory_order_acquire));

return ret;
}

void record_failure (const char *e)
{
// should we construct a list of failures if there are
// more than one? seems less confusing to just report
// the first we happened to record

std::string *cur = _first_failure.load ();
if (!cur)
{
std::string *msg = new std::string (e);
if (! _first_failure.compare_exchange_strong (cur, msg))
delete msg;
}
}

void throw_on_failure ()
{
std::string *cur = _first_failure.load ();
_first_failure.store (nullptr);

if (cur)
{
std::string msg (*cur);
delete cur;

throw IEX_NAMESPACE::IoExc (msg);
}
}
private:
Semaphore _sem;

std::vector<Process> _fixed_pool;

std::atomic<Process *> _avail_head;

std::atomic<std::string *> _first_failure;
};


ILMTHREAD_INTERNAL_NAMESPACE_HEADER_EXIT

#endif // INCLUDED_ILM_THREAD_POOL_H
113 changes: 42 additions & 71 deletions src/lib/OpenEXR/ImfDeepScanLineInputFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

#include "IlmThreadPool.h"
#if ILMTHREAD_THREADING_ENABLED
# include "IlmThreadSemaphore.h"
# include "IlmThreadProcessGroup.h"
# include <mutex>
#endif

#include "Iex.h"

#include <algorithm>
#include <limits>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -86,9 +86,13 @@ struct ScanLineProcess
exr_chunk_info_t cinfo;
exr_decode_pipeline_t decoder;

std::shared_ptr<ScanLineProcess> next;
ScanLineProcess* next;
};

#if ILMTHREAD_THREADING_ENABLED
using ScanLineProcessGroup = ILMTHREAD_NAMESPACE::ProcessGroup<ScanLineProcess>;
#endif

} // empty namespace

struct DeepScanLineInputFile::Data
Expand All @@ -97,9 +101,6 @@ struct DeepScanLineInputFile::Data
: _ctxt (ctxt)
, partNumber (pN)
, numThreads (nT)
#if ILMTHREAD_THREADING_ENABLED
, _sem ((unsigned int)nT)
#endif
{}

void initialize ()
Expand Down Expand Up @@ -133,42 +134,16 @@ struct DeepScanLineInputFile::Data
DeepFrameBuffer frameBuffer;
std::vector<DeepSlice> fill_list;

std::shared_ptr<ScanLineProcess> processStack;
std::shared_ptr<ScanLineProcess> getChunkProcess ()
{
std::shared_ptr<ScanLineProcess> retval;
#if ILMTHREAD_THREADING_ENABLED
std::lock_guard<std::mutex> lk (_mx);
#endif
retval = processStack;
if (!retval)
retval = std::make_shared<ScanLineProcess> ();
processStack = retval->next;
retval->next.reset();
return retval;
}
void putChunkProcess (std::shared_ptr<ScanLineProcess> sp)
{
#if ILMTHREAD_THREADING_ENABLED
std::lock_guard<std::mutex> lk (_mx);
#endif
sp->next = processStack;
processStack = sp;
}

#if ILMTHREAD_THREADING_ENABLED
std::mutex _mx;
ILMTHREAD_NAMESPACE::Semaphore _sem;

std::vector<std::string> _failures;
#endif
#if ILMTHREAD_THREADING_ENABLED
class LineBufferTask final : public ILMTHREAD_NAMESPACE::Task
{
public:
LineBufferTask (
ILMTHREAD_NAMESPACE::TaskGroup* group,
Data* ifd,
ScanLineProcessGroup* lineg,
const DeepFrameBuffer* outfb,
const exr_chunk_info_t& cinfo,
int fby,
Expand All @@ -179,17 +154,16 @@ struct DeepScanLineInputFile::Data
, _ifd (ifd)
, _fby (fby)
, _last_fby (endScan)
, _line (ifd->getChunkProcess ())
, _line (lineg->pop ())
, _line_group (lineg)
{
_line->cinfo = cinfo;
_line->counts_only = countsOnly;
}

~LineBufferTask () override
{
if (_line)
_ifd->putChunkProcess (std::move (_line));
_ifd->_sem.post ();
_line_group->push (_line);
}

void execute () override;
Expand All @@ -201,8 +175,8 @@ struct DeepScanLineInputFile::Data
Data* _ifd;
int _fby;
int _last_fby;

std::shared_ptr<ScanLineProcess> _line;
ScanLineProcess* _line;
ScanLineProcessGroup* _line_group;
};
#endif
};
Expand Down Expand Up @@ -301,7 +275,6 @@ DeepScanLineInputFile::setFrameBuffer (const DeepFrameBuffer& frameBuffer)
_data->prepFillList (frameBuffer, _data->fill_list);
_data->frameBuffer = frameBuffer;
_data->frameBufferValid = true;
_data->processStack.reset();
}

const DeepFrameBuffer&
Expand Down Expand Up @@ -539,29 +512,36 @@ DeepScanLineInputFile::Data::readData (

if (nchunks > 1 && numThreads > 1)
{
ILMTHREAD_NAMESPACE::TaskGroup tg;
// we need the lifetime of this to last longer than the
// lifetime of the task group below such that we don't get use
// after free type error, so use scope rules to accomplish
// this
ScanLineProcessGroup sg (numThreads);

for (int y = scanLine1; y <= scanLine2; )
{
if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo))
throw IEX_NAMESPACE::InputExc ("Unable to query scanline information");
ILMTHREAD_NAMESPACE::TaskGroup tg;

// used for honoring the numThreads
_sem.wait ();
for (int y = scanLine1; y <= scanLine2; )
{
if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo))
throw IEX_NAMESPACE::InputExc ("Unable to query scanline information");

ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask (
new LineBufferTask (&tg, this, &fb, cinfo, y, scanLine2, countsOnly) );
ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask (
new LineBufferTask (&tg, this, &sg, &fb, cinfo, y, scanLine2, countsOnly) );

y += scansperchunk - (y - cinfo.start_y);
y += scansperchunk - (y - cinfo.start_y);
}
}

sg.throw_on_failure ();
}
else
#endif
{
auto sp = getChunkProcess ();
bool redo = sp->first || sp->counts_only != countsOnly;
ScanLineProcess sp;
bool redo = true;

sp->counts_only = countsOnly;
sp.counts_only = countsOnly;
for (int y = scanLine1; y <= scanLine2; )
{
if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo))
Expand All @@ -571,10 +551,10 @@ DeepScanLineInputFile::Data::readData (
// re-run the unpack (i.e. people reading 1 scan at a time
// in a multi-scanline chunk)
if (!redo &&
sp->cinfo.idx == cinfo.idx &&
sp->last_decode_err == EXR_ERR_SUCCESS)
sp.cinfo.idx == cinfo.idx &&
sp.last_decode_err == EXR_ERR_SUCCESS)
{
sp->run_unpack (
sp.run_unpack (
*_ctxt,
partNumber,
&fb,
Expand All @@ -584,8 +564,8 @@ DeepScanLineInputFile::Data::readData (
}
else
{
sp->cinfo = cinfo;
sp->run_decode (
sp.cinfo = cinfo;
sp.run_decode (
*_ctxt,
partNumber,
&fb,
Expand All @@ -597,19 +577,7 @@ DeepScanLineInputFile::Data::readData (

y += scansperchunk - (y - cinfo.start_y);
}

putChunkProcess (std::move(sp));
}

#if ILMTHREAD_THREADING_ENABLED
std::lock_guard<std::mutex> lock (_mx);
if (! _failures.empty())
{
std::string fail = _failures[0];
_failures.clear ();
throw IEX_NAMESPACE::IoExc (fail);
}
#endif
}

////////////////////////////////////////
Expand Down Expand Up @@ -727,8 +695,11 @@ void DeepScanLineInputFile::Data::LineBufferTask::execute ()
}
catch (std::exception &e)
{
std::lock_guard<std::mutex> lock (_ifd->_mx);
_ifd->_failures.emplace_back (std::string (e.what()));
_line_group->record_failure (e.what ());
}
catch (...)
{
_line_group->record_failure ("Unknown exception");
}
}
#endif
Expand Down
Loading

0 comments on commit 720cc0c

Please sign in to comment.