Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122807
D1345.id4538.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
25 KB
Subscribers
None
D1345.id4538.diff
View Options
diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/backfill.py
@@ -0,0 +1,464 @@
+# Copyright (C) 2017-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""The backfiller's goal is to produce part or all of the swh objects
+from the storage to the journal.
+
+At the moment, a first implementation is the JournalBackfiller. It
+reads the objects from the storage and sends every inflated object to
+the journal.
+
+"""
+
+import logging
+
+from .direct_writer import DirectKafkaWriter
+
+from swh.core.db import BaseDb
+from swh.storage.converters import db_to_release, db_to_revision
+
+
+logger = logging.getLogger(__name__)
+
+PARTITION_KEY = {
+ 'content': 'sha1',
+ 'skipped_content': None, # unused
+ 'directory': 'id',
+ 'revision': 'revision.id',
+ 'release': 'release.id',
+ 'snapshot': 'id',
+ 'origin': 'id',
+ 'origin_visit': 'origin_visit.origin',
+}
+
+COLUMNS = {
+ 'content': [
+ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status',
+ 'ctime'
+ ],
+ 'skipped_content': [
+ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime',
+ 'status', 'reason',
+ ],
+ 'directory': ['id', 'dir_entries', 'file_entries', 'rev_entries'],
+ 'revision': [
+ ("revision.id", "id"),
+ "date",
+ "date_offset",
+ "committer_date",
+ "committer_date_offset",
+ "type",
+ "directory",
+ "message",
+ "synthetic",
+ "metadata",
+ "date_neg_utc_offset",
+ "committer_date_neg_utc_offset",
+ ("array(select parent_id::bytea from revision_history rh "
+ "where rh.id = revision.id order by rh.parent_rank asc)",
+ "parents"),
+ ("a.id", "author_id"),
+ ("a.name", "author_name"),
+ ("a.email", "author_email"),
+ ("a.fullname", "author_fullname"),
+ ("c.id", "committer_id"),
+ ("c.name", "committer_name"),
+ ("c.email", "committer_email"),
+ ("c.fullname", "committer_fullname"),
+ ],
+ 'release': [
+ ("release.id", "id"),
+ "date",
+ "date_offset",
+ "comment",
+ ("release.name", "name"),
+ "synthetic",
+ "date_neg_utc_offset",
+ "target",
+ "target_type",
+ ("a.id", "author_id"),
+ ("a.name", "author_name"),
+ ("a.email", "author_email"),
+ ("a.fullname", "author_fullname"),
+ ],
+ 'snapshot': ['id', 'object_id'],
+ 'origin': ['type', 'url'],
+ 'origin_visit': ['type', 'url', 'date', 'snapshot', 'status'],
+}
+
+
+JOINS = {
+ 'release': ['person a on release.author=a.id'],
+ 'revision': ['person a on revision.author=a.id',
+ 'person c on revision.committer=c.id'],
+ 'origin_visit': ['origin on origin_visit.origin=origin.id'],
+}
+
+
+def directory_converter(db, directory):
+ """Convert directory from the flat representation to swh model
+ compatible objects.
+
+ """
+ columns = ['target', 'name', 'perms']
+ query_template = '''
+ select %(columns)s
+ from directory_entry_%(type)s
+ where id in %%s
+ '''
+
+ types = ['file', 'dir', 'rev']
+
+ entries = []
+ with db.cursor() as cur:
+ for type in types:
+ ids = directory.pop('%s_entries' % type)
+ if not ids:
+ continue
+ query = query_template % {
+ 'columns': ','.join(columns),
+ 'type': type,
+ }
+ cur.execute(query, (tuple(ids), ))
+ for row in cur:
+ entry = dict(zip(columns, row))
+ entry['type'] = type
+ entries.append(entry)
+
+ directory['entries'] = entries
+ return directory
+
+
+def revision_converter(db, revision):
+ """Convert revision from the flat representation to swh model
+ compatible objects.
+
+ """
+ revision = db_to_revision(revision)
+ if 'author' in revision and revision['author']:
+ del revision['author']['id']
+ if 'committer' in revision and revision['committer']:
+ del revision['committer']['id']
+ return revision
+
+
+def release_converter(db, release):
+ """Convert release from the flat representation to swh model
+ compatible objects.
+
+ """
+ release = db_to_release(release)
+ if 'author' in release and release['author']:
+ del release['author']['id']
+ return release
+
+
+def snapshot_converter(db, snapshot):
+ """Convert snapshot from the flat representation to swh model
+ compatible objects.
+
+ """
+ columns = ['name', 'target', 'target_type']
+ query = '''
+ select %s
+ from snapshot_branches sbs
+ inner join snapshot_branch sb on sb.object_id=sbs.branch_id
+ where sbs.snapshot_id=%%s
+ ''' % ', '.join(columns)
+ with db.cursor() as cur:
+ cur.execute(query, (snapshot.pop('object_id'), ))
+ branches = {}
+ for name, *row in cur:
+ branch = dict(zip(columns[1:], row))
+ if not branch['target'] and not branch['target_type']:
+ branch = None
+ branches[name] = branch
+
+ snapshot['branches'] = branches
+ return snapshot
+
+
+def origin_visit_converter(db, origin_visit):
+ origin = {
+ 'type': origin_visit.pop('type'),
+ 'url': origin_visit.pop('url'),
+ }
+ origin_visit['origin'] = origin
+ return origin_visit
+
+
+CONVERTERS = {
+ 'directory': directory_converter,
+ 'revision': revision_converter,
+ 'release': release_converter,
+ 'snapshot': snapshot_converter,
+ 'origin_visit': origin_visit_converter,
+}
+
+
+def _compute_shift_bits(numbits):
+ """Compute length and shift bits from numbits
+
+ """
+ q, r = divmod(numbits, 8)
+ length = q + (r != 0)
+ shift_bits = 8 - r if r else 0
+ return length, shift_bits
+
+
+def object_to_offset(object_id, numbits):
+ """Compute the index of the range containing object id, when dividing
+ space into 2^numbits.
+
+ Args:
+ object_id (str): The hex representation of object_id
+ numbits (int): Number of bits in which we divide input space
+
+ Returns:
+ The index of the range containing object id
+
+ """
+ length, shift_bits = _compute_shift_bits(numbits)
+ truncated_id = object_id[:length * 2]
+ if len(truncated_id) < length * 2:
+ truncated_id += '0' * (length * 2 - len(truncated_id))
+
+ truncated_id_bytes = bytes.fromhex(truncated_id)
+ return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits
+
+
+def byte_ranges(numbits, start_object=None, end_object=None):
+ """Generate start/end pairs of bytes spanning numbits bits and
+ constrained by optional start_object and end_object.
+
+ Args:
+ numbits (int): Number of bits in which we divide input space
+ start_object (str): Hex object id contained in the first range
+ returned
+ end_object (str): Hex object id contained in the last range
+ returned
+
+ Yields:
+ 2^numbits pairs of bytes
+
+ """
+ length, shift_bits = _compute_shift_bits(numbits)
+
+ def to_bytes(i):
+ return int.to_bytes(i << shift_bits, length=length, byteorder='big')
+
+ start_offset = 0
+ end_offset = 1 << numbits
+
+ if start_object is not None:
+ start_offset = object_to_offset(start_object, numbits)
+ if end_object is not None:
+ end_offset = object_to_offset(end_object, numbits) + 1
+
+ for start in range(start_offset, end_offset):
+ end = start + 1
+
+ if start == 0:
+ yield None, to_bytes(end)
+ elif end == 1 << numbits:
+ yield to_bytes(start), None
+ else:
+ yield to_bytes(start), to_bytes(end)
+
+
+def integer_ranges(start, end, block_size=1000):
+ """Compute range of integers between [start, end].
+
+ """
+ for start in range(start, end, block_size):
+ if start == 0:
+ yield None, block_size
+ elif start + block_size > end:
+ yield start, end
+ else:
+ yield start, start + block_size
+
+
+RANGE_GENERATORS = {
+ 'content': lambda start, end: byte_ranges(24, start, end),
+ 'skipped_content': lambda start, end: [(None, None)],
+ 'directory': lambda start, end: byte_ranges(24, start, end),
+ 'revision': lambda start, end: byte_ranges(24, start, end),
+ 'release': lambda start, end: byte_ranges(16, start, end),
+ 'snapshot': lambda start, end: byte_ranges(16, start, end),
+ 'origin': integer_ranges,
+ 'origin_visit': integer_ranges,
+}
+
+
+def compute_query(obj_type, start, end):
+ """Compute the query for object_type between start and end range.
+
+ """
+ columns = COLUMNS.get(obj_type)
+ join_specs = JOINS.get(obj_type, [])
+ join_clause = '\n'.join('left join %s' % clause for clause in join_specs)
+
+ where = []
+ where_args = []
+ if start:
+ where.append('%(keys)s >= %%s')
+ where_args.append(start)
+ if end:
+ where.append('%(keys)s < %%s')
+ where_args.append(end)
+
+ where_clause = ''
+ if where:
+ where_clause = ('where ' + ' and '.join(where)) % {
+ 'keys': '(%s)' % PARTITION_KEY[obj_type]
+ }
+
+ column_specs = []
+ column_aliases = []
+ for column in columns:
+ if isinstance(column, str):
+ column_specs.append(column)
+ column_aliases.append(column)
+ else:
+ column_specs.append('%s as %s' % column)
+ column_aliases.append(column[1])
+
+ query = '''
+select %(columns)s
+from %(table)s
+%(join)s
+%(where)s
+ ''' % {
+ 'columns': ','.join(column_specs),
+ 'table': obj_type,
+ 'join': join_clause,
+ 'where': where_clause,
+ }
+
+ return query, where_args, column_aliases
+
+
+def fetch(db, obj_type, start, end):
+ """Fetch all obj_type's identifiers from db.
+
+ This opens one connection, stream objects and when done, close
+ the connection.
+
+ Args:
+ db (BaseDb): Db connection object
+ obj_type (str): Object type
+ start (Union[bytes|Tuple]): Range start identifier
+ end (Union[bytes|Tuple]): Range end identifier
+
+ Raises:
+ ValueError if obj_type is not supported
+
+ Yields:
+ Objects in the given range
+
+ """
+ query, where_args, column_aliases = compute_query(obj_type, start, end)
+ converter = CONVERTERS.get(obj_type)
+ with db.cursor() as cursor:
+ logger.debug('Fetching data for table %s', obj_type)
+ logger.debug('query: %s %s', query, where_args)
+ cursor.execute(query, where_args)
+ for row in cursor:
+ record = dict(zip(column_aliases, row))
+ if converter:
+ record = converter(db, record)
+
+ logger.debug('record: %s', record)
+ yield record
+
+
+MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id']
+
+
+class JournalBackfiller:
+ """Read the storage's objects and send to the journal's topics.
+
+ This is designed to be run periodically.
+
+ """
+ def __init__(self, config=None):
+ self.config = config
+ self.check_config(config)
+
+ def check_config(self, config):
+ missing_keys = []
+ for key in MANDATORY_KEYS:
+ if not config.get(key):
+ missing_keys.append(key)
+
+ if missing_keys:
+ raise ValueError(
+ 'Configuration error: The following keys must be'
+ ' provided: %s' % (','.join(missing_keys), ))
+
+ def parse_arguments(self, object_type, start_object, end_object):
+ """Parse arguments
+
+ Raises:
+ ValueError for unsupported object type
+ ValueError if object ids are not parseable
+
+ Returns:
+ Parsed start and end object ids
+
+ """
+ if object_type not in COLUMNS:
+ raise ValueError('Object type %s is not supported. '
+ 'The only possible values are %s' % (
+ object_type, ', '.join(COLUMNS.keys())))
+
+ if object_type in ['origin', 'origin_visit']:
+ if start_object:
+ start_object = int(start_object)
+ else:
+ start_object = 0
+ if end_object:
+ end_object = int(end_object)
+ else:
+ end_object = 100 * 1000 * 1000 # hard-coded limit
+
+ return start_object, end_object
+
+ def run(self, object_type, start_object, end_object, dry_run=False):
+ """Read storage's subscribed object types and send them to the
+ journal's reading topic.
+
+ """
+ start_object, end_object = self.parse_arguments(
+ object_type, start_object, end_object)
+
+ db = BaseDb.connect(self.config['storage_dbconn'])
+ writer = DirectKafkaWriter(
+ brokers=self.config['brokers'],
+ prefix=self.config['final_prefix'],
+ client_id=self.config['client_id']
+ )
+ for range_start, range_end in RANGE_GENERATORS[object_type](
+ start_object, end_object):
+ logger.info('Processing %s range %s to %s', object_type,
+ range_start, range_end)
+
+ data_sent = False
+ for obj in fetch(
+ db, object_type, start=range_start, end=range_end,
+ ):
+ if dry_run:
+ continue
+ data_sent = True
+ writer.write_addition(object_type=object_type,
+ object_=obj)
+
+ if data_sent:
+ writer.producer.flush()
+
+
+if __name__ == '__main__':
+ print('Please use the "swh-journal backfiller" command')
diff --git a/swh/journal/checker.py b/swh/journal/checker.py
deleted file mode 100644
--- a/swh/journal/checker.py
+++ /dev/null
@@ -1,137 +0,0 @@
-# Copyright (C) 2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-"""Module defining journal checker classes.
-
-Those checker goal is to send back all, or missing objects from the
-journal queues.
-
-At the moment, a first naive implementation is the
-SimpleCheckerProducer. It simply reads the objects from the
-storage and sends every object identifier back to the journal.
-
-"""
-
-import psycopg2
-
-from kafka import KafkaProducer
-
-from swh.core.config import SWHConfig
-from .serializers import key_to_kafka
-
-
-TYPE_TO_PRIMARY_KEY = {
- 'origin': ['id'],
- 'content': ['sha1', 'sha1_git', 'sha256'],
- 'directory': ['id'],
- 'revision': ['id'],
- 'release': ['id'],
- 'origin_visit': ['origin', 'visit'],
- 'skipped_content': ['sha1', 'sha1_git', 'sha256'],
-}
-
-
-def entry_to_bytes(entry):
- """Convert an entry coming from the database to bytes"""
- if isinstance(entry, memoryview):
- return entry.tobytes()
- if isinstance(entry, tuple):
- return [entry_to_bytes(value) for value in entry]
- return entry
-
-
-def fetch(db_conn, obj_type):
- """Fetch all obj_type's identifiers from db.
-
- This opens one connection, stream objects and when done, close
- the connection.
-
- Raises:
- ValueError if obj_type is not supported
-
- Yields:
- Identifiers for the specific object_type
-
- """
- primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type)
- if not primary_key:
- raise ValueError('The object type %s is not supported. '
- 'Only possible values are %s' % (
- obj_type, TYPE_TO_PRIMARY_KEY.keys()))
-
- primary_key_str = ','.join(primary_key)
- query = 'select %s from %s order by %s' % (
- primary_key_str, obj_type, primary_key_str)
- server_side_cursor_name = 'swh.journal.%s' % obj_type
-
- with psycopg2.connect(db_conn) as db:
- cursor = db.cursor(name=server_side_cursor_name)
- cursor.execute(query)
- for o in cursor:
- yield dict(zip(primary_key, entry_to_bytes(o)))
-
-
-class SimpleCheckerProducer(SWHConfig):
- """Class in charge of reading the storage's objects and sends those
- back to the publisher queue.
-
- This is designed to be run periodically.
-
- """
- DEFAULT_CONFIG = {
- 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
- 'temporary_prefix': ('str', 'swh.tmp_journal.new'),
- 'publisher_id': ('str', 'swh.journal.publisher.test'),
- 'object_types': ('list[str]', ['content', 'revision', 'release']),
- 'storage_dbconn': ('str', 'service=swh-dev'),
- }
-
- CONFIG_BASE_FILENAME = 'journal/checker'
-
- def __init__(self, extra_configuration=None):
- self.config = config = self.parse_config_file()
- if extra_configuration:
- config.update(extra_configuration)
-
- self.object_types = self.config['object_types']
- for obj_type in self.object_types:
- if obj_type not in TYPE_TO_PRIMARY_KEY:
- raise ValueError('The object type %s is not supported. '
- 'Possible values are %s' % (
- obj_type,
- ', '.join(TYPE_TO_PRIMARY_KEY)))
-
- self.storage_dbconn = self.config['storage_dbconn']
-
- self.producer = KafkaProducer(
- bootstrap_servers=config['brokers'],
- value_serializer=key_to_kafka,
- client_id=config['publisher_id'],
- )
-
- def _read_storage(self):
- """Read storage's objects and generates tuple as object_type, dict of
- object.
-
- Yields:
- tuple of object_type, object as dict
-
- """
- for obj_type in self.object_types:
- for obj in fetch(self.storage_dbconn, obj_type):
- yield obj_type, obj
-
- def run(self):
- """Reads storage's subscribed object types and send them to the
- publisher's reading queue.
-
- """
- for obj_type, obj in self._read_storage():
- topic = '%s.%s' % (self.config['temporary_prefix'], obj_type)
- self.producer.send(topic, value=obj)
-
-
-if __name__ == '__main__':
- SimpleCheckerProducer().run()
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
--- a/swh/journal/cli.py
+++ b/swh/journal/cli.py
@@ -11,6 +11,7 @@
from swh.storage import get_storage
from swh.journal.replay import StorageReplayer
+from swh.journal.backfill import JournalBackfiller
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@@ -80,6 +81,27 @@
print('Done.')
+@cli.command()
+@click.argument('object_type')
+@click.option('--start-object', default=None)
+@click.option('--end-object', default=None)
+@click.option('--dry-run', is_flag=True, default=False)
+@click.pass_context
+def backfiller(ctx, object_type, start_object, end_object, dry_run):
+ """Manipulate backfiller
+
+ """
+ conf = ctx.obj['config']
+ backfiller = JournalBackfiller(conf)
+ try:
+ backfiller.run(
+ object_type=object_type,
+ start_object=start_object, end_object=end_object,
+ dry_run=dry_run)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+
+
def main():
return cli(auto_envvar_prefix='SWH_JOURNAL')
diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py
--- a/swh/journal/direct_writer.py
+++ b/swh/journal/direct_writer.py
@@ -7,6 +7,8 @@
from kafka import KafkaProducer
+from swh.model.hashutil import DEFAULT_ALGORITHMS
+
from .serializers import key_to_kafka, value_to_kafka
logger = logging.getLogger(__name__)
@@ -34,6 +36,11 @@
return object_['id']
elif object_type == 'content':
return object_['sha1'] # TODO: use a dict of hashes
+ elif object_type == 'skipped_content':
+ return {
+ hash: object_[hash]
+ for hash in DEFAULT_ALGORITHMS
+ }
elif object_type == 'origin':
return {'url': object_['url'], 'type': object_['type']}
elif object_type == 'origin_visit':
@@ -46,7 +53,6 @@
def _sanitize_object(self, object_type, object_):
if object_type == 'origin_visit':
- # Compatibility with the publisher's format
return {
**object_,
'date': str(object_['date']),
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -165,8 +165,7 @@
TEST_CONFIG = {
'temporary_prefix': 'swh.tmp_journal.new',
'final_prefix': 'swh.journal.objects',
- 'consumer_id': 'swh.journal.publisher',
- 'publisher_id': 'swh.journal.publisher',
+ 'consumer_id': 'swh.journal.consumer',
'object_types': OBJECT_TYPE_KEYS.keys(),
'max_messages': 1, # will read 1 message and stops
'storage': {'cls': 'memory', 'args': {}},
@@ -176,7 +175,7 @@
@pytest.fixture
def test_config(kafka_server: Tuple[Popen, int],
kafka_prefix: str):
- """Test configuration needed for publisher/producer/consumer
+ """Test configuration needed for producer/consumer
"""
_, port = kafka_server
diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/tests/test_backfill.py
@@ -0,0 +1,123 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.journal.backfill import (
+ JournalBackfiller, compute_query, PARTITION_KEY
+)
+
+
+TEST_CONFIG = {
+ 'brokers': ['localhost'],
+ 'final_prefix': 'swh.tmp_journal.new',
+ 'client_id': 'swh.journal.client.test',
+ 'storage_dbconn': 'service=swh-dev',
+}
+
+
+def test_config_ko_missing_mandatory_key():
+ """Missing configuration key will make the initialization fail
+
+ """
+ for key in TEST_CONFIG.keys():
+ config = TEST_CONFIG.copy()
+ config.pop(key)
+
+ with pytest.raises(ValueError) as e:
+ JournalBackfiller(config)
+
+ error = ('Configuration error: The following keys must be'
+ ' provided: %s' % (','.join([key]), ))
+ assert e.value.args[0] == error
+
+
+def test_config_ko_unknown_object_type():
+ """Parse arguments will fail if the object type is unknown
+
+ """
+ backfiller = JournalBackfiller(TEST_CONFIG)
+ with pytest.raises(ValueError) as e:
+ backfiller.parse_arguments('unknown-object-type', 1, 2)
+
+ error = ('Object type unknown-object-type is not supported. '
+ 'The only possible values are %s' % (
+ ', '.join(PARTITION_KEY)))
+ assert e.value.args[0] == error
+
+
+def test_compute_query_content():
+ query, where_args, column_aliases = compute_query(
+ 'content', '\x000000', '\x000001')
+
+ assert where_args == ['\x000000', '\x000001']
+
+ assert column_aliases == [
+ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status',
+ 'ctime'
+ ]
+
+ assert query == '''
+select sha1,sha1_git,sha256,blake2s256,length,status,ctime
+from content
+
+where (sha1) >= %s and (sha1) < %s
+ '''
+
+
+def test_compute_query_skipped_content():
+ query, where_args, column_aliases = compute_query(
+ 'skipped_content', None, None)
+
+ assert where_args == []
+
+ assert column_aliases == [
+ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime',
+ 'status', 'reason',
+ ]
+
+ assert query == '''
+select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason
+from skipped_content
+
+
+ '''
+
+
+def test_compute_query_origin_visit():
+ query, where_args, column_aliases = compute_query(
+ 'origin_visit', 1, 10)
+
+ assert where_args == [1, 10]
+
+ assert column_aliases == [
+ 'type', 'url', 'date', 'snapshot', 'status'
+ ]
+
+ assert query == '''
+select type,url,date,snapshot,status
+from origin_visit
+left join origin on origin_visit.origin=origin.id
+where (origin_visit.origin) >= %s and (origin_visit.origin) < %s
+ '''
+
+
+def test_compute_query_release():
+ query, where_args, column_aliases = compute_query(
+ 'release', '\x000002', '\x000003')
+
+ assert where_args == ['\x000002', '\x000003']
+
+ assert column_aliases == [
+ 'id', 'date', 'date_offset', 'comment', 'name', 'synthetic',
+ 'date_neg_utc_offset', 'target', 'target_type', 'author_id',
+ 'author_name', 'author_email', 'author_fullname']
+
+ assert query == '''
+select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname
+from release
+left join person a on release.author=a.id
+where (release.id) >= %s and (release.id) < %s
+ ''' # noqa
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 17 2024, 2:25 AM (13 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232011
Attached To
D1345: swh.journal: Add backfiller implementation
Event Timeline
Log In to Comment