diff --git a/swh/loader/mercurial/from_disk.py b/swh/loader/mercurial/from_disk.py --- a/swh/loader/mercurial/from_disk.py +++ b/swh/loader/mercurial/from_disk.py @@ -8,13 +8,13 @@ import os from shutil import rmtree from tempfile import mkdtemp -from typing import Deque, Dict, Optional, Tuple, TypeVar, Union +from typing import Deque, Dict, List, Optional, Tuple, TypeVar, Union from swh.loader.core.loader import BaseLoader from swh.loader.core.utils import clean_dangling_folders from swh.loader.mercurial.utils import parse_visit_date from swh.model.from_disk import Content, DentryPerms, Directory -from swh.model.hashutil import MultiHash, hash_to_bytehex +from swh.model.hashutil import MultiHash, hash_to_bytehex, hash_to_bytes from swh.model.model import ( ObjectType, Origin, @@ -29,11 +29,12 @@ TimestampWithTimezone, ) from swh.model.model import Content as ModelContent +from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from . import hgutil from .archive_extract import tmp_extract -from .hgutil import HgNodeId +from .hgutil import HgFilteredSet, HgNodeId, HgSpanSet FLAG_PERMS = { b"l": DentryPerms.symlink, @@ -146,6 +147,12 @@ content_cache_size, ) + # hg node id of the latest snapshot branch heads + # used to find what are the new revisions since last snapshot + self._latest_heads: List[HgNodeId] = [] + + self._load_status = "eventful" + def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. @@ -176,6 +183,26 @@ the loader. """ + # Set here to allow multiple calls to load on the same loader instance + self._latest_heads = [] + + latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) + if latest_snapshot: + snapshot_branches = [ + branch.target + for branch in latest_snapshot.branches.values() + if branch.target_type != TargetType.ALIAS + ] + revisions = [ + revision + for revision in self.storage.revision_get(snapshot_branches) + if revision + ] + self._latest_heads = [ + hash_to_bytes(revision.metadata["node"]) + for revision in revisions + if revision.metadata + ] def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading @@ -205,9 +232,40 @@ return False + def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: + """Return the hg revision numbers to load.""" + repo: hgutil.Repository = self._repo # mypy can't infer that repo is not None + if self._latest_heads: + existing_heads = [] # heads that still exist in the repository + for hg_nodeid in self._latest_heads: + try: + rev = repo[hg_nodeid].rev() + existing_heads.append(rev) + except KeyError: # the node does not exists anymore + pass + + # select revisions that are not ancestors of heads + # and not the heads themselves + new_revs = repo.revs("not ::(%ld)", existing_heads) + + # for now, reload all revisions if there are new commits + # otherwise the loader will crash on missing parents + # incremental loading will come in next commits + if new_revs: + return repo.revs("all()") + else: + return new_revs + else: + return repo.revs("all()") + def store_data(self): """Store fetched data in the database.""" - for rev in self._repo: + revs = self.get_hg_revs_to_load() + if not revs: + self._load_status = "uneventful" + return + + for rev in revs: self.store_revision(self._repo[rev]) branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { @@ -250,6 +308,19 @@ self.flush() self.loaded_snapshot_id = snapshot.id + def load_status(self) -> Dict[str, str]: + """Detailed loading status. + + Defaults to logging an eventful load. + + Returns: a dictionary that is eventually passed back as the task's + result to the scheduler, allowing tuning of the task recurrence + mechanism. + """ + return { + "status": self._load_status, + } + def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: """Return the swhid of a revision given its hg nodeid. diff --git a/swh/loader/mercurial/hgutil.py b/swh/loader/mercurial/hgutil.py --- a/swh/loader/mercurial/hgutil.py +++ b/swh/loader/mercurial/hgutil.py @@ -5,13 +5,15 @@ # The internal Mercurial API is not guaranteed to be stable. import mercurial.ui # type: ignore -from mercurial import context, hg, util +from mercurial import context, hg, smartset, util NULLID = mercurial.node.nullid HgNodeId = NewType("HgNodeId", bytes) Repository = hg.localrepo BaseContext = context.basectx LRUCacheDict = util.lrucachedict +HgSpanSet = smartset._spanset +HgFilteredSet = smartset.filteredset def repository(path: str) -> hg.localrepo: diff --git a/swh/loader/mercurial/tests/test_from_disk.py b/swh/loader/mercurial/tests/test_from_disk.py --- a/swh/loader/mercurial/tests/test_from_disk.py +++ b/swh/loader/mercurial/tests/test_from_disk.py @@ -246,3 +246,38 @@ assert len(hg_changesets) > 0 assert len(transplant_sources) > 0 assert transplant_sources.issubset(hg_changesets) + + +def test_load_unchanged_repo_should_be_uneventfull( + swh_config, swh_storage, datadir, tmp_path +): + archive_name = "hello" + archive_path = os.path.join(datadir, f"{archive_name}.tgz") + repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) + repo_path = repo_url.replace("file://", "") + + loader = HgLoaderFromDisk(swh_storage, repo_path) + + assert loader.load() == {"status": "eventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 1, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + } + + assert loader.load() == {"status": "uneventful"} + assert get_stats(loader.storage) == { + "content": 3, + "directory": 3, + "origin": 1, + "origin_visit": 2, + "release": 1, + "revision": 3, + "skipped_content": 0, + "snapshot": 1, + }