Changeset View
Standalone View
swh/loader/mercurial/from_disk.py
# Copyright (C) 2020-2021 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | |||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | |||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | |||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | |||||||||||||
from collections import deque | from collections import deque | |||||||||||||
from datetime import datetime | from datetime import datetime | |||||||||||||
import os | import os | |||||||||||||
from shutil import rmtree | from shutil import rmtree | |||||||||||||
from tempfile import mkdtemp | from tempfile import mkdtemp | |||||||||||||
from typing import Deque, Dict, List, Optional, Set, Tuple, TypeVar, Union | from typing import Deque, Dict, Iterator, List, Optional, Set, Tuple, TypeVar, Union | |||||||||||||
from swh.core.utils import grouper | ||||||||||||||
from swh.loader.core.loader import BaseLoader | from swh.loader.core.loader import BaseLoader | |||||||||||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | |||||||||||||
from swh.loader.mercurial.utils import get_minimum_env, parse_visit_date | from swh.loader.mercurial.utils import get_minimum_env, parse_visit_date | |||||||||||||
from swh.model import identifiers | from swh.model import identifiers | |||||||||||||
from swh.model.from_disk import Content, DentryPerms, Directory | from swh.model.from_disk import Content, DentryPerms, Directory | |||||||||||||
from swh.model.hashutil import hash_to_bytehex | from swh.model.hashutil import hash_to_bytehex | |||||||||||||
from swh.model.model import ( | from swh.model.model import ( | |||||||||||||
ExtID, | ExtID, | |||||||||||||
▲ Show 20 Lines • Show All 263 Lines • ▼ Show 20 Lines | def _get_extids_for_targets(self, targets: List[bytes]) -> List[ExtID]: | |||||||||||||
) | ) | |||||||||||||
extids = [ | extids = [ | |||||||||||||
extid | extid | |||||||||||||
for extid in extids | for extid in extids | |||||||||||||
if extid.target.object_id not in revisions_missing | if extid.target.object_id not in revisions_missing | |||||||||||||
] | ] | |||||||||||||
return extids | return extids | |||||||||||||
def _get_extids_for_hgnodes(self, hgnode_ids: List[bytes]) -> List[ExtID]: | ||||||||||||||
vlorentzUnsubmitted Done Inline Actions
vlorentz: | ||||||||||||||
"""Get all Mercurial ExtIDs for the mercurial nodes in the list which point to | ||||||||||||||
a known revision. | ||||||||||||||
""" | ||||||||||||||
extids = [] | ||||||||||||||
for group_ids in grouper(hgnode_ids, n=1000): | ||||||||||||||
for extid in self.storage.extid_get_from_extid(EXTID_TYPE, group_ids): | ||||||||||||||
if extid.extid_version != EXTID_VERSION: | ||||||||||||||
continue | ||||||||||||||
Done Inline ActionsIt's changed that way (from a comprehension to a for loop) because of the next step [1] (and i got lazy after the gazillionth rebase). [1] D6268 ardumont: It's changed that way (from a comprehension to a for loop) because of the next step [1] (and i… | ||||||||||||||
extids.append(extid) | ||||||||||||||
if extids: | ||||||||||||||
# Filter out dangling extids, we need to load their target again | ||||||||||||||
revisions_missing = self.storage.revision_missing( | ||||||||||||||
[extid.target.object_id for extid in extids] | ||||||||||||||
) | ||||||||||||||
extids = [ | ||||||||||||||
extid | ||||||||||||||
for extid in extids | ||||||||||||||
if extid.target.object_id not in revisions_missing | ||||||||||||||
] | ||||||||||||||
return extids | ||||||||||||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | |||||||||||||
"""Fetch the data from the source the loader is currently loading | """Fetch the data from the source the loader is currently loading | |||||||||||||
Returns: | Returns: | |||||||||||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, fetch_data needs | |||||||||||||
to be called again to complete loading. | to be called again to complete loading. | |||||||||||||
""" | """ | |||||||||||||
Show All 12 Lines | def fetch_data(self) -> bool: | |||||||||||||
# Allow to load on disk repository without cloning | # Allow to load on disk repository without cloning | |||||||||||||
# for testing purpose. | # for testing purpose. | |||||||||||||
self._repo_directory = self.directory | self._repo_directory = self.directory | |||||||||||||
self._repo = hgutil.repository(self._repo_directory) | self._repo = hgutil.repository(self._repo_directory) | |||||||||||||
return False | return False | |||||||||||||
def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: | def _new_revs(self, heads: List[bytes]) -> Union[HgFilteredSet, HgSpanSet]: | |||||||||||||
"""Return the hg revision numbers to load.""" | """Return unseen revisions. That is, filter out revisions that are not ancestors of | |||||||||||||
heads""" | ||||||||||||||
assert self._repo is not None | assert self._repo is not None | |||||||||||||
repo: hgutil.Repository = self._repo | existing_heads = [] | |||||||||||||
if self._latest_heads: | ||||||||||||||
existing_heads = [] # heads that still exist in the repository | for hg_nodeid in heads: | |||||||||||||
for hg_nodeid in self._latest_heads: | ||||||||||||||
try: | try: | |||||||||||||
rev = repo[hg_nodeid].rev() | rev = self._repo[hg_nodeid].rev() | |||||||||||||
existing_heads.append(rev) | existing_heads.append(rev) | |||||||||||||
except KeyError: # the node does not exist anymore | except KeyError: # the node does not exist anymore | |||||||||||||
pass | pass | |||||||||||||
# select revisions that are not ancestors of heads | # select revisions that are not ancestors of heads | |||||||||||||
# and not the heads themselves | # and not the heads themselves | |||||||||||||
new_revs = repo.revs("not ::(%ld)", existing_heads) | new_revs = self._repo.revs("not ::(%ld)", existing_heads) | |||||||||||||
if new_revs: | if new_revs: | |||||||||||||
self.log.info("New revisions found: %d", len(new_revs)) | self.log.info("New revisions found: %d", len(new_revs)) | |||||||||||||
return new_revs | return new_revs | |||||||||||||
else: | ||||||||||||||
return repo.revs("all()") | def get_hg_revs_to_load(self) -> Iterator[int]: | |||||||||||||
"""Yield hg revision numbers to load. | ||||||||||||||
""" | ||||||||||||||
Not Done Inline Actionswhy did you remove the return type? vlorentz: why did you remove the return type? | ||||||||||||||
Done Inline Actionsa mistake ;) ardumont: a mistake ;)
it gets removed, well transformed, in the next though. | ||||||||||||||
Done Inline ActionsNope, it's transformed here. ardumont: Nope, it's transformed here.
I'll add it back nonetheless, thx. | ||||||||||||||
assert self._repo is not None | ||||||||||||||
repo: hgutil.Repository = self._repo | ||||||||||||||
seen_revs: Set[int] = set() | ||||||||||||||
# 1. use snapshot to reuse existing seen heads from it | ||||||||||||||
if self._latest_heads: | ||||||||||||||
for rev in self._new_revs(self._latest_heads): | ||||||||||||||
seen_revs.add(rev) | ||||||||||||||
Done Inline ActionsDoes this fetch extids for *all* revisions in the repo? vlorentz: Does this fetch extids for *all* revisions in the repo? | ||||||||||||||
Done Inline ActionsIt should, yes. The idea being to avoid reprocessing revisions (extracting their directories, etc.) that we've already loaded. However, I would suggest doing this filtering /after/ having done the original history filtering that the loader used to do (i.e., using extids to filter what the loader used to return as new_revs). olasd: It should, yes. The idea being to avoid reprocessing revisions (extracting their directories… | ||||||||||||||
Done Inline Actions
Is it that costly? We don't check revision_missing for every git commit, why would we do the (costlier) equivalent for mercurial commits? vlorentz: > The idea being to avoid reprocessing revisions (extracting their directories, etc.) that… | ||||||||||||||
Done Inline Actions
Well, there exists origins that takes 10th of hours (cpu times) to run (some more than 150h without finishing, plus some were already started before). Most likely or hopefully, if the ingestion could use those exitds and skip those revisions (providing it got reached initially), that'd go faster.
I'm not sure that we don't, we are using at least the filter proxy which does use it. And, i suppose that will be the other way around, we will make t0ihe git loader work the same way at some point (when we'll have the extids).
Right, I'll adapt. ardumont: > Is it that costly?
Well, there exists origins that takes 10th of hours (cpu times) to run… | ||||||||||||||
Done Inline Actions
However, i don't clearly see how to do that. /me needs more thinking, walking in the rain might help ardumont: >> However, I would suggest doing this filtering /after/ having done the original history… | ||||||||||||||
Done Inline Actions@olasd Heads up, i've kept the old behavior as i currently don't see how to do what you Maybe the current implementation is enough already. It uses best of both world. If there Otherwise, let's fallback to filter over all extids for the default case. What do you think of ^? I did not yet work on grouping the extid reading calls though (@vlorentz's suggestion), ardumont: @olasd Heads up, i've kept the old behavior as i currently don't see how to do what you… | ||||||||||||||
Done Inline ActionsI've tried to make it 1. then 2. instead of 1. or 2.
ardumont: I've tried to make it 1. then 2. instead of 1. or 2.
1. being using the snapshot.
2. being… | ||||||||||||||
yield rev | ||||||||||||||
# 2. Then filter out remaining revisions through the overall extid mappings | ||||||||||||||
# across hg origins | ||||||||||||||
Not Done Inline Actionsseen_revs will typically be very large; does Mercurial handle this query nicely? vlorentz: `seen_revs` will typically be very large; does Mercurial handle this query nicely? | ||||||||||||||
Done Inline ActionsWell, i don't know but it was already that way before. How can I check this? I'm planning (to land) and check that on staging first on overall large repositories. to land or test it in a venv, i'm not sure yet. ardumont: Well, i don't know but it was already that way before.
It just now does some more substraction… | ||||||||||||||
Not Done Inline ActionsI don't know; load a large repo vlorentz: I don't know; load a large repo | ||||||||||||||
Done Inline Actionsyes, but i thought you add another idea ;) [1] ardumont: yes, but i thought you add another idea ;)
The venv is giving me a hard time though [1] so i'm… | ||||||||||||||
Done Inline Actionsah that's a new deps grown out of the new deps from storage on swh.counter in the 0.37 storage. ardumont: ah that's a new deps grown out of the new deps from storage on swh.counter in the 0.37 storage. | ||||||||||||||
Done Inline Actionspip hell unstuck, fixed. I've made a pass on the origin https://foss.heptapod.net/mercurial/tortoisehg/thg which qualifies ardumont: pip hell unstuck, fixed.
I've made a pass on the origin https://foss.heptapod. | ||||||||||||||
revs_left = repo.revs("all() - ::(%ld)", seen_revs) | ||||||||||||||
hg_nodeids = [repo[nodeid].node() for nodeid in revs_left] | ||||||||||||||
yield from self._new_revs( | ||||||||||||||
[ | ||||||||||||||
extid.extid | ||||||||||||||
Not Done Inline Actions
vlorentz: | ||||||||||||||
Done Inline Actionsi'm not a huge fan of this writing but ok. ardumont: i'm not a huge fan of this writing but ok. | ||||||||||||||
Done Inline ActionsNote, that's not it. i'd prefer yield from self._new... then. ardumont: Note, that's not it.
I'm not returning a list, i'm yielding stuff.
i'd prefer `yield from self. | ||||||||||||||
for extid in self._get_extids_for_hgnodes(hg_nodeids) | ||||||||||||||
] | ||||||||||||||
) | ||||||||||||||
def store_data(self): | def store_data(self): | |||||||||||||
"""Store fetched data in the database.""" | """Store fetched data in the database.""" | |||||||||||||
revs = self.get_hg_revs_to_load() | revs = self.get_hg_revs_to_load() | |||||||||||||
if not revs: | length_ingested_revs = 0 | |||||||||||||
self._load_status = "uneventful" | ||||||||||||||
return | ||||||||||||||
assert self._repo is not None | assert self._repo is not None | |||||||||||||
repo = self._repo | repo = self._repo | |||||||||||||
ignored_revs: Set[int] = set() | ignored_revs: Set[int] = set() | |||||||||||||
for rev in revs: | for rev in revs: | |||||||||||||
if rev in ignored_revs: | if rev in ignored_revs: | |||||||||||||
continue | continue | |||||||||||||
try: | try: | |||||||||||||
self.store_revision(repo[rev]) | self.store_revision(repo[rev]) | |||||||||||||
length_ingested_revs += 1 | ||||||||||||||
except CorruptedRevision as e: | except CorruptedRevision as e: | |||||||||||||
self._visit_status = "partial" | self._visit_status = "partial" | |||||||||||||
self.log.warning("Corrupted revision %s", e) | self.log.warning("Corrupted revision %s", e) | |||||||||||||
descendents = repo.revs("(%ld)::", [rev]) | descendents = repo.revs("(%ld)::", [rev]) | |||||||||||||
ignored_revs.update(descendents) | ignored_revs.update(descendents) | |||||||||||||
if len(ignored_revs) == len(revs): | if length_ingested_revs == 0: | |||||||||||||
# The repository is completely broken, nothing can be loaded | # The repository has nothing to ingest (either empty or broken repository) | |||||||||||||
self._load_status = "uneventful" | self._load_status = "uneventful" | |||||||||||||
return | return | |||||||||||||
branching_info = hgutil.branching_info(repo, ignored_revs) | branching_info = hgutil.branching_info(repo, ignored_revs) | |||||||||||||
tags_by_name: Dict[bytes, HgNodeId] = repo.tags() | tags_by_name: Dict[bytes, HgNodeId] = repo.tags() | |||||||||||||
snapshot_branches: Dict[bytes, SnapshotBranch] = {} | snapshot_branches: Dict[bytes, SnapshotBranch] = {} | |||||||||||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def store_data(self): | |||||||||||||
default_branch_alias = branching_info.default_branch_alias | default_branch_alias = branching_info.default_branch_alias | |||||||||||||
if default_branch_alias is not None: | if default_branch_alias is not None: | |||||||||||||
snapshot_branches[b"HEAD"] = SnapshotBranch( | snapshot_branches[b"HEAD"] = SnapshotBranch( | |||||||||||||
target=default_branch_alias, target_type=TargetType.ALIAS, | target=default_branch_alias, target_type=TargetType.ALIAS, | |||||||||||||
) | ) | |||||||||||||
snapshot = Snapshot(branches=snapshot_branches) | snapshot = Snapshot(branches=snapshot_branches) | |||||||||||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | |||||||||||||
self.flush() | self.flush() | |||||||||||||
Done Inline Actionswhy? vlorentz: why? | ||||||||||||||
Done Inline Actionsyeah, I'm not sure that's appropriate. olasd: yeah, I'm not sure that's appropriate. | ||||||||||||||
Done Inline Actionsit's mentioned in the description, it's already done in the main loop (and it's in a dedicated commit as well). It was a tryout from early this week. ardumont: it's mentioned in the description, it's already done in the main loop (and it's in a dedicated… | ||||||||||||||
self.loaded_snapshot_id = snapshot.id | self.loaded_snapshot_id = snapshot.id | |||||||||||||
def load_status(self) -> Dict[str, str]: | def load_status(self) -> Dict[str, str]: | |||||||||||||
"""Detailed loading status. | """Detailed loading status. | |||||||||||||
Defaults to logging an eventful load. | Defaults to logging an eventful load. | |||||||||||||
Returns: a dictionary that is eventually passed back as the task's | Returns: a dictionary that is eventually passed back as the task's | |||||||||||||
▲ Show 20 Lines • Show All 322 Lines • Show Last 20 Lines |