diff --git a/playbooks/files/replay.py b/playbooks/files/replay.py index b4f264c..c429843 100644 --- a/playbooks/files/replay.py +++ b/playbooks/files/replay.py @@ -1,100 +1,190 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from time import time import logging from concurrent.futures import ThreadPoolExecutor from swh.storage import HashCollision +from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex from swh.objstorage.objstorage import ID_HASH_ALGO from swh.core.statsd import statsd logger = logging.getLogger(__name__) def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): _insert_objects(object_type, objects, storage) +def _fix_revision_pypi_empty_string(rev): + """PyPI loader failed to encode empty strings as bytes, see: + swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 + or https://forge.softwareheritage.org/D1772 + """ + if rev['author'].get('email') == '': + rev['author']['email'] = b'' + if rev['author'].get('name') == '': + rev['author']['name'] = b'' + if rev['committer'].get('email') == '': + rev['committer']['email'] = b'' + if rev['committer'].get('name') == '': + rev['committer']['name'] = b'' + + +def _check_date(date): + date = normalize_timestamp(date) + return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ + and (0 <= date['timestamp']['microseconds'] << 10**6) \ + and (-2**15 <= date['offset'] < 2**15) + + +def _check_revision_date(rev): + """Exclude revisions with invalid dates. + See https://forge.softwareheritage.org/T1339""" + return _check_date(rev['date']) and _check_date(rev['committer_date']) + + +def _fix_revisions(revisions): + good_revisions = [] + for rev in revisions: + _fix_revision_pypi_empty_string(rev) + if not _check_revision_date(rev): + logging.warning('Excluding revision: %r', rev) + continue + good_revisions.append(rev) + revisions[:] = good_revisions + + +def _fix_origin_visits(visits): + for visit in visits: + if isinstance(visit['origin'], str): + # note that it will crash with the pg and + # in-mem storages if the origin is not already known, + # but there is no other choice because we can't add an + # origin without knowing its type. Non-pg storages + # don't use a numeric FK internally, + visit['origin'] = {'url': visit['origin']} + else: + if 'type' not in visit: + visit['type'] = visit['origin']['type'] + + +def fix_objects(object_type, objects): + """Converts a possibly old object from the journal to its current + expected format. + + List of conversions: + + Empty author name/email in PyPI releases: + + >>> from pprint import pprint + >>> pprint(fix_objects('revision', [{ + ... 'author': {'email': '', 'fullname': b'', 'name': ''}, + ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, + ... }])) + [{'author': {'email': b'', 'fullname': b'', 'name': b''}, + 'committer': {'email': b'', 'fullname': b'', 'name': b''}}] + + + `visit['origin']` is an URL instead of a dict: + + >>> fix_objects('origin_visit', [{'origin': 'http://foo'}]) + [{'origin': {'url': 'http://foo'}}] + + `visit['type']` is missing , but `origin['visit']['type']` exists: + + >>> pprint(fix_objects( + ... 'origin_visit', + ... [{'origin': {'type': 'hg', 'url': 'http://foo'}}])) + [{'origin': {'type': 'hg', 'url': 'http://foo'}, 'type': 'hg'}] + """ + + if object_type == 'revision': + _fix_revisions(objects) + elif object_type == 'origin_visit': + _fix_origin_visits(objects) + return objects + + def _insert_objects(object_type, objects, storage): + fix_objects(object_type, objects) + if object_type == 'content': # TODO: insert 'content' in batches for object_ in objects: try: storage.content_add_metadata([object_]) except HashCollision as e: logger.error('Hash collision: %s', e.args) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') - method(objects) + try: + method(objects) + except Exception as e: + for obj in objects: + try: + method([obj]) + except Exception: + logger.error('Failed to insert %s: %r' % (object_type, obj)) + raise + raise elif object_type == 'origin_visit': for visit in objects: - if isinstance(visit['origin'], str): - # old format - visit['origin'] = {'url': visit['origin']} - else: + if 'type' in visit['origin']: storage.origin_add_one(visit['origin']) - if 'type' not in visit: - # old format - visit['type'] = visit['origin']['type'] - - # filter out buggy objects produced by the backfiller - # see https://forge.softwareheritage.org/D1742 - objects = [visit for visit in objects - if 'visit' in visit] - storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def copy_object(obj_id, src, dst): statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' try: with statsd.timed(statsd_name % 'get'): obj = src.get(obj_id) with statsd.timed(statsd_name % 'put'): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment( 'swh_journal_content_replayer_bytes_total', len(obj)) except Exception: obj = '' logger.exception('Failed to copy %s', hash_to_hex(obj_id)) return len(obj) def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): vol = [] t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as executor: for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] == 'visible': fut = executor.submit(copy_object, obj_id, src, dst) fut.add_done_callback(lambda fn: vol.append(fn.result())) else: logger.debug('skipped %s (%s)', hash_to_hex(obj_id), obj['status']) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %s failures', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, len([x for x in vol if not x])) diff --git a/playbooks/install-replayer.yml b/playbooks/install-replayer.yml index 97738f2..8522bc2 100644 --- a/playbooks/install-replayer.yml +++ b/playbooks/install-replayer.yml @@ -1,60 +1,62 @@ - hosts: cassandra-clients tasks: - name: "set journal source location" set_fact: journal_source: "/home/{{ansible_user}}/swh-journal" - name: "get swh-journal's source" git: dest: "{{journal_source}}" repo: "https://forge.softwareheritage.org/source/swh-journal.git" force: yes - - name: "fix 'replay' command for recent versions of click" - shell: "sed -i \"s/'--group-id', '--consumer-id'/'--group-id'/\" {{journal_source}}/swh/journal/cli.py" - name: "update replayer" copy: src: replay.py dest: "{{journal_source}}/swh/journal/replay.py" + - name: "update replayer cli" + copy: + src: cli.py + dest: "{{journal_source}}/swh/journal/cli.py" - name: "write dummy version file" # to please vcversioner copy: content: "0.0.0-foo-bar" dest: "{{journal_source}}/version.txt" - name: "install swh-journal" shell: "python3 setup.py develop --user" args: chdir: "{{journal_source}}" - name: "copy replayer config file" template: src: "replayer.yml" dest: "/home/{{ansible_user}}/replayer_{{item}}.yml" vars: rpc_port: "{{item}}" with_items: "{{swh_storage_ports}}" - name: "create systemd unit dir" file: path: "/home/{{ansible_user}}/.config/systemd/user/" state: directory - name: "install replayer systemd unit" template: src: "replayer@.service" dest: "/home/{{ansible_user}}/.config/systemd/user/replayer@.service" - name: "enable systemd unit" systemd: name: "replayer@{{item}}.service" enabled: yes user: yes daemon_reload: yes # https://github.com/ansible/ansible/issues/36585 with_items: "{{swh_storage_ports}}" - name: "restart replayer" systemd: name: "replayer@{{item}}.service" state: restarted user: yes with_items: "{{swh_storage_ports}}"