Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
Show All 27 Lines | |||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
DEFAULT_CONFIG: Dict[str, Any] = { | DEFAULT_CONFIG: Dict[str, Any] = { | ||||
"max_content_size": 100 * 1024 * 1024, | "max_content_size": 100 * 1024 * 1024, | ||||
} | } | ||||
class Loader: | class BaseLoader: | ||||
"""The base class for a Software Heritage Loader. | """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g | ||||
PyPI, Npm, CRAN, ...) | |||||
A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | ||||
package artifacts), ingests the contents/directories/revisions/releases/snapshot to | package artifacts), ingests the contents/directories/revisions/releases/snapshot | ||||
the storage backend. | read from those artifacts and send them to the archive through the storage backend. | ||||
For now, this just exposes 2 static methods (from_config, from_configfile) to | The main entry point for the loader is the :func:`load` function. | ||||
centralize and ease the loader instantiation. | |||||
Args: | 2 static methods (:func:`from_config`, :func:`from_configfile`) centralizes and | ||||
storage: the instance of the Storage being used to register the | eases the loader instantiation from either configuration dict or configuration file. | ||||
origin information | |||||
Some class examples: | |||||
- :class:`SvnLoader` | |||||
- :class:`GitLoader` | |||||
- :class:`PyPILoader` | |||||
- :class:`NpmLoader` | |||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, storage: StorageInterface, max_content_size: Optional[int] = None, | self, | ||||
storage: StorageInterface, | |||||
logging_class: Optional[str] = None, | |||||
save_data_path: Optional[str] = None, | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
super().__init__() | |||||
self.storage = storage | self.storage = storage | ||||
self.max_content_size = int(max_content_size) if max_content_size else None | self.max_content_size = int(max_content_size) if max_content_size else None | ||||
if logging_class is None: | |||||
logging_class = "%s.%s" % ( | |||||
self.__class__.__module__, | |||||
self.__class__.__name__, | |||||
) | |||||
self.log = logging.getLogger(logging_class) | |||||
_log = logging.getLogger("requests.packages.urllib3.connectionpool") | |||||
_log.setLevel(logging.WARN) | |||||
# possibly overridden in self.prepare method | |||||
self.visit_date: Optional[datetime.datetime] = None | |||||
self.origin: Optional[Origin] = None | |||||
if not hasattr(self, "visit_type"): | |||||
self.visit_type: Optional[str] = None | |||||
self.origin_metadata: Dict[str, Any] = {} | |||||
self.loaded_snapshot_id: Optional[Sha1Git] = None | |||||
if save_data_path: | |||||
path = save_data_path | |||||
os.stat(path) | |||||
if not os.access(path, os.R_OK | os.W_OK): | |||||
raise PermissionError("Permission denied: %r" % path) | |||||
self.save_data_path = save_data_path | |||||
@classmethod | @classmethod | ||||
def from_config(cls, storage: Dict[str, Any], **config: Any): | def from_config(cls, storage: Dict[str, Any], **config: Any): | ||||
"""Instantiate a loader from a configuration dict. | """Instantiate a loader from a configuration dict. | ||||
This is basically a backwards-compatibility shim for the CLI. | This is basically a backwards-compatibility shim for the CLI. | ||||
Args: | Args: | ||||
storage: instantiation config for the storage | storage: instantiation config for the storage | ||||
Show All 21 Lines | def from_configfile(cls, **kwargs: Any): | ||||
Args: | Args: | ||||
kwargs: kwargs passed to the loader instantiation | kwargs: kwargs passed to the loader instantiation | ||||
""" | """ | ||||
config = dict(load_from_envvar(DEFAULT_CONFIG)) | config = dict(load_from_envvar(DEFAULT_CONFIG)) | ||||
config.update({k: v for k, v in kwargs.items() if v is not None}) | config.update({k: v for k, v in kwargs.items() if v is not None}) | ||||
return cls.from_config(**config) | return cls.from_config(**config) | ||||
class BaseLoader(Loader): | |||||
"""Mixin base class for (D)VCS loaders (e.g svn, git, mercurial, ...). | |||||
To define such loaders, you must: | |||||
- inherit from this class | |||||
- and implement following methods: | |||||
- :func:`prepare`: First step executed by the loader to prepare some | |||||
state needed by the `func`:load method. | |||||
- :func:`get_origin`: Retrieve the origin that is currently being loaded. | |||||
- :func:`fetch_data`: Fetch the data is actually the method to implement | |||||
to compute data to inject in swh (through the store_data method) | |||||
- :func:`store_data`: Store data fetched. | |||||
- :func:`visit_status`: Explicit status of the visit ('partial' or | |||||
'full') | |||||
- :func:`load_status`: Explicit status of the loading, for use by the | |||||
scheduler (eventful/uneventful/temporary failure/permanent failure). | |||||
- :func:`cleanup`: Last step executed by the loader. | |||||
The entry point for the resulting loader is :func:`load`. | |||||
You can take a look at some example classes: | |||||
- :class:`SvnLoader` | |||||
""" | |||||
def __init__( | |||||
self, | |||||
storage: StorageInterface, | |||||
logging_class: Optional[str] = None, | |||||
save_data_path: Optional[str] = None, | |||||
max_content_size: Optional[int] = None, | |||||
): | |||||
super().__init__(storage=storage, max_content_size=max_content_size) | |||||
if logging_class is None: | |||||
logging_class = "%s.%s" % ( | |||||
self.__class__.__module__, | |||||
self.__class__.__name__, | |||||
) | |||||
self.log = logging.getLogger(logging_class) | |||||
_log = logging.getLogger("requests.packages.urllib3.connectionpool") | |||||
_log.setLevel(logging.WARN) | |||||
# possibly overridden in self.prepare method | |||||
self.visit_date: Optional[datetime.datetime] = None | |||||
self.origin: Optional[Origin] = None | |||||
if not hasattr(self, "visit_type"): | |||||
self.visit_type: Optional[str] = None | |||||
self.origin_metadata: Dict[str, Any] = {} | |||||
self.loaded_snapshot_id: Optional[Sha1Git] = None | |||||
if save_data_path: | |||||
path = save_data_path | |||||
os.stat(path) | |||||
if not os.access(path, os.R_OK | os.W_OK): | |||||
raise PermissionError("Permission denied: %r" % path) | |||||
self.save_data_path = save_data_path | |||||
def save_data(self) -> None: | def save_data(self) -> None: | ||||
"""Save the data associated to the current load""" | """Save the data associated to the current load""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def get_save_data_path(self) -> str: | def get_save_data_path(self) -> str: | ||||
"""The path to which we archive the loader's raw data""" | """The path to which we archive the loader's raw data""" | ||||
if not hasattr(self, "__save_data_path"): | if not hasattr(self, "__save_data_path"): | ||||
year = str(self.visit_date.year) # type: ignore | year = str(self.visit_date.year) # type: ignore | ||||
Show All 21 Lines | def flush(self) -> None: | ||||
self.storage.flush() | self.storage.flush() | ||||
def cleanup(self) -> None: | def cleanup(self) -> None: | ||||
"""Last step executed by the loader. | """Last step executed by the loader. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def prepare_origin_visit(self, *args, **kwargs) -> None: | def prepare_origin_visit(self) -> None: | ||||
"""First step executed by the loader to prepare origin and visit | """First step executed by the loader to prepare origin and visit | ||||
references. Set/update self.origin, and | references. Set/update self.origin, and | ||||
optionally self.origin_url, self.visit_date. | optionally self.origin_url, self.visit_date. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def _store_origin_visit(self) -> None: | def _store_origin_visit(self) -> None: | ||||
Show All 14 Lines | def _store_origin_visit(self) -> None: | ||||
origin=self.origin.url, | origin=self.origin.url, | ||||
date=self.visit_date, | date=self.visit_date, | ||||
type=self.visit_type, | type=self.visit_type, | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
)[0] | )[0] | ||||
def prepare(self, *args, **kwargs) -> None: | def prepare(self) -> None: | ||||
"""Second step executed by the loader to prepare some state needed by | """Second step executed by the loader to prepare some state needed by | ||||
the loader. | the loader. | ||||
Raises | Raises | ||||
NotFound exception if the origin to ingest is not found. | NotFound exception if the origin to ingest is not found. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | class BaseLoader: | ||||
def pre_cleanup(self) -> None: | def pre_cleanup(self) -> None: | ||||
"""As a first step, will try and check for dangling data to cleanup. | """As a first step, will try and check for dangling data to cleanup. | ||||
This should do its best to avoid raising issues. | This should do its best to avoid raising issues. | ||||
""" | """ | ||||
pass | pass | ||||
def load(self, *args, **kwargs) -> Dict[str, str]: | def load(self) -> Dict[str, str]: | ||||
r"""Loading logic for the loader to follow: | r"""Loading logic for the loader to follow: | ||||
- 1. Call :meth:`prepare_origin_visit` to prepare the | - 1. Call :meth:`prepare_origin_visit` to prepare the | ||||
origin and visit we will associate loading data to | origin and visit we will associate loading data to | ||||
- 2. Store the actual ``origin_visit`` to storage | - 2. Store the actual ``origin_visit`` to storage | ||||
- 3. Call :meth:`prepare` to prepare any eventual state | - 3. Call :meth:`prepare` to prepare any eventual state | ||||
- 4. Call :meth:`get_origin` to get the origin we work with and store | - 4. Call :meth:`get_origin` to get the origin we work with and store | ||||
- while True: | - while True: | ||||
- 5. Call :meth:`fetch_data` to fetch the data to store | - 5. Call :meth:`fetch_data` to fetch the data to store | ||||
- 6. Call :meth:`store_data` to store the data | - 6. Call :meth:`store_data` to store the data | ||||
- 7. Call :meth:`cleanup` to clean up any eventual state put in place | - 7. Call :meth:`cleanup` to clean up any eventual state put in place | ||||
in :meth:`prepare` method. | in :meth:`prepare` method. | ||||
""" | """ | ||||
try: | try: | ||||
self.pre_cleanup() | self.pre_cleanup() | ||||
except Exception: | except Exception: | ||||
msg = "Cleaning up dangling data failed! Continue loading." | msg = "Cleaning up dangling data failed! Continue loading." | ||||
self.log.warning(msg) | self.log.warning(msg) | ||||
self.prepare_origin_visit(*args, **kwargs) | self.prepare_origin_visit() | ||||
self._store_origin_visit() | self._store_origin_visit() | ||||
assert ( | assert ( | ||||
self.origin | self.origin | ||||
), "The method `prepare_origin_visit` call should set the origin (Origin)" | ), "The method `prepare_origin_visit` call should set the origin (Origin)" | ||||
assert ( | assert ( | ||||
self.visit.visit | self.visit.visit | ||||
), "The method `_store_origin_visit` should set the visit (OriginVisit)" | ), "The method `_store_origin_visit` should set the visit (OriginVisit)" | ||||
self.log.info( | self.log.info( | ||||
"Load origin '%s' with type '%s'", self.origin.url, self.visit.type | "Load origin '%s' with type '%s'", self.origin.url, self.visit.type | ||||
) | ) | ||||
try: | try: | ||||
self.prepare(*args, **kwargs) | self.prepare() | ||||
while True: | while True: | ||||
more_data_to_fetch = self.fetch_data() | more_data_to_fetch = self.fetch_data() | ||||
self.store_data() | self.store_data() | ||||
if not more_data_to_fetch: | if not more_data_to_fetch: | ||||
break | break | ||||
self.store_metadata() | self.store_metadata() | ||||
Show All 13 Lines | def load(self) -> Dict[str, str]: | ||||
task_status = "uneventful" | task_status = "uneventful" | ||||
else: | else: | ||||
status = "partial" if self.loaded_snapshot_id else "failed" | status = "partial" if self.loaded_snapshot_id else "failed" | ||||
task_status = "failed" | task_status = "failed" | ||||
self.log.exception( | self.log.exception( | ||||
"Loading failure, updating to `%s` status", | "Loading failure, updating to `%s` status", | ||||
status, | status, | ||||
extra={"swh_task_args": args, "swh_task_kwargs": kwargs,}, | extra={ | ||||
"swh_task_args": [], | |||||
"swh_task_kwargs": { | |||||
"origin": self.origin.url | |||||
}, | |||||
}, | |||||
) | ) | ||||
visit_status = OriginVisitStatus( | visit_status = OriginVisitStatus( | ||||
origin=self.origin.url, | origin=self.origin.url, | ||||
visit=self.visit.visit, | visit=self.visit.visit, | ||||
type=self.visit_type, | type=self.visit_type, | ||||
date=now(), | date=now(), | ||||
status=status, | status=status, | ||||
snapshot=self.loaded_snapshot_id, | snapshot=self.loaded_snapshot_id, | ||||
▲ Show 20 Lines • Show All 92 Lines • Show Last 20 Lines |