Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/model.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from __future__ import annotations | |||||
from datetime import datetime | from datetime import datetime | ||||
from typing import Iterable, Iterator, List, Optional | from typing import Iterable, Iterator, List, Optional | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .archive import ArchiveInterface | from .archive import ArchiveInterface | ||||
class OriginEntry: | class OriginEntry: | ||||
def __init__(self, url: str, snapshot: Sha1Git): | def __init__(self, url: str, snapshot: Sha1Git) -> None: | ||||
self.url = url | self.url = url | ||||
self.id: Sha1Git = hash_to_bytes(origin_identifier({"url": self.url})) | self.id: Sha1Git = hash_to_bytes(origin_identifier({"url": self.url})) | ||||
self.snapshot = snapshot | self.snapshot = snapshot | ||||
self._revisions: Optional[List[RevisionEntry]] = None | self._revisions: Optional[List[RevisionEntry]] = None | ||||
def retrieve_revisions(self, archive: ArchiveInterface): | def retrieve_revisions(self, archive: ArchiveInterface) -> None: | ||||
if self._revisions is None: | if self._revisions is None: | ||||
self._revisions = [ | self._revisions = [ | ||||
RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) | RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) | ||||
] | ] | ||||
@property | @property | ||||
def revisions(self) -> Iterator["RevisionEntry"]: | def revisions(self) -> Iterator[RevisionEntry]: | ||||
if self._revisions is None: | if self._revisions is None: | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"Revisions of this node has not yet been retrieved. " | "Revisions of this node has not yet been retrieved. " | ||||
"Please call retrieve_revisions() before using this property." | "Please call retrieve_revisions() before using this property." | ||||
) | ) | ||||
return (x for x in self._revisions) | return (x for x in self._revisions) | ||||
def __str__(self): | def __str__(self) -> str: | ||||
return f"<MOrigin[{self.id.hex()}] url={self.url}, snap={self.snapshot.hex()}>" | return f"<MOrigin[{self.id.hex()}] url={self.url}, snap={self.snapshot.hex()}>" | ||||
class RevisionEntry: | class RevisionEntry: | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
id: Sha1Git, | id: Sha1Git, | ||||
date: Optional[datetime] = None, | date: Optional[datetime] = None, | ||||
root: Optional[Sha1Git] = None, | root: Optional[Sha1Git] = None, | ||||
parents: Optional[Iterable[Sha1Git]] = None, | parents: Optional[Iterable[Sha1Git]] = None, | ||||
): | ) -> None: | ||||
self.id = id | self.id = id | ||||
self.date = date | self.date = date | ||||
assert self.date is None or self.date.tzinfo is not None | assert self.date is None or self.date.tzinfo is not None | ||||
self.root = root | self.root = root | ||||
self._parents_ids = parents | self._parents_ids = parents | ||||
self._parents_entries: Optional[List[RevisionEntry]] = None | self._parents_entries: Optional[List[RevisionEntry]] = None | ||||
def retrieve_parents(self, archive: ArchiveInterface): | def retrieve_parents(self, archive: ArchiveInterface) -> None: | ||||
if self._parents_entries is None: | if self._parents_entries is None: | ||||
if self._parents_ids is None: | if self._parents_ids is None: | ||||
self._parents_ids = archive.revision_get_parents(self.id) | self._parents_ids = archive.revision_get_parents(self.id) | ||||
self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] | self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] | ||||
@property | @property | ||||
def parents(self) -> Iterator["RevisionEntry"]: | def parents(self) -> Iterator[RevisionEntry]: | ||||
if self._parents_entries is None: | if self._parents_entries is None: | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"Parents of this node has not yet been retrieved. " | "Parents of this node has not yet been retrieved. " | ||||
"Please call retrieve_parents() before using this property." | "Please call retrieve_parents() before using this property." | ||||
) | ) | ||||
return (x for x in self._parents_entries) | return (x for x in self._parents_entries) | ||||
def __str__(self): | def __str__(self) -> str: | ||||
return f"<MRevision[{self.id.hex()}]>" | return f"<MRevision[{self.id.hex()}]>" | ||||
def __eq__(self, other): | def __eq__(self, other) -> bool: | ||||
return isinstance(other, RevisionEntry) and self.id == other.id | return isinstance(other, RevisionEntry) and self.id == other.id | ||||
def __hash__(self): | def __hash__(self) -> int: | ||||
return hash(self.id) | return hash(self.id) | ||||
class DirectoryEntry: | class DirectoryEntry: | ||||
def __init__(self, id: Sha1Git, name: bytes = b""): | def __init__(self, id: Sha1Git, name: bytes = b"") -> None: | ||||
self.id = id | self.id = id | ||||
self.name = name | self.name = name | ||||
self._files: Optional[List[FileEntry]] = None | self._files: Optional[List[FileEntry]] = None | ||||
self._dirs: Optional[List[DirectoryEntry]] = None | self._dirs: Optional[List[DirectoryEntry]] = None | ||||
def retrieve_children(self, archive: ArchiveInterface): | def retrieve_children(self, archive: ArchiveInterface) -> None: | ||||
if self._files is None and self._dirs is None: | if self._files is None and self._dirs is None: | ||||
self._files = [] | self._files = [] | ||||
self._dirs = [] | self._dirs = [] | ||||
for child in archive.directory_ls(self.id): | for child in archive.directory_ls(self.id): | ||||
if child["type"] == "dir": | if child["type"] == "dir": | ||||
self._dirs.append( | self._dirs.append( | ||||
DirectoryEntry(child["target"], name=child["name"]) | DirectoryEntry(child["target"], name=child["name"]) | ||||
) | ) | ||||
elif child["type"] == "file": | elif child["type"] == "file": | ||||
self._files.append(FileEntry(child["target"], child["name"])) | self._files.append(FileEntry(child["target"], child["name"])) | ||||
@property | @property | ||||
def files(self) -> Iterator["FileEntry"]: | def files(self) -> Iterator[FileEntry]: | ||||
if self._files is None: | if self._files is None: | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"Children of this node has not yet been retrieved. " | "Children of this node has not yet been retrieved. " | ||||
"Please call retrieve_children() before using this property." | "Please call retrieve_children() before using this property." | ||||
) | ) | ||||
return (x for x in self._files) | return (x for x in self._files) | ||||
@property | @property | ||||
def dirs(self) -> Iterator["DirectoryEntry"]: | def dirs(self) -> Iterator[DirectoryEntry]: | ||||
if self._dirs is None: | if self._dirs is None: | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"Children of this node has not yet been retrieved. " | "Children of this node has not yet been retrieved. " | ||||
"Please call retrieve_children() before using this property." | "Please call retrieve_children() before using this property." | ||||
) | ) | ||||
return (x for x in self._dirs) | return (x for x in self._dirs) | ||||
def __str__(self): | def __str__(self) -> str: | ||||
return f"<MDirectory[{self.id.hex()}] {self.name}>" | return f"<MDirectory[{self.id.hex()}] {self.name!r}>" | ||||
def __eq__(self, other): | def __eq__(self, other) -> bool: | ||||
return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( | return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( | ||||
other.id, | other.id, | ||||
other.name, | other.name, | ||||
) | ) | ||||
def __hash__(self): | def __hash__(self) -> int: | ||||
return hash((self.id, self.name)) | return hash((self.id, self.name)) | ||||
class FileEntry: | class FileEntry: | ||||
def __init__(self, id: Sha1Git, name: bytes): | def __init__(self, id: Sha1Git, name: bytes) -> None: | ||||
self.id = id | self.id = id | ||||
self.name = name | self.name = name | ||||
def __str__(self): | def __str__(self) -> str: | ||||
return f"<MFile[{self.id.hex()}] {self.name}>" | return f"<MFile[{self.id.hex()}] {self.name!r}>" | ||||
def __eq__(self, other): | def __eq__(self, other) -> bool: | ||||
return isinstance(other, FileEntry) and (self.id, self.name) == ( | return isinstance(other, FileEntry) and (self.id, self.name) == ( | ||||
other.id, | other.id, | ||||
other.name, | other.name, | ||||
) | ) | ||||
def __hash__(self): | def __hash__(self) -> int: | ||||
return hash((self.id, self.name)) | return hash((self.id, self.name)) |