Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/from_disk.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||||||
import os | import os | ||||||||||||||||||||
from collections import deque | from collections import deque | ||||||||||||||||||||
from datetime import datetime, timezone | from datetime import datetime, timezone | ||||||||||||||||||||
from shutil import rmtree | from shutil import rmtree | ||||||||||||||||||||
from tempfile import mkdtemp | from tempfile import mkdtemp | ||||||||||||||||||||
from typing import Any, Deque, Dict, Optional, Tuple, TypeVar, Union | from typing import Any, Deque, Dict, List, Optional, Tuple, TypeVar, Union | ||||||||||||||||||||
import dateutil | import dateutil | ||||||||||||||||||||
from swh.core.config import merge_configs | from swh.core.config import merge_configs | ||||||||||||||||||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||||||||||||||||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||||||||||||||||||
from swh.model.from_disk import Content, DentryPerms, Directory | from swh.model.from_disk import Content, DentryPerms, Directory | ||||||||||||||||||||
from swh.model.hashutil import MultiHash, hash_to_bytehex | from swh.model.hashutil import MultiHash, hash_to_bytehex, hash_to_bytes | ||||||||||||||||||||
from swh.model.model import Content as ModelContent | from swh.model.model import Content as ModelContent | ||||||||||||||||||||
from swh.model.model import ( | from swh.model.model import ( | ||||||||||||||||||||
ObjectType, | ObjectType, | ||||||||||||||||||||
Origin, | Origin, | ||||||||||||||||||||
Person, | Person, | ||||||||||||||||||||
Release, | Release, | ||||||||||||||||||||
Revision, | Revision, | ||||||||||||||||||||
RevisionType, | RevisionType, | ||||||||||||||||||||
Sha1Git, | Sha1Git, | ||||||||||||||||||||
Snapshot, | Snapshot, | ||||||||||||||||||||
SnapshotBranch, | SnapshotBranch, | ||||||||||||||||||||
TargetType, | TargetType, | ||||||||||||||||||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
from swh.storage.algos.snapshot import snapshot_get_latest | |||||||||||||||||||||
from . import hgutil | from . import hgutil | ||||||||||||||||||||
from .archive_extract import tmp_extract | from .archive_extract import tmp_extract | ||||||||||||||||||||
from .hgutil import HgNodeId | from .hgutil import HgNodeId | ||||||||||||||||||||
FLAG_PERMS = { | FLAG_PERMS = { | ||||||||||||||||||||
b"l": DentryPerms.symlink, | b"l": DentryPerms.symlink, | ||||||||||||||||||||
b"x": DentryPerms.executable_content, | b"x": DentryPerms.executable_content, | ||||||||||||||||||||
▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | ): | ||||||||||||||||||||
# it is used for differential tree update by store_directories | # it is used for differential tree update by store_directories | ||||||||||||||||||||
self._last_root = HgDirectory() | self._last_root = HgDirectory() | ||||||||||||||||||||
# Cache the content hash across revisions to avoid recalculation. | # Cache the content hash across revisions to avoid recalculation. | ||||||||||||||||||||
self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( | self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( | ||||||||||||||||||||
self.config["content_cache_size"], | self.config["content_cache_size"], | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
# hg node id of the latest snapshot branch heads | |||||||||||||||||||||
# used to find what are the new revisions since last snapshot | |||||||||||||||||||||
self._latest_heads: List[HgNodeId] = [] | |||||||||||||||||||||
self._load_status = "eventful" | |||||||||||||||||||||
def pre_cleanup(self) -> None: | def pre_cleanup(self) -> None: | ||||||||||||||||||||
"""As a first step, will try and check for dangling data to cleanup. | """As a first step, will try and check for dangling data to cleanup. | ||||||||||||||||||||
This should do its best to avoid raising issues. | This should do its best to avoid raising issues. | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
clean_dangling_folders( | clean_dangling_folders( | ||||||||||||||||||||
self._temp_directory, | self._temp_directory, | ||||||||||||||||||||
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | ||||||||||||||||||||
Show All 14 Lines | def prepare_origin_visit(self, *args, **kwargs) -> None: | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
self.origin = Origin(url=self.origin_url) | self.origin = Origin(url=self.origin_url) | ||||||||||||||||||||
def prepare(self, *args, **kwargs) -> None: | def prepare(self, *args, **kwargs) -> None: | ||||||||||||||||||||
"""Second step executed by the loader to prepare some state needed by | """Second step executed by the loader to prepare some state needed by | ||||||||||||||||||||
the loader. | the loader. | ||||||||||||||||||||
""" | """ | ||||||||||||||||||||
# Set here to allow multiple calls to load on the same loader instance | |||||||||||||||||||||
self._latest_heads = [] | |||||||||||||||||||||
latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) | |||||||||||||||||||||
if latest_snapshot: | |||||||||||||||||||||
snapshot_branches = [ | |||||||||||||||||||||
branch.target | |||||||||||||||||||||
for branch in latest_snapshot.branches.values() | |||||||||||||||||||||
if branch.target_type != TargetType.ALIAS | |||||||||||||||||||||
] | |||||||||||||||||||||
revisions = [ | |||||||||||||||||||||
revision | |||||||||||||||||||||
for revision in self.storage.revision_get(snapshot_branches) | |||||||||||||||||||||
if revision | |||||||||||||||||||||
] | |||||||||||||||||||||
self._latest_heads = [ | |||||||||||||||||||||
hash_to_bytes(revision.metadata["node"]) | |||||||||||||||||||||
for revision in revisions | |||||||||||||||||||||
if revision.metadata | |||||||||||||||||||||
ardumont: can't we make that one for comprehension instead of 2?
```
self._latest_heads =… | |||||||||||||||||||||
Done Inline ActionsMuch better, thanks. I saw the patch was accepted and just rebased it, I should have paid more attention. :) Alphare: Much better, thanks. I saw the patch was accepted and just rebased it, I should have paid more… | |||||||||||||||||||||
Not Done Inline Actions
same proposal with the diff format... (neat trick) the phabricator suggestion edit. ardumont: same proposal with the diff format...
(neat trick) the phabricator suggestion edit. | |||||||||||||||||||||
] | |||||||||||||||||||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||||||||||||||||||
"""Fetch the data from the source the loader is currently loading | """Fetch the data from the source the loader is currently loading | ||||||||||||||||||||
Returns: | Returns: | ||||||||||||||||||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, fetch_data needs | ||||||||||||||||||||
to be called again to complete loading. | to be called again to complete loading. | ||||||||||||||||||||
Show All 13 Lines | def fetch_data(self) -> bool: | ||||||||||||||||||||
# Allow to load on disk repository without cloning | # Allow to load on disk repository without cloning | ||||||||||||||||||||
# for testing purpose. | # for testing purpose. | ||||||||||||||||||||
self._repo_directory = self.directory | self._repo_directory = self.directory | ||||||||||||||||||||
self._repo = hgutil.repository(self._repo_directory) | self._repo = hgutil.repository(self._repo_directory) | ||||||||||||||||||||
return False | return False | ||||||||||||||||||||
def get_hg_revs_to_load(self): | |||||||||||||||||||||
douarddaUnsubmitted Done Inline Actionsshould add type annotation douardda: should add type annotation | |||||||||||||||||||||
"""Return the hg revision numbers to load.""" | |||||||||||||||||||||
Not Done Inline Actions
ardumont: | |||||||||||||||||||||
if self._latest_heads: | |||||||||||||||||||||
existing_heads = [] # heads that still exist in the repository | |||||||||||||||||||||
for hg_nodeid in self._latest_heads: | |||||||||||||||||||||
try: | |||||||||||||||||||||
rev = self._repo[hg_nodeid].rev() | |||||||||||||||||||||
existing_heads.append(rev) | |||||||||||||||||||||
except KeyError: # the node does not exists anymore | |||||||||||||||||||||
Not Done Inline Actions
ardumont: | |||||||||||||||||||||
pass | |||||||||||||||||||||
# select revisions that are not ancestors of heads | |||||||||||||||||||||
# and not the heads themselves | |||||||||||||||||||||
new_revs = self._repo.revs("not ::(%ld)", existing_heads) | |||||||||||||||||||||
# for now, reload all revisions if there are new commits | |||||||||||||||||||||
# otherwise the loader will crash on missing parents | |||||||||||||||||||||
# incremental loading will come in next commits | |||||||||||||||||||||
if len(new_revs) == 0: | |||||||||||||||||||||
return new_revs | |||||||||||||||||||||
else: | |||||||||||||||||||||
return self._repo.revs("all()") | |||||||||||||||||||||
else: | |||||||||||||||||||||
return self._repo.revs("all()") | |||||||||||||||||||||
def store_data(self): | def store_data(self): | ||||||||||||||||||||
"""Store fetched data in the database.""" | """Store fetched data in the database.""" | ||||||||||||||||||||
for rev in self._repo: | revs = self.get_hg_revs_to_load() | ||||||||||||||||||||
if len(revs) == 0: | |||||||||||||||||||||
douarddaUnsubmitted Done Inline Actionsnit: why not if not revs: ? douardda: nit: why not `if not revs:` ? | |||||||||||||||||||||
acezarUnsubmitted Done Inline ActionsWas not sure __bool__ / __nonzero__ was implemented by the class acezar: Was not sure `__bool__` / ` __nonzero__` was implemented by the class | |||||||||||||||||||||
self._load_status = "uneventful" | |||||||||||||||||||||
return | |||||||||||||||||||||
for rev in revs: | |||||||||||||||||||||
self.store_revision(self._repo[rev]) | self.store_revision(self._repo[rev]) | ||||||||||||||||||||
branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { | branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { | ||||||||||||||||||||
hg_nodeid: name for name, hg_nodeid in hgutil.branches(self._repo).items() | hg_nodeid: name for name, hg_nodeid in hgutil.branches(self._repo).items() | ||||||||||||||||||||
} | } | ||||||||||||||||||||
tags_by_name: Dict[bytes, HgNodeId] = self._repo.tags() | tags_by_name: Dict[bytes, HgNodeId] = self._repo.tags() | ||||||||||||||||||||
tags_by_hg_nodeid: Dict[HgNodeId, bytes] = { | tags_by_hg_nodeid: Dict[HgNodeId, bytes] = { | ||||||||||||||||||||
hg_nodeid: name for name, hg_nodeid in tags_by_name.items() | hg_nodeid: name for name, hg_nodeid in tags_by_name.items() | ||||||||||||||||||||
Show All 26 Lines | def store_data(self): | ||||||||||||||||||||
) | ) | ||||||||||||||||||||
snapshot = Snapshot(branches=snapshot_branches) | snapshot = Snapshot(branches=snapshot_branches) | ||||||||||||||||||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||||||||||||||||||
self.flush() | self.flush() | ||||||||||||||||||||
self.loaded_snapshot_id = snapshot.id | self.loaded_snapshot_id = snapshot.id | ||||||||||||||||||||
def load_status(self) -> Dict[str, str]: | |||||||||||||||||||||
"""Detailed loading status. | |||||||||||||||||||||
Defaults to logging an eventful load. | |||||||||||||||||||||
Returns: a dictionary that is eventually passed back as the task's | |||||||||||||||||||||
result to the scheduler, allowing tuning of the task recurrence | |||||||||||||||||||||
mechanism. | |||||||||||||||||||||
""" | |||||||||||||||||||||
return { | |||||||||||||||||||||
"status": self._load_status, | |||||||||||||||||||||
} | |||||||||||||||||||||
def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: | def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: | ||||||||||||||||||||
"""Return the swhid of a revision given its hg nodeid. | """Return the swhid of a revision given its hg nodeid. | ||||||||||||||||||||
Args: | Args: | ||||||||||||||||||||
hg_nodeid: the hg nodeid of the revision. | hg_nodeid: the hg nodeid of the revision. | ||||||||||||||||||||
Returns: | Returns: | ||||||||||||||||||||
the swhid of the revision. | the swhid of the revision. | ||||||||||||||||||||
▲ Show 20 Lines • Show All 254 Lines • Show Last 20 Lines |
can't we make that one for comprehension instead of 2?