diff --git a/docs/README.rst b/docs/README.rst index 4e27671..1d0fb58 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -1,30 +1,33 @@ Software Heritage - Loader foundations ====================================== The Software Heritage Loader Core is a low-level loading utilities and helpers used by :term:`loaders `. The main entry points are classes: -- :class:`swh.loader.core.loader.BaseLoader` for loaders (e.g. svn) -- :class:`swh.loader.core.loader.DVCSLoader` for DVCS loaders (e.g. hg, git, ...) +- :class:`swh.loader.core.loader.BaseLoader` for VCS loaders (e.g. svn) +- :class:`swh.loader.core.loader.DVCSLoader` for DVCS loaders (e.g. git, ...) +- :class:`swh.loader.core.loader.ContentLoader` for Content loader +- :class:`swh.loader.core.loader.DirectoryLoader` for Directory loader - :class:`swh.loader.package.loader.PackageLoader` for Package loaders (e.g. PyPI, Npm, ...) +- ... Package loaders --------------- This package also implements many package loaders directly, out of convenience, as they usually are quite similar and each fits in a single file. They all roughly follow these steps, explained in the :py:meth:`swh.loader.package.loader.PackageLoader.load` documentation. See the :ref:`package-loader-tutorial` for details. VCS loaders ----------- Unlike package loaders, VCS loaders remain in separate packages, as they often need more advanced conversions and very VCS-specific operations. This usually involves getting the branches of a repository and recursively loading revisions in the history (and directory trees in these revisions), until a known revision is found diff --git a/setup.py b/setup.py index 0c4f97c..93e7f45 100755 --- a/setup.py +++ b/setup.py @@ -1,88 +1,90 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.rst"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.loader.core", description="Software Heritage Base Loader", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DLDBASE", packages=find_packages(), # packages's modules scripts=[], # scripts to package install_requires=parse_requirements() + parse_requirements("swh"), setup_requires=["setuptools-scm"], use_scm_version=True, extras_require={"testing": parse_requirements("test")}, include_package_data=True, entry_points=""" [swh.cli.subcommands] loader=swh.loader.cli [swh.workers] + loader.content=swh.loader.core:register_content + loader.directory=swh.loader.core:register_directory loader.arch=swh.loader.package.arch:register loader.archive=swh.loader.package.archive:register loader.aur=swh.loader.package.aur:register loader.cpan=swh.loader.package.cpan:register loader.cran=swh.loader.package.cran:register loader.crates=swh.loader.package.crates:register loader.debian=swh.loader.package.debian:register loader.deposit=swh.loader.package.deposit:register loader.golang=swh.loader.package.golang:register loader.nixguix=swh.loader.package.nixguix:register loader.npm=swh.loader.package.npm:register loader.opam=swh.loader.package.opam:register loader.pubdev=swh.loader.package.pubdev:register loader.puppet=swh.loader.package.puppet:register loader.pypi=swh.loader.package.pypi:register loader.maven=swh.loader.package.maven:register """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-loader-core", "Documentation": "https://docs.softwareheritage.org/devel/swh-loader-core/", }, ) diff --git a/swh/loader/core/__init__.py b/swh/loader/core/__init__.py index e69de29..6b30de1 100644 --- a/swh/loader/core/__init__.py +++ b/swh/loader/core/__init__.py @@ -0,0 +1,27 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from typing import Any, Mapping + + +def register_content() -> Mapping[str, Any]: + """Register the current worker module's definition""" + from swh.loader.core.loader import ContentLoader + + return { + "task_modules": [f"{__name__}.tasks"], + "loader": ContentLoader, + } + + +def register_directory() -> Mapping[str, Any]: + """Register the current worker module's definition""" + from swh.loader.core.loader import DirectoryLoader + + return { + "task_modules": [f"{__name__}.tasks"], + "loader": DirectoryLoader, + } diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py index 0b3f296..90716c5 100644 --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -1,882 +1,883 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib import logging import os import tempfile import time from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union from urllib.parse import urlparse from requests.exceptions import HTTPError import sentry_sdk from swh.core.config import load_from_envvar from swh.core.statsd import Statsd from swh.core.tarball import uncompress from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister from swh.loader.exception import NotFound from swh.loader.package.utils import download from swh.model import from_disk from swh.model.model import ( BaseContent, Content, Directory, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Release, Revision, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, TargetType, ) from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from swh.storage.utils import now DEFAULT_CONFIG: Dict[str, Any] = { "max_content_size": 100 * 1024 * 1024, } SENTRY_ORIGIN_URL_TAG_NAME = "swh.loader.origin_url" SENTRY_VISIT_TYPE_TAG_NAME = "swh.loader.visit_type" class BaseLoader: """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g PyPI, Npm, CRAN, ...) A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... package artifacts), ingests the contents/directories/revisions/releases/snapshot read from those artifacts and send them to the archive through the storage backend. The main entry point for the loader is the :func:`load` function. 2 static methods (:func:`from_config`, :func:`from_configfile`) centralizes and eases the loader instantiation from either configuration dict or configuration file. Some class examples: - :class:`SvnLoader` - :class:`GitLoader` - :class:`PyPILoader` - :class:`NpmLoader` Args: lister_name: Name of the lister which triggered this load. If provided, the loader will try to use the forge's API to retrieve extrinsic metadata lister_instance_name: Name of the lister instance which triggered this load. Must be None iff lister_name is, but it may be the empty string for listers with a single instance. """ visit_type: str origin: Origin loaded_snapshot_id: Optional[Sha1Git] parent_origins: Optional[List[Origin]] """If the given origin is a "forge fork" (ie. created with the "Fork" button of GitHub-like forges), :meth:`build_extrinsic_origin_metadata` sets this to a list of origins it was forked from; closest parent first.""" def __init__( self, storage: StorageInterface, origin_url: str, logging_class: Optional[str] = None, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, metadata_fetcher_credentials: CredentialsType = None, ): if lister_name == "": raise ValueError("lister_name must not be the empty string") if lister_name is None and lister_instance_name is not None: raise ValueError( f"lister_name is None but lister_instance_name is {lister_instance_name!r}" ) if lister_name is not None and lister_instance_name is None: raise ValueError( f"lister_instance_name is None but lister_name is {lister_name!r}" ) self.storage = storage self.origin = Origin(url=origin_url) self.max_content_size = int(max_content_size) if max_content_size else None self.lister_name = lister_name self.lister_instance_name = lister_instance_name self.metadata_fetcher_credentials = metadata_fetcher_credentials or {} if logging_class is None: logging_class = "%s.%s" % ( self.__class__.__module__, self.__class__.__name__, ) self.log = logging.getLogger(logging_class) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) sentry_sdk.set_tag(SENTRY_ORIGIN_URL_TAG_NAME, self.origin.url) sentry_sdk.set_tag(SENTRY_VISIT_TYPE_TAG_NAME, self.visit_type) # possibly overridden in self.prepare method self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) self.loaded_snapshot_id = None if save_data_path: path = save_data_path os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) self.save_data_path = save_data_path self.parent_origins = None self.statsd = Statsd( namespace="swh_loader", constant_tags={"visit_type": self.visit_type} ) @classmethod def from_config(cls, storage: Dict[str, Any], **config: Any): """Instantiate a loader from a configuration dict. This is basically a backwards-compatibility shim for the CLI. Args: storage: instantiation config for the storage config: the configuration dict for the loader, with the following keys: - credentials (optional): credentials list for the scheduler - any other kwargs passed to the loader. Returns: the instantiated loader """ # Drop the legacy config keys which aren't used for this generation of loader. for legacy_key in ("storage", "celery"): config.pop(legacy_key, None) # Instantiate the storage storage_instance = get_storage(**storage) return cls(storage=storage_instance, **config) @classmethod def from_configfile(cls, **kwargs: Any): """Instantiate a loader from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to the loader instantiation """ config = dict(load_from_envvar(DEFAULT_CONFIG)) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(**config) def save_data(self) -> None: """Save the data associated to the current load""" raise NotImplementedError def get_save_data_path(self) -> str: """The path to which we archive the loader's raw data""" if not hasattr(self, "__save_data_path"): year = str(self.visit_date.year) assert self.origin url = self.origin.url.encode("utf-8") origin_url_hash = hashlib.sha1(url).hexdigest() path = "%s/sha1:%s/%s/%s" % ( self.save_data_path, origin_url_hash[0:2], origin_url_hash, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path def flush(self) -> Dict[str, int]: """Flush any potential buffered data not sent to swh-storage. Returns the same value as :meth:`swh.storage.interface.StorageInterface.flush`. """ return self.storage.flush() def cleanup(self) -> None: """Last step executed by the loader.""" raise NotImplementedError def _store_origin_visit(self) -> None: """Store origin and visit references. Sets the self.visit references.""" assert self.origin self.storage.origin_add([self.origin]) assert isinstance(self.visit_type, str) self.visit = list( self.storage.origin_visit_add( [ OriginVisit( origin=self.origin.url, date=self.visit_date, type=self.visit_type, ) ] ) )[0] def prepare(self) -> None: """Second step executed by the loader to prepare some state needed by the loader. Raises NotFound exception if the origin to ingest is not found. """ raise NotImplementedError def get_origin(self) -> Origin: """Get the origin that is currently being loaded. self.origin should be set in :func:`prepare_origin` Returns: dict: an origin ready to be sent to storage by :func:`origin_add`. """ assert self.origin return self.origin def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading (ex: git/hg/svn/... repository). Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ raise NotImplementedError def process_data(self) -> bool: """Run any additional processing between fetching and storing the data Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. Ignored if ``fetch_data`` already returned :const:`False`. """ return True def store_data(self) -> None: """Store fetched data in the database. Should call the :func:`maybe_load_xyz` methods, which handle the bundles sent to storage, rather than send directly. """ raise NotImplementedError def load_status(self) -> Dict[str, str]: """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { "status": "eventful", } def post_load(self, success: bool = True) -> None: """Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading's status. Defaults to doing nothing. This is up to the implementer of this method to make sure this does not break. Args: success (bool): the success status of the loading """ pass def visit_status(self) -> str: """Detailed visit status. Defaults to logging a full visit. """ return "full" def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ pass def load(self) -> Dict[str, str]: r"""Loading logic for the loader to follow: - Store the actual ``origin_visit`` to storage - Call :meth:`prepare` to prepare any eventual state - Call :meth:`get_origin` to get the origin we work with and store - while True: - Call :meth:`fetch_data` to fetch the data to store - Call :meth:`process_data` to optionally run processing between :meth:`fetch_data` and :meth:`store_data` - Call :meth:`store_data` to store the data - Call :meth:`cleanup` to clean up any eventual state put in place in :meth:`prepare` method. """ try: with self.statsd_timed("pre_cleanup"): self.pre_cleanup() except Exception: msg = "Cleaning up dangling data failed! Continue loading." self.log.warning(msg) sentry_sdk.capture_exception() self._store_origin_visit() assert ( self.visit.visit ), "The method `_store_origin_visit` should set the visit (OriginVisit)" self.log.info( "Load origin '%s' with type '%s'", self.origin.url, self.visit.type ) try: with self.statsd_timed("build_extrinsic_origin_metadata"): metadata = self.build_extrinsic_origin_metadata() self.load_metadata_objects(metadata) except Exception as e: sentry_sdk.capture_exception(e) # Do not fail the whole task if this is the only failure self.log.exception( "Failure while loading extrinsic origin metadata.", extra={ "swh_task_args": [], "swh_task_kwargs": { "origin": self.origin.url, "lister_name": self.lister_name, "lister_instance_name": self.lister_instance_name, }, }, ) total_time_fetch_data = 0.0 total_time_process_data = 0.0 total_time_store_data = 0.0 # Initially not a success, will be True when actually one status = "failed" success = False try: with self.statsd_timed("prepare"): self.prepare() while True: t1 = time.monotonic() more_data_to_fetch = self.fetch_data() t2 = time.monotonic() total_time_fetch_data += t2 - t1 more_data_to_fetch = self.process_data() and more_data_to_fetch t3 = time.monotonic() total_time_process_data += t3 - t2 self.store_data() t4 = time.monotonic() total_time_store_data += t4 - t3 if not more_data_to_fetch: break self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0) self.statsd_timing("process_data", total_time_process_data * 1000.0) self.statsd_timing("store_data", total_time_store_data * 1000.0) status = self.visit_status() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), status=status, snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) success = True with self.statsd_timed( "post_load", tags={"success": success, "status": status} ): self.post_load() except BaseException as e: success = False if isinstance(e, NotFound): status = "not_found" task_status = "uneventful" else: status = "partial" if self.loaded_snapshot_id else "failed" task_status = "failed" self.log.exception( "Loading failure, updating to `%s` status", status, extra={ "swh_task_args": [], "swh_task_kwargs": { "origin": self.origin.url, "lister_name": self.lister_name, "lister_instance_name": self.lister_instance_name, }, }, ) if not isinstance(e, (SystemExit, KeyboardInterrupt)): sentry_sdk.capture_exception() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), status=status, snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) with self.statsd_timed( "post_load", tags={"success": success, "status": status} ): self.post_load(success=success) if not isinstance(e, Exception): # e derives from BaseException but not Exception; this is most likely # SystemExit or KeyboardInterrupt, so we should re-raise it. raise return {"status": task_status} finally: with self.statsd_timed( "flush", tags={"success": success, "status": status} ): self.flush() with self.statsd_timed( "cleanup", tags={"success": success, "status": status} ): self.cleanup() return self.load_status() def load_metadata_objects( self, metadata_objects: List[RawExtrinsicMetadata] ) -> None: if not metadata_objects: return authorities = {mo.authority for mo in metadata_objects} self.storage.metadata_authority_add(list(authorities)) fetchers = {mo.fetcher for mo in metadata_objects} self.storage.metadata_fetcher_add(list(fetchers)) self.storage.raw_extrinsic_metadata_add(metadata_objects) def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]: """Builds a list of full RawExtrinsicMetadata objects, using a metadata fetcher returned by :func:`get_fetcher_classes`.""" if self.lister_name is None: self.log.debug("lister_not provided, skipping extrinsic origin metadata") return [] assert ( self.lister_instance_name is not None ), "lister_instance_name is None, but lister_name is not" metadata = [] fetcher_classes = get_fetchers_for_lister(self.lister_name) self.statsd_average("metadata_fetchers", len(fetcher_classes)) for cls in fetcher_classes: metadata_fetcher = cls( origin=self.origin, lister_name=self.lister_name, lister_instance_name=self.lister_instance_name, credentials=self.metadata_fetcher_credentials, ) with self.statsd_timed( "fetch_one_metadata", tags={"fetcher": cls.FETCHER_NAME} ): metadata.extend(metadata_fetcher.get_origin_metadata()) if self.parent_origins is None: self.parent_origins = metadata_fetcher.get_parent_origins() self.statsd_average( "metadata_parent_origins", len(self.parent_origins), tags={"fetcher": cls.FETCHER_NAME}, ) self.statsd_average("metadata_objects", len(metadata)) return metadata def statsd_timed(self, name: str, tags: Dict[str, Any] = {}) -> ContextManager: """ Wrapper for :meth:`swh.core.statsd.Statsd.timed`, which uses the standard metric name and tags for loaders. """ return self.statsd.timed( "operation_duration_seconds", tags={"operation": name, **tags} ) def statsd_timing(self, name: str, value: float, tags: Dict[str, Any] = {}) -> None: """ Wrapper for :meth:`swh.core.statsd.Statsd.timing`, which uses the standard metric name and tags for loaders. """ self.statsd.timing( "operation_duration_seconds", value, tags={"operation": name, **tags} ) def statsd_average( self, name: str, value: Union[int, float], tags: Dict[str, Any] = {} ) -> None: """Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count`` (by ``1``), allowing to prometheus to compute the average ``value`` over time.""" self.statsd.increment(f"{name}_sum", value, tags=tags) self.statsd.increment(f"{name}_count", tags=tags) class DVCSLoader(BaseLoader): """This base class is a pattern for dvcs loaders (e.g. git, mercurial). Those loaders are able to load all the data in one go. For example, the loader defined in swh-loader-git :class:`BulkUpdater`. For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), inherit directly from :class:`BaseLoader`. """ def cleanup(self) -> None: """Clean up an eventual state installed for computations.""" pass def has_contents(self) -> bool: """Checks whether we need to load contents""" return True def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded""" raise NotImplementedError def has_directories(self) -> bool: """Checks whether we need to load directories""" return True def get_directories(self) -> Iterable[Directory]: """Get the directories that need to be loaded""" raise NotImplementedError def has_revisions(self) -> bool: """Checks whether we need to load revisions""" return True def get_revisions(self) -> Iterable[Revision]: """Get the revisions that need to be loaded""" raise NotImplementedError def has_releases(self) -> bool: """Checks whether we need to load releases""" return True def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded""" raise NotImplementedError def get_snapshot(self) -> Snapshot: """Get the snapshot that needs to be loaded""" raise NotImplementedError def eventful(self) -> bool: """Whether the load was eventful""" raise NotImplementedError def store_data(self) -> None: assert self.origin if self.save_data_path: self.save_data() if self.has_contents(): for obj in self.get_contents(): if isinstance(obj, Content): self.storage.content_add([obj]) elif isinstance(obj, SkippedContent): self.storage.skipped_content_add([obj]) else: raise TypeError(f"Unexpected content type: {obj}") if self.has_directories(): for directory in self.get_directories(): self.storage.directory_add([directory]) if self.has_revisions(): for revision in self.get_revisions(): self.storage.revision_add([revision]) if self.has_releases(): for release in self.get_releases(): self.storage.release_add([release]) snapshot = self.get_snapshot() self.storage.snapshot_add([snapshot]) self.flush() self.loaded_snapshot_id = snapshot.id class NodeLoader(BaseLoader): """Common class for :class:`ContentLoader` and :class:`Directoryloader`. The "checksums" field is a dictionary of hex hashes on the object retrieved (content or directory). The multiple "fallback" urls received are mirror urls only used to fetch the object if the main origin is no longer available. Those are not stored. Ingestion is considered eventful on the first ingestion. Subsequent load of the same object should end up being an uneventful visit (matching snapshot). """ def __init__( self, - *args, + storage: StorageInterface, + url: str, checksums: Dict[str, str], fallback_urls: List[str] = None, **kwargs, ): - super().__init__(*args, **kwargs) + super().__init__(storage, url, **kwargs) self.snapshot: Optional[Snapshot] = None self.checksums = checksums fallback_urls_ = fallback_urls or [] self.mirror_urls: List[str] = [self.origin.url, *fallback_urls_] def prepare(self) -> None: self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url) def load_status(self) -> Dict[str, Any]: return { "status": "uneventful" if self.last_snapshot == self.snapshot else "eventful" } def cleanup(self) -> None: self.log.debug("cleanup") class ContentLoader(NodeLoader): """Basic loader for edge case content ingestion. The output snapshot is of the form: .. code:: id: branches: HEAD: target_type: content target: """ visit_type = "content" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.content: Optional[Content] = None def fetch_data(self) -> bool: """Retrieve the content file as a Content Object""" for url in self.mirror_urls: url_ = urlparse(url) self.log.debug( "prepare; origin_url=%s fallback=%s scheme=%s path=%s", self.origin.url, url, url_.scheme, url_.path, ) try: with tempfile.TemporaryDirectory() as tmpdir: file_path, _ = download(url, dest=tmpdir, hashes=self.checksums) with open(file_path, "rb") as file: self.content = Content.from_data(file.read()) except HTTPError as http_error: if http_error.response.status_code == 404: self.log.debug( "Not found '%s', continue on next mirror url if any", url ) continue else: return False # no more data to fetch # If we reach this point, we did not find any proper content, consider the # origin not found raise NotFound(f"Unknown origin {self.origin.url}.") def process_data(self) -> bool: """Build the snapshot out of the Content retrieved.""" assert self.content is not None self.snapshot = Snapshot( branches={ b"HEAD": SnapshotBranch( target=self.content.sha1_git, target_type=TargetType.CONTENT, ), } ) return False # no more data to process def store_data(self) -> None: """Store newly retrieved Content and Snapshot.""" assert self.content is not None self.storage.content_add([self.content]) assert self.snapshot is not None self.storage.snapshot_add([self.snapshot]) self.loaded_snapshot_id = self.snapshot.id def visit_status(self): return "full" if self.content and self.snapshot is not None else "partial" class DirectoryLoader(NodeLoader): """Basic loader for edge case directory ingestion (through one tarball). The output snapshot is of the form: .. code:: id: branches: HEAD: target_type: directory target: """ visit_type = "directory" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.directory: Optional[from_disk.Directory] = None self.cnts: List[Content] = None self.skipped_cnts: List[SkippedContent] = None self.dirs: List[Directory] = None def fetch_data(self) -> bool: """Fetch directory as a tarball amongst the self.mirror_urls. Raises NotFound if no tarball is found """ for url in self.mirror_urls: url_ = urlparse(url) self.log.debug( "prepare; origin_url=%s fallback=%s scheme=%s path=%s", self.origin.url, url, url_.scheme, url_.path, ) with tempfile.TemporaryDirectory() as tmpdir: try: tarball_path, extrinsic_metadata = download( url, tmpdir, # Ensure content received matched the checksums received hashes=self.checksums, extra_request_headers={"Accept-Encoding": "identity"}, ) except ValueError as e: # Checksum mismatch self.log.debug("Error: %s", e) continue except HTTPError as http_error: if http_error.response.status_code == 404: self.log.debug( "Not found '%s', continue on next mirror url if any", url ) continue directory_path = os.path.join(tmpdir, "src") os.makedirs(directory_path, exist_ok=True) uncompress(tarball_path, dest=directory_path) self.log.debug("uncompressed path to directory: %s", directory_path) self.directory = from_disk.Directory.from_disk( path=directory_path.encode("utf-8"), max_content_length=self.max_content_size, ) # Compute the merkle dag from the top-level directory self.cnts, self.skipped_cnts, self.dirs = from_disk.iter_directory( self.directory ) if self.directory is not None: return False # no more data to fetch # if we reach here, we did not find any proper tarball, so consider the origin # not found raise NotFound(f"Unknown origin {self.origin.url}.") def process_data(self) -> bool: """Build the snapshot out of the Directory retrieved.""" assert self.directory is not None # Build the snapshot self.snapshot = Snapshot( branches={ b"HEAD": SnapshotBranch( target=self.directory.hash, target_type=TargetType.DIRECTORY, ), } ) return False # no more data to process def store_data(self) -> None: """Store newly retrieved Content and Snapshot.""" self.log.debug("Number of skipped contents: %s", len(self.skipped_cnts)) self.storage.skipped_content_add(self.skipped_cnts) self.log.debug("Number of contents: %s", len(self.cnts)) self.storage.content_add(self.cnts) self.log.debug("Number of directories: %s", len(self.dirs)) self.storage.directory_add(self.dirs) assert self.snapshot is not None self.storage.snapshot_add([self.snapshot]) self.loaded_snapshot_id = self.snapshot.id def visit_status(self): return "full" if self.directory and self.snapshot is not None else "partial" diff --git a/swh/loader/core/tasks.py b/swh/loader/core/tasks.py new file mode 100644 index 0000000..c221903 --- /dev/null +++ b/swh/loader/core/tasks.py @@ -0,0 +1,20 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from celery import shared_task + +from swh.loader.core.loader import ContentLoader, DirectoryLoader + + +@shared_task(name=__name__ + ".LoadContent") +def load_content(**kwargs): + """Load Content package""" + return ContentLoader.from_configfile(**kwargs).load() + + +@shared_task(name=__name__ + ".LoadDirectory") +def load_directory(**kwargs): + """Load Content package""" + return DirectoryLoader.from_configfile(**kwargs).load() diff --git a/swh/loader/core/tests/test_tasks.py b/swh/loader/core/tests/test_tasks.py new file mode 100644 index 0000000..effe27b --- /dev/null +++ b/swh/loader/core/tests/test_tasks.py @@ -0,0 +1,53 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import uuid + +import pytest + +from swh.scheduler.model import ListedOrigin, Lister +from swh.scheduler.utils import create_origin_task_dict + +NAMESPACE = "swh.loader.core" + + +@pytest.fixture +def nixguix_lister(): + return Lister(name="nixguix", instance_name="example", id=uuid.uuid4()) + + +@pytest.mark.parametrize("loader_name", ["Content", "Directory"]) +def test_tasks_loader_for_listed_origin( + mocker, + swh_scheduler_celery_app, + swh_scheduler_celery_worker, + swh_config, + nixguix_lister, + loader_name, +): + mock_load = mocker.patch(f"{NAMESPACE}.loader.{loader_name}Loader.load") + mock_load.return_value = {"status": "eventful"} + + listed_origin = ListedOrigin( + lister_id=nixguix_lister.id, + url="https://example.org/artifact/artifact", + visit_type=loader_name.lower(), + extra_loader_arguments={ + "fallback_urls": ["https://example.org/mirror/artifact-0.0.1.pkg.xz"], + "checksums": {"sha256": "some-valid-checksum"}, + }, + ) + + task_dict = create_origin_task_dict(listed_origin, nixguix_lister) + + res = swh_scheduler_celery_app.send_task( + f"{NAMESPACE}.tasks.Load{loader_name}", + kwargs=task_dict["arguments"]["kwargs"], + ) + assert res + res.wait() + assert res.successful() + assert mock_load.called + assert res.result == {"status": "eventful"}