Skip to content

Commit

Permalink
Capture base_seed as part of main snapshot and initialize worker loop…
Browse files Browse the repository at this point in the history
…, add test for it

Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
gokulavasan committed Apr 13, 2024
1 parent 761a7cf commit 0c862b1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 20 deletions.
28 changes: 8 additions & 20 deletions test/stateful_dataloader/test_state_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,39 +92,22 @@ def __init__(self, size, shuffle):
self.size = size
self.data = [{"id": i, "strcol": f"strcol_{i}", "listcol": [i, i + 1, i + 2]} for i in range(size)]
self.shuffle = shuffle
self.g = torch.Generator()
self.g.manual_seed(1)

def __getstate__(self):
"""pickling generators fails on windows and mac, this makes sure
unit tests can proceed on those platforms
"""
state = dict(self.__dict__)
del state["g"]
state["g_state"] = self.g.get_state()
return state

def __setstate__(self, state):
g_state = state.pop("g_state")
self.__dict__ = state
self.g = torch.Generator()
self.g.set_state(g_state)

def __len__(self):
return self.size

def __getitem__(self, i):
if self.shuffle:
i = torch.randint(self.size, (1,), generator=self.g).item()
i = torch.randint(self.size, (1,)).item()
return self.data[i]

def state_dict(self):
return {
"g": self.g.get_state(),
"g": torch.get_rng_state(),
}

def load_state_dict(self, state_dict):
self.g.set_state(state_dict["g"])
torch.set_rng_state(state_dict["g"])


def identity(x):
Expand Down Expand Up @@ -695,26 +678,31 @@ def test_map(self):
every_n_steps = 10
for pw, bs in itertools.product([False, True], [None, 4]):
dataset = DummyMapDataset(100, shuffle=True)
generator = torch.Generator()
generator.manual_seed(15)
dl = StatefulDataLoader(
dataset=dataset,
num_workers=num_workers,
collate_fn=identity,
snapshot_every_n_steps=every_n_steps,
persistent_workers=pw,
batch_size=bs,
generator=generator,
)
list(dl)
state_end = dl.state_dict()
exp = list(dl)

dataset = DummyMapDataset(100, shuffle=True)
generator.manual_seed(15)
dl = StatefulDataLoader(
dataset=dataset,
num_workers=num_workers,
collate_fn=identity,
snapshot_every_n_steps=every_n_steps,
persistent_workers=pw,
batch_size=bs,
generator=generator,
)
dl.load_state_dict(state_end)
batches = list(dl)
Expand Down
4 changes: 4 additions & 0 deletions torchdata/stateful_dataloader/stateful_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ def __init__(self, loader, next_iter_state):
assert set(range(len(wstates))) == set(wstates.keys()), (len(wstates), wstates.keys())
for wid, sd in wstates.items():
worker_states[wid] = sd
self._base_seed = next_iter_state["snapshot"]["main_snapshot"].get("_base_seed", self._base_seed)
self._shared_seed = next_iter_state["snapshot"]["main_snapshot"].get("_shared_seed", self._shared_seed)

for i in range(self._num_workers):
# No certainty which module multiprocessing_context is
Expand Down Expand Up @@ -1162,6 +1164,7 @@ def _get_main_state(self):
"_sampler_iter_yielded": self._sampler_iter_yielded,
"_IterableDataset_len_called": self._IterableDataset_len_called,
"_shared_seed": self._shared_seed,
"_base_seed": self._base_seed,
}

def _restore_main_state(self, state_dict):
Expand All @@ -1179,6 +1182,7 @@ def _restore_main_state(self, state_dict):
self._sampler_iter = itertools.islice(self._index_sampler, self._sampler_iter_yielded, None)
self._IterableDataset_len_called = state_dict["_IterableDataset_len_called"]
self._shared_seed = state_dict["_shared_seed"]
self._base_seed = state_dict["_base_seed"]

def _try_put_index(self):
assert self._tasks_outstanding < self._prefetch_factor * self._num_workers
Expand Down

0 comments on commit 0c862b1

Please sign in to comment.