Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
Show All 10 Lines | |||||
import requests | import requests | ||||
import traceback | import traceback | ||||
import uuid | import uuid | ||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
from retrying import retry | from retrying import retry | ||||
from typing import Any, Dict, Optional, Tuple | from typing import Any, Dict, Optional, Tuple | ||||
from . import converters | |||||
from swh.core import config | from swh.core import config | ||||
from swh.storage import get_storage, HashCollision | from swh.storage import get_storage, HashCollision | ||||
from .queue import QueuePerSizeAndNbUniqueElements | |||||
from .queue import QueuePerNbUniqueElements | |||||
def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): | |||||
"""Send `objects`, using the `sender`, in packets of `packet_size` objects | |||||
(and of max `packet_size_bytes`). | |||||
""" | |||||
formatted_objects = [] | |||||
count = 0 | |||||
if not packet_size_bytes: | |||||
packet_size_bytes = 0 | |||||
for obj in objects: | |||||
if not obj: | |||||
continue | |||||
formatted_objects.append(obj) | |||||
if packet_size_bytes: | |||||
count += obj['length'] | |||||
if len(formatted_objects) >= packet_size or count > packet_size_bytes: | |||||
sender(formatted_objects) | |||||
formatted_objects = [] | |||||
count = 0 | |||||
if formatted_objects: | |||||
sender(formatted_objects) | |||||
def retry_loading(error): | def retry_loading(error): | ||||
"""Retry policy when the database raises an integrity error""" | """Retry policy when the database raises an integrity error""" | ||||
exception_classes = [ | exception_classes = [ | ||||
# raised when two parallel insertions insert the same data. | # raised when two parallel insertions insert the same data. | ||||
psycopg2.IntegrityError, | psycopg2.IntegrityError, | ||||
HashCollision, | HashCollision, | ||||
# raised when uWSGI restarts and hungs up on the worker. | # raised when uWSGI restarts and hungs up on the worker. | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | - and implement the @abstractmethod methods: | ||||
- :func:`cleanup`: Last step executed by the loader. | - :func:`cleanup`: Last step executed by the loader. | ||||
The entry point for the resulting loader is :func:`load`. | The entry point for the resulting loader is :func:`load`. | ||||
You can take a look at some example classes: | You can take a look at some example classes: | ||||
- :class:`BaseSvnLoader` | - :class:`BaseSvnLoader` | ||||
- :class:`TarLoader` | |||||
- :class:`DirLoader` | |||||
- :class:`DebianLoader` | |||||
""" | """ | ||||
CONFIG_BASE_FILENAME = None # type: Optional[str] | CONFIG_BASE_FILENAME = None # type: Optional[str] | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'storage': ('dict', { | 'storage': ('dict', { | ||||
'cls': 'remote', | 'cls': 'remote', | ||||
'args': { | 'args': { | ||||
'url': 'http://localhost:5002/', | 'url': 'http://localhost:5002/', | ||||
} | } | ||||
}), | }), | ||||
'send_contents': ('bool', True), | |||||
'send_directories': ('bool', True), | |||||
'send_revisions': ('bool', True), | |||||
'send_releases': ('bool', True), | |||||
'send_snapshot': ('bool', True), | |||||
'save_data': ('bool', False), | 'save_data': ('bool', False), | ||||
'save_data_path': ('str', ''), | 'save_data_path': ('str', ''), | ||||
# Number of contents | |||||
'content_packet_size': ('int', 10000), | |||||
# packet of 100Mib contents | |||||
'content_packet_size_bytes': ('int', 100 * 1024 * 1024), | |||||
'directory_packet_size': ('int', 25000), | |||||
'revision_packet_size': ('int', 100000), | |||||
'release_packet_size': ('int', 100000), | |||||
'occurrence_packet_size': ('int', 100000), | |||||
} # type: Dict[str, Tuple[str, Any]] | } # type: Dict[str, Tuple[str, Any]] | ||||
ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ||||
def __init__(self, logging_class=None, config=None): | def __init__(self, logging_class=None, config=None): | ||||
if config: | if config: | ||||
self.config = config | self.config = config | ||||
else: | else: | ||||
self.config = self.parse_config_file( | self.config = self.parse_config_file( | ||||
additional_configs=[self.ADDITIONAL_CONFIG]) | additional_configs=[self.ADDITIONAL_CONFIG]) | ||||
self.storage = get_storage(**self.config['storage']) | self.storage = get_storage(**self.config['storage']) | ||||
if logging_class is None: | if logging_class is None: | ||||
logging_class = '%s.%s' % (self.__class__.__module__, | logging_class = '%s.%s' % (self.__class__.__module__, | ||||
self.__class__.__name__) | self.__class__.__name__) | ||||
self.log = logging.getLogger(logging_class) | self.log = logging.getLogger(logging_class) | ||||
self.contents = QueuePerSizeAndNbUniqueElements( | |||||
key='sha1', | |||||
max_nb_elements=self.config['content_packet_size'], | |||||
max_size=self.config['content_packet_size_bytes']) | |||||
self.contents_seen = set() | |||||
self.directories = QueuePerNbUniqueElements( | |||||
key='id', | |||||
max_nb_elements=self.config['directory_packet_size']) | |||||
self.directories_seen = set() | |||||
self.revisions = QueuePerNbUniqueElements( | |||||
key='id', | |||||
max_nb_elements=self.config['revision_packet_size']) | |||||
self.revisions_seen = set() | |||||
self.releases = QueuePerNbUniqueElements( | |||||
key='id', | |||||
max_nb_elements=self.config['release_packet_size']) | |||||
self.releases_seen = set() | |||||
self.snapshot = None | |||||
_log = logging.getLogger('requests.packages.urllib3.connectionpool') | _log = logging.getLogger('requests.packages.urllib3.connectionpool') | ||||
_log.setLevel(logging.WARN) | _log.setLevel(logging.WARN) | ||||
self.counters = { | self.counters = { | ||||
'contents': 0, | 'contents': 0, | ||||
'directories': 0, | 'directories': 0, | ||||
'revisions': 0, | 'revisions': 0, | ||||
'releases': 0, | 'releases': 0, | ||||
▲ Show 20 Lines • Show All 286 Lines • ▼ Show 20 Lines | def send_releases(self, release_list): | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'release', | 'swh_content_type': 'release', | ||||
'swh_num': num_releases, | 'swh_num': num_releases, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_snapshot(self, snapshot): | def send_snapshot(self, snapshot): | ||||
self.flush() # to ensure the snapshot targets existing objects | |||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin['url'], self.visit, snapshot=snapshot['id']) | self.origin['url'], self.visit, snapshot=snapshot['id']) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | |||||
def filter_missing_contents(self, contents): | |||||
"""Return only the contents missing from swh""" | |||||
max_content_size = self.config['content_size_limit'] | |||||
contents_per_key = {} | |||||
content_key = 'blake2s256' | |||||
for content in contents: | |||||
if content[content_key] in self.contents_seen: | |||||
continue | |||||
key = content[content_key] | |||||
contents_per_key[key] = content | |||||
self.contents_seen.add(key) | |||||
for key in self.storage.content_missing( | |||||
list(contents_per_key.values()), | |||||
key_hash=content_key | |||||
): | |||||
yield converters.content_for_storage( | |||||
contents_per_key[key], | |||||
max_content_size=max_content_size, | |||||
origin_url=self.origin['url'], | |||||
) | |||||
def bulk_send_contents(self, contents): | |||||
"""Format contents as swh contents and send them to the database. | |||||
""" | |||||
threshold_reached = self.contents.add( | |||||
self.filter_missing_contents(contents)) | |||||
if threshold_reached: | |||||
self.send_batch_contents(self.contents.pop()) | |||||
def filter_missing_directories(self, directories): | |||||
"""Return only directories missing from swh""" | |||||
directories_per_id = {} | |||||
search_dirs = [] | |||||
for directory in directories: | |||||
dir_id = directory['id'] | |||||
if dir_id in self.directories_seen: | |||||
continue | |||||
search_dirs.append(dir_id) | |||||
directories_per_id[dir_id] = directory | |||||
self.directories_seen.add(dir_id) | |||||
for dir_id in self.storage.directory_missing(search_dirs): | |||||
yield directories_per_id[dir_id] | |||||
def bulk_send_directories(self, directories): | |||||
"""Send missing directories to the database""" | |||||
threshold_reached = self.directories.add( | |||||
self.filter_missing_directories(directories)) | |||||
if threshold_reached: | |||||
self.send_batch_contents(self.contents.pop()) | |||||
self.send_batch_directories(self.directories.pop()) | |||||
def filter_missing_revisions(self, revisions): | |||||
"""Return only revisions missing from swh""" | |||||
revisions_per_id = {} | |||||
search_revs = [] | |||||
for revision in revisions: | |||||
rev_id = revision['id'] | |||||
if rev_id in self.revisions_seen: | |||||
continue | |||||
search_revs.append(rev_id) | |||||
revisions_per_id[rev_id] = revision | |||||
self.revisions_seen.add(rev_id) | |||||
for rev_id in self.storage.revision_missing(search_revs): | |||||
yield revisions_per_id[rev_id] | |||||
def bulk_send_revisions(self, revisions): | |||||
"""Send missing revisions to the database""" | |||||
threshold_reached = self.revisions.add( | |||||
self.filter_missing_revisions(revisions)) | |||||
if threshold_reached: | |||||
self.send_batch_contents(self.contents.pop()) | |||||
self.send_batch_directories(self.directories.pop()) | |||||
self.send_batch_revisions(self.revisions.pop()) | |||||
def filter_missing_releases(self, releases): | |||||
"""Return only releases missing from swh""" | |||||
releases_per_id = {} | |||||
search_rels = [] | |||||
for release in releases: | |||||
rel_id = release['id'] | |||||
if rel_id in self.releases_seen: | |||||
continue | |||||
search_rels.append(rel_id) | |||||
releases_per_id[rel_id] = release | |||||
self.releases_seen.add(rel_id) | |||||
for rel_id in self.storage.release_missing(search_rels): | |||||
yield releases_per_id[rel_id] | |||||
def bulk_send_releases(self, releases): | |||||
"""Send missing releases to the database""" | |||||
threshold_reached = self.releases.add( | |||||
self.filter_missing_releases(releases)) | |||||
if threshold_reached: | |||||
self.send_batch_contents(self.contents.pop()) | |||||
self.send_batch_directories(self.directories.pop()) | |||||
self.send_batch_revisions(self.revisions.pop()) | |||||
self.send_batch_releases(self.releases.pop()) | |||||
def bulk_send_snapshot(self, snapshot): | |||||
"""Send missing releases to the database""" | |||||
self.send_batch_contents(self.contents.pop()) | |||||
self.send_batch_directories(self.directories.pop()) | |||||
self.send_batch_revisions(self.revisions.pop()) | |||||
self.send_batch_releases(self.releases.pop()) | |||||
self.send_snapshot(snapshot) | |||||
def maybe_load_contents(self, contents): | |||||
"""Load contents in swh-storage if need be. | |||||
""" | |||||
if self.config['send_contents']: | |||||
self.bulk_send_contents(contents) | |||||
def maybe_load_directories(self, directories): | |||||
"""Load directories in swh-storage if need be. | |||||
""" | |||||
if self.config['send_directories']: | |||||
self.bulk_send_directories(directories) | |||||
def maybe_load_revisions(self, revisions): | |||||
"""Load revisions in swh-storage if need be. | |||||
""" | |||||
if self.config['send_revisions']: | |||||
self.bulk_send_revisions(revisions) | |||||
def maybe_load_releases(self, releases): | |||||
"""Load releases in swh-storage if need be. | |||||
""" | |||||
if self.config['send_releases']: | |||||
self.bulk_send_releases(releases) | |||||
def maybe_load_snapshot(self, snapshot): | |||||
"""Load the snapshot in swh-storage if need be.""" | |||||
if self.config['send_snapshot']: | |||||
self.bulk_send_snapshot(snapshot) | |||||
def send_batch_contents(self, contents): | |||||
"""Send contents batches to the storage""" | |||||
packet_size = self.config['content_packet_size'] | |||||
packet_size_bytes = self.config['content_packet_size_bytes'] | |||||
send_in_packets(contents, self.send_contents, packet_size, | |||||
packet_size_bytes=packet_size_bytes) | |||||
def send_batch_directories(self, directories): | |||||
"""Send directories batches to the storage""" | |||||
packet_size = self.config['directory_packet_size'] | |||||
send_in_packets(directories, self.send_directories, packet_size) | |||||
def send_batch_revisions(self, revisions): | |||||
"""Send revisions batches to the storage""" | |||||
packet_size = self.config['revision_packet_size'] | |||||
send_in_packets(revisions, self.send_revisions, packet_size) | |||||
def send_batch_releases(self, releases): | |||||
"""Send releases batches to the storage | |||||
""" | |||||
packet_size = self.config['release_packet_size'] | |||||
send_in_packets(releases, self.send_releases, packet_size) | |||||
def flush(self): | def flush(self): | ||||
"""Flush any potential dangling data not sent to swh-storage. | """Flush any potential dangling data not sent to swh-storage. | ||||
Bypass the maybe_load_* methods which awaits threshold reached | Bypass the maybe_load_* methods which awaits threshold reached | ||||
signal. We actually want to store those as we are done | signal. We actually want to store those as we are done | ||||
loading. | loading. | ||||
""" | """ | ||||
contents = self.contents.pop() | |||||
directories = self.directories.pop() | |||||
revisions = self.revisions.pop() | |||||
releases = self.releases.pop() | |||||
# and send those to storage if asked | |||||
if self.config['send_contents']: | |||||
self.send_batch_contents(contents) | |||||
if self.config['send_contents']: | |||||
self.send_batch_directories(directories) | |||||
if self.config['send_revisions']: | |||||
self.send_batch_revisions(revisions) | |||||
if self.config['send_releases']: | |||||
self.send_batch_releases(releases) | |||||
if self.config['send_snapshot'] and self.snapshot: | |||||
self.send_snapshot(self.snapshot) | |||||
if hasattr(self.storage, 'flush'): | if hasattr(self.storage, 'flush'): | ||||
self.storage.flush() | self.storage.flush() | ||||
def prepare_metadata(self): | def prepare_metadata(self): | ||||
"""First step for origin_metadata insertion, resolving the | """First step for origin_metadata insertion, resolving the | ||||
provider_id and the tool_id by fetching data from the storage | provider_id and the tool_id by fetching data from the storage | ||||
or creating tool and provider on the fly if the data isn't available | or creating tool and provider on the fly if the data isn't available | ||||
▲ Show 20 Lines • Show All 250 Lines • ▼ Show 20 Lines | class UnbufferedLoader(BufferedLoader): | ||||
def eventful(self): | def eventful(self): | ||||
"""Whether the load was eventful""" | """Whether the load was eventful""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def save_data(self): | def save_data(self): | ||||
"""Save the data associated to the current load""" | """Save the data associated to the current load""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def flush(self): | |||||
"""Flush the storage if needed. | |||||
""" | |||||
if hasattr(self.storage, 'flush'): | |||||
self.storage.flush() | |||||
def store_data(self): | def store_data(self): | ||||
if self.config['save_data']: | if self.config['save_data']: | ||||
self.save_data() | self.save_data() | ||||
if self.config['send_contents'] and self.has_contents(): | if self.has_contents(): | ||||
self.send_batch_contents(self.get_contents()) | self.send_contents(self.get_contents()) | ||||
if self.config['send_directories'] and self.has_directories(): | if self.has_directories(): | ||||
self.send_batch_directories(self.get_directories()) | self.send_directories(self.get_directories()) | ||||
if self.config['send_revisions'] and self.has_revisions(): | if self.has_revisions(): | ||||
self.send_batch_revisions(self.get_revisions()) | self.send_revisions(self.get_revisions()) | ||||
if self.config['send_releases'] and self.has_releases(): | if self.has_releases(): | ||||
self.send_batch_releases(self.get_releases()) | self.send_releases(self.get_releases()) | ||||
if self.config['send_snapshot']: | |||||
self.send_snapshot(self.get_snapshot()) | self.send_snapshot(self.get_snapshot()) |