From 72d1032bb03fd95cf7a1c29a9c3bf5b92d3c058c Mon Sep 17 00:00:00 2001 From: Sviatoslav Sydorenko Date: Tue, 10 Dec 2024 01:52:29 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=92=85=20Bundle=20attestation=20existence?= =?UTF-8?q?=20check=20together?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch moves said check out of the signing loop and performs the check early in the process. It is then able to report multiple problems in a single error. --- attestations.py | 56 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/attestations.py b/attestations.py index 6972ae2..69cc3fb 100644 --- a/attestations.py +++ b/attestations.py @@ -69,13 +69,47 @@ def collect_dists(packages_dir: Path) -> list[Path]: return dist_paths -def attest_dist(dist_path: Path, signer: Signer) -> None: +def assert_attestations_do_not_pre_exist( + dist_to_attestation_map: dict[Path, Path], +) -> None: + existing_attestations = { + f'* {dist !s} -> {dist_attestation !s}' + for dist, dist_attestation in dist_to_attestation_map.items() + if dist_attestation.exists() + } + if not existing_attestations: + return + + existing_attestations_list = '\n'.join(map(str, existing_attestations)) + error_message = ( + 'The following distributions already have publish attestations:' + f'{existing_attestations_list}', + ) + die(error_message) + + +def compose_attestation_mapping(dist_paths: list[Path]) -> dict[Path, Path]: + dist_to_attestation_map = { + dist_path: dist_path.with_suffix( + f'{dist_path.suffix}.publish.attestation', + ) + for dist_path in dist_paths + } + # We are the publishing step, so there should be no pre-existing publish # attestation. The presence of one indicates user confusion. - attestation_path = Path(f'{dist_path}.publish.attestation') - if attestation_path.exists(): - die(f'{dist_path} already has a publish attestation: {attestation_path}') + # Make sure there's no publish attestations on disk. + # We do this up-front to prevent partial signing. + assert_attestations_do_not_pre_exist(dist_to_attestation_map) + return dist_to_attestation_map + + +def attest_dist( + dist_path: Path, + attestation_path: Path, + signer: Signer, +) -> None: dist = Distribution.from_file(dist_path) attestation = Attestation.sign(signer, dist) @@ -92,7 +126,9 @@ def get_identity_token() -> IdentityToken: def main() -> None: - packages_dir = Path(sys.argv[1]) + dist_to_attestation_map = compose_attestation_mapping( + collect_dists(Path(sys.argv[1])), + ) try: identity = get_identity_token() @@ -103,12 +139,10 @@ def main() -> None: # since permissions can't be to blame at this stage. die(_TOKEN_RETRIEVAL_FAILED_MESSAGE.format(identity_error=identity_error)) - dist_paths = collect_dists(packages_dir) - - with SigningContext.production().signer(identity, cache=True) as s: - debug(f'attesting to dists: {dist_paths}') - for dist_path in dist_paths: - attest_dist(dist_path, s) + with SigningContext.production().signer(identity, cache=True) as signer: + debug(f'attesting to dists: {dist_to_attestation_map.keys()}') + for dist_path, attestation_path in dist_to_attestation_map.items(): + attest_dist(dist_path, attestation_path, signer) if __name__ == '__main__':