-
Notifications
You must be signed in to change notification settings - Fork 91
/
disk_io.py
223 lines (183 loc) · 6.36 KB
/
disk_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import os
import os.path
import pathlib
import tempfile
import threading
import weakref
from typing import Callable, Iterable, Mapping, Optional, Union
import numpy as np
import dask
from distributed.utils import nbytes
_new_cuda_buffer: Optional[Callable[[int], object]] = None
def get_new_cuda_buffer() -> Callable[[int], object]:
"""Return a function to create an empty CUDA buffer"""
global _new_cuda_buffer
if _new_cuda_buffer is not None:
return _new_cuda_buffer
try:
import rmm
_new_cuda_buffer = lambda n: rmm.DeviceBuffer(size=n)
return _new_cuda_buffer
except ImportError:
pass
try:
import cupy
_new_cuda_buffer = lambda n: cupy.empty((n,), dtype="u1")
return _new_cuda_buffer
except ImportError:
pass
try:
import numba.cuda
def numba_device_array(n):
a = numba.cuda.device_array((n,), dtype="u1")
weakref.finalize(a, numba.cuda.current_context)
return a
_new_cuda_buffer = numba_device_array
return _new_cuda_buffer
except ImportError:
pass
raise RuntimeError("GPUDirect Storage requires RMM, CuPy, or Numba")
class SpillToDiskFile:
"""File path the gets removed on destruction
When spilling to disk, we have to delay the removal of the file
until no more proxies are pointing to the file.
"""
path: str
def __init__(self, path: str) -> None:
self.path = path
def __del__(self):
os.remove(self.path)
def __str__(self) -> str:
return self.path
def exists(self):
return os.path.exists(self.path)
def __deepcopy__(self, memo) -> str:
"""A deep copy is simply the path as a string.
In order to avoid multiple instance of SpillToDiskFile pointing
to the same file, we do not allow a direct copy.
"""
return self.path
def __copy__(self):
raise RuntimeError("Cannot copy or pickle a SpillToDiskFile")
def __reduce__(self):
self.__copy__()
class SpillToDiskProperties:
gds_enabled: bool
shared_filesystem: bool
root_dir: pathlib.Path
tmpdir: tempfile.TemporaryDirectory
def __init__(
self,
root_dir: Union[str, os.PathLike],
shared_filesystem: Optional[bool] = None,
gds: Optional[bool] = None,
):
"""
Parameters
----------
root_dir : os.PathLike
Path to the root directory to write serialized data.
shared_filesystem: bool or None, default None
Whether the `root_dir` above is shared between all workers or not.
If ``None``, the "jit-unspill-shared-fs" config value are used, which
defaults to False.
gds: bool
Enable the use of GPUDirect Storage. If ``None``, the "gds-spilling"
config value are used, which defaults to ``False``.
"""
self.lock = threading.Lock()
self.counter = 0
self.root_dir = pathlib.Path(root_dir)
os.makedirs(self.root_dir, exist_ok=True)
self.tmpdir = tempfile.TemporaryDirectory(dir=self.root_dir)
self.shared_filesystem = shared_filesystem or dask.config.get(
"jit-unspill-shared-fs", default=False
)
self.gds_enabled = gds or dask.config.get("gds-spilling", default=False)
if self.gds_enabled:
try:
import kvikio # noqa F401
except ImportError:
raise ImportError(
"GPUDirect Storage requires the kvikio Python package"
)
else:
self.gds_enabled = kvikio.DriverProperties().is_gds_available
def gen_file_path(self) -> str:
"""Generate an unique file path"""
with self.lock:
self.counter += 1
return str(
pathlib.Path(self.tmpdir.name) / pathlib.Path("%04d" % self.counter)
)
def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False) -> dict:
"""Write frames to disk
Parameters
----------
path: str
File path
frames: Iterable
The frames to write to disk
shared_filesystem: bool
Whether the target filesystem is shared between all workers or not.
If True, the filesystem must support the `os.link()` operation.
gds: bool
Enable the use of GPUDirect Storage. Notice, the consecutive
`disk_read()` must enable GDS as well.
Returns
-------
header: dict
A dict of metadata
"""
cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames)
frame_lengths = tuple(map(nbytes, frames))
if gds and any(cuda_frames):
import kvikio
with kvikio.CuFile(path, "w") as f:
for frame, length in zip(frames, frame_lengths):
f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0).get()
else:
with open(path, "wb") as f:
for frame in frames:
f.write(frame)
return {
"method": "stdio",
"path": SpillToDiskFile(path),
"frame-lengths": tuple(map(nbytes, frames)),
"shared-filesystem": shared_filesystem,
"cuda-frames": cuda_frames,
}
def disk_read(header: Mapping, gds=False) -> list:
"""Read frames from disk
Parameters
----------
header: Mapping
The metadata of the frames to read
gds: bool
Enable the use of GPUDirect Storage. Notice, this must
match the GDS option set by the prior `disk_write()` call.
Returns
-------
frames: list
List of read frames
"""
ret = []
if gds:
import kvikio # isort:skip
with kvikio.CuFile(header["path"], "rb") as f:
file_offset = 0
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"]):
if is_cuda:
buf = get_new_cuda_buffer()(length)
else:
buf = np.empty((length,), dtype="u1")
f.pread(
buf=buf, count=length, file_offset=file_offset, buf_offset=0
).get()
file_offset += length
ret.append(buf)
else:
with open(str(header["path"]), "rb") as f:
for length in header["frame-lengths"]:
ret.append(f.read(length))
return ret