diff --git a/attestations.py b/attestations.py index 6972ae2..69cc3fb 100644 --- a/attestations.py +++ b/attestations.py @@ -69,13 +69,47 @@ def collect_dists(packages_dir: Path) -> list[Path]: return dist_paths -def attest_dist(dist_path: Path, signer: Signer) -> None: +def assert_attestations_do_not_pre_exist( + dist_to_attestation_map: dict[Path, Path], +) -> None: + existing_attestations = { + f'* {dist !s} -> {dist_attestation !s}' + for dist, dist_attestation in dist_to_attestation_map.items() + if dist_attestation.exists() + } + if not existing_attestations: + return + + existing_attestations_list = '\n'.join(map(str, existing_attestations)) + error_message = ( + 'The following distributions already have publish attestations:' + f'{existing_attestations_list}', + ) + die(error_message) + + +def compose_attestation_mapping(dist_paths: list[Path]) -> dict[Path, Path]: + dist_to_attestation_map = { + dist_path: dist_path.with_suffix( + f'{dist_path.suffix}.publish.attestation', + ) + for dist_path in dist_paths + } + # We are the publishing step, so there should be no pre-existing publish # attestation. The presence of one indicates user confusion. - attestation_path = Path(f'{dist_path}.publish.attestation') - if attestation_path.exists(): - die(f'{dist_path} already has a publish attestation: {attestation_path}') + # Make sure there's no publish attestations on disk. + # We do this up-front to prevent partial signing. + assert_attestations_do_not_pre_exist(dist_to_attestation_map) + return dist_to_attestation_map + + +def attest_dist( + dist_path: Path, + attestation_path: Path, + signer: Signer, +) -> None: dist = Distribution.from_file(dist_path) attestation = Attestation.sign(signer, dist) @@ -92,7 +126,9 @@ def get_identity_token() -> IdentityToken: def main() -> None: - packages_dir = Path(sys.argv[1]) + dist_to_attestation_map = compose_attestation_mapping( + collect_dists(Path(sys.argv[1])), + ) try: identity = get_identity_token() @@ -103,12 +139,10 @@ def main() -> None: # since permissions can't be to blame at this stage. die(_TOKEN_RETRIEVAL_FAILED_MESSAGE.format(identity_error=identity_error)) - dist_paths = collect_dists(packages_dir) - - with SigningContext.production().signer(identity, cache=True) as s: - debug(f'attesting to dists: {dist_paths}') - for dist_path in dist_paths: - attest_dist(dist_path, s) + with SigningContext.production().signer(identity, cache=True) as signer: + debug(f'attesting to dists: {dist_to_attestation_map.keys()}') + for dist_path, attestation_path in dist_to_attestation_map.items(): + attest_dist(dist_path, attestation_path, signer) if __name__ == '__main__':