diff --git a/PKG-INFO b/PKG-INFO index 4c7cc2c..1a1178a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.28 +Version: 0.0.29 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index e172a16..ba52a6d 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 -swh.model >= 0.0.60 +swh.model >= 0.0.61 swh.storage >= 0.0.181 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index 4c7cc2c..1a1178a 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.28 +Version: 0.0.29 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 4e69043..4a6da92 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,13 +1,13 @@ confluent-kafka msgpack tenacity vcversioner swh.core[db,http]>=0.0.60 -swh.model>=0.0.60 +swh.model>=0.0.61 swh.storage>=0.0.181 [testing] pytest swh.model>=0.0.34 pytest-kafka hypothesis diff --git a/swh/journal/fixer.py b/swh/journal/fixer.py index f3f5797..a3ff5b1 100644 --- a/swh/journal/fixer.py +++ b/swh/journal/fixer.py @@ -1,268 +1,290 @@ import copy import logging from typing import Any, Dict, List, Optional from swh.model.identifiers import normalize_timestamp logger = logging.getLogger(__name__) def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: """Filters-out invalid 'perms' key that leaked from swh.model.from_disk to the journal. >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) {'sha1_git': b'foo'} >>> _fix_content({'sha1_git': b'bar'}) {'sha1_git': b'bar'} """ content = content.copy() content.pop('perms', None) return content def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, 'author': rev['author'].copy(), 'committer': rev['committer'].copy(), } if rev['author'].get('email') == '': rev['author']['email'] = b'' if rev['author'].get('name') == '': rev['author']['name'] = b'' if rev['committer'].get('email') == '': rev['committer']['email'] = b'' if rev['committer'].get('name') == '': rev['committer']['name'] = b'' return rev def _fix_revision_transplant_source(rev): if rev.get('metadata') and rev['metadata'].get('extra_headers'): rev = copy.deepcopy(rev) rev['metadata']['extra_headers'] = [ [key, value.encode('ascii')] if key == 'transplant_source' and isinstance(value, str) else [key, value] for (key, value) in rev['metadata']['extra_headers']] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ if date is None: return True date = normalize_timestamp(date) return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ and (0 <= date['timestamp']['microseconds'] < 10**6) \ and (-2**15 <= date['offset'] < 2**15) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" return _check_date(rev['date']) and _check_date(rev['committer_date']) def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: """Fix various legacy revision issues. Fix author/committer person: >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } >>> rev0 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> rev0['author'] {'fullname': b'', 'name': b'', 'email': b''} >>> rev0['committer'] {'fullname': b'', 'name': b'', 'email': b''} Fix type of 'transplant_source' extra headers: >>> rev1 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] ... ]}, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> pprint(rev1['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Revision with invalid date are filtered: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }) >>> rev is None True """ # noqa rev = _fix_revision_pypi_empty_string(revision) rev = _fix_revision_transplant_source(rev) if not _check_revision_date(rev): logger.warning('Invalid revision date detected: %(revision)s', { 'revision': rev }) return None return rev +def _fix_origin(origin: Dict) -> Dict: + """Fix legacy origin with type which is no longer part of the model. + + >>> from pprint import pprint + >>> pprint(_fix_origin({ + ... 'url': 'http://foo', + ... })) + {'url': 'http://foo'} + >>> pprint(_fix_origin({ + ... 'url': 'http://bar', + ... 'type': 'foo', + ... })) + {'url': 'http://bar'} + + """ + o = origin.copy() + o.pop('type', None) + return o + + def _fix_origin_visit(visit: Dict) -> Dict: """Fix various legacy origin visit issues. `visit['origin']` is a dict instead of an URL: >>> from datetime import datetime, timezone >>> from pprint import pprint >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) >>> pprint(_fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'type': 'git', ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'metadata': None, 'origin': 'http://foo', 'snapshot': None, 'status': 'ongoing', 'type': 'git'} `visit['type']` is missing , but `origin['visit']['type']` exists: >>> pprint(_fix_origin_visit( ... {'origin': {'type': 'hg', 'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'metadata': None, 'origin': 'http://foo', 'snapshot': None, 'status': 'ongoing', 'type': 'hg'} Old visit format (origin_visit with no type) raises: >>> _fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... >>> _fix_origin_visit({ ... 'origin': 'http://foo', ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... """ # noqa visit = visit.copy() if 'type' not in visit: if isinstance(visit['origin'], dict) and 'type' in visit['origin']: # Very old version of the schema: visits did not have a type, # but their 'origin' field was a dict with a 'type' key. visit['type'] = visit['origin']['type'] else: # Very old schema version: 'type' is missing, stop early # We expect the journal's origin_visit topic to no longer reference # such visits. If it does, the replayer must crash so we can fix # the journal's topic. raise ValueError(f'Old origin visit format detected: {visit}') if isinstance(visit['origin'], dict): # Old version of the schema: visit['origin'] was a dict. visit['origin'] = visit['origin']['url'] if 'metadata' not in visit: visit['metadata'] = None return visit def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: """ Fix legacy objects from the journal to bring them up to date with the latest storage schema. """ if object_type == 'content': return [_fix_content(v) for v in objects] elif object_type == 'revision': revisions = [_fix_revision(v) for v in objects] return [rev for rev in revisions if rev is not None] + elif object_type == 'origin': + return [_fix_origin(v) for v in objects] elif object_type == 'origin_visit': return [_fix_origin_visit(v) for v in objects] else: return objects diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 0b8880e..48d1d92 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,76 +1,79 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import (kafka_to_value, key_to_kafka, value_to_kafka) class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) + def close(self): + pass + class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False self.batch_size = 200 diff --git a/version.txt b/version.txt index 1bd474e..f17c0f7 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.28-0-g7ab4c2c \ No newline at end of file +v0.0.29-0-g0ddcb92 \ No newline at end of file