Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/model.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
from typing import Iterable, Iterator, List, Optional | from typing import Iterable, Iterator, List, Optional | ||||
from swh.model.hashutil import hash_to_bytes | |||||
from swh.model.identifiers import origin_identifier | |||||
from swh.model.model import Sha1Git | |||||
from .archive import ArchiveInterface | from .archive import ArchiveInterface | ||||
class OriginEntry: | class OriginEntry: | ||||
def __init__(self, url: str, date: datetime, snapshot: bytes): | def __init__(self, url: str, snapshot: Sha1Git): | ||||
self.url = url | self.url = url | ||||
# TODO: this is probably not needed and will be removed! | self.id: Sha1Git = hash_to_bytes(origin_identifier({"url": self.url})) | ||||
# self.date = date | |||||
self.snapshot = snapshot | self.snapshot = snapshot | ||||
self._revisions: Optional[List[RevisionEntry]] = None | self._revisions: Optional[List[RevisionEntry]] = None | ||||
def retrieve_revisions(self, archive: ArchiveInterface): | def retrieve_revisions(self, archive: ArchiveInterface): | ||||
if self._revisions is None: | if self._revisions is None: | ||||
self._revisions = [ | self._revisions = [ | ||||
RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) | RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) | ||||
] | ] | ||||
@property | @property | ||||
def revisions(self) -> Iterator["RevisionEntry"]: | def revisions(self) -> Iterator["RevisionEntry"]: | ||||
if self._revisions is None: | if self._revisions is None: | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"Revisions of this node has not yet been retrieved. " | "Revisions of this node has not yet been retrieved. " | ||||
"Please call retrieve_revisions() before using this property." | "Please call retrieve_revisions() before using this property." | ||||
) | ) | ||||
return (x for x in self._revisions) | return (x for x in self._revisions) | ||||
def __str__(self): | def __str__(self): | ||||
return ( | return f"<MOrigin[{self.id.hex()}] url={self.url}, snap={self.snapshot.hex()}>" | ||||
f"<MOrigin[{self.url}] " | |||||
f"snap={self.snapshot.hex()}, date={self.date.isoformat()}>" | |||||
) | |||||
class RevisionEntry: | class RevisionEntry: | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
id: bytes, | id: Sha1Git, | ||||
date: Optional[datetime] = None, | date: Optional[datetime] = None, | ||||
root: Optional[bytes] = None, | root: Optional[Sha1Git] = None, | ||||
parents: Optional[Iterable[bytes]] = None, | parents: Optional[Iterable[Sha1Git]] = None, | ||||
): | ): | ||||
self.id = id | self.id = id | ||||
self.date = date | self.date = date | ||||
assert self.date is None or self.date.tzinfo is not None | assert self.date is None or self.date.tzinfo is not None | ||||
self.root = root | self.root = root | ||||
self._parents_ids = parents | self._parents_ids = parents | ||||
self._parents_entries: Optional[List[RevisionEntry]] = None | self._parents_entries: Optional[List[RevisionEntry]] = None | ||||
Show All 29 Lines | class RevisionEntry: | ||||
def __str__(self): | def __str__(self): | ||||
return ( | return ( | ||||
f"<MRevision[{self.id.hex()}] " | f"<MRevision[{self.id.hex()}] " | ||||
f"date={self.date.isoformat()}, root={self.root.hex()}>" | f"date={self.date.isoformat()}, root={self.root.hex()}>" | ||||
) | ) | ||||
class DirectoryEntry: | class DirectoryEntry: | ||||
def __init__(self, id: bytes, name: bytes = b""): | def __init__(self, id: Sha1Git, name: bytes = b""): | ||||
self.id = id | self.id = id | ||||
self.name = name | self.name = name | ||||
self._files: Optional[List[FileEntry]] = None | self._files: Optional[List[FileEntry]] = None | ||||
self._dirs: Optional[List[DirectoryEntry]] = None | self._dirs: Optional[List[DirectoryEntry]] = None | ||||
def retrieve_children(self, archive: ArchiveInterface): | def retrieve_children(self, archive: ArchiveInterface): | ||||
if self._files is None and self._dirs is None: | if self._files is None and self._dirs is None: | ||||
self._files = [] | self._files = [] | ||||
Show All 33 Lines | def __eq__(self, other): | ||||
other.name, | other.name, | ||||
) | ) | ||||
def __hash__(self): | def __hash__(self): | ||||
return hash((self.id, self.name)) | return hash((self.id, self.name)) | ||||
class FileEntry: | class FileEntry: | ||||
def __init__(self, id: bytes, name: bytes): | def __init__(self, id: Sha1Git, name: bytes): | ||||
self.id = id | self.id = id | ||||
self.name = name | self.name = name | ||||
def __str__(self): | def __str__(self): | ||||
return f"<MFile[{self.id.hex()}] {self.name}>" | return f"<MFile[{self.id.hex()}] {self.name}>" | ||||
def __eq__(self, other): | def __eq__(self, other): | ||||
return isinstance(other, FileEntry) and (self.id, self.name) == ( | return isinstance(other, FileEntry) and (self.id, self.name) == ( | ||||
other.id, | other.id, | ||||
other.name, | other.name, | ||||
) | ) | ||||
def __hash__(self): | def __hash__(self): | ||||
return hash((self.id, self.name)) | return hash((self.id, self.name)) |