Skip to content

Commit

Permalink
Try to fix macOS hang when building Python 2 (microsoft#1343)
Browse files Browse the repository at this point in the history
  • Loading branch information
BillyONeal authored Feb 15, 2024
1 parent 8a83681 commit 05320e4
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 76 deletions.
8 changes: 8 additions & 0 deletions include/vcpkg/base/files.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ namespace vcpkg
long long tell() const noexcept;
int eof() const noexcept;
std::error_code error() const noexcept;
int error_raw() const noexcept;

const Path& path() const;
ExpectedL<Unit> try_seek_to(long long offset);
Expand All @@ -97,6 +98,9 @@ namespace vcpkg
ExpectedL<char> try_getc();
ExpectedL<Unit> try_read_all_from(long long offset, void* buffer, std::uint32_t size);
std::string read_to_end(std::error_code& ec);
// reads any remaining chunks of the file; used to implement read_to_end
void read_to_end_suffix(
std::string& output, std::error_code& ec, char* buffer, size_t buffer_size, size_t last_read);
};

struct WriteFilePointer : FilePointer
Expand Down Expand Up @@ -135,6 +139,10 @@ namespace vcpkg

ExpectedL<FileContents> try_read_contents(const Path& file_path) const;

// Tries to read `file_path`, and if the file starts with a shebang sequence #!, returns the contents of the
// file. If an I/O error occurs or the file does not start with a shebang sequence, returns an empty string.
virtual std::string best_effort_read_contents_if_shebang(const Path& file_path) const = 0;

virtual Path find_file_recursively_up(const Path& starting_dir,
const Path& filename,
std::error_code& ec) const = 0;
Expand Down
175 changes: 130 additions & 45 deletions include/vcpkg/base/parallel-algorithms.h
Original file line number Diff line number Diff line change
@@ -1,85 +1,170 @@
#pragma once

#if defined(_WIN32)
#include <vcpkg/base/system-headers.h>
#else // ^^^ _WIN32 / !_WIN32 vvv
#include <system_error>
#include <thread>
#include <type_traits>
#include <vector>
#endif // ^^^ _WIN32
#include <vcpkg/base/system.h>

#include <limits.h>

#include <algorithm>
#include <atomic>
#include <future>
#include <vector>

namespace vcpkg
{
template<class F>
inline void execute_in_parallel(size_t work_count, F&& work) noexcept
struct WorkCallbackContext
{
const size_t thread_count = static_cast<size_t>(get_concurrency());
const size_t num_threads = std::max(static_cast<size_t>(1), std::min(thread_count, work_count));
F work;
size_t work_count;
std::atomic<size_t> next_offset;

std::vector<std::future<void>> workers;
workers.reserve(num_threads - 1);
WorkCallbackContext(F init_f, size_t work_count) : work(init_f), work_count(work_count), next_offset(0) { }

for (size_t i = 0; i < num_threads - 1; ++i)
// pre: run() is called at most SIZE_MAX - work_count times
void run()
{
workers.emplace_back(std::async(std::launch::async | std::launch::deferred, [&work]() { work(); }));
for (;;)
{
auto offset = next_offset.fetch_add(1, std::memory_order_relaxed);
if (offset >= work_count)
{
return;
}

work(offset);
}
}
work();
};

for (auto&& w : workers)
#if defined(_WIN32)
struct PtpWork
{
PtpWork(_In_ PTP_WORK_CALLBACK pfnwk, _Inout_opt_ PVOID pv, _In_opt_ PTP_CALLBACK_ENVIRON pcbe)
: ptp_work(CreateThreadpoolWork(pfnwk, pv, pcbe))
{
w.get();
}
}
PtpWork(const PtpWork&) = delete;
PtpWork& operator=(const PtpWork&) = delete;
~PtpWork()
{
if (ptp_work)
{
::WaitForThreadpoolWorkCallbacks(ptp_work, TRUE);
::CloseThreadpoolWork(ptp_work);
}
}

template<class Container, class F>
void parallel_for_each(Container&& c, F cb) noexcept
explicit operator bool() { return ptp_work != nullptr; }

void submit() { ::SubmitThreadpoolWork(ptp_work); }

private:
PTP_WORK ptp_work;
};

template<class F>
inline void execute_in_parallel(size_t work_count, F work) noexcept
{
if (c.size() == 0)
if (work_count == 0)
{
return;
}
if (c.size() == 1)

if (work_count == 1)
{
cb(c[0]);
return;
work(size_t{});
}

std::atomic_size_t next{0};

execute_in_parallel(c.size(), [&]() {
size_t i = 0;
while (i < c.size())
WorkCallbackContext<F> context{work, work_count};
PtpWork ptp_work([](PTP_CALLBACK_INSTANCE,
void* context,
PTP_WORK) noexcept { static_cast<WorkCallbackContext<F>*>(context)->run(); },
&context,
nullptr);
if (ptp_work)
{
auto max_threads = (std::min)(work_count, static_cast<size_t>(get_concurrency()));
max_threads = (std::min)(max_threads, (SIZE_MAX - work_count) + 1u); // to avoid overflow in fetch_add
// start at 1 to account for the running thread
for (size_t i = 1; i < max_threads; ++i)
{
if (next.compare_exchange_weak(i, i + 1, std::memory_order_relaxed))
{
cb(c[i]);
}
ptp_work.submit();
}
});
}

context.run();
}
#else // ^^^ _WIN32 / !_WIN32 vvv
struct JThread
{
template<class Arg0, std::enable_if_t<!std::is_same<JThread, std::decay_t<Arg0>>::value, int> = 0>
JThread(Arg0&& arg0) : m_thread(std::forward<Arg0>(arg0))
{
}

template<class Container, class RanItTarget, class F>
void parallel_transform(const Container& c, RanItTarget out_begin, F&& cb) noexcept
~JThread() { m_thread.join(); }

JThread(const JThread&) = delete;
JThread& operator=(const JThread&) = delete;
JThread(JThread&&) = default;
JThread& operator=(JThread&&) = default;

private:
std::thread m_thread;
};

template<class F>
inline void execute_in_parallel(size_t work_count, F work) noexcept
{
if (c.size() == 0)
if (work_count == 0)
{
return;
}
if (c.size() == 1)

if (work_count == 1)
{
*out_begin = cb(c[0]);
work(size_t{});
return;
}

std::atomic_size_t next{0};

execute_in_parallel(c.size(), [&]() {
size_t i = 0;
while (i < c.size())
WorkCallbackContext<F> context{work, work_count};
auto max_threads = std::min(work_count, static_cast<size_t>(get_concurrency()));
max_threads = std::min(max_threads, (SIZE_MAX - work_count) + 1u); // to avoid overflow in fetch_add
auto bg_thread_count = max_threads - 1;
std::vector<JThread> bg_threads;
bg_threads.reserve(bg_thread_count);
for (size_t i = 0; i < bg_thread_count; ++i)
{
try
{
if (next.compare_exchange_weak(i, i + 1, std::memory_order_relaxed))
{
*(out_begin + i) = cb(c[i]);
}
bg_threads.emplace_back([&]() { context.run(); });
}
});
catch (const std::system_error&)
{
// ok, just give up trying to create threads
break;
}
}

context.run();
// destroying workers joins
}
#endif // ^^^ !_WIN32

template<class Container, class F>
void parallel_for_each(Container&& c, F cb) noexcept
{
execute_in_parallel(c.size(), [&](size_t offset) { cb(c[offset]); });
}

template<class Container, class RanItTarget, class F>
void parallel_transform(const Container& c, RanItTarget out_begin, F cb) noexcept
{
execute_in_parallel(c.size(), [&](size_t offset) { out_begin[offset] = cb(c[offset]); });
}
}
Loading

0 comments on commit 05320e4

Please sign in to comment.