Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
Show First 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | def __init__(self, storage: StorageInterface, url: str, **kwargs: Any): | ||||
configuration is missing (cf. fn:`check` method). | configuration is missing (cf. fn:`check` method). | ||||
Args: | Args: | ||||
storage: Storage instance | storage: Storage instance | ||||
url: Origin url to load data from | url: Origin url to load data from | ||||
""" | """ | ||||
super().__init__(storage=storage, origin_url=url, **kwargs) | super().__init__(storage=storage, origin_url=url, **kwargs) | ||||
self.status_load = "" | |||||
self.status_visit = "" | |||||
def load_status(self) -> Dict[str, str]: | |||||
"""Detailed loading status.""" | |||||
return { | |||||
"status": self.status_load, | |||||
} | |||||
def visit_status(self) -> str: | |||||
"""Detailed visit status.""" | |||||
return self.status_visit | |||||
def get_versions(self) -> Sequence[str]: | def get_versions(self) -> Sequence[str]: | ||||
"""Return the list of all published package versions. | """Return the list of all published package versions. | ||||
Raises: | Raises: | ||||
class:`swh.loader.exception.NotFound` error when failing to read the | class:`swh.loader.exception.NotFound` error when failing to read the | ||||
published package versions. | published package versions. | ||||
▲ Show 20 Lines • Show All 272 Lines • ▼ Show 20 Lines | class PackageLoader(BaseLoader, Generic[TPackageInfo]): | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
"""Finalize the visit: | """Finalize the visit: | ||||
- flush eventual unflushed data to storage | - flush eventual unflushed data to storage | ||||
- update origin visit's status | - update origin visit's status | ||||
- return the task's status | - return the task's status | ||||
""" | """ | ||||
self.status_load = status_load | |||||
self.status_visit = status_visit | |||||
self.storage.flush() | self.storage.flush() | ||||
snapshot_id: Optional[bytes] = None | snapshot_id: Optional[bytes] = None | ||||
if snapshot and snapshot.id: # to prevent the snapshot.id to b"" | if snapshot and snapshot.id: # to prevent the snapshot.id to b"" | ||||
snapshot_id = snapshot.id | snapshot_id = snapshot.id | ||||
assert visit.visit | assert visit.visit | ||||
visit_status = OriginVisitStatus( | visit_status = OriginVisitStatus( | ||||
origin=self.origin.url, | origin=self.origin.url, | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
8. Generate and load the snapshot for the visit | 8. Generate and load the snapshot for the visit | ||||
Using the revisions/releases collected at step 7., and the branch | Using the revisions/releases collected at step 7., and the branch | ||||
information from step 2., generate a snapshot and load it into the | information from step 2., generate a snapshot and load it into the | ||||
Software Heritage archive | Software Heritage archive | ||||
""" | """ | ||||
status_load = "uneventful" # either: eventful, uneventful, failed | self.status_load = "uneventful" # either: eventful, uneventful, failed | ||||
status_visit = "full" # see swh.model.model.OriginVisitStatus | self.status_visit = "full" # see swh.model.model.OriginVisitStatus | ||||
snapshot = None | snapshot = None | ||||
failed_branches: List[str] = [] | failed_branches: List[str] = [] | ||||
# Prepare origin and origin_visit | # Prepare origin and origin_visit | ||||
origin = Origin(url=self.origin.url) | origin = Origin(url=self.origin.url) | ||||
try: | try: | ||||
self.storage.origin_add([origin]) | self.storage.origin_add([origin]) | ||||
visit = list( | visit = list( | ||||
self.storage.origin_visit_add( | self.storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=self.origin.url, | origin=self.origin.url, | ||||
date=self.visit_date, | date=self.visit_date, | ||||
type=self.visit_type, | type=self.visit_type, | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
)[0] | )[0] | ||||
except Exception as e: | except Exception as e: | ||||
logger.exception( | logger.exception( | ||||
"Failed to initialize origin_visit for %s", self.origin.url | "Failed to initialize origin_visit for %s", self.origin.url | ||||
) | ) | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
self.status_load = self.status_visit = "failed" | |||||
return {"status": "failed"} | return {"status": "failed"} | ||||
# Get the previous snapshot for this origin. It is then used to see which | # Get the previous snapshot for this origin. It is then used to see which | ||||
# of the package's versions are already loaded in the archive. | # of the package's versions are already loaded in the archive. | ||||
try: | try: | ||||
last_snapshot = self.last_snapshot() | last_snapshot = self.last_snapshot() | ||||
logger.debug("last snapshot: %s", last_snapshot) | logger.debug("last snapshot: %s", last_snapshot) | ||||
except Exception as e: | except Exception as e: | ||||
▲ Show 20 Lines • Show All 90 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
if res: | if res: | ||||
(release_id, directory_id) = res | (release_id, directory_id) = res | ||||
assert release_id | assert release_id | ||||
assert directory_id | assert directory_id | ||||
self._load_extrinsic_directory_metadata( | self._load_extrinsic_directory_metadata( | ||||
p_info, release_id, directory_id | p_info, release_id, directory_id | ||||
) | ) | ||||
self.storage.flush() | self.storage.flush() | ||||
status_load = "eventful" | self.status_load = "eventful" | ||||
except Exception as e: | except Exception as e: | ||||
self.storage.clear_buffers() | self.storage.clear_buffers() | ||||
load_exceptions.append(e) | load_exceptions.append(e) | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
error = f"Failed to load branch {branch_name} for {self.origin.url}" | error = f"Failed to load branch {branch_name} for {self.origin.url}" | ||||
logger.exception(error) | logger.exception(error) | ||||
failed_branches.append(branch_name) | failed_branches.append(branch_name) | ||||
errors.append(f"{error}: {e}") | errors.append(f"{error}: {e}") | ||||
Show All 38 Lines | def load(self) -> Dict: | ||||
extid=extid, | extid=extid, | ||||
target=release_swhid, | target=release_swhid, | ||||
) | ) | ||||
) | ) | ||||
tmp_releases[p_info.version].append((branch_name, release_id)) | tmp_releases[p_info.version].append((branch_name, release_id)) | ||||
if load_exceptions: | if load_exceptions: | ||||
status_visit = "partial" | self.status_visit = "partial" | ||||
if not tmp_releases: | if not tmp_releases: | ||||
# We could not load any releases; fail completely | # We could not load any releases; fail completely | ||||
logger.error("Failed to load any release for %s", self.origin.url) | logger.error("Failed to load any release for %s", self.origin.url) | ||||
return self.finalize_visit( | return self.finalize_visit( | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
visit=visit, | visit=visit, | ||||
failed_branches=failed_branches, | failed_branches=failed_branches, | ||||
Show All 14 Lines | def load(self) -> Dict: | ||||
default_version, tmp_releases, extra_branches | default_version, tmp_releases, extra_branches | ||||
) | ) | ||||
self.storage.flush() | self.storage.flush() | ||||
except Exception as e: | except Exception as e: | ||||
error = f"Failed to build snapshot for origin {self.origin.url}" | error = f"Failed to build snapshot for origin {self.origin.url}" | ||||
logger.exception(error) | logger.exception(error) | ||||
errors.append(f"{error}: {e}") | errors.append(f"{error}: {e}") | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
status_visit = "failed" | self.status_visit = "failed" | ||||
status_load = "failed" | self.status_load = "failed" | ||||
if snapshot: | if snapshot: | ||||
try: | try: | ||||
metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id) | metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id) | ||||
self.load_metadata_objects(metadata_objects) | self.load_metadata_objects(metadata_objects) | ||||
except Exception as e: | except Exception as e: | ||||
error = ( | error = ( | ||||
f"Failed to load extrinsic snapshot metadata for {self.origin.url}" | f"Failed to load extrinsic snapshot metadata for {self.origin.url}" | ||||
) | ) | ||||
logger.exception(error) | logger.exception(error) | ||||
errors.append(f"{error}: {e}") | errors.append(f"{error}: {e}") | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
status_visit = "partial" | self.status_visit = "partial" | ||||
status_load = "failed" | self.status_load = "failed" | ||||
try: | try: | ||||
metadata_objects = self.build_extrinsic_origin_metadata() | metadata_objects = self.build_extrinsic_origin_metadata() | ||||
self.load_metadata_objects(metadata_objects) | self.load_metadata_objects(metadata_objects) | ||||
except Exception as e: | except Exception as e: | ||||
error = f"Failed to load extrinsic origin metadata for {self.origin.url}" | error = f"Failed to load extrinsic origin metadata for {self.origin.url}" | ||||
logger.exception(error) | logger.exception(error) | ||||
errors.append(f"{error}: {e}") | errors.append(f"{error}: {e}") | ||||
sentry_sdk.capture_exception(e) | sentry_sdk.capture_exception(e) | ||||
status_visit = "partial" | self.status_visit = "partial" | ||||
status_load = "failed" | self.status_load = "failed" | ||||
if status_load != "failed": | if self.status_load != "failed": | ||||
self._load_extids(new_extids) | self._load_extids(new_extids) | ||||
return self.finalize_visit( | return self.finalize_visit( | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
visit=visit, | visit=visit, | ||||
failed_branches=failed_branches, | failed_branches=failed_branches, | ||||
status_visit=status_visit, | status_visit=self.status_visit, | ||||
status_load=status_load, | status_load=self.status_load, | ||||
errors=errors, | errors=errors, | ||||
) | ) | ||||
def _load_directory( | def _load_directory( | ||||
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str | self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str | ||||
) -> Tuple[str, from_disk.Directory]: | ) -> Tuple[str, from_disk.Directory]: | ||||
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | ||||
logger.debug("uncompressed_path: %s", uncompressed_path) | logger.debug("uncompressed_path: %s", uncompressed_path) | ||||
▲ Show 20 Lines • Show All 300 Lines • Show Last 20 Lines |