diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 64b2bd7722..a9bce8f833 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -80,21 +80,6 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: def setUp(self) -> None: self.trusted_set = TrustedMetadataSet(self.metadata["root"]) - def _root_updated_and_update_timestamp( - self, timestamp_bytes: Optional[bytes] = None - ) -> None: - """Finsh root update and update timestamp with passed timestamp_bytes. - - Args: - timestamp_bytes: - Bytes used when calling trusted_set.update_timestamp(). - Default self.metadata["timestamp"]. - - """ - timestamp_bytes = timestamp_bytes or self.metadata["timestamp"] - self.trusted_set.root_update_finished() - self.trusted_set.update_timestamp(timestamp_bytes) - def _update_all_besides_targets( self, @@ -112,13 +97,14 @@ def _update_all_besides_targets( Default self.metadata["snapshot"]. """ - self._root_updated_and_update_timestamp(timestamp_bytes) + + timestamp_bytes = timestamp_bytes or self.metadata["timestamp"] + self.trusted_set.update_timestamp(timestamp_bytes) snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) def test_update(self): - self.trusted_set.root_update_finished() self.trusted_set.update_timestamp(self.metadata["timestamp"]) self.trusted_set.update_snapshot(self.metadata["snapshot"]) self.trusted_set.update_targets(self.metadata["targets"]) @@ -139,24 +125,16 @@ def test_update(self): self.assertTrue(count, 6) def test_out_of_order_ops(self): - # Update timestamp before root is finished - with self.assertRaises(RuntimeError): - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - - self.trusted_set.root_update_finished() - with self.assertRaises(RuntimeError): - self.trusted_set.root_update_finished() - - # Update root after a previous successful root update - with self.assertRaises(RuntimeError): - self.trusted_set.update_root(self.metadata["root"]) - # Update snapshot before timestamp with self.assertRaises(RuntimeError): self.trusted_set.update_snapshot(self.metadata["snapshot"]) self.trusted_set.update_timestamp(self.metadata["timestamp"]) + # Update root after timestamp + with self.assertRaises(RuntimeError): + self.trusted_set.update_root(self.metadata["root"]) + # Update targets before snapshot with self.assertRaises(RuntimeError): self.trusted_set.update_targets(self.metadata["targets"]) @@ -198,8 +176,6 @@ def test_update_with_invalid_json(self): with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_root(self.metadata["snapshot"]) - self.trusted_set.root_update_finished() - top_level_md = [ (self.metadata["timestamp"], self.trusted_set.update_timestamp), (self.metadata["snapshot"], self.trusted_set.update_snapshot), @@ -241,15 +217,16 @@ def test_update_root_new_root_ver_same_as_trusted_root_ver(self): with self.assertRaises(exceptions.ReplayedMetadataError): self.trusted_set.update_root(self.metadata["root"]) - def test_root_update_finished_expired(self): + def test_root_expired_final_root(self): def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) + # intermediate root can be expired root = self.modify_metadata("root", root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) - # call root_update_finished when trusted root has expired + # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - tmp_trusted_set.root_update_finished() + tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): @@ -258,7 +235,7 @@ def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 timestamp = self.modify_metadata("timestamp", version_modifier) - self._root_updated_and_update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): self.trusted_set.update_timestamp(self.metadata["timestamp"]) @@ -268,21 +245,24 @@ def bump_snapshot_version(timestamp: Timestamp) -> None: # set current known snapshot.json version to 2 timestamp = self.modify_metadata("timestamp", bump_snapshot_version) - self._root_updated_and_update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp) # newtimestamp.meta["snapshot.json"].version < trusted_timestamp.meta["snapshot.json"].version with self.assertRaises(exceptions.ReplayedMetadataError): self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_expired(self): - self.trusted_set.root_update_finished() # new_timestamp has expired def timestamp_expired_modifier(timestamp: Timestamp) -> None: timestamp.expires = datetime(1970, 1, 1) + # intermediate timestamp is allowed to be expired timestamp = self.modify_metadata("timestamp", timestamp_expired_modifier) + self.trusted_set.update_timestamp(timestamp) + + # update snapshot to trigger final timestamp expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_timestamp(timestamp) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_length_or_hash_mismatch(self): def modify_snapshot_length(timestamp: Timestamp) -> None: @@ -290,13 +270,13 @@ def modify_snapshot_length(timestamp: Timestamp) -> None: # set known snapshot.json length to 1 timestamp = self.modify_metadata("timestamp", modify_snapshot_length) - self._root_updated_and_update_timestamp(timestamp) + self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): - self._root_updated_and_update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): @@ -307,46 +287,70 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.meta["snapshot.json"].version = 2 timestamp = self.modify_metadata("timestamp", timestamp_version_modifier) - self._root_updated_and_update_timestamp(timestamp) - # new_snapshot.version != trusted timestamp.meta["snapshot"].version - def snapshot_version_modifier(snapshot: Snapshot) -> None: - snapshot.version = 3 + self.trusted_set.update_timestamp(timestamp) + + #intermediate snapshot is allowed to not match meta version + self.trusted_set.update_snapshot(self.metadata["snapshot"]) - snapshot = self.modify_metadata("snapshot", snapshot_version_modifier) + # final snapshot must match meta version with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(snapshot) + self.trusted_set.update_targets(self.metadata["targets"]) + - def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): + def test_update_snapshot_file_removed_from_meta(self): self._update_all_besides_targets(self.metadata["timestamp"]) - # Test removing a meta_file in new_snapshot compared to the old snapshot - def no_meta_modifier(snapshot: Snapshot) -> None: - snapshot.meta = {} + def remove_file_from_meta(snapshot: Snapshot) -> None: + del snapshot.meta["targets.json"] - snapshot = self.modify_metadata("snapshot", no_meta_modifier) + # Test removing a meta_file in new_snapshot compared to the old snapshot + snapshot = self.modify_metadata("snapshot", remove_file_from_meta) with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(snapshot) - def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self): - self._root_updated_and_update_timestamp(self.metadata["timestamp"]) - # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version + def test_update_snapshot_meta_version_decreases(self): + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + def version_meta_modifier(snapshot: Snapshot) -> None: - for metafile_path in snapshot.meta.keys(): - snapshot.meta[metafile_path].version += 1 + snapshot.meta["targets.json"].version += 1 snapshot = self.modify_metadata("snapshot", version_meta_modifier) self.trusted_set.update_snapshot(snapshot) + with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_expired_new_snapshot(self): - self._root_updated_and_update_timestamp(self.metadata["timestamp"]) - # new_snapshot has expired + self.trusted_set.update_timestamp(self.metadata["timestamp"]) def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) + # intermediate snapshot is allowed to be expired snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier) + self.trusted_set.update_snapshot(snapshot) + + # update targets to trigger final snapshot expiry check with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(snapshot) + self.trusted_set.update_targets(self.metadata["targets"]) + + def test_update_snapshot_successful_rollback_checks(self): + def meta_version_bump(timestamp: Timestamp) -> None: + timestamp.meta["snapshot.json"].version += 1 + + def version_bump(snapshot: Snapshot) -> None: + snapshot.version += 1 + + # load a "local" timestamp, then update to newer one: + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + new_timestamp = self.modify_metadata("timestamp", meta_version_bump) + self.trusted_set.update_timestamp(new_timestamp) + + # load a "local" snapshot, then update to newer one: + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + new_snapshot = self.modify_metadata("snapshot", version_bump) + self.trusted_set.update_snapshot(new_snapshot) + + # update targets to trigger final snapshot meta version check + self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_no_meta_in_snapshot(self): def no_meta_modifier(snapshot: Snapshot) -> None: diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index 369b5255ff..608889fb2a 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -30,8 +30,6 @@ >>> # update root from remote until no more are available >>> with download("root", trusted_set.root.signed.version + 1) as f: >>> trusted_set.update_root(f.read()) ->>> # ... ->>> trusted_set.root_update_finished() >>> >>> # load local timestamp, then update from remote >>> try: @@ -58,9 +56,6 @@ a generic RepositoryError that covers every issue that server provided metadata could inflict (other errors would be user errors), but this is not yet the case - * usefulness of root_update_finished() can be debated: it could be done - in the beginning of load_timestamp()... - * some metadata interactions might work better in Metadata itself * Progress through Specification update process should be documented (not sure yet how: maybe a spec_logger that logs specification events?) """ @@ -99,7 +94,6 @@ def __init__(self, root_data: bytes): """ self._trusted_set = {} # type: Dict[str: Metadata] self.reference_time = datetime.utcnow() - self._root_update_finished = False # Load and validate the local root metadata. Valid initial trusted root # metadata is required @@ -144,7 +138,7 @@ def update_root(self, data: bytes): """Verifies and loads 'data' as new root metadata. Note that an expired intermediate root is considered valid: expiry is - only checked for the final root in root_update_finished(). + only checked for the final root in update_timestamp(). Args: data: unverified new root metadata as bytes @@ -153,10 +147,8 @@ def update_root(self, data: bytes): RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. """ - if self._root_update_finished: - raise RuntimeError( - "Cannot update root after root update is finished" - ) + if self.timestamp is not None: + raise RuntimeError("Cannot update root after timestamp") logger.debug("Updating root") try: @@ -183,29 +175,13 @@ def update_root(self, data: bytes): self._trusted_set["root"] = new_root logger.debug("Updated root") - def root_update_finished(self): - """Marks root metadata as final and verifies it is not expired - - Raises: - ExpiredMetadataError: The final root metadata is expired. - """ - if self._root_update_finished: - raise RuntimeError("Root update is already finished") - - if self.root.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError("New root.json is expired") - - # No need to delete timestamp/snapshot here as specification instructs - # for fast-forward attack recovery: timestamp/snapshot can not be - # loaded at this point and when loaded later they will be verified - # with current root keys. - - self._root_update_finished = True - logger.debug("Verified final root.json") - def update_timestamp(self, data: bytes): """Verifies and loads 'data' as new timestamp metadata. + Note that an expired intermediate timestamp is considered valid so it + can be used for rollback checks on newer, final timestamp. Expiry is + only checked for the final timestamp in update_snapshot(). + Args: data: unverified new timestamp metadata as bytes @@ -213,11 +189,15 @@ def update_timestamp(self, data: bytes): RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. """ - if not self._root_update_finished: - raise RuntimeError("Cannot update timestamp before root") if self.snapshot is not None: raise RuntimeError("Cannot update timestamp after snapshot") + # client workflow 5.3.10: Make sure final root is not expired. + if self.root.signed.is_expired(self.reference_time): + raise exceptions.ExpiredMetadataError("Final root.json is expired") + # No need to check for 5.3.11 (fast forward attack recovery): + # timestamp/snapshot can not yet be loaded at this point + try: new_timestamp = Metadata.from_bytes(data) except DeserializationError as e: @@ -251,8 +231,8 @@ def update_timestamp(self, data: bytes): self.timestamp.signed.meta["snapshot.json"].version, ) - if new_timestamp.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError("New timestamp is expired") + # expiry not checked to allow old timestamp to be used for rollback + # protection of new timestamp: expiry is checked in update_snapshot() self._trusted_set["timestamp"] = new_timestamp logger.debug("Updated timestamp") @@ -260,6 +240,12 @@ def update_timestamp(self, data: bytes): def update_snapshot(self, data: bytes): """Verifies and loads 'data' as new snapshot metadata. + Note that intermediate snapshot is considered valid even if it is + expired or the version does not match the timestamp meta version. This + means the intermediate snapshot can be used for rollback checks on + newer, final snapshot. Expiry and meta version are only checked for + the final snapshot in update_delegated_targets(). + Args: data: unverified new snapshot metadata as bytes @@ -274,6 +260,11 @@ def update_snapshot(self, data: bytes): raise RuntimeError("Cannot update snapshot after targets") logger.debug("Updating snapshot") + # Local timestamp was allowed to be expired to allow for rollback + # checks on new timestamp but now timestamp must not be expired + if self.timestamp.signed.is_expired(self.reference_time): + raise exceptions.ExpiredMetadataError("timestamp.json is expired") + meta = self.timestamp.signed.meta["snapshot.json"] # Verify against the hashes in timestamp, if any @@ -296,18 +287,10 @@ def update_snapshot(self, data: bytes): self.root.verify_delegate("snapshot", new_snapshot) - if ( - new_snapshot.signed.version - != self.timestamp.signed.meta["snapshot.json"].version - ): - raise exceptions.BadVersionNumberError( - f"Expected snapshot version " - f"{self.timestamp.signed.meta['snapshot.json'].version}, " - f"got {new_snapshot.signed.version}" - ) + # version not checked against meta version to allow old snapshot to be + # used in rollback protection: it is checked when targets is updated - # If an existing trusted snapshot is updated, - # check for a rollback attack + # If an existing trusted snapshot is updated, check for rollback attack if self.snapshot is not None: for filename, fileinfo in self.snapshot.signed.meta.items(): new_fileinfo = new_snapshot.signed.meta.get(filename) @@ -325,12 +308,26 @@ def update_snapshot(self, data: bytes): f"{new_fileinfo.version}, got {fileinfo.version}." ) - if new_snapshot.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError("New snapshot is expired") + # expiry not checked to allow old snapshot to be used for rollback + # protection of new snapshot: it is checked when targets is updated self._trusted_set["snapshot"] = new_snapshot logger.debug("Updated snapshot") + def _check_final_snapshot(self): + if self.snapshot.signed.is_expired(self.reference_time): + raise exceptions.ExpiredMetadataError("snapshot.json is expired") + + if ( + self.snapshot.signed.version + != self.timestamp.signed.meta["snapshot.json"].version + ): + raise exceptions.BadVersionNumberError( + f"Expected snapshot version " + f"{self.timestamp.signed.meta['snapshot.json'].version}, " + f"got {self.snapshot.signed.version}" + ) + def update_targets(self, data: bytes): """Verifies and loads 'data' as new top-level targets metadata. @@ -360,6 +357,11 @@ def update_delegated_targets( if self.snapshot is None: raise RuntimeError("Cannot load targets before snapshot") + # Local snapshot was allowed to be expired and to not match meta + # version to allow for rollback checks on new snapshot but now + # snapshot must not be expired and must match meta version + self._check_final_snapshot() + delegator: Optional[Metadata] = self.get(delegator_name) if delegator is None: raise RuntimeError("Cannot load targets before delegator") diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index ec5a852d74..40c57e7331 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -320,9 +320,6 @@ def _load_root(self) -> None: # 404/403 means current root is newest available break - # Verify final root - self._trusted_set.root_update_finished() - def _load_timestamp(self) -> None: """Load local and remote timestamp metadata""" try: