Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py
index 30e30f6..c787d2e 100644
--- a/swh/loader/core/loader.py
+++ b/swh/loader/core/loader.py
@@ -1,636 +1,642 @@
# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import hashlib
import logging
import os
import time
from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union
import sentry_sdk
from swh.core.config import load_from_envvar
from swh.core.statsd import Statsd
from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister
from swh.loader.exception import NotFound
from swh.model.model import (
BaseContent,
Content,
Directory,
Origin,
OriginVisit,
OriginVisitStatus,
RawExtrinsicMetadata,
Release,
Revision,
Sha1Git,
SkippedContent,
Snapshot,
)
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
from swh.storage.utils import now
DEFAULT_CONFIG: Dict[str, Any] = {
"max_content_size": 100 * 1024 * 1024,
}
+SENTRY_ORIGIN_URL_TAG_NAME = "swh.loader.origin_url"
+SENTRY_VISIT_TYPE_TAG_NAME = "swh.loader.visit_type"
+
class BaseLoader:
"""Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g
PyPI, Npm, CRAN, ...)
A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/...
package artifacts), ingests the contents/directories/revisions/releases/snapshot
read from those artifacts and send them to the archive through the storage backend.
The main entry point for the loader is the :func:`load` function.
2 static methods (:func:`from_config`, :func:`from_configfile`) centralizes and
eases the loader instantiation from either configuration dict or configuration file.
Some class examples:
- :class:`SvnLoader`
- :class:`GitLoader`
- :class:`PyPILoader`
- :class:`NpmLoader`
Args:
lister_name: Name of the lister which triggered this load.
If provided, the loader will try to use the forge's API to retrieve extrinsic
metadata
lister_instance_name: Name of the lister instance which triggered this load.
Must be None iff lister_name is, but it may be the empty string for listers
with a single instance.
"""
visit_type: str
origin: Origin
loaded_snapshot_id: Optional[Sha1Git]
parent_origins: Optional[List[Origin]]
"""If the given origin is a "forge fork" (ie. created with the "Fork" button
of GitHub-like forges), :meth:`build_extrinsic_origin_metadata` sets this to
a list of origins it was forked from; closest parent first."""
def __init__(
self,
storage: StorageInterface,
origin_url: str,
logging_class: Optional[str] = None,
save_data_path: Optional[str] = None,
max_content_size: Optional[int] = None,
lister_name: Optional[str] = None,
lister_instance_name: Optional[str] = None,
metadata_fetcher_credentials: CredentialsType = None,
):
if lister_name == "":
raise ValueError("lister_name must not be the empty string")
if lister_name is None and lister_instance_name is not None:
raise ValueError(
f"lister_name is None but lister_instance_name is {lister_instance_name!r}"
)
if lister_name is not None and lister_instance_name is None:
raise ValueError(
f"lister_instance_name is None but lister_name is {lister_name!r}"
)
self.storage = storage
self.origin = Origin(url=origin_url)
self.max_content_size = int(max_content_size) if max_content_size else None
self.lister_name = lister_name
self.lister_instance_name = lister_instance_name
self.metadata_fetcher_credentials = metadata_fetcher_credentials or {}
if logging_class is None:
logging_class = "%s.%s" % (
self.__class__.__module__,
self.__class__.__name__,
)
self.log = logging.getLogger(logging_class)
_log = logging.getLogger("requests.packages.urllib3.connectionpool")
_log.setLevel(logging.WARN)
+ sentry_sdk.set_tag(SENTRY_ORIGIN_URL_TAG_NAME, self.origin.url)
+ sentry_sdk.set_tag(SENTRY_VISIT_TYPE_TAG_NAME, self.visit_type)
+
# possibly overridden in self.prepare method
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
self.loaded_snapshot_id = None
if save_data_path:
path = save_data_path
os.stat(path)
if not os.access(path, os.R_OK | os.W_OK):
raise PermissionError("Permission denied: %r" % path)
self.save_data_path = save_data_path
self.parent_origins = None
self.statsd = Statsd(
namespace="swh_loader", constant_tags={"visit_type": self.visit_type}
)
@classmethod
def from_config(cls, storage: Dict[str, Any], **config: Any):
"""Instantiate a loader from a configuration dict.
This is basically a backwards-compatibility shim for the CLI.
Args:
storage: instantiation config for the storage
config: the configuration dict for the loader, with the following keys:
- credentials (optional): credentials list for the scheduler
- any other kwargs passed to the loader.
Returns:
the instantiated loader
"""
# Drop the legacy config keys which aren't used for this generation of loader.
for legacy_key in ("storage", "celery"):
config.pop(legacy_key, None)
# Instantiate the storage
storage_instance = get_storage(**storage)
return cls(storage=storage_instance, **config)
@classmethod
def from_configfile(cls, **kwargs: Any):
"""Instantiate a loader from the configuration loaded from the
SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their
value is not None.
Args:
kwargs: kwargs passed to the loader instantiation
"""
config = dict(load_from_envvar(DEFAULT_CONFIG))
config.update({k: v for k, v in kwargs.items() if v is not None})
return cls.from_config(**config)
def save_data(self) -> None:
"""Save the data associated to the current load"""
raise NotImplementedError
def get_save_data_path(self) -> str:
"""The path to which we archive the loader's raw data"""
if not hasattr(self, "__save_data_path"):
year = str(self.visit_date.year)
assert self.origin
url = self.origin.url.encode("utf-8")
origin_url_hash = hashlib.sha1(url).hexdigest()
path = "%s/sha1:%s/%s/%s" % (
self.save_data_path,
origin_url_hash[0:2],
origin_url_hash,
year,
)
os.makedirs(path, exist_ok=True)
self.__save_data_path = path
return self.__save_data_path
def flush(self) -> Dict[str, int]:
"""Flush any potential buffered data not sent to swh-storage.
Returns the same value as :meth:`swh.storage.interface.StorageInterface.flush`.
"""
return self.storage.flush()
def cleanup(self) -> None:
"""Last step executed by the loader."""
raise NotImplementedError
def _store_origin_visit(self) -> None:
"""Store origin and visit references. Sets the self.visit references."""
assert self.origin
self.storage.origin_add([self.origin])
assert isinstance(self.visit_type, str)
self.visit = list(
self.storage.origin_visit_add(
[
OriginVisit(
origin=self.origin.url,
date=self.visit_date,
type=self.visit_type,
)
]
)
)[0]
def prepare(self) -> None:
"""Second step executed by the loader to prepare some state needed by
the loader.
Raises
NotFound exception if the origin to ingest is not found.
"""
raise NotImplementedError
def get_origin(self) -> Origin:
"""Get the origin that is currently being loaded.
self.origin should be set in :func:`prepare_origin`
Returns:
dict: an origin ready to be sent to storage by
:func:`origin_add`.
"""
assert self.origin
return self.origin
def fetch_data(self) -> bool:
"""Fetch the data from the source the loader is currently loading
(ex: git/hg/svn/... repository).
Returns:
a value that is interpreted as a boolean. If True, fetch_data needs
to be called again to complete loading.
"""
raise NotImplementedError
def process_data(self) -> bool:
"""Run any additional processing between fetching and storing the data
Returns:
a value that is interpreted as a boolean. If True, fetch_data needs
to be called again to complete loading.
Ignored if ``fetch_data`` already returned :const:`False`.
"""
return True
def store_data(self):
"""Store fetched data in the database.
Should call the :func:`maybe_load_xyz` methods, which handle the
bundles sent to storage, rather than send directly.
"""
raise NotImplementedError
def load_status(self) -> Dict[str, str]:
"""Detailed loading status.
Defaults to logging an eventful load.
Returns: a dictionary that is eventually passed back as the task's
result to the scheduler, allowing tuning of the task recurrence
mechanism.
"""
return {
"status": "eventful",
}
def post_load(self, success: bool = True) -> None:
"""Permit the loader to do some additional actions according to status
after the loading is done. The flag success indicates the
loading's status.
Defaults to doing nothing.
This is up to the implementer of this method to make sure this
does not break.
Args:
success (bool): the success status of the loading
"""
pass
def visit_status(self) -> str:
"""Detailed visit status.
Defaults to logging a full visit.
"""
return "full"
def pre_cleanup(self) -> None:
"""As a first step, will try and check for dangling data to cleanup.
This should do its best to avoid raising issues.
"""
pass
def load(self) -> Dict[str, str]:
r"""Loading logic for the loader to follow:
- Store the actual ``origin_visit`` to storage
- Call :meth:`prepare` to prepare any eventual state
- Call :meth:`get_origin` to get the origin we work with and store
- while True:
- Call :meth:`fetch_data` to fetch the data to store
- Call :meth:`process_data` to optionally run processing between
:meth:`fetch_data` and :meth:`store_data`
- Call :meth:`store_data` to store the data
- Call :meth:`cleanup` to clean up any eventual state put in place
in :meth:`prepare` method.
"""
try:
with self.statsd_timed("pre_cleanup"):
self.pre_cleanup()
except Exception:
msg = "Cleaning up dangling data failed! Continue loading."
self.log.warning(msg)
sentry_sdk.capture_exception()
self._store_origin_visit()
assert (
self.visit.visit
), "The method `_store_origin_visit` should set the visit (OriginVisit)"
self.log.info(
"Load origin '%s' with type '%s'", self.origin.url, self.visit.type
)
try:
with self.statsd_timed("build_extrinsic_origin_metadata"):
metadata = self.build_extrinsic_origin_metadata()
self.load_metadata_objects(metadata)
except Exception as e:
sentry_sdk.capture_exception(e)
# Do not fail the whole task if this is the only failure
self.log.exception(
"Failure while loading extrinsic origin metadata.",
extra={
"swh_task_args": [],
"swh_task_kwargs": {
"origin": self.origin.url,
"lister_name": self.lister_name,
"lister_instance_name": self.lister_instance_name,
},
},
)
total_time_fetch_data = 0.0
total_time_process_data = 0.0
total_time_store_data = 0.0
# Initially not a success, will be True when actually one
status = "failed"
success = False
try:
with self.statsd_timed("prepare"):
self.prepare()
while True:
t1 = time.monotonic()
more_data_to_fetch = self.fetch_data()
t2 = time.monotonic()
total_time_fetch_data += t2 - t1
more_data_to_fetch = self.process_data() and more_data_to_fetch
t3 = time.monotonic()
total_time_process_data += t3 - t2
self.store_data()
t4 = time.monotonic()
total_time_store_data += t4 - t3
if not more_data_to_fetch:
break
self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0)
self.statsd_timing("process_data", total_time_process_data * 1000.0)
self.statsd_timing("store_data", total_time_store_data * 1000.0)
status = self.visit_status()
visit_status = OriginVisitStatus(
origin=self.origin.url,
visit=self.visit.visit,
type=self.visit_type,
date=now(),
status=status,
snapshot=self.loaded_snapshot_id,
)
self.storage.origin_visit_status_add([visit_status])
success = True
with self.statsd_timed(
"post_load", tags={"success": success, "status": status}
):
self.post_load()
except BaseException as e:
success = False
if isinstance(e, NotFound):
status = "not_found"
task_status = "uneventful"
else:
status = "partial" if self.loaded_snapshot_id else "failed"
task_status = "failed"
self.log.exception(
"Loading failure, updating to `%s` status",
status,
extra={
"swh_task_args": [],
"swh_task_kwargs": {
"origin": self.origin.url,
"lister_name": self.lister_name,
"lister_instance_name": self.lister_instance_name,
},
},
)
if not isinstance(e, (SystemExit, KeyboardInterrupt)):
sentry_sdk.capture_exception()
visit_status = OriginVisitStatus(
origin=self.origin.url,
visit=self.visit.visit,
type=self.visit_type,
date=now(),
status=status,
snapshot=self.loaded_snapshot_id,
)
self.storage.origin_visit_status_add([visit_status])
with self.statsd_timed(
"post_load", tags={"success": success, "status": status}
):
self.post_load(success=success)
if not isinstance(e, Exception):
# e derives from BaseException but not Exception; this is most likely
# SystemExit or KeyboardInterrupt, so we should re-raise it.
raise
return {"status": task_status}
finally:
with self.statsd_timed(
"flush", tags={"success": success, "status": status}
):
self.flush()
with self.statsd_timed(
"cleanup", tags={"success": success, "status": status}
):
self.cleanup()
return self.load_status()
def load_metadata_objects(
self, metadata_objects: List[RawExtrinsicMetadata]
) -> None:
if not metadata_objects:
return
authorities = {mo.authority for mo in metadata_objects}
self.storage.metadata_authority_add(list(authorities))
fetchers = {mo.fetcher for mo in metadata_objects}
self.storage.metadata_fetcher_add(list(fetchers))
self.storage.raw_extrinsic_metadata_add(metadata_objects)
def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]:
"""Builds a list of full RawExtrinsicMetadata objects, using
a metadata fetcher returned by :func:`get_fetcher_classes`."""
if self.lister_name is None:
self.log.debug("lister_not provided, skipping extrinsic origin metadata")
return []
assert (
self.lister_instance_name is not None
), "lister_instance_name is None, but lister_name is not"
metadata = []
fetcher_classes = get_fetchers_for_lister(self.lister_name)
self.statsd_average("metadata_fetchers", len(fetcher_classes))
for cls in fetcher_classes:
metadata_fetcher = cls(
origin=self.origin,
lister_name=self.lister_name,
lister_instance_name=self.lister_instance_name,
credentials=self.metadata_fetcher_credentials,
)
with self.statsd_timed(
"fetch_one_metadata", tags={"fetcher": cls.FETCHER_NAME}
):
metadata.extend(metadata_fetcher.get_origin_metadata())
if self.parent_origins is None:
self.parent_origins = metadata_fetcher.get_parent_origins()
self.statsd_average(
"metadata_parent_origins",
len(self.parent_origins),
tags={"fetcher": cls.FETCHER_NAME},
)
self.statsd_average("metadata_objects", len(metadata))
return metadata
def statsd_timed(self, name: str, tags: Dict[str, Any] = {}) -> ContextManager:
"""
Wrapper for :meth:`swh.core.statsd.Statsd.timed`, which uses the standard
metric name and tags for loaders.
"""
return self.statsd.timed(
"operation_duration_seconds", tags={"operation": name, **tags}
)
def statsd_timing(self, name: str, value: float, tags: Dict[str, Any] = {}) -> None:
"""
Wrapper for :meth:`swh.core.statsd.Statsd.timing`, which uses the standard
metric name and tags for loaders.
"""
self.statsd.timing(
"operation_duration_seconds", value, tags={"operation": name, **tags}
)
def statsd_average(
self, name: str, value: Union[int, float], tags: Dict[str, Any] = {}
) -> None:
"""Increments both ``{name}_sum`` (by the ``value``) and ``{name}_count``
(by ``1``), allowing to prometheus to compute the average ``value`` over
time."""
self.statsd.increment(f"{name}_sum", value, tags=tags)
self.statsd.increment(f"{name}_count", tags=tags)
class DVCSLoader(BaseLoader):
"""This base class is a pattern for dvcs loaders (e.g. git, mercurial).
Those loaders are able to load all the data in one go. For example, the
loader defined in swh-loader-git :class:`BulkUpdater`.
For other loaders (stateful one, (e.g :class:`SWHSvnLoader`),
inherit directly from :class:`BaseLoader`.
"""
def cleanup(self) -> None:
"""Clean up an eventual state installed for computations."""
pass
def has_contents(self) -> bool:
"""Checks whether we need to load contents"""
return True
def get_contents(self) -> Iterable[BaseContent]:
"""Get the contents that need to be loaded"""
raise NotImplementedError
def has_directories(self) -> bool:
"""Checks whether we need to load directories"""
return True
def get_directories(self) -> Iterable[Directory]:
"""Get the directories that need to be loaded"""
raise NotImplementedError
def has_revisions(self) -> bool:
"""Checks whether we need to load revisions"""
return True
def get_revisions(self) -> Iterable[Revision]:
"""Get the revisions that need to be loaded"""
raise NotImplementedError
def has_releases(self) -> bool:
"""Checks whether we need to load releases"""
return True
def get_releases(self) -> Iterable[Release]:
"""Get the releases that need to be loaded"""
raise NotImplementedError
def get_snapshot(self) -> Snapshot:
"""Get the snapshot that needs to be loaded"""
raise NotImplementedError
def eventful(self) -> bool:
"""Whether the load was eventful"""
raise NotImplementedError
def store_data(self) -> None:
assert self.origin
if self.save_data_path:
self.save_data()
if self.has_contents():
for obj in self.get_contents():
if isinstance(obj, Content):
self.storage.content_add([obj])
elif isinstance(obj, SkippedContent):
self.storage.skipped_content_add([obj])
else:
raise TypeError(f"Unexpected content type: {obj}")
if self.has_directories():
for directory in self.get_directories():
self.storage.directory_add([directory])
if self.has_revisions():
for revision in self.get_revisions():
self.storage.revision_add([revision])
if self.has_releases():
for release in self.get_releases():
self.storage.release_add([release])
snapshot = self.get_snapshot()
self.storage.snapshot_add([snapshot])
self.flush()
self.loaded_snapshot_id = snapshot.id
diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py
index dacec8b..6633460 100644
--- a/swh/loader/core/tests/test_loader.py
+++ b/swh/loader/core/tests/test_loader.py
@@ -1,481 +1,505 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import hashlib
import logging
import time
from unittest.mock import MagicMock, call
import pytest
-from swh.loader.core.loader import BaseLoader, DVCSLoader
+from swh.loader.core.loader import (
+ SENTRY_ORIGIN_URL_TAG_NAME,
+ SENTRY_VISIT_TYPE_TAG_NAME,
+ BaseLoader,
+ DVCSLoader,
+)
from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol
from swh.loader.exception import NotFound
from swh.loader.tests import assert_last_visit_matches
from swh.model.hashutil import hash_to_bytes
from swh.model.model import (
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
Origin,
RawExtrinsicMetadata,
Snapshot,
)
import swh.storage.exc
ORIGIN = Origin(url="some-url")
PARENT_ORIGIN = Origin(url="base-origin-url")
METADATA_AUTHORITY = MetadataAuthority(
type=MetadataAuthorityType.FORGE, url="http://example.org/"
)
REMD = RawExtrinsicMetadata(
target=ORIGIN.swhid(),
discovery_date=datetime.datetime.now(tz=datetime.timezone.utc),
authority=METADATA_AUTHORITY,
fetcher=MetadataFetcher(
name="test fetcher",
version="0.0.1",
),
format="test-format",
metadata=b'{"foo": "bar"}',
)
class DummyLoader:
"""Base Loader to overload and simplify the base class (technical: to avoid repetition
in other *Loader classes)"""
visit_type = "git"
def __init__(self, storage, *args, **kwargs):
super().__init__(storage, ORIGIN.url, *args, **kwargs)
def cleanup(self):
pass
def prepare(self, *args, **kwargs):
pass
def fetch_data(self):
pass
def get_snapshot_id(self):
return None
class DummyDVCSLoader(DummyLoader, DVCSLoader):
"""DVCS Loader that does nothing in regards to DAG objects."""
def get_contents(self):
return []
def get_directories(self):
return []
def get_revisions(self):
return []
def get_releases(self):
return []
def get_snapshot(self):
return Snapshot(branches={})
def eventful(self):
return False
class DummyBaseLoader(DummyLoader, BaseLoader):
"""Buffered loader will send new data when threshold is reached"""
def store_data(self):
pass
class DummyMetadataFetcher:
SUPPORTED_LISTERS = {"fake-forge"}
FETCHER_NAME = "fake-forge"
def __init__(self, origin, credentials, lister_name, lister_instance_name):
pass
def get_origin_metadata(self):
return [REMD]
def get_parent_origins(self):
return []
class DummyMetadataFetcherWithFork:
SUPPORTED_LISTERS = {"fake-forge"}
FETCHER_NAME = "fake-forge"
def __init__(self, origin, credentials, lister_name, lister_instance_name):
pass
def get_origin_metadata(self):
return [REMD]
def get_parent_origins(self):
return [PARENT_ORIGIN]
def test_types():
assert isinstance(
DummyMetadataFetcher(None, None, None, None), MetadataFetcherProtocol
)
assert isinstance(
DummyMetadataFetcherWithFork(None, None, None, None), MetadataFetcherProtocol
)
def test_base_loader(swh_storage):
loader = DummyBaseLoader(swh_storage)
result = loader.load()
assert result == {"status": "eventful"}
def test_base_loader_with_config(swh_storage):
loader = DummyBaseLoader(swh_storage, "logger-name")
result = loader.load()
assert result == {"status": "eventful"}
def test_base_loader_with_known_lister_name(swh_storage, mocker):
fetcher_cls = MagicMock(wraps=DummyMetadataFetcher)
fetcher_cls.SUPPORTED_LISTERS = DummyMetadataFetcher.SUPPORTED_LISTERS
fetcher_cls.FETCHER_NAME = "fake-forge"
mocker.patch(
"swh.loader.core.metadata_fetchers._fetchers", return_value=[fetcher_cls]
)
loader = DummyBaseLoader(
swh_storage, lister_name="fake-forge", lister_instance_name=""
)
statsd_report = mocker.patch.object(loader.statsd, "_report")
result = loader.load()
assert result == {"status": "eventful"}
fetcher_cls.assert_called_once()
fetcher_cls.assert_called_once_with(
origin=ORIGIN,
credentials={},
lister_name="fake-forge",
lister_instance_name="",
)
assert swh_storage.raw_extrinsic_metadata_get(
ORIGIN.swhid(), METADATA_AUTHORITY
).results == [REMD]
assert loader.parent_origins == []
assert [
call("metadata_fetchers_sum", "c", 1, {}, 1),
call("metadata_fetchers_count", "c", 1, {}, 1),
call("metadata_parent_origins_sum", "c", 0, {"fetcher": "fake-forge"}, 1),
call("metadata_parent_origins_count", "c", 1, {"fetcher": "fake-forge"}, 1),
call("metadata_objects_sum", "c", 1, {}, 1),
call("metadata_objects_count", "c", 1, {}, 1),
] == [c for c in statsd_report.mock_calls if "metadata_" in c[1][0]]
assert loader.statsd.namespace == "swh_loader"
assert loader.statsd.constant_tags == {"visit_type": "git"}
def test_base_loader_with_unknown_lister_name(swh_storage, mocker):
fetcher_cls = MagicMock(wraps=DummyMetadataFetcher)
fetcher_cls.SUPPORTED_LISTERS = DummyMetadataFetcher.SUPPORTED_LISTERS
mocker.patch(
"swh.loader.core.metadata_fetchers._fetchers", return_value=[fetcher_cls]
)
loader = DummyBaseLoader(
swh_storage, lister_name="other-lister", lister_instance_name=""
)
result = loader.load()
assert result == {"status": "eventful"}
fetcher_cls.assert_not_called()
with pytest.raises(swh.storage.exc.StorageArgumentException):
swh_storage.raw_extrinsic_metadata_get(ORIGIN.swhid(), METADATA_AUTHORITY)
def test_base_loader_forked_origin(swh_storage, mocker):
fetcher_cls = MagicMock(wraps=DummyMetadataFetcherWithFork)
fetcher_cls.SUPPORTED_LISTERS = DummyMetadataFetcherWithFork.SUPPORTED_LISTERS
fetcher_cls.FETCHER_NAME = "fake-forge"
mocker.patch(
"swh.loader.core.metadata_fetchers._fetchers", return_value=[fetcher_cls]
)
loader = DummyBaseLoader(
swh_storage, lister_name="fake-forge", lister_instance_name=""
)
statsd_report = mocker.patch.object(loader.statsd, "_report")
result = loader.load()
assert result == {"status": "eventful"}
fetcher_cls.assert_called_once()
fetcher_cls.assert_called_once_with(
origin=ORIGIN,
credentials={},
lister_name="fake-forge",
lister_instance_name="",
)
assert swh_storage.raw_extrinsic_metadata_get(
ORIGIN.swhid(), METADATA_AUTHORITY
).results == [REMD]
assert loader.parent_origins == [PARENT_ORIGIN]
assert [
call("metadata_fetchers_sum", "c", 1, {}, 1),
call("metadata_fetchers_count", "c", 1, {}, 1),
call("metadata_parent_origins_sum", "c", 1, {"fetcher": "fake-forge"}, 1),
call("metadata_parent_origins_count", "c", 1, {"fetcher": "fake-forge"}, 1),
call("metadata_objects_sum", "c", 1, {}, 1),
call("metadata_objects_count", "c", 1, {}, 1),
] == [c for c in statsd_report.mock_calls if "metadata_" in c[1][0]]
assert loader.statsd.namespace == "swh_loader"
assert loader.statsd.constant_tags == {"visit_type": "git"}
def test_base_loader_post_load_raise(swh_storage, mocker):
loader = DummyBaseLoader(swh_storage)
post_load = mocker.patch.object(loader, "post_load")
# raise exception in post_load when success is True
def post_load_method(*args, success=True):
if success:
raise Exception("Error in post_load")
post_load.side_effect = post_load_method
result = loader.load()
assert result == {"status": "failed"}
# ensure post_load has been called twice, once with success to True and
# once with success to False as the first post_load call raised exception
assert post_load.call_args_list == [mocker.call(), mocker.call(success=False)]
def test_dvcs_loader(swh_storage):
loader = DummyDVCSLoader(swh_storage)
result = loader.load()
assert result == {"status": "eventful"}
def test_dvcs_loader_with_config(swh_storage):
loader = DummyDVCSLoader(swh_storage, "another-logger")
result = loader.load()
assert result == {"status": "eventful"}
def test_loader_logger_default_name(swh_storage):
loader = DummyBaseLoader(swh_storage)
assert isinstance(loader.log, logging.Logger)
assert loader.log.name == "swh.loader.core.tests.test_loader.DummyBaseLoader"
loader = DummyDVCSLoader(swh_storage)
assert isinstance(loader.log, logging.Logger)
assert loader.log.name == "swh.loader.core.tests.test_loader.DummyDVCSLoader"
def test_loader_logger_with_name(swh_storage):
loader = DummyBaseLoader(swh_storage, "some.logger.name")
assert isinstance(loader.log, logging.Logger)
assert loader.log.name == "some.logger.name"
def test_loader_save_data_path(swh_storage, tmp_path):
loader = DummyBaseLoader(swh_storage, "some.logger.name.1", save_data_path=tmp_path)
url = "http://bitbucket.org/something"
loader.origin = Origin(url=url)
loader.visit_date = datetime.datetime(year=2019, month=10, day=1)
hash_url = hashlib.sha1(url.encode("utf-8")).hexdigest()
expected_save_path = "%s/sha1:%s/%s/2019" % (str(tmp_path), hash_url[0:2], hash_url)
save_path = loader.get_save_data_path()
assert save_path == expected_save_path
def _check_load_failure(caplog, loader, exc_class, exc_text, status="partial"):
"""Check whether a failed load properly logged its exception, and that the
snapshot didn't get referenced in storage"""
assert isinstance(loader, DVCSLoader) # was implicit so far
for record in caplog.records:
if record.levelname != "ERROR":
continue
assert "Loading failure" in record.message
assert record.exc_info
exc = record.exc_info[1]
assert isinstance(exc, exc_class)
assert exc_text in exc.args[0]
# Check that the get_snapshot operation would have succeeded
assert loader.get_snapshot() is not None
# And confirm that the visit doesn't reference a snapshot
visit = assert_last_visit_matches(loader.storage, ORIGIN.url, status)
if status != "partial":
assert visit.snapshot is None
# But that the snapshot didn't get loaded
assert loader.loaded_snapshot_id is None
@pytest.mark.parametrize("success", [True, False])
def test_loader_timings(swh_storage, mocker, success):
current_time = time.time()
mocker.patch("time.monotonic", side_effect=lambda: current_time)
mocker.patch("swh.core.statsd.monotonic", side_effect=lambda: current_time)
runtimes = {
"pre_cleanup": 2.0,
"build_extrinsic_origin_metadata": 3.0,
"prepare": 5.0,
"fetch_data": 7.0,
"process_data": 11.0,
"store_data": 13.0,
"post_load": 17.0,
"flush": 23.0,
"cleanup": 27.0,
}
class TimedLoader(BaseLoader):
visit_type = "my-visit-type"
def __getattribute__(self, method_name):
if method_name == "visit_status" and not success:
def crashy():
raise Exception("oh no")
return crashy
if method_name not in runtimes:
return super().__getattribute__(method_name)
def meth(*args, **kwargs):
nonlocal current_time
current_time += runtimes[method_name]
return meth
loader = TimedLoader(swh_storage, origin_url="http://example.org/hello.git")
statsd_report = mocker.patch.object(loader.statsd, "_report")
loader.load()
if success:
expected_tags = {
"post_load": {"success": True, "status": "full"},
"flush": {"success": True, "status": "full"},
"cleanup": {"success": True, "status": "full"},
}
else:
expected_tags = {
"post_load": {"success": False, "status": "failed"},
"flush": {"success": False, "status": "failed"},
"cleanup": {"success": False, "status": "failed"},
}
# note that this is a list equality, so order of entries in 'runtimes' matters.
# This is not perfect, but call() objects are not hashable so it's simpler this way,
# even if not perfect.
assert statsd_report.mock_calls == [
call(
"operation_duration_seconds",
"ms",
value * 1000,
{"operation": key, **expected_tags.get(key, {})},
1,
)
for (key, value) in runtimes.items()
]
assert loader.statsd.namespace == "swh_loader"
assert loader.statsd.constant_tags == {"visit_type": "my-visit-type"}
class DummyDVCSLoaderExc(DummyDVCSLoader):
"""A loader which raises an exception when loading some contents"""
def get_contents(self):
raise RuntimeError("Failed to get contents!")
def test_dvcs_loader_exc_partial_visit(swh_storage, caplog):
logger_name = "dvcsloaderexc"
caplog.set_level(logging.ERROR, logger=logger_name)
loader = DummyDVCSLoaderExc(swh_storage, logging_class=logger_name)
# fake the loading ending up in a snapshot
loader.loaded_snapshot_id = hash_to_bytes(
"9e4dd2b40d1b46b70917c0949aa2195c823a648e"
)
result = loader.load()
# loading failed
assert result == {"status": "failed"}
# still resulted in a partial visit with a snapshot (somehow)
_check_load_failure(
caplog,
loader,
RuntimeError,
"Failed to get contents!",
)
class BrokenStorageProxy:
def __init__(self, storage):
self.storage = storage
def __getattr__(self, attr):
return getattr(self.storage, attr)
def snapshot_add(self, snapshots):
raise RuntimeError("Failed to add snapshot!")
class DummyDVCSLoaderStorageExc(DummyDVCSLoader):
"""A loader which raises an exception when loading some contents"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.storage = BrokenStorageProxy(self.storage)
def test_dvcs_loader_storage_exc_failed_visit(swh_storage, caplog):
logger_name = "dvcsloaderexc"
caplog.set_level(logging.ERROR, logger=logger_name)
loader = DummyDVCSLoaderStorageExc(swh_storage, logging_class=logger_name)
result = loader.load()
assert result == {"status": "failed"}
_check_load_failure(
caplog, loader, RuntimeError, "Failed to add snapshot!", status="failed"
)
class DummyDVCSLoaderNotFound(DummyDVCSLoader, BaseLoader):
"""A loader which raises a not_found exception during the prepare method call"""
def prepare(*args, **kwargs):
raise NotFound("Unknown origin!")
def load_status(self):
return {
"status": "uneventful",
}
def test_loader_not_found(swh_storage, caplog):
loader = DummyDVCSLoaderNotFound(swh_storage)
result = loader.load()
assert result == {"status": "uneventful"}
_check_load_failure(caplog, loader, NotFound, "Unknown origin!", status="not_found")
+
+
+class DummyLoaderWithError(DummyBaseLoader):
+ def prepare(self, *args, **kwargs):
+ raise Exception("error")
+
+
+class DummyDVCSLoaderWithError(DummyDVCSLoader, BaseLoader):
+ def prepare(self, *args, **kwargs):
+ raise Exception("error")
+
+
+@pytest.mark.parametrize("loader_cls", [DummyLoaderWithError, DummyDVCSLoaderWithError])
+def test_loader_sentry_tags_on_error(swh_storage, sentry_events, loader_cls):
+ loader = loader_cls(swh_storage)
+ loader.load()
+ sentry_tags = sentry_events[0]["tags"]
+ assert sentry_tags.get(SENTRY_ORIGIN_URL_TAG_NAME) == ORIGIN.url
+ assert sentry_tags.get(SENTRY_VISIT_TYPE_TAG_NAME) == DummyLoader.visit_type
diff --git a/swh/loader/package/tests/test_loader.py b/swh/loader/package/tests/test_loader.py
index ccf47c3..1ea43dc 100644
--- a/swh/loader/package/tests/test_loader.py
+++ b/swh/loader/package/tests/test_loader.py
@@ -1,520 +1,541 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import hashlib
import logging
import string
from unittest.mock import Mock, call, patch
import attr
import pytest
+from swh.loader.core.loader import (
+ SENTRY_ORIGIN_URL_TAG_NAME,
+ SENTRY_VISIT_TYPE_TAG_NAME,
+)
from swh.loader.package.loader import BasePackageInfo, PackageLoader
from swh.model.model import (
Origin,
OriginVisit,
OriginVisitStatus,
Person,
Release,
Revision,
RevisionType,
Snapshot,
SnapshotBranch,
TargetType,
TimestampWithTimezone,
)
from swh.model.model import ExtID
from swh.model.model import ObjectType as ModelObjectType
from swh.model.swhids import CoreSWHID, ObjectType
from swh.storage import get_storage
from swh.storage.algos.snapshot import snapshot_get_latest
class FakeStorage:
def origin_add(self, origins):
raise ValueError("We refuse to add an origin")
def origin_visit_get_latest(self, origin):
return None
class FakeStorage2(FakeStorage):
def origin_add(self, origins):
pass
def origin_visit_add(self, visits):
raise ValueError("We refuse to add an origin visit")
class StubPackageInfo(BasePackageInfo):
pass
class StubPackageLoader(PackageLoader[StubPackageInfo]):
visit_type = "stub"
def get_versions(self):
return ["v1.0", "v2.0", "v3.0", "v4.0"]
def get_package_info(self, version):
p_info = StubPackageInfo(
"http://example.org", f"example-{version}.tar", version=version
)
extid_type = "extid-type1" if version in ("v1.0", "v2.0") else "extid-type2"
# Versions 1.0 and 2.0 have an extid of a given type, v3.0 has an extid
# of a different type
patch.object(
p_info,
"extid",
return_value=(extid_type, 0, f"extid-of-{version}".encode()),
autospec=True,
).start()
yield (f"branch-{version}", p_info)
def _load_release(self, p_info, origin):
return None
def test_loader_origin_visit_failure(swh_storage):
"""Failure to add origin or origin visit should failed immediately"""
loader = StubPackageLoader(swh_storage, "some-url")
loader.storage = FakeStorage()
actual_load_status = loader.load()
assert actual_load_status == {"status": "failed"}
loader.storage = FakeStorage2()
actual_load_status2 = loader.load()
assert actual_load_status2 == {"status": "failed"}
def test_resolve_object_from_extids() -> None:
storage = get_storage("memory")
target = b"\x01" * 20
rel1 = Release(
name=b"aaaa",
message=b"aaaa",
target=target,
target_type=ModelObjectType.DIRECTORY,
synthetic=False,
)
rel2 = Release(
name=b"bbbb",
message=b"bbbb",
target=target,
target_type=ModelObjectType.DIRECTORY,
synthetic=False,
)
storage.release_add([rel1, rel2])
loader = StubPackageLoader(storage, "http://example.org/")
p_info = Mock(wraps=BasePackageInfo(None, None, None)) # type: ignore
# The PackageInfo does not support extids
p_info.extid.return_value = None
known_extids = {("extid-type", 0, b"extid-of-aaaa"): [rel1.swhid()]}
whitelist = {b"unused"}
assert loader.resolve_object_from_extids(known_extids, p_info, whitelist) is None
# Some known extid, and the PackageInfo is not one of them (ie. cache miss)
p_info.extid.return_value = ("extid-type", 0, b"extid-of-cccc")
assert loader.resolve_object_from_extids(known_extids, p_info, whitelist) is None
# Some known extid, and the PackageInfo is one of them (ie. cache hit),
# but the target release was not in the previous snapshot
p_info.extid.return_value = ("extid-type", 0, b"extid-of-aaaa")
assert loader.resolve_object_from_extids(known_extids, p_info, whitelist) is None
# Some known extid, and the PackageInfo is one of them (ie. cache hit),
# and the target release was in the previous snapshot
whitelist = {rel1.id}
assert (
loader.resolve_object_from_extids(known_extids, p_info, whitelist)
== rel1.swhid()
)
# Same as before, but there is more than one extid, and only one is an allowed
# release
whitelist = {rel1.id}
known_extids = {("extid-type", 0, b"extid-of-aaaa"): [rel2.swhid(), rel1.swhid()]}
assert (
loader.resolve_object_from_extids(known_extids, p_info, whitelist)
== rel1.swhid()
)
def test_resolve_object_from_extids_missing_target() -> None:
storage = get_storage("memory")
target = b"\x01" * 20
rel = Release(
name=b"aaaa",
message=b"aaaa",
target=target,
target_type=ModelObjectType.DIRECTORY,
synthetic=False,
)
loader = StubPackageLoader(storage, "http://example.org/")
p_info = Mock(wraps=BasePackageInfo(None, None, None)) # type: ignore
known_extids = {("extid-type", 0, b"extid-of-aaaa"): [rel.swhid()]}
p_info.extid.return_value = ("extid-type", 0, b"extid-of-aaaa")
whitelist = {rel.id}
# Targeted release is missing from the storage
assert loader.resolve_object_from_extids(known_extids, p_info, whitelist) is None
storage.release_add([rel])
# Targeted release now exists
assert (
loader.resolve_object_from_extids(known_extids, p_info, whitelist)
== rel.swhid()
)
def test_load_get_known_extids() -> None:
"""Checks PackageLoader.load() fetches known extids efficiently"""
storage = Mock(wraps=get_storage("memory"))
loader = StubPackageLoader(storage, "http://example.org")
loader.load()
# Calls should be grouped by extid type
storage.extid_get_from_extid.assert_has_calls(
[
call("extid-type1", [b"extid-of-v1.0", b"extid-of-v2.0"], version=0),
call("extid-type2", [b"extid-of-v3.0", b"extid-of-v4.0"], version=0),
],
any_order=True,
)
def test_load_extids() -> None:
"""Checks PackageLoader.load() skips iff it should, and writes (only)
the new ExtIDs"""
storage = get_storage("memory")
dir_swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=b"e" * 20)
rels = [
Release(
name=f"v{i}.0".encode(),
message=b"blah\n",
target=dir_swhid.object_id,
target_type=ModelObjectType.DIRECTORY,
synthetic=True,
)
for i in (1, 2, 3, 4)
]
storage.release_add(rels[0:3])
origin = "http://example.org"
rel1_swhid = rels[0].swhid()
rel2_swhid = rels[1].swhid()
rel3_swhid = rels[2].swhid()
rel4_swhid = rels[3].swhid()
# Results of a previous load
storage.extid_add(
[
ExtID("extid-type1", b"extid-of-v1.0", rel1_swhid),
ExtID("extid-type2", b"extid-of-v2.0", rel2_swhid),
]
)
last_snapshot = Snapshot(
branches={
b"v1.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel1_swhid.object_id
),
b"v2.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel2_swhid.object_id
),
b"v3.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel3_swhid.object_id
),
}
)
storage.snapshot_add([last_snapshot])
date = datetime.datetime.now(tz=datetime.timezone.utc)
storage.origin_add([Origin(url=origin)])
storage.origin_visit_add(
[OriginVisit(origin="http://example.org", visit=1, date=date, type="tar")]
)
storage.origin_visit_status_add(
[
OriginVisitStatus(
origin=origin,
visit=1,
status="full",
date=date,
snapshot=last_snapshot.id,
)
]
)
loader = StubPackageLoader(storage, "http://example.org")
patch.object(
loader,
"_load_release",
return_value=(rel4_swhid.object_id, dir_swhid.object_id),
autospec=True,
).start()
loader.load()
assert loader._load_release.mock_calls == [ # type: ignore
# v1.0: not loaded because there is already its (extid_type, extid, rel)
# in the storage.
# v2.0: loaded, because there is already a similar extid, but different type
call(
StubPackageInfo(origin, "example-v2.0.tar", "v2.0"),
Origin(url=origin),
),
# v3.0: loaded despite having an (extid_type, extid) in storage, because
# the target of the extid is not in the previous snapshot
call(
StubPackageInfo(origin, "example-v3.0.tar", "v3.0"),
Origin(url=origin),
),
# v4.0: loaded, because there isn't its extid
call(
StubPackageInfo(origin, "example-v4.0.tar", "v4.0"),
Origin(url=origin),
),
]
# then check the snapshot has all the branches.
# versions 2.0 to 4.0 all point to rel4_swhid (instead of the value of the last
# snapshot), because they had to be loaded (mismatched extid), and the mocked
# _load_release always returns rel4_swhid.
snapshot = Snapshot(
branches={
b"branch-v1.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel1_swhid.object_id
),
b"branch-v2.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel4_swhid.object_id
),
b"branch-v3.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel4_swhid.object_id
),
b"branch-v4.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel4_swhid.object_id
),
}
)
assert snapshot_get_latest(storage, origin) == snapshot
extids = storage.extid_get_from_target(
ObjectType.RELEASE,
[
rel1_swhid.object_id,
rel2_swhid.object_id,
rel3_swhid.object_id,
rel4_swhid.object_id,
],
)
assert set(extids) == {
# What we inserted at the beginning of the test:
ExtID("extid-type1", b"extid-of-v1.0", rel1_swhid),
ExtID("extid-type2", b"extid-of-v2.0", rel2_swhid),
# Added by the loader:
ExtID("extid-type1", b"extid-of-v2.0", rel4_swhid),
ExtID("extid-type2", b"extid-of-v3.0", rel4_swhid),
ExtID("extid-type2", b"extid-of-v4.0", rel4_swhid),
}
def test_load_upgrade_from_revision_extids(caplog):
"""Tests that, when loading incrementally based on a snapshot made by an old
version of the loader, the loader will convert revisions to releases
and add them to the storage.
Also checks that, if an extid exists pointing to a non-existent revision
(which should never happen, but you never know...), the release is loaded from
scratch."""
storage = get_storage("memory")
origin = "http://example.org"
dir1_swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=b"d" * 20)
dir2_swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=b"e" * 20)
date = TimestampWithTimezone.from_datetime(
datetime.datetime.now(tz=datetime.timezone.utc)
)
person = Person.from_fullname(b"Jane Doe <jdoe@example.org>")
rev1 = Revision(
message=b"blah",
author=person,
date=date,
committer=person,
committer_date=date,
directory=dir1_swhid.object_id,
type=RevisionType.TAR,
synthetic=True,
)
rel1 = Release(
name=b"v1.0",
message=b"blah\n",
author=person,
date=date,
target=dir1_swhid.object_id,
target_type=ModelObjectType.DIRECTORY,
synthetic=True,
)
rev1_swhid = rev1.swhid()
rel1_swhid = rel1.swhid()
rev2_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"b" * 20)
rel2_swhid = CoreSWHID(object_type=ObjectType.RELEASE, object_id=b"c" * 20)
# Results of a previous load
storage.extid_add(
[
ExtID("extid-type1", b"extid-of-v1.0", rev1_swhid, 0),
ExtID("extid-type1", b"extid-of-v2.0", rev2_swhid, 0),
]
)
storage.revision_add([rev1])
last_snapshot = Snapshot(
branches={
b"v1.0": SnapshotBranch(
target_type=TargetType.REVISION, target=rev1_swhid.object_id
),
b"v2.0": SnapshotBranch(
target_type=TargetType.REVISION, target=rev2_swhid.object_id
),
}
)
storage.snapshot_add([last_snapshot])
date = datetime.datetime.now(tz=datetime.timezone.utc)
storage.origin_add([Origin(url=origin)])
storage.origin_visit_add(
[OriginVisit(origin="http://example.org", visit=1, date=date, type="tar")]
)
storage.origin_visit_status_add(
[
OriginVisitStatus(
origin=origin,
visit=1,
status="full",
date=date,
snapshot=last_snapshot.id,
)
]
)
loader = StubPackageLoader(storage, "http://example.org")
patch.object(
loader,
"_load_release",
return_value=(rel2_swhid.object_id, dir2_swhid.object_id),
autospec=True,
).start()
patch.object(
loader,
"get_versions",
return_value=["v1.0", "v2.0", "v3.0"],
autospec=True,
).start()
caplog.set_level(logging.ERROR)
loader.load()
assert len(caplog.records) == 1
(record,) = caplog.records
assert record.levelname == "ERROR"
assert "Failed to upgrade branch branch-v2.0" in record.message
assert loader._load_release.mock_calls == [
# v1.0: not loaded because there is already a revision matching it
# v2.0: loaded, as the revision is missing from the storage even though there
# is an extid
call(StubPackageInfo(origin, "example-v2.0.tar", "v2.0"), Origin(url=origin)),
# v3.0: loaded (did not exist yet)
call(StubPackageInfo(origin, "example-v3.0.tar", "v3.0"), Origin(url=origin)),
]
snapshot = Snapshot(
branches={
b"branch-v1.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel1_swhid.object_id
),
b"branch-v2.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel2_swhid.object_id
),
b"branch-v3.0": SnapshotBranch(
target_type=TargetType.RELEASE, target=rel2_swhid.object_id
),
}
)
assert snapshot_get_latest(storage, origin) == snapshot
extids = storage.extid_get_from_target(
ObjectType.RELEASE,
[
rel1_swhid.object_id,
rel2_swhid.object_id,
],
)
assert set(extids) == {
ExtID("extid-type1", b"extid-of-v1.0", rel1_swhid),
ExtID("extid-type1", b"extid-of-v2.0", rel2_swhid),
ExtID("extid-type2", b"extid-of-v3.0", rel2_swhid),
}
def test_manifest_extid():
"""Compute primary key should return the right identity"""
@attr.s
class TestPackageInfo(BasePackageInfo):
a = attr.ib()
b = attr.ib()
length = attr.ib()
filename = attr.ib()
MANIFEST_FORMAT = string.Template("$a $b")
p_info = TestPackageInfo(
url="http://example.org/",
a=1,
b=2,
length=221837,
filename="8sync-0.1.0.tar.gz",
version="0.1.0",
)
actual_id = p_info.extid()
assert actual_id == ("package-manifest-sha256", 0, hashlib.sha256(b"1 2").digest())
def test_no_env_swh_config_filename_raise(monkeypatch):
"""No SWH_CONFIG_FILENAME environment variable makes package loader init raise"""
class DummyPackageLoader(PackageLoader):
"""A dummy package loader for test purpose"""
pass
monkeypatch.delenv("SWH_CONFIG_FILENAME", raising=False)
with pytest.raises(
AssertionError, match="SWH_CONFIG_FILENAME environment variable is undefined"
):
DummyPackageLoader.from_configfile(url="some-url")
+
+
+class StubPackageLoaderWithError(StubPackageLoader):
+ def get_versions(self, *args, **kwargs):
+ raise Exception("error")
+
+
+def test_loader_sentry_tags_on_error(swh_storage, sentry_events):
+ origin_url = "http://example.org/package/name"
+ loader = StubPackageLoaderWithError(swh_storage, origin_url)
+ loader.load()
+ sentry_tags = sentry_events[0]["tags"]
+ assert sentry_tags.get(SENTRY_ORIGIN_URL_TAG_NAME) == origin_url
+ assert (
+ sentry_tags.get(SENTRY_VISIT_TYPE_TAG_NAME)
+ == StubPackageLoaderWithError.visit_type
+ )

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:20 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3270011

Event Timeline