Changeset View
Changeset View
Standalone View
Standalone View
swh/model/identifiers.py
Show All 23 Lines | from typing import ( | ||||
Union, | Union, | ||||
) | ) | ||||
import urllib.parse | import urllib.parse | ||||
import attr | import attr | ||||
from attrs_strict import type_validator | from attrs_strict import type_validator | ||||
from .exceptions import ValidationError | from .exceptions import ValidationError | ||||
from .hashutil import MultiHash, hash_git_data, hash_to_bytes, hash_to_hex | from .hashutil import MultiHash, _git_header, _new_hash, hash_to_bytes, hash_to_hex | ||||
olasd: Importing underscore functions looks like poor style. I guess the `git_header` function could… | |||||
class ObjectType(enum.Enum): | class ObjectType(enum.Enum): | ||||
"""Possible object types of a QualifiedSWHID or CoreSWHID. | """Possible object types of a QualifiedSWHID or CoreSWHID. | ||||
The values of each variant is what is used in the SWHID's string representation.""" | The values of each variant is what is used in the SWHID's string representation.""" | ||||
SNAPSHOT = "snp" | SNAPSHOT = "snp" | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | if isinstance(identifier, bytes): | ||||
return binascii.hexlify(identifier).decode() | return binascii.hexlify(identifier).decode() | ||||
raise ValueError( | raise ValueError( | ||||
"Wrong type for identifier %s, expected bytes or str" | "Wrong type for identifier %s, expected bytes or str" | ||||
% identifier.__class__.__name__ | % identifier.__class__.__name__ | ||||
) | ) | ||||
def _git_object_to_identifier_str(git_object: bytes) -> str: | |||||
h = _new_hash("sha1") | |||||
olasdUnsubmitted Not Done Inline Actionsshould probably just be hashlib.new olasd: should probably just be `hashlib.new` | |||||
h.update(git_object) | |||||
return identifier_to_str(h.digest()) | |||||
def content_identifier(content: Dict[str, Any]) -> Dict[str, bytes]: | def content_identifier(content: Dict[str, Any]) -> Dict[str, bytes]: | ||||
"""Return the intrinsic identifier for a content. | """Return the intrinsic identifier for a content. | ||||
A content's identifier is the sha1, sha1_git and sha256 checksums of its | A content's identifier is the sha1, sha1_git and sha256 checksums of its | ||||
data. | data. | ||||
Args: | Args: | ||||
content: a content conforming to the Software Heritage schema | content: a content conforming to the Software Heritage schema | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | 2. For each entry of the directory, the following bytes are output: | ||||
- for symbolic links: the blob sha1_git of a file containing the link | - for symbolic links: the blob sha1_git of a file containing the link | ||||
destination | destination | ||||
- for directories: their intrinsic identifier | - for directories: their intrinsic identifier | ||||
- for revisions: their intrinsic identifier | - for revisions: their intrinsic identifier | ||||
(Note that there is no separator between entries) | (Note that there is no separator between entries) | ||||
""" | """ | ||||
manifest = directory_manifest(directory) | git_object = directory_git_object(directory) | ||||
return identifier_to_str(hash_git_data(manifest, "tree")) | return _git_object_to_identifier_str(git_object) | ||||
def directory_manifest(directory: Dict[str, Any]) -> bytes: | def directory_git_object(directory: Dict[str, Any]) -> bytes: | ||||
components = [] | components = [] | ||||
for entry in sorted(directory["entries"], key=directory_entry_sort_key): | for entry in sorted(directory["entries"], key=directory_entry_sort_key): | ||||
components.extend( | components.extend( | ||||
[ | [ | ||||
_perms_to_bytes(entry["perms"]), | _perms_to_bytes(entry["perms"]), | ||||
b"\x20", | b"\x20", | ||||
entry["name"], | entry["name"], | ||||
b"\x00", | b"\x00", | ||||
identifier_to_bytes(entry["target"]), | identifier_to_bytes(entry["target"]), | ||||
] | ] | ||||
) | ) | ||||
return b"".join(components) | return format_list_git_object("tree", components) | ||||
def format_date(date): | def format_date(date): | ||||
"""Convert a date object into an UTC timestamp encoded as ascii bytes. | """Convert a date object into an UTC timestamp encoded as ascii bytes. | ||||
Git stores timestamps as an integer number of seconds since the UNIX epoch. | Git stores timestamps as an integer number of seconds since the UNIX epoch. | ||||
However, Software Heritage stores timestamps as an integer number of | However, Software Heritage stores timestamps as an integer number of | ||||
▲ Show 20 Lines • Show All 140 Lines • ▼ Show 20 Lines | def format_author(author): | ||||
if author["name"] is not None: | if author["name"] is not None: | ||||
ret.append(author["name"]) | ret.append(author["name"]) | ||||
if author["email"] is not None: | if author["email"] is not None: | ||||
ret.append(b"".join([b"<", author["email"], b">"])) | ret.append(b"".join([b"<", author["email"], b">"])) | ||||
return b" ".join(ret) | return b" ".join(ret) | ||||
def format_manifest( | def format_git_object( | ||||
olasdUnsubmitted Not Done Inline ActionsMaybe call it format_kv_git_object (or format_git_object_with_headers) by opposition to the directory object which is special. olasd: Maybe call it `format_kv_git_object` (or `format_git_object_with_headers`) by opposition to the… | |||||
vlorentzAuthorUnsubmitted Done Inline ActionsI went for format_git_object_from_headers instead, for consistency with format_git_object_from_parts. vlorentz: I went for `format_git_object_from_headers` instead, for consistency with… | |||||
olasdUnsubmitted Not Done Inline ActionsThing is, there's headers *and* a message (which is why I suggested with). But hey, I like consistency as much as the next person... olasd: Thing is, there's headers *and* a message (which is why I suggested `with`). But hey, I like… | |||||
headers: Iterable[Tuple[bytes, bytes]], message: Optional[bytes] = None, | git_type: str, | ||||
headers: Iterable[Tuple[bytes, bytes]], | |||||
message: Optional[bytes] = None, | |||||
) -> bytes: | ) -> bytes: | ||||
"""Format a manifest comprised of a sequence of `headers` and an optional `message`. | """Format a git_object comprised of a git header and a manifest, | ||||
which is itself a sequence of `headers`, and an optional `message`. | |||||
The manifest format, compatible with the git format for tag and commit | The git_object format, compatible with the git format for tag and commit | ||||
objects, is as follows: | objects, is as follows: | ||||
- for each `key`, `value` in `headers`, emit: | - for each `key`, `value` in `headers`, emit: | ||||
- the `key`, literally | - the `key`, literally | ||||
- an ascii space (``\\x20``) | - an ascii space (``\\x20``) | ||||
- the `value`, with newlines escaped using :func:`escape_newlines`, | - the `value`, with newlines escaped using :func:`escape_newlines`, | ||||
- an ascii newline (``\\x0a``) | - an ascii newline (``\\x0a``) | ||||
- if the `message` is not None, emit: | - if the `message` is not None, emit: | ||||
- an ascii newline (``\\x0a``) | - an ascii newline (``\\x0a``) | ||||
- the `message`, literally | - the `message`, literally | ||||
Args: | Args: | ||||
headers: a sequence of key/value headers stored in the manifest; | headers: a sequence of key/value headers stored in the manifest; | ||||
message: an optional message used to trail the manifest. | message: an optional message used to trail the manifest. | ||||
Returns: | Returns: | ||||
the formatted manifest as bytes | the formatted git_object as bytes | ||||
""" | """ | ||||
entries: List[bytes] = [] | entries: List[bytes] = [] | ||||
for key, value in headers: | for key, value in headers: | ||||
entries.extend((key, b" ", escape_newlines(value), b"\n")) | entries.extend((key, b" ", escape_newlines(value), b"\n")) | ||||
if message is not None: | if message is not None: | ||||
entries.extend((b"\n", message)) | entries.extend((b"\n", message)) | ||||
return b"".join(entries) | concatenated_entries = b"".join(entries) | ||||
header = _git_header(git_type, len(concatenated_entries)) | |||||
return header + concatenated_entries | |||||
def format_list_git_object(git_type: str, entries: Iterable[bytes]) -> bytes: | |||||
olasdUnsubmitted Not Done Inline Actionsformat_git_object_from_parts? from_chunks? olasd: `format_git_object_from_parts`? `from_chunks`? | |||||
"""Similar to :func:`format_git_object`, but for manifests made of a flat | |||||
list of entries, instead of key-value + message, ie. trees and snapshots.""" | |||||
concatenated_entries = b"".join(entries) | |||||
header = _git_header(git_type, len(concatenated_entries)) | |||||
return header + concatenated_entries | |||||
def format_author_data(author, date_offset) -> bytes: | def format_author_data(author, date_offset) -> bytes: | ||||
"""Format authorship data according to git standards. | """Format authorship data according to git standards. | ||||
Git authorship data has two components: | Git authorship data has two components: | ||||
- an author specification, usually a name and email, but in practice an | - an author specification, usually a name and email, but in practice an | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | def revision_identifier(revision: Dict[str, Any]) -> str: | ||||
If the message is None, the manifest ends with the last header. Else, the | If the message is None, the manifest ends with the last header. Else, the | ||||
message is appended to the headers after an empty line. | message is appended to the headers after an empty line. | ||||
The checksum of the full manifest is computed using the 'commit' git object | The checksum of the full manifest is computed using the 'commit' git object | ||||
type. | type. | ||||
""" | """ | ||||
manifest = revision_manifest(revision) | git_object = revision_git_object(revision) | ||||
return identifier_to_str(hash_git_data(manifest, "commit")) | return _git_object_to_identifier_str(git_object) | ||||
def revision_manifest(revision: Dict[str, Any]) -> bytes: | def revision_git_object(revision: Dict[str, Any]) -> bytes: | ||||
"""Formats the manifest of a revision. See :func:`revision_identifier` for details | """Formats the git_object of a revision. See :func:`revision_identifier` for details | ||||
on the format.""" | on the format.""" | ||||
headers = [(b"tree", identifier_to_str(revision["directory"]).encode())] | headers = [(b"tree", identifier_to_str(revision["directory"]).encode())] | ||||
for parent in revision["parents"]: | for parent in revision["parents"]: | ||||
if parent: | if parent: | ||||
headers.append((b"parent", identifier_to_str(parent).encode())) | headers.append((b"parent", identifier_to_str(parent).encode())) | ||||
headers.append( | headers.append( | ||||
(b"author", format_author_data(revision["author"], revision["date"])) | (b"author", format_author_data(revision["author"], revision["date"])) | ||||
) | ) | ||||
headers.append( | headers.append( | ||||
( | ( | ||||
b"committer", | b"committer", | ||||
format_author_data(revision["committer"], revision["committer_date"]), | format_author_data(revision["committer"], revision["committer_date"]), | ||||
) | ) | ||||
) | ) | ||||
# Handle extra headers | # Handle extra headers | ||||
metadata = revision.get("metadata") or {} | metadata = revision.get("metadata") or {} | ||||
extra_headers = revision.get("extra_headers", ()) | extra_headers = revision.get("extra_headers", ()) | ||||
if not extra_headers and "extra_headers" in metadata: | if not extra_headers and "extra_headers" in metadata: | ||||
extra_headers = metadata["extra_headers"] | extra_headers = metadata["extra_headers"] | ||||
headers.extend(extra_headers) | headers.extend(extra_headers) | ||||
return format_manifest(headers, revision["message"]) | return format_git_object("commit", headers, revision["message"]) | ||||
def target_type_to_git(target_type: str) -> bytes: | def target_type_to_git(target_type: str) -> bytes: | ||||
"""Convert a software heritage target type to a git object type""" | """Convert a software heritage target type to a git object type""" | ||||
return { | return { | ||||
"content": b"blob", | "content": b"blob", | ||||
"directory": b"tree", | "directory": b"tree", | ||||
"revision": b"commit", | "revision": b"commit", | ||||
"release": b"tag", | "release": b"tag", | ||||
"snapshot": b"refs", | "snapshot": b"refs", | ||||
}[target_type] | }[target_type] | ||||
def release_identifier(release: Dict[str, Any]) -> str: | def release_identifier(release: Dict[str, Any]) -> str: | ||||
"""Return the intrinsic identifier for a release.""" | """Return the intrinsic identifier for a release.""" | ||||
manifest = release_manifest(release) | git_object = release_git_object(release) | ||||
return identifier_to_str(hash_git_data(manifest, "tag")) | return _git_object_to_identifier_str(git_object) | ||||
def release_manifest(release: Dict[str, Any]) -> bytes: | def release_git_object(release: Dict[str, Any]) -> bytes: | ||||
headers = [ | headers = [ | ||||
(b"object", identifier_to_str(release["target"]).encode()), | (b"object", identifier_to_str(release["target"]).encode()), | ||||
(b"type", target_type_to_git(release["target_type"])), | (b"type", target_type_to_git(release["target_type"])), | ||||
(b"tag", release["name"]), | (b"tag", release["name"]), | ||||
] | ] | ||||
if "author" in release and release["author"]: | if "author" in release and release["author"]: | ||||
headers.append( | headers.append( | ||||
(b"tagger", format_author_data(release["author"], release["date"])) | (b"tagger", format_author_data(release["author"], release["date"])) | ||||
) | ) | ||||
return format_manifest(headers, release["message"]) | return format_git_object("tag", headers, release["message"]) | ||||
def snapshot_identifier( | def snapshot_identifier( | ||||
snapshot: Dict[str, Any], *, ignore_unresolved: bool = False | snapshot: Dict[str, Any], *, ignore_unresolved: bool = False | ||||
) -> str: | ) -> str: | ||||
"""Return the intrinsic identifier for a snapshot. | """Return the intrinsic identifier for a snapshot. | ||||
Snapshots are a set of named branches, which are pointers to objects at any | Snapshots are a set of named branches, which are pointers to objects at any | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | Args: | ||||
single entry is needed, ``'branches'``, which is itself a :class:`dict` | single entry is needed, ``'branches'``, which is itself a :class:`dict` | ||||
mapping each branch to its target | mapping each branch to its target | ||||
ignore_unresolved (bool): if `True`, ignore unresolved branch aliases. | ignore_unresolved (bool): if `True`, ignore unresolved branch aliases. | ||||
Returns: | Returns: | ||||
str: the intrinsic identifier for `snapshot` | str: the intrinsic identifier for `snapshot` | ||||
""" | """ | ||||
manifest = snapshot_manifest(snapshot, ignore_unresolved=ignore_unresolved) | git_object = snapshot_git_object(snapshot, ignore_unresolved=ignore_unresolved) | ||||
return identifier_to_str(hash_git_data(manifest, "snapshot")) | return _git_object_to_identifier_str(git_object) | ||||
def snapshot_manifest( | def snapshot_git_object( | ||||
snapshot: Dict[str, Any], *, ignore_unresolved: bool = False | snapshot: Dict[str, Any], *, ignore_unresolved: bool = False | ||||
) -> bytes: | ) -> bytes: | ||||
"""Formats the manifest of a revision. See :func:`snapshot_identifier` for details | """Formats the git_object of a revision. See :func:`snapshot_identifier` for details | ||||
on the format.""" | on the format.""" | ||||
unresolved = [] | unresolved = [] | ||||
lines = [] | lines = [] | ||||
for name, target in sorted(snapshot["branches"].items()): | for name, target in sorted(snapshot["branches"].items()): | ||||
if not target: | if not target: | ||||
target_type = b"dangling" | target_type = b"dangling" | ||||
target_id = b"" | target_id = b"" | ||||
Show All 19 Lines | ) -> bytes: | ||||
if unresolved and not ignore_unresolved: | if unresolved and not ignore_unresolved: | ||||
raise ValueError( | raise ValueError( | ||||
"Branch aliases unresolved: %s" | "Branch aliases unresolved: %s" | ||||
% ", ".join("%r -> %r" % x for x in unresolved), | % ", ".join("%r -> %r" % x for x in unresolved), | ||||
unresolved, | unresolved, | ||||
) | ) | ||||
return b"".join(lines) | return format_list_git_object("snapshot", lines) | ||||
def origin_identifier(origin): | def origin_identifier(origin): | ||||
"""Return the intrinsic identifier for an origin. | """Return the intrinsic identifier for an origin. | ||||
An origin's identifier is the sha1 checksum of the entire origin URL | An origin's identifier is the sha1 checksum of the entire origin URL | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def raw_extrinsic_metadata_identifier(metadata: Dict[str, Any]) -> str: | ||||
Newlines in $Bytes, $Str, and $Iri are escaped as with other git fields, | Newlines in $Bytes, $Str, and $Iri are escaped as with other git fields, | ||||
ie. by adding a space after them. | ie. by adding a space after them. | ||||
Returns: | Returns: | ||||
str: the intrinsic identifier for ``metadata`` | str: the intrinsic identifier for ``metadata`` | ||||
""" | """ | ||||
manifest = raw_extrinsic_metadata_manifest(metadata) | git_object = raw_extrinsic_metadata_git_object(metadata) | ||||
return identifier_to_str(hash_git_data(manifest, "raw_extrinsic_metadata")) | return _git_object_to_identifier_str(git_object) | ||||
def raw_extrinsic_metadata_manifest(metadata: Dict[str, Any]) -> bytes: | def raw_extrinsic_metadata_git_object(metadata: Dict[str, Any]) -> bytes: | ||||
"""Formats the manifest of a raw_extrinsic_metadata object. | """Formats the git_object of a raw_extrinsic_metadata object. | ||||
See :func:`raw_extrinsic_metadata_identifier` for details | See :func:`raw_extrinsic_metadata_identifier` for details | ||||
on the format.""" | on the format.""" | ||||
# equivalent to using math.floor(dt.timestamp()) to round down, | # equivalent to using math.floor(dt.timestamp()) to round down, | ||||
# as int(dt.timestamp()) rounds toward zero, | # as int(dt.timestamp()) rounds toward zero, | ||||
# which would map two seconds on the 0 timestamp. | # which would map two seconds on the 0 timestamp. | ||||
# | # | ||||
# This should never be an issue in practice as Software Heritage didn't | # This should never be an issue in practice as Software Heritage didn't | ||||
# start collecting metadata before 2015. | # start collecting metadata before 2015. | ||||
Show All 32 Lines | ): | ||||
value: bytes | value: bytes | ||||
if key == "path": | if key == "path": | ||||
value = metadata[key] | value = metadata[key] | ||||
else: | else: | ||||
value = str(metadata[key]).encode() | value = str(metadata[key]).encode() | ||||
headers.append((key.encode("ascii"), value)) | headers.append((key.encode("ascii"), value)) | ||||
return format_manifest(headers, metadata["metadata"]) | return format_git_object("raw_extrinsic_metadata", headers, metadata["metadata"]) | ||||
def extid_identifier(extid: Dict[str, Any]) -> str: | def extid_identifier(extid: Dict[str, Any]) -> str: | ||||
"""Return the intrinsic identifier for an ExtID object. | """Return the intrinsic identifier for an ExtID object. | ||||
An ExtID identifier is a salted sha1 (using the git hashing algorithm with | An ExtID identifier is a salted sha1 (using the git hashing algorithm with | ||||
the ``extid`` object type) of a manifest following the format: | the ``extid`` object type) of a manifest following the format: | ||||
Show All 14 Lines | def extid_identifier(extid: Dict[str, Any]) -> str: | ||||
""" | """ | ||||
headers = [ | headers = [ | ||||
(b"extid_type", extid["extid_type"].encode("ascii")), | (b"extid_type", extid["extid_type"].encode("ascii")), | ||||
(b"extid", extid["extid"]), | (b"extid", extid["extid"]), | ||||
(b"target", str(extid["target"]).encode("ascii")), | (b"target", str(extid["target"]).encode("ascii")), | ||||
] | ] | ||||
manifest = format_manifest(headers) | git_object = format_git_object("extid", headers) | ||||
return identifier_to_str(hash_git_data(manifest, "extid")) | return _git_object_to_identifier_str(git_object) | ||||
# type of the "object_type" attribute of the SWHID class; either | # type of the "object_type" attribute of the SWHID class; either | ||||
# ObjectType or ExtendedObjectType | # ObjectType or ExtendedObjectType | ||||
_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) | _TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) | ||||
# the SWHID class itself (this is used so that X.from_string() can return X | # the SWHID class itself (this is used so that X.from_string() can return X | ||||
# for all X subclass of _BaseSWHID) | # for all X subclass of _BaseSWHID) | ||||
▲ Show 20 Lines • Show All 379 Lines • Show Last 20 Lines |
Importing underscore functions looks like poor style. I guess the git_header function could just be public (and be called git_object_header).