diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index a22161b..b0fe6fb 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,306 +1,308 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import tempfile import os from typing import Generator, Dict, Tuple, Sequence, List from swh.core.tarball import uncompress from swh.core.config import SWHConfig from swh.model.from_disk import Directory from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes ) from swh.storage import get_storage from swh.loader.core.converters import content_for_storage from swh.loader.package.utils import download logger = logging.getLogger(__name__) # Not implemented yet: # - clean up disk routines from previous killed workers (when OOMkilled) # -> separation of concern would like this to be abstracted from the code # -> experience tells us it's complicated to do as such (T903, T964, T982, # etc...) # # - (in-progress) splitting into groups too many objects sent to storage, # filtering known contents, directories, etc... This could be a specialized # collaborator or storage implementation or proxy which deals with this (cf. # swh.loader.package.storage.ProxyStorage) # # - model: swh.model.merkle.from_disk should output swh.model.model.* objects # to avoid this layer's conversion routine call # -> Take this up within swh.model's current implementation # # - Does not trap exceptions yet within the PackageLoader.load method # - Incremental loading through latest snapshot introspection class PackageLoader: # Origin visit type (str) set by the loader visit_type = '' def __init__(self, url): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: url (str): Origin url to load data from """ # This expects to use the environment variable SWH_CONFIG_FILENAME self.config = SWHConfig.parse_config_file() self._check_configuration() self.storage = get_storage(**self.config['storage']) self.url = url def _check_configuration(self): """Checks the minimal configuration required is set for the loader. If some required configuration is missing, exception detailing the issue is raised. """ if 'storage' not in self.config: raise ValueError( 'Misconfiguration, at least the storage key should be set') def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Returns: Sequence of published versions """ return [] def get_artifacts(self, version: str) -> Generator[ Tuple[str, str, Dict], None, None]: """Given a release version of a package, retrieve the associated artifact information for such version. Args: version: Package version Returns: (artifact filename, artifact uri, raw artifact metadata) """ yield from {} def fetch_artifact_archive( self, artifact_uri: str, dest: str) -> Tuple[str, Dict]: """Fetch artifact archive to a temporary folder and returns its path. Args: artifact_uri: Artifact uri to fetch dest: Directory to write the downloaded archive to Returns: the locally retrieved artifact path """ return download(artifact_uri, dest=dest) def build_revision( self, a_metadata: Dict, a_uncompressed_path: str) -> Dict: """Build the revision dict Returns: SWH data dict """ return {} def get_default_release(self) -> str: """Retrieve the latest release version Returns: Latest version """ return '' def load(self) -> Dict: """Load for a specific origin the associated contents. for each package version of the origin 1. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 2. Extract the downloaded files By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 3. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 4. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 5. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 6. Generate and load the snapshot for the visit Using the revisions/releases collected at step 5., and the branch information from step 0., generate a snapshot and load it into the Software Heritage archive """ status_load = 'uneventful' # either: eventful, uneventful, failed status_visit = 'full' # either: partial, full tmp_revisions: Dict[str, List] = {} snapshot = None try: # Prepare origin and origin_visit origin = {'url': self.url} self.storage.origin_add([origin]) visit_date = datetime.datetime.now(tz=datetime.timezone.utc) visit_id = self.storage.origin_visit_add( origin=self.url, date=visit_date, type=self.visit_type)['visit'] # Retrieve the default release (the "latest" one) default_release = self.get_default_release() logger.debug('default release: %s', default_release) for version in self.get_versions(): # for each logger.debug('version: %s', version) tmp_revisions[version] = [] # `a_` stands for `artifact_` for a_filename, a_uri, a_metadata in self.get_artifacts( version): with tempfile.TemporaryDirectory() as tmpdir: try: # a_c_: archive_computed_ a_path, a_c_metadata = self.fetch_artifact_archive( a_uri, dest=tmpdir) except Exception as e: logger.warning('Unable to retrieve %s. Reason: %s', a_uri, e) status_visit = 'partial' continue logger.debug('archive_path: %s', a_path) logger.debug('archive_computed_metadata: %s', a_c_metadata) uncompressed_path = os.path.join(tmpdir, 'src') uncompress(a_path, dest=uncompressed_path) logger.debug('uncompressed_path: %s', uncompressed_path) directory = Directory.from_disk( path=uncompressed_path.encode('utf-8'), data=True) # FIXME: Try not to load the full raw content in memory objects = directory.collect() contents = objects['content'].values() logger.debug('Number of contents: %s', len(contents)) self.storage.content_add( map(content_for_storage, contents)) status_load = 'eventful' directories = objects['directory'].values() logger.debug('Number of directories: %s', len(directories)) self.storage.directory_add(directories) # FIXME: This should be release. cf. D409 discussion revision = self.build_revision( a_metadata, uncompressed_path) revision.update({ 'type': 'tar', 'synthetic': True, 'directory': directory.hash, }) # FIXME: Standardize those metadata keys and use the # correct ones revision['metadata'].update({ 'original_artifact': a_metadata, 'hashes_artifact': a_c_metadata }) revision['id'] = identifier_to_bytes( revision_identifier(revision)) logger.debug('Revision: %s', revision) self.storage.revision_add([revision]) tmp_revisions[version].append({ 'filename': a_filename, 'target': revision['id'], }) # Build and load the snapshot branches = {} for version, v_branches in tmp_revisions.items(): if len(v_branches) == 1: branch_name = ('releases/%s' % version).encode('utf-8') if version == default_release: branches[b'HEAD'] = { 'target_type': 'alias', 'target': branch_name, } branches[branch_name] = { 'target_type': 'revision', 'target': v_branches[0]['target'], } else: for x in v_branches: branch_name = ('releases/%s/%s' % ( version, v_branches['filename'])).encode('utf-8') branches[branch_name] = { 'target_type': 'revision', 'target': x['target'], } snapshot = { 'branches': branches } snapshot['id'] = identifier_to_bytes( snapshot_identifier(snapshot)) logger.debug('snapshot: %s', snapshot) self.storage.snapshot_add([snapshot]) + if hasattr(self.storage, 'flush'): + self.storage.flush() except Exception as e: logger.warning('Fail to load %s. Reason: %s' % (self.url, e)) status_visit = 'partial' finally: self.storage.origin_visit_update( origin=self.url, visit_id=visit_id, status=status_visit, snapshot=snapshot) return {'status': status_load} diff --git a/swh/loader/package/storage.py b/swh/loader/package/storage.py index bce1f59..f87efd4 100644 --- a/swh/loader/package/storage.py +++ b/swh/loader/package/storage.py @@ -1,104 +1,156 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Optional, Sequence, Dict, Set +from functools import partial +from collections import deque + +from swh.core.utils import grouper from swh.storage import get_storage -from typing import Sequence, Dict, Set + +class BufferingProxyStorage: + """Storage implementation in charge of accumulating objects prior to + discussing with the "main" storage. + + """ + def __init__(self, **storage): + self.storage = get_storage(**storage) + + self._max_size = { + 'content': self.config.get('content_packet_size', 10000), + 'directory': self.config.get('directory_packet_size', 25000), + 'revision': self.config.get('directory_packet_size', 100000), + } + self.object_types = ['content', 'directory', 'revision'] + self._objects = {k: deque() for k in self.object_types} + + def flush(self, object_types: Optional[Sequence[str]] = None) -> Dict: + if object_types is None: + object_types = self.object_types + summary = {} + for object_type in object_types: + q = self._objects[object_type] + for objs in grouper(q, n=self._max_size[object_type]): + add_fn = getattr(self.storage, '%s_add' % object_type) + s = add_fn(objs) + summary = {k: v + summary.get(k, 0) + for k, v in s.items()} + + return summary + + def object_add(self, objects: Sequence[Dict], *, object_type: str) -> Dict: + q = self._objects[object_type] + max_size = self._max_size[object_type] + q.extend(objects) + if len(q) > max_size: + return self.flush() + + return {} + + def __getattr__(self, key): + if key.endswith('_add'): + object_type = key.split('_')[0] + if object_type in self.object_types: + return partial( + self.object_add, object_type=object_type + ) + return getattr(self.storage, key) -class ProxyStorage: +class FilteringProxyStorage: """Storage implementation in charge of filtering existing objects prior to - calling the storage api for ingestion + calling the storage api for ingestion. """ def __init__(self, **storage): self.storage = get_storage(**storage) self.objects_seen = { 'content': set(), # set of content hashes (sha256) seen 'directory': set(), 'revision': set(), } def content_add(self, content: Sequence[Dict]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( x for x in contents if x['sha256'] in contents_to_add ) def directory_add(self, directories: Sequence[Dict]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids( 'directory', (d['id'] for d in directories) ) return self.storage.directory_add( d for d in directories if d['id'] in missing_ids ) def revision_add(self, revisions): revisions = list(revisions) missing_ids = self._filter_missing_ids( 'revision', (d['id'] for d in revisions) ) return self.storage.revision_add( r for r in revisions if r['id'] in missing_ids ) def _filter_missing_contents( self, content_hashes: Sequence[Dict]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ objects_seen = self.objects_seen['content'] missing_hashes = [] for hashes in content_hashes: if hashes['sha256'] in objects_seen: continue objects_seen.add(hashes['sha256']) missing_hashes.append(hashes) return set(self.storage.content_missing( missing_hashes, key_hash='sha256', )) def _filter_missing_ids( self, object_type: str, ids: Sequence[bytes]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: List of object_type ids Returns: Missing ids from the storage for object_type """ objects_seen = self.objects_seen[object_type] missing_ids = [] for id in ids: if id in objects_seen: continue objects_seen.add(id) missing_ids.append(id) fn_by_object_type = { 'revision': self.storage.revision_missing, 'directory': self.storage.directory_missing, } fn = fn_by_object_type[object_type] return set(fn(missing_ids)) def __getattr__(self, key): return getattr(self.storage, key) diff --git a/swh/loader/package/tests/conftest.py b/swh/loader/package/tests/conftest.py index 9a2419e..8e781a9 100644 --- a/swh/loader/package/tests/conftest.py +++ b/swh/loader/package/tests/conftest.py @@ -1,137 +1,140 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import re import pytest from functools import partial from os import path from urllib.parse import urlparse from .common import DATADIR import swh.storage from swh.storage import get_storage as initial_get_storage logger = logging.getLogger(__name__) def get_storage(cls, args): - if cls == 'proxy': - from swh.loader.package.storage import ProxyStorage - return ProxyStorage(**args) + if cls == 'filter': + from swh.loader.package.storage import FilteringProxyStorage + return FilteringProxyStorage(**args) + if cls == 'buffer': + from swh.loader.package.storage import BufferingProxyStorage + return BufferingProxyStorage(**args) return initial_get_storage(cls, args) swh.storage.get_storage = get_storage # Check get_local_factory function # Maximum number of iteration checks to generate requests responses MAX_VISIT_FILES = 10 @pytest.fixture def swh_config(monkeypatch): conffile = os.path.join(DATADIR, 'loader.yml') monkeypatch.setenv('SWH_CONFIG_FILENAME', conffile) return conffile def get_response_cb(request, context, ignore_urls=[], visits=None): """Mount point callback to fetch on disk the content of a request This is meant to be used as 'body' argument of the requests_mock.get() method. It will look for files on the local filesystem based on the requested URL, using the following rules: - files are searched in the DATADIR/ directory - the local file name is the path part of the URL with path hierarchy markers (aka '/') replaced by '_' Eg. if you use the requests_mock fixture in your test file as: requests_mock.get('https://nowhere.com', body=get_response_cb) # or even requests_mock.get(re.compile('https://'), body=get_response_cb) then a call requests.get like: requests.get('https://nowhere.com/path/to/resource') will look the content of the response in: DATADIR/resources/nowhere.com/path_to_resource Args: request (requests.Request): Object requests context (requests.Context): Object holding response metadata information (status_code, headers, etc...) ignore_urls (List): urls whose status response should be 404 even if the local file exists visits (Optional[Dict]): Map of url, number of visits. If None, disable multi visit support (default) Returns: Optional[FileDescriptor] on the on disk file to read from the test context """ logger.debug('get_response_cb(%s, %s)', request, context) logger.debug('url: %s', request.url) logger.debug('ignore_urls: %s', ignore_urls) if request.url in ignore_urls: context.status_code = 404 return None url = urlparse(request.url) dirname = url.hostname # pypi.org | files.pythonhosted.org # url.path: pypi//json -> local file: pypi__json filename = url.path[1:] if filename.endswith('/'): filename = filename[:-1] filename = filename.replace('/', '_') filepath = path.join(DATADIR, dirname, filename) if visits is not None: visit = visits.get(url, 0) visits[url] = visit + 1 if visit: filepath = filepath + '_visit%s' % visit if not path.isfile(filepath): logger.debug('not found filepath: %s', filepath) context.status_code = 404 return None fd = open(filepath, 'rb') context.headers['content-length'] = str(path.getsize(filepath)) return fd def local_get_factory(ignore_urls=[], has_multi_visit=False): @pytest.fixture def local_get(requests_mock): if not has_multi_visit: cb = partial(get_response_cb, ignore_urls=ignore_urls) requests_mock.get(re.compile('https://'), body=cb) else: visits = {} requests_mock.get(re.compile('https://'), body=partial( get_response_cb, ignore_urls=ignore_urls, visits=visits) ) return requests_mock return local_get local_get = local_get_factory([]) local_get_visits = local_get_factory(has_multi_visit=True) diff --git a/swh/loader/package/tests/resources/loader.yml b/swh/loader/package/tests/resources/loader.yml index cba3f83..13a4fd3 100644 --- a/swh/loader/package/tests/resources/loader.yml +++ b/swh/loader/package/tests/resources/loader.yml @@ -1,5 +1,7 @@ storage: - cls: proxy + cls: filter args: - cls: memory - args: {} + cls: buffer + args: + cls: memory + args: {}