Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from abc import ABCMeta, abstractmethod | |||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Any, Dict, Iterable, Optional | from typing import Any, Dict, Iterable, Optional | ||||
from swh.core.config import load_from_envvar | from swh.core.config import load_from_envvar | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
Sha1Git, | Sha1Git, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
) | ) | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | |||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
DEFAULT_CONFIG: Dict[str, Any] = { | DEFAULT_CONFIG: Dict[str, Any] = { | ||||
"max_content_size": 100 * 1024 * 1024, | "max_content_size": 100 * 1024 * 1024, | ||||
"save_data": False, | |||||
"save_data_path": "", | |||||
"storage": {"cls": "memory"}, | |||||
} | } | ||||
class BaseLoader(metaclass=ABCMeta): | class Loader: | ||||
"""Mixin base class for loader. | """The base class for a Software Heritage Loader. | ||||
To use this class, you must: | A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... | ||||
package artifacts), ingests the contents/directories/revisions/releases/snapshot to | |||||
the storage backend. | |||||
For now, this just exposes 2 static methods (from_config, from_configfile) to | |||||
centralize and ease the loader instantiation. | |||||
Args: | |||||
storage: the instance of the Storage being used to register the | |||||
origin information | |||||
""" | |||||
def __init__( | |||||
self, storage: StorageInterface, max_content_size: Optional[int] = None, | |||||
): | |||||
self.storage = storage | |||||
self.max_content_size = int(max_content_size) if max_content_size else None | |||||
@classmethod | |||||
def from_config(cls, storage: Dict[str, Any], **config: Any): | |||||
"""Instantiate a loader from a configuration dict. | |||||
This is basically a backwards-compatibility shim for the CLI. | |||||
Args: | |||||
storage: instantiation config for the storage | |||||
config: the configuration dict for the loader, with the following keys: | |||||
- credentials (optional): credentials list for the scheduler | |||||
- any other kwargs passed to the loader. | |||||
Returns: | |||||
the instantiated loader | |||||
""" | |||||
# Drop the legacy config keys which aren't used for this generation of loader. | |||||
for legacy_key in ("storage", "celery"): | |||||
config.pop(legacy_key, None) | |||||
# Instantiate the storage | |||||
storage_instance = get_storage(**storage) | |||||
return cls(storage=storage_instance, **config) | |||||
@classmethod | |||||
def from_configfile(cls, **kwargs: Any): | |||||
"""Instantiate a loader from the configuration loaded from the | |||||
SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their | |||||
value is not None. | |||||
Args: | |||||
kwargs: kwargs passed to the loader instantiation | |||||
""" | |||||
config = dict(load_from_envvar(DEFAULT_CONFIG)) | |||||
config.update({k: v for k, v in kwargs.items() if v is not None}) | |||||
return cls.from_config(**config) | |||||
class BaseLoader(Loader): | |||||
"""Mixin base class for (D)VCS loaders (e.g svn, git, mercurial, ...). | |||||
To define such loaders, you must: | |||||
- inherit from this class | - inherit from this class | ||||
- and implement the @abstractmethod methods: | - and implement following methods: | ||||
- :func:`prepare`: First step executed by the loader to prepare some | - :func:`prepare`: First step executed by the loader to prepare some | ||||
state needed by the `func`:load method. | state needed by the `func`:load method. | ||||
- :func:`get_origin`: Retrieve the origin that is currently being loaded. | - :func:`get_origin`: Retrieve the origin that is currently being loaded. | ||||
- :func:`fetch_data`: Fetch the data is actually the method to implement | - :func:`fetch_data`: Fetch the data is actually the method to implement | ||||
to compute data to inject in swh (through the store_data method) | to compute data to inject in swh (through the store_data method) | ||||
Show All 13 Lines | class BaseLoader(Loader): | ||||
You can take a look at some example classes: | You can take a look at some example classes: | ||||
- :class:`SvnLoader` | - :class:`SvnLoader` | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | |||||
logging_class: Optional[str] = None, | logging_class: Optional[str] = None, | ||||
config: Optional[Dict[str, Any]] = None, | save_data_path: Optional[str] = None, | ||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
if config: | super().__init__(storage=storage, max_content_size=max_content_size) | ||||
self.config = config | |||||
else: | |||||
self.config = load_from_envvar(DEFAULT_CONFIG) | |||||
self.storage = get_storage(**self.config["storage"]) | |||||
if logging_class is None: | if logging_class is None: | ||||
logging_class = "%s.%s" % ( | logging_class = "%s.%s" % ( | ||||
self.__class__.__module__, | self.__class__.__module__, | ||||
self.__class__.__name__, | self.__class__.__name__, | ||||
) | ) | ||||
self.log = logging.getLogger(logging_class) | self.log = logging.getLogger(logging_class) | ||||
_log = logging.getLogger("requests.packages.urllib3.connectionpool") | _log = logging.getLogger("requests.packages.urllib3.connectionpool") | ||||
_log.setLevel(logging.WARN) | _log.setLevel(logging.WARN) | ||||
self.max_content_size = self.config["max_content_size"] | |||||
# possibly overridden in self.prepare method | # possibly overridden in self.prepare method | ||||
self.visit_date: Optional[datetime.datetime] = None | self.visit_date: Optional[datetime.datetime] = None | ||||
self.origin: Optional[Origin] = None | self.origin: Optional[Origin] = None | ||||
if not hasattr(self, "visit_type"): | if not hasattr(self, "visit_type"): | ||||
self.visit_type: Optional[str] = None | self.visit_type: Optional[str] = None | ||||
self.origin_metadata: Dict[str, Any] = {} | self.origin_metadata: Dict[str, Any] = {} | ||||
self.loaded_snapshot_id: Optional[Sha1Git] = None | self.loaded_snapshot_id: Optional[Sha1Git] = None | ||||
# Make sure the config is sane | if save_data_path: | ||||
save_data = self.config.get("save_data") | path = save_data_path | ||||
if save_data: | |||||
path = self.config["save_data_path"] | |||||
os.stat(path) | os.stat(path) | ||||
if not os.access(path, os.R_OK | os.W_OK): | if not os.access(path, os.R_OK | os.W_OK): | ||||
raise PermissionError("Permission denied: %r" % path) | raise PermissionError("Permission denied: %r" % path) | ||||
self.save_data_path = save_data_path | |||||
def save_data(self) -> None: | def save_data(self) -> None: | ||||
"""Save the data associated to the current load""" | """Save the data associated to the current load""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def get_save_data_path(self) -> str: | def get_save_data_path(self) -> str: | ||||
"""The path to which we archive the loader's raw data""" | """The path to which we archive the loader's raw data""" | ||||
if not hasattr(self, "__save_data_path"): | if not hasattr(self, "__save_data_path"): | ||||
year = str(self.visit_date.year) # type: ignore | year = str(self.visit_date.year) # type: ignore | ||||
assert self.origin | assert self.origin | ||||
url = self.origin.url.encode("utf-8") | url = self.origin.url.encode("utf-8") | ||||
origin_url_hash = hashlib.sha1(url).hexdigest() | origin_url_hash = hashlib.sha1(url).hexdigest() | ||||
path = "%s/sha1:%s/%s/%s" % ( | path = "%s/sha1:%s/%s/%s" % ( | ||||
self.config["save_data_path"], | self.save_data_path, | ||||
origin_url_hash[0:2], | origin_url_hash[0:2], | ||||
origin_url_hash, | origin_url_hash, | ||||
year, | year, | ||||
) | ) | ||||
os.makedirs(path, exist_ok=True) | os.makedirs(path, exist_ok=True) | ||||
self.__save_data_path = path | self.__save_data_path = path | ||||
return self.__save_data_path | return self.__save_data_path | ||||
def flush(self) -> None: | def flush(self) -> None: | ||||
"""Flush any potential buffered data not sent to swh-storage. | """Flush any potential buffered data not sent to swh-storage. | ||||
""" | """ | ||||
self.storage.flush() | self.storage.flush() | ||||
@abstractmethod | |||||
def cleanup(self) -> None: | def cleanup(self) -> None: | ||||
"""Last step executed by the loader. | """Last step executed by the loader. | ||||
""" | """ | ||||
pass | raise NotImplementedError | ||||
@abstractmethod | |||||
def prepare_origin_visit(self, *args, **kwargs) -> None: | def prepare_origin_visit(self, *args, **kwargs) -> None: | ||||
"""First step executed by the loader to prepare origin and visit | """First step executed by the loader to prepare origin and visit | ||||
references. Set/update self.origin, and | references. Set/update self.origin, and | ||||
optionally self.origin_url, self.visit_date. | optionally self.origin_url, self.visit_date. | ||||
""" | """ | ||||
pass | raise NotImplementedError | ||||
def _store_origin_visit(self) -> None: | def _store_origin_visit(self) -> None: | ||||
"""Store origin and visit references. Sets the self.visit references. | """Store origin and visit references. Sets the self.visit references. | ||||
""" | """ | ||||
assert self.origin | assert self.origin | ||||
self.storage.origin_add([self.origin]) | self.storage.origin_add([self.origin]) | ||||
if not self.visit_date: # now as default visit_date if not provided | if not self.visit_date: # now as default visit_date if not provided | ||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
assert isinstance(self.visit_date, datetime.datetime) | assert isinstance(self.visit_date, datetime.datetime) | ||||
assert isinstance(self.visit_type, str) | assert isinstance(self.visit_type, str) | ||||
self.visit = list( | self.visit = list( | ||||
self.storage.origin_visit_add( | self.storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=self.origin.url, | origin=self.origin.url, | ||||
date=self.visit_date, | date=self.visit_date, | ||||
type=self.visit_type, | type=self.visit_type, | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
)[0] | )[0] | ||||
@abstractmethod | |||||
def prepare(self, *args, **kwargs) -> None: | def prepare(self, *args, **kwargs) -> None: | ||||
"""Second step executed by the loader to prepare some state needed by | """Second step executed by the loader to prepare some state needed by | ||||
the loader. | the loader. | ||||
Raises | Raises | ||||
NotFound exception if the origin to ingest is not found. | NotFound exception if the origin to ingest is not found. | ||||
""" | """ | ||||
pass | raise NotImplementedError | ||||
def get_origin(self) -> Origin: | def get_origin(self) -> Origin: | ||||
"""Get the origin that is currently being loaded. | """Get the origin that is currently being loaded. | ||||
self.origin should be set in :func:`prepare_origin` | self.origin should be set in :func:`prepare_origin` | ||||
Returns: | Returns: | ||||
dict: an origin ready to be sent to storage by | dict: an origin ready to be sent to storage by | ||||
:func:`origin_add`. | :func:`origin_add`. | ||||
""" | """ | ||||
assert self.origin | assert self.origin | ||||
return self.origin | return self.origin | ||||
@abstractmethod | |||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||
"""Fetch the data from the source the loader is currently loading | """Fetch the data from the source the loader is currently loading | ||||
(ex: git/hg/svn/... repository). | (ex: git/hg/svn/... repository). | ||||
Returns: | Returns: | ||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, fetch_data needs | ||||
to be called again to complete loading. | to be called again to complete loading. | ||||
""" | """ | ||||
pass | raise NotImplementedError | ||||
@abstractmethod | |||||
def store_data(self): | def store_data(self): | ||||
"""Store fetched data in the database. | """Store fetched data in the database. | ||||
Should call the :func:`maybe_load_xyz` methods, which handle the | Should call the :func:`maybe_load_xyz` methods, which handle the | ||||
bundles sent to storage, rather than send directly. | bundles sent to storage, rather than send directly. | ||||
""" | """ | ||||
pass | raise NotImplementedError | ||||
def store_metadata(self) -> None: | def store_metadata(self) -> None: | ||||
"""Store fetched metadata in the database. | """Store fetched metadata in the database. | ||||
For more information, see implementation in :class:`DepositLoader`. | For more information, see implementation in :class:`DepositLoader`. | ||||
""" | """ | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 180 Lines • ▼ Show 20 Lines | def get_snapshot(self) -> Snapshot: | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def eventful(self) -> bool: | def eventful(self) -> bool: | ||||
"""Whether the load was eventful""" | """Whether the load was eventful""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def store_data(self) -> None: | def store_data(self) -> None: | ||||
assert self.origin | assert self.origin | ||||
if self.config.get("save_data"): | if self.save_data_path: | ||||
self.save_data() | self.save_data() | ||||
if self.has_contents(): | if self.has_contents(): | ||||
for obj in self.get_contents(): | for obj in self.get_contents(): | ||||
if isinstance(obj, Content): | if isinstance(obj, Content): | ||||
self.storage.content_add([obj]) | self.storage.content_add([obj]) | ||||
elif isinstance(obj, SkippedContent): | elif isinstance(obj, SkippedContent): | ||||
self.storage.skipped_content_add([obj]) | self.storage.skipped_content_add([obj]) | ||||
Show All 15 Lines |