Changeset View
Standalone View
swh/loader/mercurial/from_disk.py
Show First 20 Lines • Show All 283 Lines • ▼ Show 20 Lines | def _get_extids_for_targets(self, targets: List[bytes]) -> List[ExtID]: | |||||||||||||
) | ) | |||||||||||||
extids = [ | extids = [ | |||||||||||||
extid | extid | |||||||||||||
for extid in extids | for extid in extids | |||||||||||||
if extid.target.object_id not in revisions_missing | if extid.target.object_id not in revisions_missing | |||||||||||||
] | ] | |||||||||||||
return extids | return extids | |||||||||||||
def _get_extids_for_hgnodes(self, hgnode_ids: List[bytes]) -> List[ExtID]: | ||||||||||||||
vlorentzUnsubmitted Done Inline Actions
vlorentz: | ||||||||||||||
"""Get all Mercurial ExtIDs for the mercurial nodes in the list.""" | ||||||||||||||
extids = [ | ||||||||||||||
extid | ||||||||||||||
for extid in self.storage.extid_get_from_extid(EXTID_TYPE, hgnode_ids) | ||||||||||||||
if extid.extid_version == EXTID_VERSION | ||||||||||||||
] | ||||||||||||||
if extids: | ||||||||||||||
# Filter out dangling extids, we need to load their target again | ||||||||||||||
revisions_missing = self.storage.revision_missing( | ||||||||||||||
Done Inline ActionsIt's changed that way (from a comprehension to a for loop) because of the next step [1] (and i got lazy after the gazillionth rebase). [1] D6268 ardumont: It's changed that way (from a comprehension to a for loop) because of the next step [1] (and i… | ||||||||||||||
[extid.target.object_id for extid in extids] | ||||||||||||||
) | ||||||||||||||
extids = [ | ||||||||||||||
extid | ||||||||||||||
for extid in extids | ||||||||||||||
if extid.target.object_id not in revisions_missing | ||||||||||||||
] | ||||||||||||||
return extids | ||||||||||||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | |||||||||||||
"""Fetch the data from the source the loader is currently loading | """Fetch the data from the source the loader is currently loading | |||||||||||||
Returns: | Returns: | |||||||||||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, fetch_data needs | |||||||||||||
to be called again to complete loading. | to be called again to complete loading. | |||||||||||||
""" | """ | |||||||||||||
Show All 12 Lines | def fetch_data(self) -> bool: | |||||||||||||
# Allow to load on disk repository without cloning | # Allow to load on disk repository without cloning | |||||||||||||
# for testing purpose. | # for testing purpose. | |||||||||||||
self._repo_directory = self.directory | self._repo_directory = self.directory | |||||||||||||
self._repo = hgutil.repository(self._repo_directory) | self._repo = hgutil.repository(self._repo_directory) | |||||||||||||
return False | return False | |||||||||||||
def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: | def _new_revs(self, heads: List[bytes]): | |||||||||||||
"""Return the hg revision numbers to load.""" | """Return unseen revisions. That is, filter out revisions that are not ancestors of | |||||||||||||
heads""" | ||||||||||||||
assert self._repo is not None | assert self._repo is not None | |||||||||||||
repo: hgutil.Repository = self._repo | existing_heads = [] | |||||||||||||
if self._latest_heads: | ||||||||||||||
existing_heads = [] # heads that still exist in the repository | for hg_nodeid in heads: | |||||||||||||
for hg_nodeid in self._latest_heads: | ||||||||||||||
try: | try: | |||||||||||||
rev = repo[hg_nodeid].rev() | rev = self._repo[hg_nodeid].rev() | |||||||||||||
existing_heads.append(rev) | existing_heads.append(rev) | |||||||||||||
except KeyError: # the node does not exist anymore | except KeyError: # the node does not exist anymore | |||||||||||||
pass | pass | |||||||||||||
# select revisions that are not ancestors of heads | # select revisions that are not ancestors of heads | |||||||||||||
# and not the heads themselves | # and not the heads themselves | |||||||||||||
new_revs = repo.revs("not ::(%ld)", existing_heads) | new_revs = self._repo.revs("not ::(%ld)", existing_heads) | |||||||||||||
if new_revs: | if new_revs: | |||||||||||||
self.log.info("New revisions found: %d", len(new_revs)) | self.log.info("New revisions found: %d", len(new_revs)) | |||||||||||||
return new_revs | return new_revs | |||||||||||||
def get_hg_revs_to_load(self) -> Union[HgFilteredSet, HgSpanSet]: | ||||||||||||||
"""Return the hg revision numbers to load.""" | ||||||||||||||
Not Done Inline Actionswhy did you remove the return type? vlorentz: why did you remove the return type? | ||||||||||||||
Done Inline Actionsa mistake ;) ardumont: a mistake ;)
it gets removed, well transformed, in the next though. | ||||||||||||||
Done Inline ActionsNope, it's transformed here. ardumont: Nope, it's transformed here.
I'll add it back nonetheless, thx. | ||||||||||||||
assert self._repo is not None | ||||||||||||||
repo: hgutil.Repository = self._repo | ||||||||||||||
if self._latest_heads: | ||||||||||||||
# Reuse existing seen heads from last snapshot | ||||||||||||||
new_revs = self._new_revs(self._latest_heads) | ||||||||||||||
else: | else: | |||||||||||||
return repo.revs("all()") | # otherwise, try to filter out across origins already seen revisions | |||||||||||||
all_revs = repo.revs("all()") | ||||||||||||||
Done Inline ActionsDoes this fetch extids for *all* revisions in the repo? vlorentz: Does this fetch extids for *all* revisions in the repo? | ||||||||||||||
Done Inline ActionsIt should, yes. The idea being to avoid reprocessing revisions (extracting their directories, etc.) that we've already loaded. However, I would suggest doing this filtering /after/ having done the original history filtering that the loader used to do (i.e., using extids to filter what the loader used to return as new_revs). olasd: It should, yes. The idea being to avoid reprocessing revisions (extracting their directories… | ||||||||||||||
Done Inline Actions
Is it that costly? We don't check revision_missing for every git commit, why would we do the (costlier) equivalent for mercurial commits? vlorentz: > The idea being to avoid reprocessing revisions (extracting their directories, etc.) that… | ||||||||||||||
Done Inline Actions
Well, there exists origins that takes 10th of hours (cpu times) to run (some more than 150h without finishing, plus some were already started before). Most likely or hopefully, if the ingestion could use those exitds and skip those revisions (providing it got reached initially), that'd go faster.
I'm not sure that we don't, we are using at least the filter proxy which does use it. And, i suppose that will be the other way around, we will make t0ihe git loader work the same way at some point (when we'll have the extids).
Right, I'll adapt. ardumont: > Is it that costly?
Well, there exists origins that takes 10th of hours (cpu times) to run… | ||||||||||||||
Done Inline Actions
However, i don't clearly see how to do that. /me needs more thinking, walking in the rain might help ardumont: >> However, I would suggest doing this filtering /after/ having done the original history… | ||||||||||||||
Done Inline Actions@olasd Heads up, i've kept the old behavior as i currently don't see how to do what you Maybe the current implementation is enough already. It uses best of both world. If there Otherwise, let's fallback to filter over all extids for the default case. What do you think of ^? I did not yet work on grouping the extid reading calls though (@vlorentz's suggestion), ardumont: @olasd Heads up, i've kept the old behavior as i currently don't see how to do what you… | ||||||||||||||
Done Inline ActionsI've tried to make it 1. then 2. instead of 1. or 2.
ardumont: I've tried to make it 1. then 2. instead of 1. or 2.
1. being using the snapshot.
2. being… | ||||||||||||||
hg_nodeids = [repo[nodeid].node() for nodeid in all_revs] | ||||||||||||||
new_revs = self._new_revs( | ||||||||||||||
[extid.extid for extid in self._get_extids_for_hgnodes(hg_nodeids)] | ||||||||||||||
) | ||||||||||||||
Not Done Inline Actionsseen_revs will typically be very large; does Mercurial handle this query nicely? vlorentz: `seen_revs` will typically be very large; does Mercurial handle this query nicely? | ||||||||||||||
Done Inline ActionsWell, i don't know but it was already that way before. How can I check this? I'm planning (to land) and check that on staging first on overall large repositories. to land or test it in a venv, i'm not sure yet. ardumont: Well, i don't know but it was already that way before.
It just now does some more substraction… | ||||||||||||||
Not Done Inline ActionsI don't know; load a large repo vlorentz: I don't know; load a large repo | ||||||||||||||
Done Inline Actionsyes, but i thought you add another idea ;) [1] ardumont: yes, but i thought you add another idea ;)
The venv is giving me a hard time though [1] so i'm… | ||||||||||||||
Done Inline Actionsah that's a new deps grown out of the new deps from storage on swh.counter in the 0.37 storage. ardumont: ah that's a new deps grown out of the new deps from storage on swh.counter in the 0.37 storage. | ||||||||||||||
Done Inline Actionspip hell unstuck, fixed. I've made a pass on the origin https://foss.heptapod.net/mercurial/tortoisehg/thg which qualifies ardumont: pip hell unstuck, fixed.
I've made a pass on the origin https://foss.heptapod. | ||||||||||||||
return new_revs | ||||||||||||||
def store_data(self): | def store_data(self): | |||||||||||||
"""Store fetched data in the database.""" | """Store fetched data in the database.""" | |||||||||||||
Not Done Inline Actions
vlorentz: | ||||||||||||||
Done Inline Actionsi'm not a huge fan of this writing but ok. ardumont: i'm not a huge fan of this writing but ok. | ||||||||||||||
Done Inline ActionsNote, that's not it. i'd prefer yield from self._new... then. ardumont: Note, that's not it.
I'm not returning a list, i'm yielding stuff.
i'd prefer `yield from self. | ||||||||||||||
revs = self.get_hg_revs_to_load() | revs = self.get_hg_revs_to_load() | |||||||||||||
if not revs: | if not revs: | |||||||||||||
self._load_status = "uneventful" | self._load_status = "uneventful" | |||||||||||||
return | return | |||||||||||||
assert self._repo is not None | assert self._repo is not None | |||||||||||||
repo = self._repo | repo = self._repo | |||||||||||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | def store_data(self): | |||||||||||||
# If the repo is broken enough or if it has none of the "normal" default | # If the repo is broken enough or if it has none of the "normal" default | |||||||||||||
# mechanisms, we ignore `HEAD`. | # mechanisms, we ignore `HEAD`. | |||||||||||||
default_branch_alias = branching_info.default_branch_alias | default_branch_alias = branching_info.default_branch_alias | |||||||||||||
if default_branch_alias is not None: | if default_branch_alias is not None: | |||||||||||||
snapshot_branches[b"HEAD"] = SnapshotBranch( | snapshot_branches[b"HEAD"] = SnapshotBranch( | |||||||||||||
target=default_branch_alias, target_type=TargetType.ALIAS, | target=default_branch_alias, target_type=TargetType.ALIAS, | |||||||||||||
) | ) | |||||||||||||
snapshot = Snapshot(branches=snapshot_branches) | snapshot = Snapshot(branches=snapshot_branches) | |||||||||||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | |||||||||||||
self.flush() | self.flush() | |||||||||||||
Done Inline Actionswhy? vlorentz: why? | ||||||||||||||
Done Inline Actionsyeah, I'm not sure that's appropriate. olasd: yeah, I'm not sure that's appropriate. | ||||||||||||||
Done Inline Actionsit's mentioned in the description, it's already done in the main loop (and it's in a dedicated commit as well). It was a tryout from early this week. ardumont: it's mentioned in the description, it's already done in the main loop (and it's in a dedicated… | ||||||||||||||
self.loaded_snapshot_id = snapshot.id | self.loaded_snapshot_id = snapshot.id | |||||||||||||
def load_status(self) -> Dict[str, str]: | def load_status(self) -> Dict[str, str]: | |||||||||||||
"""Detailed loading status. | """Detailed loading status. | |||||||||||||
Defaults to logging an eventful load. | Defaults to logging an eventful load. | |||||||||||||
Returns: a dictionary that is eventually passed back as the task's | Returns: a dictionary that is eventually passed back as the task's | |||||||||||||
▲ Show 20 Lines • Show All 322 Lines • Show Last 20 Lines |