Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
Show All 16 Lines | from typing import ( | |||||||||||||
Dict, | Dict, | |||||||||||||
Generic, | Generic, | |||||||||||||
Iterable, | Iterable, | |||||||||||||
Iterator, | Iterator, | |||||||||||||
List, | List, | |||||||||||||
Mapping, | Mapping, | |||||||||||||
Optional, | Optional, | |||||||||||||
Sequence, | Sequence, | |||||||||||||
Set, | ||||||||||||||
Tuple, | Tuple, | |||||||||||||
TypeVar, | TypeVar, | |||||||||||||
) | ) | |||||||||||||
import attr | import attr | |||||||||||||
import sentry_sdk | import sentry_sdk | |||||||||||||
from swh.core.tarball import uncompress | from swh.core.tarball import uncompress | |||||||||||||
▲ Show 20 Lines • Show All 249 Lines • ▼ Show 20 Lines | ) -> Optional[Sha1Git]: | |||||||||||||
for rev_id, known_artifact in known_artifacts.items(): | for rev_id, known_artifact in known_artifacts.items(): | |||||||||||||
known_extid = self.known_artifact_to_extid(known_artifact) | known_extid = self.known_artifact_to_extid(known_artifact) | |||||||||||||
if new_extid == known_extid: | if new_extid == known_extid: | |||||||||||||
return rev_id | return rev_id | |||||||||||||
return None | return None | |||||||||||||
def _get_known_extids( | ||||||||||||||
self, packages_info: List[TPackageInfo] | ||||||||||||||
) -> Dict[PartialExtID, List[CoreSWHID]]: | ||||||||||||||
"""Compute the ExtIDs from new PackageInfo objects, searches which are already | ||||||||||||||
loaded in the archive, and returns them if any.""" | ||||||||||||||
ardumontUnsubmitted Done Inline Actions
ardumont: | ||||||||||||||
# Compute the ExtIDs of all the new packages, grouped by extid type | ||||||||||||||
new_extids: Dict[str, List[bytes]] = {} | ||||||||||||||
for p_info in packages_info: | ||||||||||||||
res = p_info.extid() | ||||||||||||||
if res is not None: | ||||||||||||||
(extid_type, extid_extid) = res | ||||||||||||||
new_extids.setdefault(extid_type, []).append(extid_extid) | ||||||||||||||
# For each extid type, call extid_get_from_extid() with all the extids of | ||||||||||||||
# that type, and store them in the '(type, extid) -> target' map. | ||||||||||||||
known_extids: Dict[PartialExtID, List[CoreSWHID]] = {} | ||||||||||||||
for (extid_type, extids) in new_extids.items(): | ||||||||||||||
for extid in self.storage.extid_get_from_extid(extid_type, extids): | ||||||||||||||
if extid is not None: | ||||||||||||||
key = (extid.extid_type, extid.extid) | ||||||||||||||
known_extids.setdefault(key, []).append(extid.target) | ||||||||||||||
return known_extids | ||||||||||||||
def resolve_revision_from_extids( | ||||||||||||||
self, | ||||||||||||||
known_extids: Dict[PartialExtID, List[CoreSWHID]], | ||||||||||||||
p_info: TPackageInfo, | ||||||||||||||
revision_whitelist: Set[Sha1Git], | ||||||||||||||
) -> Optional[Sha1Git]: | ||||||||||||||
"""Resolve the revision from known ExtIDs and a package info object. | ||||||||||||||
If the artifact has already been downloaded, this will return the | ||||||||||||||
existing revision targeting that uncompressed artifact directory. | ||||||||||||||
Otherwise, this returns None. | ||||||||||||||
Args: | ||||||||||||||
known_extids: Dict built from a list of ExtID, with the target as value | ||||||||||||||
p_info: Package information | ||||||||||||||
revision_whitelist: Any ExtID with target not in this set is filtered out | ||||||||||||||
Returns: | ||||||||||||||
None or revision identifier | ||||||||||||||
""" | ||||||||||||||
new_extid = p_info.extid() | ||||||||||||||
if new_extid is None: | ||||||||||||||
return None | ||||||||||||||
for extid_target in known_extids.get(new_extid, []): | ||||||||||||||
if extid_target.object_id not in revision_whitelist: | ||||||||||||||
# There is a known ExtID for this package, but its target is not | ||||||||||||||
# in the snapshot. | ||||||||||||||
# This can happen for three reasons: | ||||||||||||||
# | ||||||||||||||
# 1. a loader crashed after writing the ExtID, but before writing | ||||||||||||||
# the snapshot | ||||||||||||||
# 2. some other loader loaded the same artifact, but produced | ||||||||||||||
# a different revision, causing an additional ExtID object | ||||||||||||||
# to be written. We will probably find this loader's ExtID | ||||||||||||||
# in a future iteration of this loop. | ||||||||||||||
# Note that for now, this is impossible, as each loader has a | ||||||||||||||
# completely different extid_type, but this is an implementation | ||||||||||||||
# detail of each loader. | ||||||||||||||
# 3. we took a snapshot, then the package disappeared, | ||||||||||||||
Done Inline Actions
ardumont: | ||||||||||||||
# then we took another snapshot, and the package reappeared | ||||||||||||||
# | ||||||||||||||
# In case of 1, we must actually load the package now, | ||||||||||||||
# so let's do it. | ||||||||||||||
# TODO: detect when we are in case 3 using revision_missing instead | ||||||||||||||
# of the snapshot. | ||||||||||||||
continue | ||||||||||||||
elif extid_target.object_type != ObjectType.REVISION: | ||||||||||||||
# We only support revisions for now. | ||||||||||||||
# Note that this case should never be reached unless there is a | ||||||||||||||
# collision between a revision hash and some non-revision object's | ||||||||||||||
# hash, but better safe than sorry. | ||||||||||||||
Done Inline ActionsLog this one as warning (or whatever is the right logging level). ardumont: Log this one as warning (or whatever is the right logging level). | ||||||||||||||
logger.warning( | ||||||||||||||
"%s is in the revision whitelist, but is not a revision.", | ||||||||||||||
hash_to_hex(extid_target.object_type), | ||||||||||||||
) | ||||||||||||||
continue | ||||||||||||||
return extid_target.object_id | ||||||||||||||
return None | ||||||||||||||
def download_package( | def download_package( | |||||||||||||
self, p_info: TPackageInfo, tmpdir: str | self, p_info: TPackageInfo, tmpdir: str | |||||||||||||
) -> List[Tuple[str, Mapping]]: | ) -> List[Tuple[str, Mapping]]: | |||||||||||||
"""Download artifacts for a specific package. All downloads happen in | """Download artifacts for a specific package. All downloads happen in | |||||||||||||
in the tmpdir folder. | in the tmpdir folder. | |||||||||||||
Default implementation expects the artifacts package info to be | Default implementation expects the artifacts package info to be | |||||||||||||
about one artifact per package. | about one artifact per package. | |||||||||||||
▲ Show 20 Lines • Show All 138 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | |||||||||||||
] | ] | |||||||||||||
) | ) | |||||||||||||
)[0] | )[0] | |||||||||||||
except Exception as e: | except Exception as e: | |||||||||||||
logger.exception("Failed to initialize origin_visit for %s", self.url) | logger.exception("Failed to initialize origin_visit for %s", self.url) | |||||||||||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | |||||||||||||
return {"status": "failed"} | return {"status": "failed"} | |||||||||||||
# Get the previous snapshot for this origin. It is then used to see which | ||||||||||||||
# of the package's versions are already loaded in the archive. | ||||||||||||||
try: | try: | |||||||||||||
last_snapshot = self.last_snapshot() | last_snapshot = self.last_snapshot() | |||||||||||||
logger.debug("last snapshot: %s", last_snapshot) | logger.debug("last snapshot: %s", last_snapshot) | |||||||||||||
known_artifacts = self.known_artifacts(last_snapshot) | known_artifacts = self.known_artifacts(last_snapshot) | |||||||||||||
logger.debug("known artifacts: %s", known_artifacts) | logger.debug("known artifacts: %s", known_artifacts) | |||||||||||||
except Exception as e: | except Exception as e: | |||||||||||||
logger.exception("Failed to get previous state for %s", self.url) | logger.exception("Failed to get previous state for %s", self.url) | |||||||||||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | |||||||||||||
return self.finalize_visit( | return self.finalize_visit( | |||||||||||||
snapshot=snapshot, | snapshot=snapshot, | |||||||||||||
visit=visit, | visit=visit, | |||||||||||||
failed_branches=failed_branches, | failed_branches=failed_branches, | |||||||||||||
status_visit="failed", | status_visit="failed", | |||||||||||||
status_load="failed", | status_load="failed", | |||||||||||||
) | ) | |||||||||||||
load_exceptions: List[Exception] = [] | load_exceptions: List[Exception] = [] | |||||||||||||
# Get the list of all version names | ||||||||||||||
try: | try: | |||||||||||||
versions = self.get_versions() | versions = self.get_versions() | |||||||||||||
except NotFound: | except NotFound: | |||||||||||||
return self.finalize_visit( | return self.finalize_visit( | |||||||||||||
snapshot=snapshot, | snapshot=snapshot, | |||||||||||||
visit=visit, | visit=visit, | |||||||||||||
failed_branches=failed_branches, | failed_branches=failed_branches, | |||||||||||||
status_visit="not_found", | status_visit="not_found", | |||||||||||||
status_load="failed", | status_load="failed", | |||||||||||||
) | ) | |||||||||||||
except Exception: | except Exception: | |||||||||||||
return self.finalize_visit( | return self.finalize_visit( | |||||||||||||
snapshot=snapshot, | snapshot=snapshot, | |||||||||||||
visit=visit, | visit=visit, | |||||||||||||
failed_branches=failed_branches, | failed_branches=failed_branches, | |||||||||||||
status_visit="failed", | status_visit="failed", | |||||||||||||
status_load="failed", | status_load="failed", | |||||||||||||
) | ) | |||||||||||||
packages_info = [ | # Get the metadata of each version's package | |||||||||||||
packages_info: List[Tuple[str, str, TPackageInfo]] = [ | ||||||||||||||
(version, branch_name, p_info) | (version, branch_name, p_info) | |||||||||||||
for version in versions | for version in versions | |||||||||||||
for (branch_name, p_info) in self.get_package_info(version) | for (branch_name, p_info) in self.get_package_info(version) | |||||||||||||
] | ] | |||||||||||||
# Compute the ExtID of each of these packages | ||||||||||||||
known_extids = self._get_known_extids( | ||||||||||||||
[p_info for (_, _, p_info) in packages_info] | ||||||||||||||
) | ||||||||||||||
if last_snapshot is None: | ||||||||||||||
last_snapshot_targets: Set[Sha1Git] = set() | ||||||||||||||
else: | ||||||||||||||
last_snapshot_targets = { | ||||||||||||||
branch.target for branch in last_snapshot.branches.values() | ||||||||||||||
} | ||||||||||||||
tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { | tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { | |||||||||||||
version: [] for version in versions | version: [] for version in versions | |||||||||||||
} | } | |||||||||||||
for (version, branch_name, p_info) in packages_info: | for (version, branch_name, p_info) in packages_info: | |||||||||||||
logger.debug("package_info: %s", p_info) | logger.debug("package_info: %s", p_info) | |||||||||||||
revision_id = self.resolve_revision_from_artifacts(known_artifacts, p_info) | ||||||||||||||
# Check if the package was already loaded, using its ExtID | ||||||||||||||
revision_id = self.resolve_revision_from_extids( | ||||||||||||||
known_extids, p_info, last_snapshot_targets | ||||||||||||||
) | ||||||||||||||
if revision_id is None: | ||||||||||||||
# No existing revision found from an acceptable ExtID, | ||||||||||||||
# search in the artifact data instead. | ||||||||||||||
# TODO: remove this after we finished migrating to ExtIDs. | ||||||||||||||
revision_id = self.resolve_revision_from_artifacts( | ||||||||||||||
known_artifacts, p_info | ||||||||||||||
) | ||||||||||||||
if revision_id is None: | if revision_id is None: | |||||||||||||
# No matching revision found in the last snapshot, load it. | ||||||||||||||
try: | try: | |||||||||||||
res = self._load_revision(p_info, origin) | res = self._load_revision(p_info, origin) | |||||||||||||
if res: | if res: | |||||||||||||
(revision_id, directory_id) = res | (revision_id, directory_id) = res | |||||||||||||
assert revision_id | assert revision_id | |||||||||||||
assert directory_id | assert directory_id | |||||||||||||
self._load_extrinsic_directory_metadata( | self._load_extrinsic_directory_metadata( | |||||||||||||
p_info, revision_id, directory_id | p_info, revision_id, directory_id | |||||||||||||
▲ Show 20 Lines • Show All 384 Lines • Show Last 20 Lines |