Skip to content

Commit

Permalink
fix(job_attachments): Use files' last modification time to identify o…
Browse files Browse the repository at this point in the history
…utput files to be synced (#211)

Signed-off-by: Gahyun Suh <[email protected]>
  • Loading branch information
gahyusuh authored Mar 19, 2024
1 parent 24fe21c commit 1688c5b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 55 deletions.
34 changes: 27 additions & 7 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ def __init__(
self.manifest_model: Type[BaseManifestModel] = ManifestModelRegistry.get_manifest_model(
version=manifest_version
)

# A dictionary mapping absolute file paths to their last modification times in microseconds.
# This is used to determine if an asset has been modified since it was last synced.
self.synced_assets_mtime: dict[str, int] = dict()

self.hash_alg: HashAlgorithm = self.manifest_model.AssetManifest.get_default_hash_alg()

def _upload_output_files_to_s3(
Expand Down Expand Up @@ -187,7 +192,6 @@ def _get_output_files(
manifest_properties: ManifestProperties,
s3_settings: JobAttachmentS3Settings,
local_root: Path,
start_time: float,
) -> List[OutputFile]:
"""
Walks the output directories for this asset root for any output files that have been created or modified
Expand Down Expand Up @@ -216,11 +220,20 @@ def _get_output_files(

# Get all files in this directory (includes sub-directories)
for file_path in output_root.glob("**/*"):
if (
not file_path.is_dir()
and file_path.exists()
and file_path.lstat().st_mtime >= start_time
):
# Files that are new or have been modified since the last sync will be added to the output list.
mtime_when_synced = self.synced_assets_mtime.get(str(file_path), None)
file_mtime = file_path.stat().st_mtime_ns
is_modified = False
if mtime_when_synced:
if file_mtime > int(mtime_when_synced):
# This file has been modified during this session action.
is_modified = True
else:
# This is a new file created during this session action.
self.synced_assets_mtime[str(file_path)] = int(file_mtime)
is_modified = True

if not file_path.is_dir() and file_path.exists() and is_modified:
file_size = file_path.lstat().st_size
file_hash = hash_file(str(file_path), self.hash_alg)
s3_key = f"{file_hash}.{self.hash_alg.value}"
Expand Down Expand Up @@ -445,6 +458,14 @@ def sync_inputs(
else:
raise

# Record the mapping of downloaded files' absolute paths to their last modification time
# (in microseconds). This is used to later determine which files have been modified or
# newly created during the session and need to be uploaded as output.
for local_root, merged_manifest in merged_manifests_by_root.items():
for manifest_path in merged_manifest.paths:
abs_path = str(Path(local_root) / manifest_path.path)
self.synced_assets_mtime[abs_path] = Path(abs_path).stat().st_mtime_ns

return (
download_summary_statistics.convert_to_summary_statistics(),
list(pathmapping_rules.values()),
Expand Down Expand Up @@ -501,7 +522,6 @@ def sync_outputs(
manifest_properties,
s3_settings,
local_root,
start_time,
)
if output_files:
output_manifest = self._generate_output_manifest(output_files)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Although it is in the output directory, it is actually an input file. It should be downloaded (to the worker's session working directory) during sync_inputs, and should not be captured as an output file when sync_outputs.
80 changes: 41 additions & 39 deletions test/integ/deadline_job_attachments/test_job_attachments.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class JobAttachmentTest:
SCENE_MA_HASH = hash_file(str(SCENE_MA_PATH), HashAlgorithm.XXH128)
BRICK_PNG_PATH = INPUT_PATH / "textures" / "brick.png"
CLOTH_PNG_PATH = INPUT_PATH / "textures" / "cloth.png"
INPUT_IN_OUTPUT_DIR_PATH = OUTPUT_PATH / "not_for_sync_outputs.txt"
FIRST_RENDER_OUTPUT_PATH = Path("outputs/render0000.exr")
SECOND_RENDER_OUTPUT_PATH = Path("outputs/render0001.exr")
MOV_FILE_OUTPUT_PATH = Path("outputs/end.mov")
Expand All @@ -64,17 +65,17 @@ def __init__(
manifest_version: ManifestVersion,
):
"""
Sets ups resource that these integration tests will need.
Sets up resource that these integration tests will need.
"""
self.job_attachment_resources = deploy_job_attachment_resources

if self.job_attachment_resources.farm is None:
raise TypeError("The Farm was not properly created when initalizing resources.")
raise TypeError("The Farm was not properly created when initializing resources.")
if (
self.job_attachment_resources.queue is None
or self.job_attachment_resources.queue_with_no_settings is None
):
raise TypeError("The Queues were not properly created when initalizing resources.")
raise TypeError("The Queues were not properly created when initializing resources.")

self.farm_id = self.job_attachment_resources.farm.id
self.queue_id = self.job_attachment_resources.queue.id
Expand Down Expand Up @@ -208,6 +209,7 @@ def upload_input_files_one_asset_in_cas(
str(job_attachment_test.SCENE_MA_PATH),
str(job_attachment_test.BRICK_PNG_PATH),
str(job_attachment_test.CLOTH_PNG_PATH),
str(job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH),
]

scene_ma_s3_path = (
Expand Down Expand Up @@ -244,22 +246,29 @@ def upload_input_files_one_asset_in_cas(
# THEN
brick_png_hash = hash_file(str(job_attachment_test.BRICK_PNG_PATH), HashAlgorithm.XXH128)
cloth_png_hash = hash_file(str(job_attachment_test.CLOTH_PNG_PATH), HashAlgorithm.XXH128)
input_in_output_dir_hash = hash_file(
str(job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH), HashAlgorithm.XXH128
)

brick_png_s3_path = f"{job_attachment_settings.full_cas_prefix()}/{brick_png_hash}.xxh128"
cloth_png_s3_path = f"{job_attachment_settings.full_cas_prefix()}/{cloth_png_hash}.xxh128"
input_in_output_dir_s3_path = (
f"{job_attachment_settings.full_cas_prefix()}/{input_in_output_dir_hash}.xxh128"
)

object_summary_iterator = job_attachment_test.bucket.objects.filter(
Prefix=f"{job_attachment_settings.full_cas_prefix()}/",
)

s3_objects = {obj.key: obj for obj in object_summary_iterator}

assert {brick_png_s3_path, cloth_png_s3_path} <= set(
assert {brick_png_s3_path, cloth_png_s3_path, input_in_output_dir_s3_path} <= set(
map(lambda x: x.key, object_summary_iterator)
)

assert brick_png_s3_path in s3_objects
assert cloth_png_s3_path in s3_objects
assert input_in_output_dir_s3_path in s3_objects
# Make sure that the file hasn't been modified/reuploaded
assert s3_objects[scene_ma_s3_path].last_modified == scene_ma_upload_time

Expand Down Expand Up @@ -295,6 +304,7 @@ def test_upload_input_files_all_assets_in_cas(
str(job_attachment_test.SCENE_MA_PATH),
str(job_attachment_test.BRICK_PNG_PATH),
str(job_attachment_test.CLOTH_PNG_PATH),
str(job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH),
]

# This file has already been uploaded
Expand Down Expand Up @@ -404,6 +414,7 @@ def on_downloading_files(*args, **kwargs):
assert Path(session_dir / dest_dir / job_attachment_test.SCENE_MA_PATH).exists()
assert Path(session_dir / dest_dir / job_attachment_test.BRICK_PNG_PATH).exists()
assert Path(session_dir / dest_dir / job_attachment_test.CLOTH_PNG_PATH).exists()
assert Path(session_dir / dest_dir / job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH).exists()

return SyncInputsOutputs(
session_dir=session_dir,
Expand Down Expand Up @@ -733,11 +744,6 @@ def sync_outputs(
stepId=step1_id,
)["tasks"][0]["taskId"]

Path(sync_inputs.session_dir / sync_inputs.dest_dir / "outputs").mkdir()

file_not_to_be_synced = (
sync_inputs.session_dir / sync_inputs.dest_dir / "outputs" / "don't sync me"
)
file_to_be_synced_step0_task0_base = job_attachment_test.FIRST_RENDER_OUTPUT_PATH
file_to_be_synced_step0_task1_base = job_attachment_test.SECOND_RENDER_OUTPUT_PATH
file_to_be_synced_step1_task0_base = job_attachment_test.MOV_FILE_OUTPUT_PATH
Expand All @@ -752,16 +758,8 @@ def sync_outputs(
sync_inputs.session_dir / sync_inputs.dest_dir / file_to_be_synced_step1_task0_base
)

# Create files before the render start time in the output dir, these shouldn't be synced
with open(file_not_to_be_synced, "w") as f:
f.write("don't sync me")

render_start_time = time.time()

# If we create the file too quickly after taking the time, there's high likelyhood that the time stamp will be
# the same.
time.sleep(1)

# WHEN
mock_on_uploading_files = MagicMock(return_value=True)

Expand All @@ -770,7 +768,7 @@ def sync_outputs(
with open(file_to_be_synced_step0_task0, "w") as f:
f.write("this is the first render")

sync_inputs.asset_syncer.sync_outputs(
summary_stats = sync_inputs.asset_syncer.sync_outputs(
s3_settings=job_attachment_settings,
attachments=sync_inputs.attachments,
queue_id=job_attachment_test.queue_id,
Expand All @@ -782,15 +780,16 @@ def sync_outputs(
session_dir=sync_inputs.session_dir,
on_uploading_files=mock_on_uploading_files,
)
# There should be one synced output for this task, Step 0 - Task 0
assert summary_stats.total_files == 1

render_start_time = time.time()
time.sleep(1)

# First step and second task
with open(file_to_be_synced_step0_task1, "w") as f:
f.write("this is a second render")

sync_inputs.asset_syncer.sync_outputs(
summary_stats = sync_inputs.asset_syncer.sync_outputs(
s3_settings=job_attachment_settings,
attachments=sync_inputs.attachments,
queue_id=job_attachment_test.queue_id,
Expand All @@ -802,15 +801,16 @@ def sync_outputs(
session_dir=sync_inputs.session_dir,
on_uploading_files=mock_on_uploading_files,
)
# There should be one synced output for this task, Step 0 - Task 1
assert summary_stats.total_files == 1

render_start_time = time.time()
time.sleep(1)

# Second step and first task
with open(file_to_be_synced_step1_task0, "w") as f:
f.write("this is a comp")

sync_inputs.asset_syncer.sync_outputs(
summary_stats = sync_inputs.asset_syncer.sync_outputs(
s3_settings=job_attachment_settings,
attachments=sync_inputs.attachments,
queue_id=job_attachment_test.queue_id,
Expand All @@ -822,6 +822,8 @@ def sync_outputs(
session_dir=sync_inputs.session_dir,
on_uploading_files=mock_on_uploading_files,
)
# There should be one synced output for this task, Step 1 - Task 0
assert summary_stats.total_files == 1

# THEN
object_summary_iterator = job_attachment_test.bucket.objects.filter(
Expand All @@ -834,10 +836,6 @@ def sync_outputs(
f"{job_attachment_settings.full_cas_prefix()}/{hash_file(str(file_to_be_synced_step0_task0), HashAlgorithm.XXH128)}.xxh128"
in object_key_set
)
assert (
f"{job_attachment_settings.full_cas_prefix()}/{hash_file(str(file_not_to_be_synced), HashAlgorithm.XXH128)}.xxh128"
not in object_key_set
)

return SyncOutputsOutput(
step0_id=step0_id,
Expand Down Expand Up @@ -899,10 +897,11 @@ def on_downloading_files(*args, **kwargs):
dest_dir = _get_unique_dest_dir_name(str(job_attachment_test.ASSET_ROOT))

# THEN
# Check if the inputs specified in job settings were downlownded
# Check if the inputs specified in job settings were downloaded
assert Path(session_dir / dest_dir / job_attachment_test.SCENE_MA_PATH).exists()
assert Path(session_dir / dest_dir / job_attachment_test.BRICK_PNG_PATH).exists()
assert Path(session_dir / dest_dir / job_attachment_test.CLOTH_PNG_PATH).exists()
assert Path(session_dir / dest_dir / job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH).exists()
# Check if the outputs from step0_id ("custom-step") were downloaded
assert Path(session_dir / dest_dir / job_attachment_test.FIRST_RENDER_OUTPUT_PATH).exists()
assert Path(session_dir / dest_dir / job_attachment_test.SECOND_RENDER_OUTPUT_PATH).exists()
Expand Down Expand Up @@ -942,7 +941,7 @@ def test_download_outputs_with_job_id_step_id_task_id_and_download_directory(
# THEN
assert Path(job_attachment_test.ASSET_ROOT / sync_outputs.step0_task0_output_file).exists()
finally:
shutil.rmtree(job_attachment_test.OUTPUT_PATH)
_cleanup_outputs_dir(job_attachment_test)


@pytest.mark.integ
Expand Down Expand Up @@ -978,7 +977,7 @@ def test_download_outputs_with_job_id_step_id_and_download_directory(
assert Path(job_attachment_test.ASSET_ROOT / sync_outputs.step0_task0_output_file).exists()
assert Path(job_attachment_test.ASSET_ROOT / sync_outputs.step0_task1_output_file).exists()
finally:
shutil.rmtree(job_attachment_test.OUTPUT_PATH)
_cleanup_outputs_dir(job_attachment_test)


@pytest.mark.integ
Expand Down Expand Up @@ -1015,12 +1014,19 @@ def test_download_outputs_with_job_id_and_download_directory(
assert Path(job_attachment_test.ASSET_ROOT / sync_outputs.step0_task1_output_file).exists()
assert Path(job_attachment_test.ASSET_ROOT / sync_outputs.step1_task0_output_file).exists()
finally:
shutil.rmtree(job_attachment_test.OUTPUT_PATH)


@dataclass
class UploadInputFilesWithJobAssetsOuput:
attachments: Attachments
_cleanup_outputs_dir(job_attachment_test)


def _cleanup_outputs_dir(job_attachment_test: JobAttachmentTest) -> None:
shutil.rmtree(job_attachment_test.OUTPUT_PATH)
# Revive the INPUT_IN_OUTPUT_DIR_PATH file.
job_attachment_test.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
with open(job_attachment_test.INPUT_IN_OUTPUT_DIR_PATH, "w") as f:
f.write(
"Although it is in the output directory, it is actually an input file. It should be"
" downloaded (to the worker's session working directory) during sync_inputs, and"
" should not be captured as an output file when sync_outputs."
)


@dataclass
Expand Down Expand Up @@ -1371,10 +1377,6 @@ def test_sync_outputs_bucket_wrong_account(

render_start_time = time.time()

# If we create the file too quickly after taking the time, there's high likelyhood that the time stamp will be
# the same.
time.sleep(1)

# WHEN

# First step and task
Expand Down
Loading

0 comments on commit 1688c5b

Please sign in to comment.