diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py index a492b5e..afa682c 100644 --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -1,430 +1,430 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib import logging import os from abc import ABCMeta, abstractmethod from typing import Any, Dict, Iterable, Optional, Tuple from swh.core import config from swh.model.model import ( BaseContent, Content, SkippedContent, Directory, Origin, OriginVisit, OriginVisitStatus, Revision, Release, Sha1Git, Snapshot, ) from swh.storage import get_storage from swh.storage.utils import now class BaseLoader(config.SWHConfig, metaclass=ABCMeta): """Mixin base class for loader. To use this class, you must: - inherit from this class - and implement the @abstractmethod methods: - :func:`prepare`: First step executed by the loader to prepare some state needed by the `func`:load method. - :func:`get_origin`: Retrieve the origin that is currently being loaded. - :func:`fetch_data`: Fetch the data is actually the method to implement to compute data to inject in swh (through the store_data method) - :func:`store_data`: Store data fetched. - :func:`visit_status`: Explicit status of the visit ('partial' or 'full') - :func:`load_status`: Explicit status of the loading, for use by the scheduler (eventful/uneventful/temporary failure/permanent failure). - :func:`cleanup`: Last step executed by the loader. The entry point for the resulting loader is :func:`load`. You can take a look at some example classes: - :class:`BaseSvnLoader` """ CONFIG_BASE_FILENAME = None # type: Optional[str] DEFAULT_CONFIG = { "storage": ("dict", {"cls": "remote", "url": "http://localhost:5002/",}), "max_content_size": ("int", 100 * 1024 * 1024), "save_data": ("bool", False), "save_data_path": ("str", ""), } # type: Dict[str, Tuple[str, Any]] ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] def __init__( self, logging_class: Optional[str] = None, config: Dict[str, Any] = {} ): if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG] ) self.storage = get_storage(**self.config["storage"]) if logging_class is None: logging_class = "%s.%s" % ( self.__class__.__module__, self.__class__.__name__, ) self.log = logging.getLogger(logging_class) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) self.max_content_size = self.config["max_content_size"] # possibly overridden in self.prepare method self.visit_date: Optional[datetime.datetime] = None self.origin: Optional[Origin] = None if not hasattr(self, "visit_type"): self.visit_type: Optional[str] = None self.origin_metadata: Dict[str, Any] = {} self.loaded_snapshot_id: Optional[Sha1Git] = None # Make sure the config is sane save_data = self.config.get("save_data") if save_data: path = self.config["save_data_path"] os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) def save_data(self) -> None: """Save the data associated to the current load""" raise NotImplementedError def get_save_data_path(self) -> str: """The path to which we archive the loader's raw data""" if not hasattr(self, "__save_data_path"): year = str(self.visit_date.year) # type: ignore assert self.origin url = self.origin.url.encode("utf-8") origin_url_hash = hashlib.sha1(url).hexdigest() path = "%s/sha1:%s/%s/%s" % ( self.config["save_data_path"], origin_url_hash[0:2], origin_url_hash, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path def flush(self) -> None: """Flush any potential buffered data not sent to swh-storage. """ self.storage.flush() @abstractmethod def cleanup(self) -> None: """Last step executed by the loader. """ pass @abstractmethod def prepare_origin_visit(self, *args, **kwargs) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ pass def _store_origin_visit(self) -> None: """Store origin and visit references. Sets the self.visit references. """ assert self.origin - self.storage.origin_add_one(self.origin) + self.storage.origin_add([self.origin]) if not self.visit_date: # now as default visit_date if not provided self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) assert isinstance(self.visit_date, datetime.datetime) assert isinstance(self.visit_type, str) self.visit = self.storage.origin_visit_add( [ OriginVisit( origin=self.origin.url, date=self.visit_date, type=self.visit_type, ) ] )[0] @abstractmethod def prepare(self, *args, **kwargs) -> None: """Second step executed by the loader to prepare some state needed by the loader. """ pass def get_origin(self) -> Origin: """Get the origin that is currently being loaded. self.origin should be set in :func:`prepare_origin` Returns: dict: an origin ready to be sent to storage by :func:`origin_add_one`. """ assert self.origin return self.origin @abstractmethod def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading (ex: git/hg/svn/... repository). Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ pass @abstractmethod def store_data(self): """Store fetched data in the database. Should call the :func:`maybe_load_xyz` methods, which handle the bundles sent to storage, rather than send directly. """ pass def store_metadata(self) -> None: """Store fetched metadata in the database. For more information, see implementation in :class:`DepositLoader`. """ pass def load_status(self) -> Dict[str, str]: """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { "status": "eventful", } def post_load(self, success: bool = True) -> None: """Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading's status. Defaults to doing nothing. This is up to the implementer of this method to make sure this does not break. Args: success (bool): the success status of the loading """ pass def visit_status(self) -> str: """Detailed visit status. Defaults to logging a full visit. """ return "full" def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ pass def load(self, *args, **kwargs) -> Dict[str, str]: r"""Loading logic for the loader to follow: - 1. Call :meth:`prepare_origin_visit` to prepare the origin and visit we will associate loading data to - 2. Store the actual ``origin_visit`` to storage - 3. Call :meth:`prepare` to prepare any eventual state - 4. Call :meth:`get_origin` to get the origin we work with and store - while True: - 5. Call :meth:`fetch_data` to fetch the data to store - 6. Call :meth:`store_data` to store the data - 7. Call :meth:`cleanup` to clean up any eventual state put in place in :meth:`prepare` method. """ try: self.pre_cleanup() except Exception: msg = "Cleaning up dangling data failed! Continue loading." self.log.warning(msg) self.prepare_origin_visit(*args, **kwargs) self._store_origin_visit() assert self.origin try: self.prepare(*args, **kwargs) while True: more_data_to_fetch = self.fetch_data() self.store_data() if not more_data_to_fetch: break self.store_metadata() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, date=now(), status=self.visit_status(), snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) self.post_load() except Exception: self.log.exception( "Loading failure, updating to `partial` status", extra={"swh_task_args": args, "swh_task_kwargs": kwargs,}, ) visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, date=now(), status="partial", snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) self.post_load(success=False) return {"status": "failed"} finally: self.flush() self.cleanup() return self.load_status() class DVCSLoader(BaseLoader): """This base class is a pattern for dvcs loaders (e.g. git, mercurial). Those loaders are able to load all the data in one go. For example, the loader defined in swh-loader-git :class:`BulkUpdater`. For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), inherit directly from :class:`BaseLoader`. """ ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] def cleanup(self) -> None: """Clean up an eventual state installed for computations.""" pass def has_contents(self) -> bool: """Checks whether we need to load contents""" return True def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded""" raise NotImplementedError def has_directories(self) -> bool: """Checks whether we need to load directories""" return True def get_directories(self) -> Iterable[Directory]: """Get the directories that need to be loaded""" raise NotImplementedError def has_revisions(self) -> bool: """Checks whether we need to load revisions""" return True def get_revisions(self) -> Iterable[Revision]: """Get the revisions that need to be loaded""" raise NotImplementedError def has_releases(self) -> bool: """Checks whether we need to load releases""" return True def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded""" raise NotImplementedError def get_snapshot(self) -> Snapshot: """Get the snapshot that needs to be loaded""" raise NotImplementedError def eventful(self) -> bool: """Whether the load was eventful""" raise NotImplementedError def store_data(self) -> None: assert self.origin if self.config.get("save_data"): self.save_data() if self.has_contents(): contents = [] skipped_contents = [] for obj in self.get_contents(): if isinstance(obj, Content): contents.append(obj) elif isinstance(obj, SkippedContent): skipped_contents.append(obj) else: raise TypeError(f"Unexpected content type: {obj}") self.storage.skipped_content_add(skipped_contents) self.storage.content_add(contents) if self.has_directories(): self.storage.directory_add(self.get_directories()) if self.has_revisions(): self.storage.revision_add(self.get_revisions()) if self.has_releases(): self.storage.release_add(self.get_releases()) snapshot = self.get_snapshot() self.storage.snapshot_add([snapshot]) self.flush() self.loaded_snapshot_id = snapshot.id diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py index 3d13e88..8390396 100644 --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -1,224 +1,224 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib import logging from swh.model.model import Origin, OriginVisit, Snapshot from swh.loader.core.loader import BaseLoader, DVCSLoader from swh.loader.tests.common import assert_last_visit_matches ORIGIN = Origin(url="some-url") class DummyLoader: def cleanup(self): pass def prepare(self, *args, **kwargs): pass def fetch_data(self): pass def get_snapshot_id(self): return None def prepare_origin_visit(self, *args, **kwargs): self.origin = ORIGIN self.origin_url = ORIGIN.url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) self.visit_type = "git" - origin_url = self.storage.origin_add_one(ORIGIN) + self.storage.origin_add([ORIGIN]) visit = OriginVisit( - origin=origin_url, date=self.visit_date, type=self.visit_type, + origin=self.origin_url, date=self.visit_date, type=self.visit_type, ) self.visit = self.storage.origin_visit_add([visit])[0] class DummyDVCSLoader(DummyLoader, DVCSLoader): """Unbuffered loader will send directly to storage new data """ def parse_config_file(self, *args, **kwargs): return { "max_content_size": 100 * 1024 * 1024, "storage": { "cls": "pipeline", "steps": [{"cls": "retry",}, {"cls": "filter",}, {"cls": "memory",},], }, } def get_contents(self): return [] def get_directories(self): return [] def get_revisions(self): return [] def get_releases(self): return [] def get_snapshot(self): return Snapshot(branches={}) def eventful(self): return False class DummyBaseLoader(DummyLoader, BaseLoader): """Buffered loader will send new data when threshold is reached """ def parse_config_file(self, *args, **kwargs): return { "max_content_size": 100 * 1024 * 1024, "storage": { "cls": "pipeline", "steps": [ {"cls": "retry",}, {"cls": "filter",}, { "cls": "buffer", "min_batch_size": { "content": 2, "content_bytes": 8, "directory": 2, "revision": 2, "release": 2, }, }, {"cls": "memory",}, ], }, } def store_data(self): pass def test_base_loader(): loader = DummyBaseLoader() result = loader.load() assert result == {"status": "eventful"} def test_dvcs_loader(): loader = DummyDVCSLoader() result = loader.load() assert result == {"status": "eventful"} def test_loader_logger_default_name(): loader = DummyBaseLoader() assert isinstance(loader.log, logging.Logger) assert loader.log.name == "swh.loader.core.tests.test_loader.DummyBaseLoader" loader = DummyDVCSLoader() assert isinstance(loader.log, logging.Logger) assert loader.log.name == "swh.loader.core.tests.test_loader.DummyDVCSLoader" def test_loader_logger_with_name(): loader = DummyBaseLoader("some.logger.name") assert isinstance(loader.log, logging.Logger) assert loader.log.name == "some.logger.name" def test_loader_save_data_path(tmp_path): loader = DummyBaseLoader("some.logger.name.1") url = "http://bitbucket.org/something" loader.origin = Origin(url=url) loader.visit_date = datetime.datetime(year=2019, month=10, day=1) loader.config = { "save_data_path": tmp_path, } hash_url = hashlib.sha1(url.encode("utf-8")).hexdigest() expected_save_path = "%s/sha1:%s/%s/2019" % (str(tmp_path), hash_url[0:2], hash_url) save_path = loader.get_save_data_path() assert save_path == expected_save_path def _check_load_failure(caplog, loader, exc_class, exc_text): """Check whether a failed load properly logged its exception, and that the snapshot didn't get referenced in storage""" for record in caplog.records: if record.levelname != "ERROR": continue assert "Loading failure" in record.message assert record.exc_info exc = record.exc_info[1] assert isinstance(exc, exc_class) assert exc_text in exc.args[0] # Check that the get_snapshot operation would have succeeded assert loader.get_snapshot() is not None # But that the snapshot didn't get loaded assert loader.loaded_snapshot_id is None # And confirm that the visit doesn't reference a snapshot visit = assert_last_visit_matches(loader.storage, ORIGIN.url, status="partial") assert visit.snapshot is None class DummyDVCSLoaderExc(DummyDVCSLoader): """A loader which raises an exception when loading some contents""" def get_contents(self): raise RuntimeError("Failed to get contents!") def test_dvcs_loader_exc_partial_visit(caplog): logger_name = "dvcsloaderexc" caplog.set_level(logging.ERROR, logger=logger_name) loader = DummyDVCSLoaderExc(logging_class=logger_name) result = loader.load() assert result == {"status": "failed"} _check_load_failure(caplog, loader, RuntimeError, "Failed to get contents!") class BrokenStorageProxy: def __init__(self, storage): self.storage = storage def __getattr__(self, attr): return getattr(self.storage, attr) def snapshot_add(self, snapshots): raise RuntimeError("Failed to add snapshot!") class DummyDVCSLoaderStorageExc(DummyDVCSLoader): """A loader which raises an exception when loading some contents""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.storage = BrokenStorageProxy(self.storage) def test_dvcs_loader_storage_exc_partial_visit(caplog): logger_name = "dvcsloaderexc" caplog.set_level(logging.ERROR, logger=logger_name) loader = DummyDVCSLoaderStorageExc(logging_class=logger_name) result = loader.load() assert result == {"status": "failed"} _check_load_failure(caplog, loader, RuntimeError, "Failed to add snapshot!") diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 46d8f3c..38877b3 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,498 +1,498 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import tempfile import os from typing import Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple import attr import sentry_sdk from swh.core.tarball import uncompress from swh.core.config import SWHConfig from swh.model import from_disk from swh.model.hashutil import hash_to_hex from swh.model.model import ( BaseModel, Sha1Git, Content, SkippedContent, Directory, Revision, TargetType, Snapshot, Origin, OriginVisit, OriginVisitStatus, ) from swh.storage import get_storage from swh.storage.utils import now from swh.storage.algos.snapshot import snapshot_get_latest from swh.loader.package.utils import download logger = logging.getLogger(__name__) class PackageLoader: # Origin visit type (str) set by the loader visit_type = "" def __init__(self, url): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: url (str): Origin url to load data from """ # This expects to use the environment variable SWH_CONFIG_FILENAME self.config = SWHConfig.parse_config_file() self._check_configuration() self.storage = get_storage(**self.config["storage"]) self.url = url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) self.max_content_size = self.config["max_content_size"] def _check_configuration(self): """Checks the minimal configuration required is set for the loader. If some required configuration is missing, exception detailing the issue is raised. """ if "storage" not in self.config: raise ValueError("Misconfiguration, at least the storage key should be set") def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Returns: Sequence of published versions """ return [] def get_package_info( self, version: str ) -> Generator[Tuple[str, Mapping[str, Any]], None, None]: """Given a release version of a package, retrieve the associated package information for such version. Args: version: Package version Returns: (branch name, package metadata) """ yield from {} def build_revision( self, a_metadata: Dict, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: """Build the revision from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. Args: a_metadata: Artifact metadata uncompressed_path: Artifact uncompressed path on disk Returns: SWH data dict """ raise NotImplementedError("build_revision") def get_default_version(self) -> str: """Retrieve the latest release version if any. Returns: Latest version """ return "" def last_snapshot(self) -> Optional[Snapshot]: """Retrieve the last snapshot out of the last visit. """ return snapshot_get_latest(self.storage, self.url) def known_artifacts(self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: """Retrieve the known releases/artifact for the origin. Args snapshot: snapshot for the visit Returns: Dict of keys revision id (bytes), values a metadata Dict. """ if not snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) revs = [ rev.target for rev in snapshot.branches.values() if rev and rev.target_type == TargetType.REVISION ] known_revisions = self.storage.revision_get(revs) ret = {} for revision in known_revisions: if not revision: # revision_get can return None continue ret[revision["id"]] = revision["metadata"] return ret def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict ) -> Optional[bytes]: """Resolve the revision from a snapshot and an artifact metadata dict. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: snapshot: Snapshot artifact_metadata: Information dict Returns: None or revision identifier """ return None def download_package( self, p_info: Mapping[str, Any], tmpdir: str ) -> List[Tuple[str, Mapping]]: """Download artifacts for a specific package. All downloads happen in in the tmpdir folder. Default implementation expects the artifacts package info to be about one artifact per package. Note that most implementation have 1 artifact per package. But some implementation have multiple artifacts per package (debian), some have none, the package is the artifact (gnu). Args: artifacts_package_info: Information on the package artifacts to download (url, filename, etc...) tmpdir: Location to retrieve such artifacts Returns: List of (path, computed hashes) """ a_uri = p_info["url"] filename = p_info.get("filename") return [download(a_uri, dest=tmpdir, filename=filename)] def uncompress( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str ) -> str: """Uncompress the artifact(s) in the destination folder dest. Optionally, this could need to use the p_info dict for some more information (debian). """ uncompressed_path = os.path.join(dest, "src") for a_path, _ in dl_artifacts: uncompress(a_path, dest=uncompressed_path) return uncompressed_path def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: """Return an extra dict of branches that are used to update the set of branches. """ return {} def load(self) -> Dict: """Load for a specific origin the associated contents. for each package version of the origin 1. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 2. Extract the downloaded files By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 3. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 4. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 5. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 6. Generate and load the snapshot for the visit Using the revisions/releases collected at step 5., and the branch information from step 0., generate a snapshot and load it into the Software Heritage archive """ status_load = "uneventful" # either: eventful, uneventful, failed status_visit = "full" # either: partial, full tmp_revisions = {} # type: Dict[str, List] snapshot = None def finalize_visit() -> Dict[str, Any]: """Finalize the visit: - flush eventual unflushed data to storage - update origin visit's status - return the task's status """ self.storage.flush() snapshot_id: Optional[bytes] = None if snapshot and snapshot.id: # to prevent the snapshot.id to b"" snapshot_id = snapshot.id assert visit.visit visit_status = OriginVisitStatus( origin=self.url, visit=visit.visit, date=now(), status=status_visit, snapshot=snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) result: Dict[str, Any] = { "status": status_load, } if snapshot_id: result["snapshot_id"] = hash_to_hex(snapshot_id) return result # Prepare origin and origin_visit origin = Origin(url=self.url) try: - self.storage.origin_add_one(origin) + self.storage.origin_add([origin]) visit = self.storage.origin_visit_add( [ OriginVisit( origin=self.url, date=self.visit_date, type=self.visit_type, ) ] )[0] except Exception as e: logger.exception("Failed to initialize origin_visit for %s", self.url) sentry_sdk.capture_exception(e) return {"status": "failed"} try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) known_artifacts = self.known_artifacts(last_snapshot) logger.debug("known artifacts: %s", known_artifacts) except Exception as e: logger.exception("Failed to get previous state for %s", self.url) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" return finalize_visit() load_exceptions: List[Exception] = [] for version in self.get_versions(): # for each logger.debug("version: %s", version) tmp_revisions[version] = [] # `p_` stands for `package_` for branch_name, p_info in self.get_package_info(version): logger.debug("package_info: %s", p_info) revision_id = self.resolve_revision_from(known_artifacts, p_info["raw"]) if revision_id is None: try: revision_id = self._load_revision(p_info, origin) self.storage.flush() status_load = "eventful" except Exception as e: self.storage.clear_buffers() load_exceptions.append(e) sentry_sdk.capture_exception(e) logger.exception( "Failed loading branch %s for %s", branch_name, self.url ) continue if revision_id is None: continue tmp_revisions[version].append((branch_name, revision_id)) if load_exceptions: status_visit = "partial" if not tmp_revisions: # We could not load any revisions; fail completely status_visit = "partial" status_load = "failed" return finalize_visit() try: # Retrieve the default release version (the "latest" one) default_version = self.get_default_version() logger.debug("default version: %s", default_version) # Retrieve extra branches extra_branches = self.extra_branches() logger.debug("extra branches: %s", extra_branches) snapshot = self._load_snapshot( default_version, tmp_revisions, extra_branches ) except Exception as e: logger.exception("Failed to build snapshot for origin %s", self.url) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" return finalize_visit() def _load_revision(self, p_info, origin) -> Optional[Sha1Git]: """Does all the loading of a revision itself: * downloads a package and uncompresses it * loads it from disk * adds contents, directories, and revision to self.storage * returns (revision_id, loaded) Raises exception when unable to download or uncompress artifacts """ with tempfile.TemporaryDirectory() as tmpdir: dl_artifacts = self.download_package(p_info, tmpdir) uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) logger.debug("uncompressed_path: %s", uncompressed_path) directory = from_disk.Directory.from_disk( path=uncompressed_path.encode("utf-8"), max_content_length=self.max_content_size, ) contents: List[Content] = [] skipped_contents: List[SkippedContent] = [] directories: List[Directory] = [] for obj in directory.iter_tree(): obj = obj.to_model() obj_type = obj.object_type if obj_type in ("content", "content_file"): # FIXME: read the data from disk later (when the # storage buffer is flushed). obj = obj.with_data() contents.append(obj) elif obj_type == "skipped_content": skipped_contents.append(obj) elif obj_type == "directory": directories.append(obj) else: raise TypeError(f"Unexpected content type from disk: {obj}") logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents)) self.storage.content_add(contents) logger.debug("Number of directories: %s", len(directories)) self.storage.directory_add(directories) # FIXME: This should be release. cf. D409 revision = self.build_revision( p_info["raw"], uncompressed_path, directory=directory.hash ) if not revision: # Some artifacts are missing intrinsic metadata # skipping those return None metadata = revision.metadata or {} metadata.update( {"original_artifact": [hashes for _, hashes in dl_artifacts],} ) revision = attr.evolve(revision, metadata=metadata) logger.debug("Revision: %s", revision) self.storage.revision_add([revision]) return revision.id def _load_snapshot( self, default_version: str, revisions: Dict[str, List[Tuple[str, bytes]]], extra_branches: Dict[bytes, Mapping[str, Any]], ) -> Optional[Snapshot]: """Build snapshot out of the current revisions stored and extra branches. Then load it in the storage. """ logger.debug("revisions: %s", revisions) # Build and load the snapshot branches = {} # type: Dict[bytes, Mapping[str, Any]] for version, branch_name_revisions in revisions.items(): if version == default_version and len(branch_name_revisions) == 1: # only 1 branch (no ambiguity), we can create an alias # branch 'HEAD' branch_name, _ = branch_name_revisions[0] # except for some corner case (deposit) if branch_name != "HEAD": branches[b"HEAD"] = { "target_type": "alias", "target": branch_name.encode("utf-8"), } for branch_name, target in branch_name_revisions: branches[branch_name.encode("utf-8")] = { "target_type": "revision", "target": target, } # Deal with extra-branches for name, branch_target in extra_branches.items(): if name in branches: logger.error("Extra branch '%s' has been ignored", name) else: branches[name] = branch_target snapshot_data = {"branches": branches} logger.debug("snapshot: %s", snapshot_data) snapshot = Snapshot.from_dict(snapshot_data) logger.debug("snapshot: %s", snapshot) self.storage.snapshot_add([snapshot]) return snapshot diff --git a/swh/loader/package/tests/test_loader.py b/swh/loader/package/tests/test_loader.py index 9f7ebe9..8cf0de3 100644 --- a/swh/loader/package/tests/test_loader.py +++ b/swh/loader/package/tests/test_loader.py @@ -1,38 +1,38 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.loader.package.loader import PackageLoader class FakeStorage: - def origin_add_one(self, origin): + def origin_add(self, origins): raise ValueError("We refuse to add an origin") def origin_visit_get_latest(self, origin): return None class FakeStorage2(FakeStorage): - def origin_add_one(self, origin): - return origin + def origin_add(self, origins): + pass def origin_visit_add(self, visits): raise ValueError("We refuse to add an origin visit") def test_loader_origin_visit_failure(swh_config): """Failure to add origin or origin visit should failed immediately """ loader = PackageLoader("some-url") loader.storage = FakeStorage() actual_load_status = loader.load() assert actual_load_status == {"status": "failed"} loader.storage = FakeStorage2() actual_load_status2 = loader.load() assert actual_load_status2 == {"status": "failed"}