diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ swh.core >= 0.0.75 swh.model >= 0.0.18 -swh.storage >= 0.0.153 swh.scheduler +swh.storage >= 0.0.160 diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -16,37 +16,9 @@ from retrying import retry from typing import Any, Dict, Optional, Tuple -from . import converters - from swh.core import config from swh.storage import get_storage, HashCollision -from .queue import QueuePerSizeAndNbUniqueElements -from .queue import QueuePerNbUniqueElements - - -def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): - """Send `objects`, using the `sender`, in packets of `packet_size` objects - (and of max `packet_size_bytes`). - """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in objects: - if not obj: - continue - formatted_objects.append(obj) - if packet_size_bytes: - count += obj['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) - def retry_loading(error): """Retry policy when the database raises an integrity error""" @@ -108,9 +80,6 @@ You can take a look at some example classes: - :class:`BaseSvnLoader` - - :class:`TarLoader` - - :class:`DirLoader` - - :class:`DebianLoader` """ CONFIG_BASE_FILENAME = None # type: Optional[str] @@ -123,23 +92,9 @@ } }), - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_snapshot': ('bool', True), - 'save_data': ('bool', False), 'save_data_path': ('str', ''), - # Number of contents - 'content_packet_size': ('int', 10000), - # packet of 100Mib contents - 'content_packet_size_bytes': ('int', 100 * 1024 * 1024), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), } # type: Dict[str, Tuple[str, Any]] ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] @@ -158,33 +113,6 @@ self.__class__.__name__) self.log = logging.getLogger(logging_class) - self.contents = QueuePerSizeAndNbUniqueElements( - key='sha1', - max_nb_elements=self.config['content_packet_size'], - max_size=self.config['content_packet_size_bytes']) - - self.contents_seen = set() - - self.directories = QueuePerNbUniqueElements( - key='id', - max_nb_elements=self.config['directory_packet_size']) - - self.directories_seen = set() - - self.revisions = QueuePerNbUniqueElements( - key='id', - max_nb_elements=self.config['revision_packet_size']) - - self.revisions_seen = set() - - self.releases = QueuePerNbUniqueElements( - key='id', - max_nb_elements=self.config['release_packet_size']) - - self.releases_seen = set() - - self.snapshot = None - _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) @@ -487,186 +415,11 @@ @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_snapshot(self, snapshot): + self.flush() # to ensure the snapshot targets existing objects self.storage.snapshot_add([snapshot]) self.storage.origin_visit_update( self.origin['url'], self.visit, snapshot=snapshot['id']) - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def filter_missing_contents(self, contents): - """Return only the contents missing from swh""" - max_content_size = self.config['content_size_limit'] - contents_per_key = {} - content_key = 'blake2s256' - - for content in contents: - if content[content_key] in self.contents_seen: - continue - key = content[content_key] - contents_per_key[key] = content - self.contents_seen.add(key) - - for key in self.storage.content_missing( - list(contents_per_key.values()), - key_hash=content_key - ): - yield converters.content_for_storage( - contents_per_key[key], - max_content_size=max_content_size, - origin_url=self.origin['url'], - ) - - def bulk_send_contents(self, contents): - """Format contents as swh contents and send them to the database. - - """ - threshold_reached = self.contents.add( - self.filter_missing_contents(contents)) - if threshold_reached: - self.send_batch_contents(self.contents.pop()) - - def filter_missing_directories(self, directories): - """Return only directories missing from swh""" - - directories_per_id = {} - search_dirs = [] - - for directory in directories: - dir_id = directory['id'] - if dir_id in self.directories_seen: - continue - - search_dirs.append(dir_id) - directories_per_id[dir_id] = directory - self.directories_seen.add(dir_id) - - for dir_id in self.storage.directory_missing(search_dirs): - yield directories_per_id[dir_id] - - def bulk_send_directories(self, directories): - """Send missing directories to the database""" - threshold_reached = self.directories.add( - self.filter_missing_directories(directories)) - if threshold_reached: - self.send_batch_contents(self.contents.pop()) - self.send_batch_directories(self.directories.pop()) - - def filter_missing_revisions(self, revisions): - """Return only revisions missing from swh""" - revisions_per_id = {} - search_revs = [] - - for revision in revisions: - rev_id = revision['id'] - if rev_id in self.revisions_seen: - continue - - search_revs.append(rev_id) - revisions_per_id[rev_id] = revision - self.revisions_seen.add(rev_id) - - for rev_id in self.storage.revision_missing(search_revs): - yield revisions_per_id[rev_id] - - def bulk_send_revisions(self, revisions): - """Send missing revisions to the database""" - threshold_reached = self.revisions.add( - self.filter_missing_revisions(revisions)) - if threshold_reached: - self.send_batch_contents(self.contents.pop()) - self.send_batch_directories(self.directories.pop()) - self.send_batch_revisions(self.revisions.pop()) - - def filter_missing_releases(self, releases): - """Return only releases missing from swh""" - releases_per_id = {} - search_rels = [] - - for release in releases: - rel_id = release['id'] - if rel_id in self.releases_seen: - continue - - search_rels.append(rel_id) - releases_per_id[rel_id] = release - self.releases_seen.add(rel_id) - - for rel_id in self.storage.release_missing(search_rels): - yield releases_per_id[rel_id] - - def bulk_send_releases(self, releases): - """Send missing releases to the database""" - threshold_reached = self.releases.add( - self.filter_missing_releases(releases)) - if threshold_reached: - self.send_batch_contents(self.contents.pop()) - self.send_batch_directories(self.directories.pop()) - self.send_batch_revisions(self.revisions.pop()) - self.send_batch_releases(self.releases.pop()) - - def bulk_send_snapshot(self, snapshot): - """Send missing releases to the database""" - self.send_batch_contents(self.contents.pop()) - self.send_batch_directories(self.directories.pop()) - self.send_batch_revisions(self.revisions.pop()) - self.send_batch_releases(self.releases.pop()) - self.send_snapshot(snapshot) - - def maybe_load_contents(self, contents): - """Load contents in swh-storage if need be. - - """ - if self.config['send_contents']: - self.bulk_send_contents(contents) - - def maybe_load_directories(self, directories): - """Load directories in swh-storage if need be. - - """ - if self.config['send_directories']: - self.bulk_send_directories(directories) - - def maybe_load_revisions(self, revisions): - """Load revisions in swh-storage if need be. - - """ - if self.config['send_revisions']: - self.bulk_send_revisions(revisions) - - def maybe_load_releases(self, releases): - """Load releases in swh-storage if need be. - - """ - if self.config['send_releases']: - self.bulk_send_releases(releases) - - def maybe_load_snapshot(self, snapshot): - """Load the snapshot in swh-storage if need be.""" - if self.config['send_snapshot']: - self.bulk_send_snapshot(snapshot) - - def send_batch_contents(self, contents): - """Send contents batches to the storage""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] - send_in_packets(contents, self.send_contents, packet_size, - packet_size_bytes=packet_size_bytes) - - def send_batch_directories(self, directories): - """Send directories batches to the storage""" - packet_size = self.config['directory_packet_size'] - send_in_packets(directories, self.send_directories, packet_size) - - def send_batch_revisions(self, revisions): - """Send revisions batches to the storage""" - packet_size = self.config['revision_packet_size'] - send_in_packets(revisions, self.send_revisions, packet_size) - - def send_batch_releases(self, releases): - """Send releases batches to the storage - """ - packet_size = self.config['release_packet_size'] - send_in_packets(releases, self.send_releases, packet_size) - def flush(self): """Flush any potential dangling data not sent to swh-storage. @@ -675,22 +428,6 @@ loading. """ - contents = self.contents.pop() - directories = self.directories.pop() - revisions = self.revisions.pop() - releases = self.releases.pop() - - # and send those to storage if asked - if self.config['send_contents']: - self.send_batch_contents(contents) - if self.config['send_contents']: - self.send_batch_directories(directories) - if self.config['send_revisions']: - self.send_batch_revisions(revisions) - if self.config['send_releases']: - self.send_batch_releases(releases) - if self.config['send_snapshot'] and self.snapshot: - self.send_snapshot(self.snapshot) if hasattr(self.storage, 'flush'): self.storage.flush() @@ -957,23 +694,16 @@ """Save the data associated to the current load""" raise NotImplementedError - def flush(self): - """Flush the storage if needed. - """ - if hasattr(self.storage, 'flush'): - self.storage.flush() - def store_data(self): if self.config['save_data']: self.save_data() - if self.config['send_contents'] and self.has_contents(): - self.send_batch_contents(self.get_contents()) - if self.config['send_directories'] and self.has_directories(): - self.send_batch_directories(self.get_directories()) - if self.config['send_revisions'] and self.has_revisions(): - self.send_batch_revisions(self.get_revisions()) - if self.config['send_releases'] and self.has_releases(): - self.send_batch_releases(self.get_releases()) - if self.config['send_snapshot']: - self.send_snapshot(self.get_snapshot()) + if self.has_contents(): + self.send_contents(self.get_contents()) + if self.has_directories(): + self.send_directories(self.get_directories()) + if self.has_revisions(): + self.send_revisions(self.get_revisions()) + if self.has_releases(): + self.send_releases(self.get_releases()) + self.send_snapshot(self.get_snapshot()) diff --git a/swh/loader/core/queue.py b/swh/loader/core/queue.py deleted file mode 100644 --- a/swh/loader/core/queue.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright (C) 2015-2016 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -class QueuePerNbElements(): - """Basic queue which holds the nb of elements it contains. - - """ - def __init__(self, max_nb_elements): - self.reset() - self.max_nb_elements = max_nb_elements - - def add(self, elements): - if not isinstance(elements, list): - elements = list(elements) - self.elements.extend(elements) - self.count += len(elements) - return self.count >= self.max_nb_elements - - def pop(self): - elements = self.elements - self.reset() - return elements - - def reset(self): - self.elements = [] - self.count = 0 - - -class QueuePerSizeAndNbUniqueElements(): - """Queue which permits to add unknown elements and holds the current - size of the queue. - - """ - def __init__(self, max_nb_elements, max_size, key): - self.reset() - self.max_nb_elements = max_nb_elements - self.max_size = max_size - self.key = key - self.keys = set() - - def _add_element(self, e): - k = e[self.key] - if k not in self.keys: - self.keys.add(k) - self.elements.append(e) - self.size += e['length'] - self.count += 1 - - def add(self, elements): - for e in elements: - self._add_element(e) - return self.size >= self.max_size or \ - self.count >= self.max_nb_elements - - def pop(self): - elements = self.elements - self.reset() - return elements - - def reset(self): - self.elements = [] - self.keys = set() - self.size = 0 - self.count = 0 - - -class QueuePerNbUniqueElements(): - """Queue which permits to add unknown elements and knows the actual - count of elements it held. - - """ - def __init__(self, max_nb_elements, key): - self.reset() - self.max_nb_elements = max_nb_elements - self.key = key - self.keys = set() - - def _add_element(self, e): - k = e[self.key] - if k not in self.keys: - self.keys.add(k) - self.elements.append(e) - self.count += 1 - - def add(self, elements): - for e in elements: - self._add_element(e) - return self.count >= self.max_nb_elements - - def pop(self): - elements = self.elements - self.reset() - return elements - - def reset(self): - self.elements = [] - self.keys = set() - self.count = 0 diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -38,36 +38,55 @@ self.storage.origin_visit_add(self.origin_url, self.visit_date, self.visit_type) + +class DummyUnbufferedLoader(DummyLoader, UnbufferedLoader): + """Unbuffered loader will send directly to storage new data + + """ def parse_config_file(self, *args, **kwargs): return { 'storage': { - 'cls': 'memory', - 'args': { - } + 'cls': 'pipeline', + 'steps': [ + { + 'cls': 'filter' + }, + { + 'cls': 'memory' + }, + ] }, - - 'send_contents': True, - 'send_directories': True, - 'send_revisions': True, - 'send_releases': True, - 'send_snapshot': True, - - 'content_packet_size': 2, - 'content_packet_size_bytes': 8, - 'directory_packet_size': 2, - 'revision_packet_size': 2, - 'release_packet_size': 2, - - 'content_size_limit': 10000, } -class DummyUnbufferedLoader(DummyLoader, UnbufferedLoader): - pass - - class DummyBufferedLoader(DummyLoader, BufferedLoader): - pass + """Buffered loader will send new data when threshold is reached + + """ + def parse_config_file(self, *args, **kwargs): + return { + 'storage': { + 'cls': 'pipeline', + 'steps': [ + { + 'cls': 'filter' + }, + { + 'cls': 'buffer', + 'min_batch_size': { + 'content': 2, + 'content_bytes': 8, + 'directory': 2, + 'revision': 2, + 'release': 2, + }, + }, + { + 'cls': 'memory' + }, + ] + }, + } class DummyBaseLoaderTest(BaseLoaderTest): @@ -230,20 +249,20 @@ def test_buffered_loader(self): self.loader.load() # initialize the loader - self.loader.maybe_load_contents(self.in_contents[0:1]) - self.loader.maybe_load_directories(self.in_directories[0:1]) - self.loader.maybe_load_revisions(self.in_revisions[0:1]) - self.loader.maybe_load_releases(self.in_releases[0:1]) + self.loader.send_contents(self.in_contents[0:1]) + self.loader.send_directories(self.in_directories[0:1]) + self.loader.send_revisions(self.in_revisions[0:1]) + self.loader.send_releases(self.in_releases[0:1]) self.assertCountContents(0) self.assertCountDirectories(0) self.assertCountRevisions(0) self.assertCountReleases(0) - self.loader.maybe_load_contents(self.in_contents[1:]) - self.loader.maybe_load_directories(self.in_directories[1:]) - self.loader.maybe_load_revisions(self.in_revisions[1:]) - self.loader.maybe_load_releases(self.in_releases[1:]) + self.loader.send_contents(self.in_contents[1:]) + self.loader.send_directories(self.in_directories[1:]) + self.loader.send_revisions(self.in_revisions[1:]) + self.loader.send_releases(self.in_releases) self.assertCountContents(len(self.in_contents)) self.assertCountDirectories(len(self.in_directories)) @@ -254,8 +273,8 @@ """Checks that sending a directory triggers sending contents""" self.loader.load() # initialize the loader - self.loader.maybe_load_contents(self.in_contents[0:1]) - self.loader.maybe_load_directories(self.in_directories) + self.loader.send_contents(self.in_contents[0:1]) + self.loader.send_directories(self.in_directories) self.assertCountContents(1) self.assertCountDirectories(len(self.in_directories)) @@ -266,9 +285,9 @@ self.loader.load() # initialize the loader - self.loader.maybe_load_contents(self.in_contents[0:1]) - self.loader.maybe_load_directories(self.in_directories[0:1]) - self.loader.maybe_load_revisions(self.in_revisions) + self.loader.send_contents(self.in_contents[0:1]) + self.loader.send_directories(self.in_directories[0:1]) + self.loader.send_revisions(self.in_revisions) self.assertCountContents(1) self.assertCountDirectories(1) @@ -279,10 +298,10 @@ contents, and directories.""" self.loader.load() # initialize the loader - self.loader.maybe_load_contents(self.in_contents[0:1]) - self.loader.maybe_load_directories(self.in_directories[0:1]) - self.loader.maybe_load_revisions(self.in_revisions[0:1]) - self.loader.maybe_load_releases(self.in_releases) + self.loader.send_contents(self.in_contents[0:1]) + self.loader.send_directories(self.in_directories[0:1]) + self.loader.send_revisions(self.in_revisions[0:1]) + self.loader.send_releases(self.in_releases) self.assertCountContents(1) self.assertCountDirectories(1) @@ -294,11 +313,11 @@ revisions, contents, and directories.""" self.loader.load() # initialize the loader - self.loader.maybe_load_contents(self.in_contents[0:1]) - self.loader.maybe_load_directories(self.in_directories[0:1]) - self.loader.maybe_load_revisions(self.in_revisions[0:1]) - self.loader.maybe_load_releases(self.in_releases[0:1]) - self.loader.maybe_load_snapshot(self.in_snapshot) + self.loader.send_contents(self.in_contents[0:1]) + self.loader.send_directories(self.in_directories[0:1]) + self.loader.send_revisions(self.in_revisions[0:1]) + self.loader.send_releases(self.in_releases[0:1]) + self.loader.send_snapshot(self.in_snapshot) self.assertCountContents(1) self.assertCountDirectories(1) diff --git a/swh/loader/core/tests/test_queue.py b/swh/loader/core/tests/test_queue.py deleted file mode 100644 --- a/swh/loader/core/tests/test_queue.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright (C) 2015-2016 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest - -from swh.loader.core.queue import (QueuePerNbElements, - QueuePerNbUniqueElements, - QueuePerSizeAndNbUniqueElements) - - -class TestQueuePerNbElements(unittest.TestCase): - def test_simple_queue_behavior(self): - max_nb_elements = 10 - queue = QueuePerNbElements(max_nb_elements=max_nb_elements) - - elements = [1, 3, 4, 9, 20, 30, 40] - actual_threshold = queue.add(elements) - - self.assertFalse(actual_threshold, len(elements) > max_nb_elements) - - # pop returns the content and reset the queue - actual_elements = queue.pop() - self.assertEqual(actual_elements, elements) - self.assertEqual(queue.pop(), []) - - # duplicates can be integrated - new_elements = [1, 1, 3, 4, 9, 20, 30, 40, 12, 14, 2] - actual_threshold = queue.add(new_elements) - - self.assertTrue(actual_threshold) - self.assertEqual(queue.pop(), new_elements) - - # reset is destructive too - queue.add(new_elements) - queue.reset() - - self.assertEqual(queue.pop(), []) - - -def to_some_objects(elements, key): - for elt in elements: - yield {key: elt} - - -class TestQueuePerNbUniqueElements(unittest.TestCase): - def test_queue_with_unique_key_behavior(self): - max_nb_elements = 5 - queue = QueuePerNbUniqueElements(max_nb_elements=max_nb_elements, - key='id') - - # no duplicates - elements = list(to_some_objects([1, 1, 3, 4, 9], key='id')) - actual_threshold = queue.add(elements) - - self.assertFalse(actual_threshold, len(elements) > max_nb_elements) - - # pop returns the content and reset the queue - actual_elements = queue.pop() - self.assertEqual(actual_elements, - [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}]) - self.assertEqual(queue.pop(), []) - - new_elements = list(to_some_objects( - [1, 3, 4, 9, 20], - key='id')) - actual_threshold = queue.add(new_elements) - - self.assertTrue(actual_threshold) - - # reset is destructive too - queue.add(new_elements) - queue.reset() - - self.assertEqual(queue.pop(), []) - - -def to_some_complex_objects(elements, key): - for elt, size in elements: - yield {key: elt, 'length': size} - - -class TestQueuePerSizeAndNbUniqueElements(unittest.TestCase): - def test_queue_with_unique_key_and_size_behavior(self): - max_nb_elements = 5 - max_size = 100 - queue = QueuePerSizeAndNbUniqueElements( - max_nb_elements=max_nb_elements, - max_size=max_size, - key='k') - - # size total exceeded, nb elements not reached, still the - # threshold is deemed reached - elements = list(to_some_complex_objects([(1, 10), - (2, 20), - (3, 30), - (4, 100)], key='k')) - actual_threshold = queue.add(elements) - - self.assertTrue(actual_threshold) - - # pop returns the content and reset the queue - actual_elements = queue.pop() - self.assertEqual(actual_elements, - [{'k': 1, 'length': 10}, - {'k': 2, 'length': 20}, - {'k': 3, 'length': 30}, - {'k': 4, 'length': 100}]) - self.assertEqual(queue.pop(), []) - - # size threshold not reached, nb elements reached, the - # threshold is considered reached - new_elements = list(to_some_complex_objects( - [(1, 10), (3, 5), (4, 2), (9, 1), (20, 0)], - key='k')) - actual_threshold = queue.add(new_elements) - - queue.reset() - - self.assertTrue(actual_threshold) - - # nb elements threshold not reached, nor the top number of - # elements, the threshold is not reached - new_elements = list(to_some_complex_objects( - [(1, 10)], - key='k')) - actual_threshold = queue.add(new_elements) - - self.assertFalse(actual_threshold) - - # reset is destructive too - queue.add(new_elements) - queue.reset() - - self.assertEqual(queue.pop(), [])