💅 Bundle attestation existence check together

This patch moves said check out of the signing loop and performs the
check early in the process. It is then able to report multiple
problems in a single error.
This commit is contained in:
Sviatoslav Sydorenko 2024-12-10 01:52:29 +01:00
parent 88a4d039d1
commit 72d1032bb0
No known key found for this signature in database
GPG key ID: 9345E8FEA89CA455

View file

@ -69,13 +69,47 @@ def collect_dists(packages_dir: Path) -> list[Path]:
return dist_paths
def attest_dist(dist_path: Path, signer: Signer) -> None:
def assert_attestations_do_not_pre_exist(
dist_to_attestation_map: dict[Path, Path],
) -> None:
existing_attestations = {
f'* {dist !s} -> {dist_attestation !s}'
for dist, dist_attestation in dist_to_attestation_map.items()
if dist_attestation.exists()
}
if not existing_attestations:
return
existing_attestations_list = '\n'.join(map(str, existing_attestations))
error_message = (
'The following distributions already have publish attestations:'
f'{existing_attestations_list}',
)
die(error_message)
def compose_attestation_mapping(dist_paths: list[Path]) -> dict[Path, Path]:
dist_to_attestation_map = {
dist_path: dist_path.with_suffix(
f'{dist_path.suffix}.publish.attestation',
)
for dist_path in dist_paths
}
# We are the publishing step, so there should be no pre-existing publish
# attestation. The presence of one indicates user confusion.
attestation_path = Path(f'{dist_path}.publish.attestation')
if attestation_path.exists():
die(f'{dist_path} already has a publish attestation: {attestation_path}')
# Make sure there's no publish attestations on disk.
# We do this up-front to prevent partial signing.
assert_attestations_do_not_pre_exist(dist_to_attestation_map)
return dist_to_attestation_map
def attest_dist(
dist_path: Path,
attestation_path: Path,
signer: Signer,
) -> None:
dist = Distribution.from_file(dist_path)
attestation = Attestation.sign(signer, dist)
@ -92,7 +126,9 @@ def get_identity_token() -> IdentityToken:
def main() -> None:
packages_dir = Path(sys.argv[1])
dist_to_attestation_map = compose_attestation_mapping(
collect_dists(Path(sys.argv[1])),
)
try:
identity = get_identity_token()
@ -103,12 +139,10 @@ def main() -> None:
# since permissions can't be to blame at this stage.
die(_TOKEN_RETRIEVAL_FAILED_MESSAGE.format(identity_error=identity_error))
dist_paths = collect_dists(packages_dir)
with SigningContext.production().signer(identity, cache=True) as s:
debug(f'attesting to dists: {dist_paths}')
for dist_path in dist_paths:
attest_dist(dist_path, s)
with SigningContext.production().signer(identity, cache=True) as signer:
debug(f'attesting to dists: {dist_to_attestation_map.keys()}')
for dist_path, attestation_path in dist_to_attestation_map.items():
attest_dist(dist_path, attestation_path, signer)
if __name__ == '__main__':