Skip to content

Commit

Permalink
Make async_pool immune to stream handle reuse. (#5348)
Browse files Browse the repository at this point in the history
* Make async_pool immune to stream handle reuse (when a stream is destroyed and a new one gets the same handle)
* Fix handling of default streams.
---------

Signed-off-by: Michal Zientkiewicz <[email protected]>
  • Loading branch information
mzient authored Mar 5, 2024
1 parent b3fbc98 commit 3c263df
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 11 deletions.
101 changes: 101 additions & 0 deletions dali/core/mm/stream_id_hint.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <dlfcn.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <cuda.h>
#include <cuda_runtime.h>

#include "dali/core/mm/detail/stream_id_hint.h"
#include "dali/core/cuda_error.h"

using cuStreamGetId_t = CUresult(CUstream, unsigned long long *); // NOLINT(runtime/int)

namespace {

inline int getTID() {
return syscall(SYS_gettid);
}

constexpr uint64_t MakeLegacyStreamId(int dev, int tid) {
return (uint64_t)dev << 32 | tid;
}

CUresult cuStreamGetIdFallback(CUstream stream, unsigned long long *id) { // NOLINT(runtime/int)
// If the stream handle is a pseudohandle, use some special treatment....
if (stream == 0 || stream == CU_STREAM_LEGACY || stream == CU_STREAM_PER_THREAD) {
int dev = -1;
if (cudaGetDevice(&dev) != cudaSuccess)
return CUDA_ERROR_INVALID_CONTEXT;
// If we use a per-thread stream, get TID; otherwise use -1 as a pseudo-tid
*id = MakeLegacyStreamId(dev, stream == CU_STREAM_PER_THREAD ? getTID() : -1);
return CUDA_SUCCESS;
} else {
// Otherwise just use the handle - it's not perfactly safe, but should do.
*id = (uint64_t)stream;
return CUDA_SUCCESS;
}
}

cuStreamGetId_t *getRealStreamIdFunc() {
static cuStreamGetId_t *fn = []() {
void *sym = nullptr;
// If it fails, we'll just return nullptr.
#if CUDA_VERSION >= 12000
(void)cuGetProcAddress("cuStreamGetId", &sym, 12000, CU_GET_PROC_ADDRESS_DEFAULT, nullptr);
#else
(void)cuGetProcAddress("cuStreamGetId", &sym, 12000, CU_GET_PROC_ADDRESS_DEFAULT);
#endif
return reinterpret_cast<cuStreamGetId_t *>(sym);
}();
return fn;
}

inline bool hasPreciseHint() {
static bool ret = getRealStreamIdFunc() != nullptr;
return ret;
}

CUresult cuStreamGetIdBootstrap(CUstream stream, unsigned long long *id); // NOLINT(runtime/int)

cuStreamGetId_t *_cuStreamGetId = cuStreamGetIdBootstrap;

CUresult cuStreamGetIdBootstrap(CUstream stream, unsigned long long *id) { // NOLINT(runtime/int)
cuStreamGetId_t *realFunc = getRealStreamIdFunc();
if (realFunc)
_cuStreamGetId = realFunc;
else
_cuStreamGetId = cuStreamGetIdFallback;

return _cuStreamGetId(stream, id);
}

} // namespace

namespace dali {

DLL_PUBLIC bool stream_id_hint::is_unambiguous() {
return hasPreciseHint();
}

DLL_PUBLIC uint64_t stream_id_hint::from_handle(CUstream stream) {
static auto initResult = cuInit(0);
(void)initResult;
unsigned long long id; // NOLINT(runtime/int)
CUDA_CALL(_cuStreamGetId(stream, &id));
return id;
}

} // namespace dali
53 changes: 42 additions & 11 deletions include/dali/core/mm/async_pool.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
#include "dali/core/mm/pool_resource.h"
#include "dali/core/mm/with_upstream.h"
#include "dali/core/mm/detail/free_list.h"
#include "dali/core/mm/detail/stream_id_hint.h"
#include "dali/core/small_vector.h"
#include "dali/core/cuda_event_pool.h"
#include "dali/core/cuda_stream.h"
Expand Down Expand Up @@ -81,9 +82,11 @@ class async_pool_resource : public async_memory_resource<Kind>,
std::lock_guard<std::mutex> guard(lock_);

// find_first_ready doesn't throw on shutdown - and we want to terminate on other errors
assert(find_first_ready(free_blocks) == free_blocks.free_list.head);
assert(!stream_id_hint::is_unambiguous() ||
find_first_ready(free_blocks) == free_blocks.free_list.head);

for (auto *f = free_blocks.free_list.head; f; ) {
assert(f->ready());
try {
global_pool_.deallocate(f->addr, f->bytes, f->alignment);
} catch (const CUDAError &e) {
Expand Down Expand Up @@ -183,10 +186,10 @@ class async_pool_resource : public async_memory_resource<Kind>,
return nullptr;
adjust_size_and_alignment(bytes, alignment, true);
std::lock_guard<LockType> guard(lock_);
auto it = stream_free_.find(stream.get());
auto it = stream_free_.find(stream_id_hint::from_handle(stream.get()));
void *ptr;
if (it != stream_free_.end()) {
ptr = try_allocate(it->second, bytes, alignment);
ptr = try_allocate(it->second, bytes, alignment, stream);
if (ptr)
return ptr;
}
Expand Down Expand Up @@ -241,7 +244,8 @@ class async_pool_resource : public async_memory_resource<Kind>,
std::lock_guard<LockType> guard(lock_);
char *ptr = static_cast<char*>(mem);
pop_block_padding(ptr, bytes, alignment);
deallocate_async_impl(stream_free_[stream.get()], ptr, bytes, alignment, ctx, std::move(event));
auto stream_id = stream_id_hint::from_handle(stream.get());
deallocate_async_impl(stream_free_[stream_id], ptr, bytes, alignment, ctx, std::move(event));
}

/**
Expand Down Expand Up @@ -327,7 +331,10 @@ class async_pool_resource : public async_memory_resource<Kind>,
*
* If the allocation fails, the function returns nullptr.
*/
void *try_allocate(PerStreamFreeBlocks &from, size_t bytes, size_t alignment) {
void *try_allocate(PerStreamFreeBlocks &from,
size_t bytes,
size_t alignment,
stream_view stream) {
// This value is only used when not splitting - it limits how much memory
// can be wasted for padding - the allowed padding is 1/16 of the allocated size,
// clamped to between 16 bytes and 1 MiB.
Expand Down Expand Up @@ -379,6 +386,13 @@ class async_pool_resource : public async_memory_resource<Kind>,
bool split = supports_splitting && remainder + min_split_remainder < block_end;
size_t orig_alignment = f->alignment;
size_t split_size = block_size;
// If stream_id is ambiguous, we might have blocks from other streams mixed in
if (!stream_id_hint::is_unambiguous()) {
// This check is not necessary for correctness but skipping it resulted in a big
// performance hit in allocator performance tests.
if (!f->ready())
CUDA_CALL(cudaStreamWaitEvent(stream.get(), f->event));
}
if (split) {
// Adjust the pending free `f` so that it contains only what remains after
// the block was split.
Expand Down Expand Up @@ -407,6 +421,10 @@ class async_pool_resource : public async_memory_resource<Kind>,
* @brief Searches per-stream free blocks to find the most recently freed one.
*/
pending_free *find_first_ready(PerStreamFreeBlocks &free) {
// Without a reliable stream id we might have
// blocks from different streams mixed up - there's no
// strict ordering and find_first makes no sense.
assert(stream_id_hint::is_unambiguous());
SmallVector<pending_free *, 128> pending;
int step = 1;
pending_free *f = free.free_list.head;
Expand Down Expand Up @@ -446,10 +464,23 @@ class async_pool_resource : public async_memory_resource<Kind>,
* @brief Returns the memory from completed deallocations to the global pool.
*/
void free_ready(PerStreamFreeBlocks &free) {
auto *f = find_first_ready(free);
while (f) {
global_pool_.deallocate(f->addr, f->bytes, f->alignment);
f = remove_pending_free(free, f);
if (stream_id_hint::is_unambiguous()) {
// strict order available - find the newest ready, all older entries must be ready too
auto *f = find_first_ready(free);
while (f) {
global_pool_.deallocate(f->addr, f->bytes, f->alignment);
f = remove_pending_free(free, f);
}
} else {
// no strict order - go over elements and remove ones that are ready
for (auto *f = free.free_list.head; f; ) {
if (f->ready()) {
global_pool_.deallocate(f->addr, f->bytes, f->alignment);
f = remove_pending_free(free, f);
} else {
f = f->next;
}
}
}
}

Expand Down Expand Up @@ -549,7 +580,7 @@ class async_pool_resource : public async_memory_resource<Kind>,

detail::pooled_map<char *, padded_block, true> padded_;

std::unordered_map<cudaStream_t, PerStreamFreeBlocks> stream_free_;
std::unordered_map<uint64_t, PerStreamFreeBlocks> stream_free_;

using FreeDescAlloc = detail::object_pool_allocator<pending_free>;

Expand Down
30 changes: 30 additions & 0 deletions include/dali/core/mm/detail/stream_id_hint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DALI_CORE_MM_DETAIL_STREAM_ID_HINT_H_
#define DALI_CORE_MM_DETAIL_STREAM_ID_HINT_H_

#include <cuda.h>
#include "dali/core/api_helper.h"

namespace dali {

struct stream_id_hint {
DLL_PUBLIC static bool is_unambiguous();
DLL_PUBLIC static uint64_t from_handle(CUstream handle);
};

} // namespace dali

#endif // DALI_CORE_MM_DETAIL_STREAM_ID_HINT_H_
2 changes: 2 additions & 0 deletions tools/stub_generator/cuda.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"not_found_error":"CUDA_ERROR_SHARED_OBJECT_SYMBOL_NOT_FOUND",
"functions": {
"cuInit": {},
"cuGetProcAddress": {},
"cuGetProcAddress_v2": {},
"cuGetErrorName": {},
"cuGetErrorString": {},
"cuDriverGetVersion": {},
Expand Down

0 comments on commit 3c263df

Please sign in to comment.