Changeset View
Standalone View
swh/loader/mercurial/from_disk.py
# Copyright (C) 2020-2021 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import deque | from collections import deque | ||||
from datetime import datetime | from datetime import datetime | ||||
import os | import os | ||||
from shutil import rmtree | from shutil import rmtree | ||||
from tempfile import mkdtemp | from tempfile import mkdtemp | ||||
from typing import Deque, Dict, List, Optional, Tuple, TypeVar, Union | from typing import Deque, Dict, List, Optional, Set, Tuple, TypeVar, Union | ||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from swh.loader.mercurial.utils import get_minimum_env, parse_visit_date | from swh.loader.mercurial.utils import get_minimum_env, parse_visit_date | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.from_disk import Content, DentryPerms, Directory | from swh.model.from_disk import Content, DentryPerms, Directory | ||||
from swh.model.hashutil import hash_to_bytehex | from swh.model.hashutil import hash_to_bytehex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
▲ Show 20 Lines • Show All 142 Lines • ▼ Show 20 Lines | ): | ||||
# Cache the content hash across revisions to avoid recalculation. | # Cache the content hash across revisions to avoid recalculation. | ||||
self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( | self._content_hash_cache: hgutil.LRUCacheDict = hgutil.LRUCacheDict( | ||||
content_cache_size, | content_cache_size, | ||||
) | ) | ||||
# hg node id of the latest snapshot branch heads | # hg node id of the latest snapshot branch heads | ||||
# used to find what are the new revisions since last snapshot | # used to find what are the new revisions since last snapshot | ||||
self._latest_heads: List[bytes] = [] | self._latest_heads: List[bytes] = [] | ||||
# hg node ids of all the tags recorded on previous runs | |||||
# Used to compute which tags need to be added, even across incremental runs | |||||
# that might separate a changeset introducing a tag from its target. | |||||
self._saved_tags: Set[bytes] = set() | |||||
self._load_status = "eventful" | self._load_status = "eventful" | ||||
# If set, will override the default value | # If set, will override the default value | ||||
self._visit_status = None | self._visit_status = None | ||||
if not in_testing_environment(): | if not in_testing_environment(): | ||||
# Do not execute in tests, since the tests already have a clean environment. | # Do not execute in tests, since the tests already have a clean environment. | ||||
# Loading multiple loaders in a test will cause other tests that rely on | # Loading multiple loaders in a test will cause other tests that rely on | ||||
Show All 40 Lines | def prepare(self) -> None: | ||||
the loader. | the loader. | ||||
""" | """ | ||||
# Set here to allow multiple calls to load on the same loader instance | # Set here to allow multiple calls to load on the same loader instance | ||||
self._latest_heads = [] | self._latest_heads = [] | ||||
latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) | latest_snapshot = snapshot_get_latest(self.storage, self.origin_url) | ||||
if latest_snapshot: | if latest_snapshot: | ||||
self._set_latest_heads(latest_snapshot) | self._set_recorded_state(latest_snapshot) | ||||
def _set_latest_heads(self, latest_snapshot: Snapshot) -> None: | def _set_recorded_state(self, latest_snapshot: Snapshot) -> None: | ||||
""" | """ | ||||
Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, | Looks up the nodeid for all revisions in the snapshot via extid_get_from_target, | ||||
and adds them to self._latest_heads. | and adds them to `self._latest_heads`. | ||||
""" | Also looks up the currently saved releases ("tags" in Mercurial speak). | ||||
# TODO: add support for releases | The tags are all listed for easy comparison at the end, while only the latest | ||||
snapshot_branches = [ | heads are needed for revisions. | ||||
branch.target | """ | ||||
for branch in latest_snapshot.branches.values() | snapshot_data: Dict[str, List[bytes]] = { | ||||
if branch.target_type == TargetType.REVISION | "heads": [], | ||||
] | "tags": [], | ||||
} | |||||
vlorentz: What about two variables, instead? (and move the `for` loop's body in a function) | |||||
AlphareAuthorUnsubmitted Done Inline ActionsSure! Alphare: Sure! | |||||
# Get all ExtIDs for revisions in the latest snapshot | for branch in latest_snapshot.branches.values(): | ||||
extids = self.storage.extid_get_from_target( | if branch.target_type == TargetType.REVISION: | ||||
identifiers.ObjectType.REVISION, snapshot_branches | snapshot_data["heads"].append(branch.target) | ||||
) | elif branch.target_type == TargetType.RELEASE: | ||||
snapshot_data["tags"].append(branch.target) | |||||
# Filter out extids not specific to Mercurial | for target, data in snapshot_data.items(): | ||||
extids = [extid for extid in extids if extid.extid_type == EXTID_TYPE] | # Get all Mercurial ExtIDs for the targets in the latest snapshot | ||||
extids = [ | |||||
extid | |||||
for extid in self.storage.extid_get_from_target( | |||||
identifiers.ObjectType.REVISION, data | |||||
) | |||||
if extid.extid_type == EXTID_TYPE | |||||
] | |||||
if extids: | if extids: | ||||
# Filter out dangling extids, we need to load their target again | # Filter out dangling extids, we need to load their target again | ||||
revisions_missing = self.storage.revision_missing( | revisions_missing = self.storage.revision_missing( | ||||
[extid.target.object_id for extid in extids] | [extid.target.object_id for extid in extids] | ||||
) | ) | ||||
extids = [ | extids = [ | ||||
extid | extid | ||||
for extid in extids | for extid in extids | ||||
if extid.target.object_id not in revisions_missing | if extid.target.object_id not in revisions_missing | ||||
] | ] | ||||
if target == "heads": | |||||
# Add the found nodeids to self.latest_heads | |||||
self._latest_heads.extend(extid.extid for extid in extids) | self._latest_heads.extend(extid.extid for extid in extids) | ||||
elif target == "tags": | |||||
self._saved_tags.update(extid.extid for extid in extids) | |||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||
"""Fetch the data from the source the loader is currently loading | """Fetch the data from the source the loader is currently loading | ||||
Returns: | Returns: | ||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, fetch_data needs | ||||
to be called again to complete loading. | to be called again to complete loading. | ||||
Show All 30 Lines | def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: | ||||
existing_heads.append(rev) | existing_heads.append(rev) | ||||
except KeyError: # the node does not exist anymore | except KeyError: # the node does not exist anymore | ||||
pass | pass | ||||
# select revisions that are not ancestors of heads | # select revisions that are not ancestors of heads | ||||
# and not the heads themselves | # and not the heads themselves | ||||
new_revs = repo.revs("not ::(%ld)", existing_heads) | new_revs = repo.revs("not ::(%ld)", existing_heads) | ||||
# for now, reload all revisions if there are new commits | |||||
# otherwise the loader will crash on missing parents | |||||
# incremental loading will come in next commits | |||||
if new_revs: | if new_revs: | ||||
return repo.revs("all()") | self.log.info(f"New revisions found: {len(new_revs)}") | ||||
vlorentzUnsubmitted Done Inline Actionsnitpick: self.log.info("New revisions found: %d", len(new_revs)), so it only formats the string if the log level is high enough for it to be printed vlorentz: nitpick: `self.log.info("New revisions found: %d", len(new_revs))`, so it only formats the… | |||||
else: | |||||
return new_revs | return new_revs | ||||
else: | else: | ||||
return repo.revs("all()") | return repo.revs("all()") | ||||
def store_data(self): | def store_data(self): | ||||
"""Store fetched data in the database.""" | """Store fetched data in the database.""" | ||||
revs = self.get_hg_revs_to_load() | revs = self.get_hg_revs_to_load() | ||||
if not revs: | if not revs: | ||||
self._load_status = "uneventful" | self._load_status = "uneventful" | ||||
Show All 13 Lines | def store_data(self): | ||||
self.log.warning("Corrupted revision %s", e) | self.log.warning("Corrupted revision %s", e) | ||||
descendents = repo.revs("(%ld)::", [rev]) | descendents = repo.revs("(%ld)::", [rev]) | ||||
blacklisted_revs.extend(descendents) | blacklisted_revs.extend(descendents) | ||||
branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { | branch_by_hg_nodeid: Dict[HgNodeId, bytes] = { | ||||
hg_nodeid: name for name, hg_nodeid in hgutil.branches(repo).items() | hg_nodeid: name for name, hg_nodeid in hgutil.branches(repo).items() | ||||
} | } | ||||
tags_by_name: Dict[bytes, HgNodeId] = repo.tags() | tags_by_name: Dict[bytes, HgNodeId] = repo.tags() | ||||
tags_by_hg_nodeid: Dict[HgNodeId, bytes] = { | |||||
hg_nodeid: name for name, hg_nodeid in tags_by_name.items() | |||||
} | |||||
snapshot_branches: Dict[bytes, SnapshotBranch] = {} | snapshot_branches: Dict[bytes, SnapshotBranch] = {} | ||||
extids = [] | for tag_name, hg_nodeid in tags_by_name.items(): | ||||
if tag_name == b"tip": | |||||
for hg_nodeid, revision_sha1git in self._revision_nodeid_to_sha1git.items(): | |||||
tag_name = tags_by_hg_nodeid.get(hg_nodeid) | |||||
# tip is listed in the tags by the mercurial api | # tip is listed in the tags by the mercurial api | ||||
# but its not a tag defined by the user in `.hgtags` | # but its not a tag defined by the user in `.hgtags` | ||||
if tag_name and tag_name != b"tip": | continue | ||||
if hg_nodeid not in self._saved_tags: | |||||
try: | |||||
revision_sha1git = self.get_revision_id_from_hg_nodeid(hg_nodeid) | |||||
except hgutil.error.RevlogError: | |||||
self.log.warning("Tag points to unknown revision %s", hg_nodeid) | |||||
vlorentzUnsubmitted Not Done Inline ActionsWould it make sense to add a test for this? vlorentz: Would it make sense to add a test for this? | |||||
AlphareAuthorUnsubmitted Done Inline ActionsThis was me being (overly?) cautious, I don't think I've ever seen it in the wild. But it wouldn't be hard to test I think, so I'll give it a shot. Alphare: This was me being (overly?) cautious, I don't think I've ever seen it in the wild. But it… | |||||
else: | |||||
snapshot_branches[tag_name] = SnapshotBranch( | snapshot_branches[tag_name] = SnapshotBranch( | ||||
target=self.store_release(tag_name, revision_sha1git), | target=self.store_release(tag_name, revision_sha1git), | ||||
target_type=TargetType.RELEASE, | target_type=TargetType.RELEASE, | ||||
) | ) | ||||
extids = [] | |||||
for hg_nodeid, revision_sha1git in self._revision_nodeid_to_sha1git.items(): | |||||
if hg_nodeid in branch_by_hg_nodeid: | if hg_nodeid in branch_by_hg_nodeid: | ||||
name = branch_by_hg_nodeid[hg_nodeid] | name = branch_by_hg_nodeid[hg_nodeid] | ||||
snapshot_branches[name] = SnapshotBranch( | snapshot_branches[name] = SnapshotBranch( | ||||
target=revision_sha1git, target_type=TargetType.REVISION, | target=revision_sha1git, target_type=TargetType.REVISION, | ||||
) | ) | ||||
# The tip is mapped to `HEAD` to match | # The tip is mapped to `HEAD` to match | ||||
# the historical implementation | # the historical implementation | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def get_revision_id_from_hg_nodeid(self, hg_nodeid: HgNodeId) -> Sha1Git: | ||||
"""Return the git sha1 of a revision given its hg nodeid. | """Return the git sha1 of a revision given its hg nodeid. | ||||
Args: | Args: | ||||
hg_nodeid: the hg nodeid of the revision. | hg_nodeid: the hg nodeid of the revision. | ||||
Returns: | Returns: | ||||
the sha1_git of the revision. | the sha1_git of the revision. | ||||
""" | """ | ||||
return self._revision_nodeid_to_sha1git[hg_nodeid] | |||||
from_cache = self._revision_nodeid_to_sha1git.get(hg_nodeid) | |||||
if from_cache is not None: | |||||
return from_cache | |||||
# The parent was not loaded in this run, get it from storage | |||||
from_storage = self.storage.extid_get_from_extid(EXTID_TYPE, ids=[hg_nodeid]) | |||||
if not from_storage: | |||||
# This can happen with corrupted revisions in otherwise working repositories | |||||
# like Pypy. Said revision will point to a non-existing parent. | |||||
# We don't raise a `CorruptedRevision` here since this one just does not | |||||
# exist, the *child* revision is corrupted. | |||||
msg = "Node not found in cache or storage: %r" % hg_nodeid | |||||
raise hgutil.error.RevlogError(msg) | |||||
vlorentzUnsubmitted Not Done Inline ActionsHmm that's inconvenient. What happens after this exception is raised? And what does the non-incremental loader do when a parent is missing? vlorentz: Hmm that's inconvenient. What happens after this exception is raised?
And what does the non… | |||||
AlphareAuthorUnsubmitted Done Inline ActionsIt's caught in get_revision_parents, and raises CorruptedRevision for the parent, which causes the whole descendants chain to be blacklisted from the run. The non-incremental loader already handled PyPy fine now that I think about it. I was going off Antoine's comment, but maybe this can't actually happen since we're catching corruptions earlier. I'll try loading PyPy and see if we come through here. Alphare: It's caught in `get_revision_parents`, and raises `CorruptedRevision` for the parent, which… | |||||
msg = "Multiple matches from storage for hg node %r" % hg_nodeid | |||||
assert len(from_storage) == 1, msg | |||||
return from_storage[0].target.object_id | |||||
def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: | def get_revision_parents(self, rev_ctx: hgutil.BaseContext) -> Tuple[Sha1Git, ...]: | ||||
"""Return the git sha1 of the parent revisions. | """Return the git sha1 of the parent revisions. | ||||
Args: | Args: | ||||
hg_nodeid: the hg nodeid of the revision. | hg_nodeid: the hg nodeid of the revision. | ||||
Returns: | Returns: | ||||
the sha1_git of the parent revisions. | the sha1_git of the parent revisions. | ||||
""" | """ | ||||
parents = [] | parents = [] | ||||
for parent_ctx in rev_ctx.parents(): | for parent_ctx in rev_ctx.parents(): | ||||
parent_hg_nodeid = parent_ctx.node() | parent_hg_nodeid = parent_ctx.node() | ||||
# nullid is the value of a parent that does not exist | # nullid is the value of a parent that does not exist | ||||
if parent_hg_nodeid == hgutil.NULLID: | if parent_hg_nodeid == hgutil.NULLID: | ||||
continue | continue | ||||
parents.append(self.get_revision_id_from_hg_nodeid(parent_hg_nodeid)) | try: | ||||
revision_id = self.get_revision_id_from_hg_nodeid(parent_hg_nodeid) | |||||
except hgutil.error.RevlogError as e: | |||||
# This node points to a non-existing parent, it's corrupted. | |||||
raise CorruptedRevision(rev_ctx.node()) from e | |||||
parents.append(revision_id) | |||||
return tuple(parents) | return tuple(parents) | ||||
def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: | def store_revision(self, rev_ctx: hgutil.BaseContext) -> None: | ||||
"""Store a revision given its hg nodeid. | """Store a revision given its hg nodeid. | ||||
Args: | Args: | ||||
rev_ctx: the he revision context. | rev_ctx: the he revision context. | ||||
▲ Show 20 Lines • Show All 245 Lines • Show Last 20 Lines |
What about two variables, instead? (and move the for loop's body in a function)