Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9340655
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
View Options
diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py
index 4242c1d..c36b6da 100644
--- a/swh/journal/backfill.py
+++ b/swh/journal/backfill.py
@@ -1,403 +1,406 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module defining journal backfiller classes.
Those backfiller goal is to produce back part or all of the objects
from the storage to the journal topics
At the moment, a first naive implementation is the
JournalBackfiller. It simply reads the objects from the
storage and sends every object identifier back to the journal.
"""
import logging
import psycopg2
from .direct_writer import DirectKafkaWriter
from swh.core.db import typecast_bytea
from swh.storage.converters import db_to_release, db_to_revision
logger = logging.getLogger(__name__)
PARTITION_KEY = {
'content': ['sha1'],
'skipped_content': None, # unused
# 'directory': ['id'],
'revision': ['revision.id'],
'release': ['release.id'],
# 'snapshot': ['id'],
'origin': ['id'],
'origin_visit': ['origin_visit.origin'],
}
COLUMNS = {
'content': [
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status',
'ctime'
],
'skipped_content': [
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime',
'status', 'reason',
],
# 'directory': ['id'],
'revision': [
("revision.id", "id"),
"date",
"date_offset",
"committer_date",
"committer_date_offset",
"type",
"directory",
"message",
"synthetic",
"metadata",
"date_neg_utc_offset",
"committer_date_neg_utc_offset",
("array(select parent_id::bytea from revision_history rh "
"where rh.id = revision.id order by rh.parent_rank asc)",
"parents"),
("a.id", "author_id"),
("a.name", "author_name"),
("a.email", "author_email"),
("a.fullname", "author_fullname"),
("c.id", "committer_id"),
("c.name", "committer_name"),
("c.email", "committer_email"),
("c.fullname", "committer_fullname"),
],
'release': [
("release.id", "id"),
"date",
"date_offset",
"comment",
("release.name", "name"),
"synthetic",
"date_neg_utc_offset",
"target",
"target_type",
("a.id", "author_id"),
("a.name", "author_name"),
("a.email", "author_email"),
("a.fullname", "author_fullname"),
],
# 'snapshot': ['id'],
'origin': ['type', 'url'],
'origin_visit': ['type', 'url', 'date', 'snapshot', 'status'],
}
JOINS = {
'release': ['person a on release.author=a.id'],
'revision': ['person a on revision.author=a.id',
'person c on revision.committer=c.id'],
'origin_visit': ['origin on origin_visit.origin=origin.id'],
}
def release_converter(release):
"""Convert release from the flat representation to swh model
compatible objects.
"""
release = db_to_release(release)
if 'author' in release and release['author']:
del release['author']['id']
return release
def revision_converter(revision):
"""Convert revision from the flat representation to swh model
compatible objects.
"""
revision = db_to_revision(revision)
if 'author' in revision and revision['author']:
del revision['author']['id']
if 'committer' in revision and revision['committer']:
del revision['committer']['id']
return revision
CONVERTERS = {
'release': release_converter,
'revision': revision_converter,
}
def object_to_offset(object_id, numbits):
"""Compute the index of the range containing object id, when dividing
space into 2^numbits.
Args:
object_id (str): The hex representation of object_id
numbits (int): Number of bits in which we divide input space
Returns:
The index of the range containing object id
"""
q, r = divmod(numbits, 8)
length = q + (r != 0)
shift_bits = 8 - r if r else 0
truncated_id = object_id[:length * 2]
if len(truncated_id) < length * 2:
truncated_id += '0' * (length * 2 - len(truncated_id))
truncated_id_bytes = bytes.fromhex(truncated_id)
return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits
def byte_ranges(numbits, start_object=None, end_object=None):
"""Generate start/end pairs of bytes spanning numbits bits and
constrained by optional start_object and end_object.
Args:
numbits (int): Number of bits in which we divide input space
start_object (str): Hex object id contained in the first range
returned
end_object (str): Hex object id contained in the last range
returned
Yields:
2^numbits pairs of bytes
"""
q, r = divmod(numbits, 8)
length = q + (r != 0)
shift_bits = 8 - r if r else 0
def to_bytes(i):
return int.to_bytes(i << shift_bits, length=length, byteorder='big')
start_offset = 0
end_offset = 1 << numbits
if start_object is not None:
start_offset = object_to_offset(start_object, numbits)
if end_object is not None:
end_offset = object_to_offset(end_object, numbits) + 1
for start in range(start_offset, end_offset):
end = start + 1
if start == 0:
yield None, to_bytes(end)
elif end == 1 << numbits:
yield to_bytes(start), None
else:
yield to_bytes(start), to_bytes(end)
def integer_ranges(start, end, block_size=1000):
for start in range(start, end, block_size):
if start == 0:
yield None, block_size
elif start + block_size > end:
yield start, end
else:
yield start, start + block_size
RANGE_GENERATORS = {
'content': lambda start, end: byte_ranges(24, start, end),
'skipped_content': lambda start, end: [(None, None)],
'directory': lambda start, end: byte_ranges(24, start, end),
'revision': lambda start, end: byte_ranges(24, start, end),
'release': lambda start, end: byte_ranges(16, start, end),
'snapshot': lambda start, end: byte_ranges(16, start, end),
'origin': integer_ranges,
'origin_visit': integer_ranges,
}
def cursor_setup(conn, server_side_cursor_name):
"""Setup cursor to return dict of data"""
# cur = conn.cursor(name=server_side_cursor_name)
cur = conn.cursor()
cur.execute("SELECT null::bytea, null::bytea[]")
bytea_oid = cur.description[0][1]
bytea_array_oid = cur.description[1][1]
t_bytes = psycopg2.extensions.new_type(
(bytea_oid,), "bytea", typecast_bytea)
psycopg2.extensions.register_type(t_bytes, conn)
t_bytes_array = psycopg2.extensions.new_array_type(
(bytea_array_oid,), "bytea[]", t_bytes)
psycopg2.extensions.register_type(t_bytes_array, conn)
return cur
-def fetch(db_conn, obj_type, start, end):
- """Fetch all obj_type's identifiers from db.
-
- This opens one connection, stream objects and when done, close
- the connection.
-
- Args:
- conn: Db connection object
- obj_type (str): Object type
- start (Union[bytes|Tuple]): Range start identifier
- end (Union[bytes|Tuple]): Range end identifier
-
- Raises:
- ValueError if obj_type is not supported
-
- Yields:
- Objects in the given range
-
- """
+def compute_query(obj_type, start, end):
columns = COLUMNS.get(obj_type)
join_specs = JOINS.get(obj_type, [])
join_clause = '\n'.join('left join %s' % clause for clause in join_specs)
where = []
where_args = []
if start:
where.append('%(keys)s >= %%s')
where_args.append(start)
if end:
where.append('%(keys)s < %%s')
where_args.append(end)
where_clause = ''
if where:
where_clause = ('where ' + ' and '.join(where)) % {
'keys': '(%s)' % ','.join(PARTITION_KEY[obj_type])
}
column_specs = []
column_aliases = []
for column in columns:
if isinstance(column, str):
column_specs.append(column)
column_aliases.append(column)
else:
column_specs.append('%s as %s' % column)
column_aliases.append(column[1])
query = '''
- select %(columns)s
- from %(table)s
- %(join)s
- %(where)s
+select %(columns)s
+from %(table)s
+%(join)s
+%(where)s
''' % {
'columns': ','.join(column_specs),
'table': obj_type,
'join': join_clause,
'where': where_clause,
}
+ return query, where_args, column_aliases
+
+
+def fetch(db_conn, obj_type, start, end):
+ """Fetch all obj_type's identifiers from db.
+
+ This opens one connection, stream objects and when done, close
+ the connection.
+
+ Args:
+ conn: Db connection object
+ obj_type (str): Object type
+ start (Union[bytes|Tuple]): Range start identifier
+ end (Union[bytes|Tuple]): Range end identifier
+
+ Raises:
+ ValueError if obj_type is not supported
+
+ Yields:
+ Objects in the given range
+
+ """
+ query, where_args, column_aliases = compute_query(obj_type, start, end)
+
converter = CONVERTERS.get(obj_type)
server_side_cursor_name = 'swh.journal.%s' % obj_type
with psycopg2.connect(db_conn) as conn:
cursor = cursor_setup(conn, server_side_cursor_name)
logger.debug('Fetching data for table %s', obj_type)
logger.debug('query: %s %s', query, where_args)
cursor.execute(query, where_args)
for row in cursor:
record = dict(zip(column_aliases, row))
if converter:
record = converter(record)
logger.debug('record: %s' % record)
yield record
-MANDATORY_KEYS = [
- 'brokers', 'object_types', 'storage_dbconn',
- 'final_prefix', 'client_id',
-]
+MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id']
class JournalBackfiller:
"""Class in charge of reading the storage's objects and sends those
back to the publisher queue.
This is designed to be run periodically.
"""
def __init__(self, config=None):
self.config = config
self.check_config(config)
self.storage_dbconn = self.config['storage_dbconn']
self.writer = DirectKafkaWriter(
brokers=config['brokers'],
prefix=config['final_prefix'],
client_id=config['client_id']
)
def check_config(self, config):
missing_keys = []
for key in MANDATORY_KEYS:
if not config.get(key):
missing_keys.append(key)
if missing_keys:
raise ValueError(
'Configuration error: The following keys must be'
' provided: %s' % (','.join(missing_keys), ))
def parse_arguments(self, object_type, start_object, end_object):
"""Parse arguments
Raises:
ValueError for unsupported object type
ValueError if object ids are not parseable
Returns:
Parsed start and end object ids
"""
if object_type not in COLUMNS:
raise ValueError('Object type %s is not supported. '
'The only possible values are %s' % (
object_type, ', '.join(COLUMNS.keys())))
if object_type in ['origin', 'origin_visit']:
if start_object:
start_object = int(start_object)
else:
start_object = 0
if end_object:
end_object = int(end_object)
else:
end_object = 100 * 1000 * 1000 # hard-coded limit
return start_object, end_object
def run(self, object_type, start_object, end_object, dry_run=False):
"""Reads storage's subscribed object types and send them to the
publisher's reading queue.
"""
start_object, end_object = self.parse_arguments(
object_type, start_object, end_object)
for range_start, range_end in RANGE_GENERATORS[object_type](
start_object, end_object):
logger.info('Processing %s range %s to %s', object_type,
range_start, range_end)
for obj in fetch(
self.storage_dbconn, object_type,
start=range_start, end=range_end,
):
if dry_run:
continue
self.writer.write_addition(object_type=object_type,
object_=obj)
if __name__ == '__main__':
print('Please use the "swh-journal backfiller run" command')
diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py
index 7b8ddee..460c4a6 100644
--- a/swh/journal/tests/test_backfill.py
+++ b/swh/journal/tests/test_backfill.py
@@ -1,42 +1,67 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
-from swh.journal.backfill import JournalBackfiller, TYPE_TO_PRIMARY_KEY
+from swh.journal.backfill import (
+ JournalBackfiller, compute_query
+)
TEST_CONFIG = {
'brokers': ['localhost'],
'final_prefix': 'swh.tmp_journal.new',
'client_id': 'swh.journal.publisher.test',
- 'object_types': ['content', 'revision', 'release'],
'storage_dbconn': 'service=swh-dev',
}
def test_config_ko_missing_mandatory_key():
+ """Missing configuration key will make the initialization fail
+
+ """
for key in TEST_CONFIG.keys():
config = TEST_CONFIG.copy()
config.pop(key)
with pytest.raises(ValueError) as e:
JournalBackfiller(config)
error = ('Configuration error: The following keys must be'
' provided: %s' % (','.join([key]), ))
assert e.value.args[0] == error
-def test_config_ko_unknown_object_type():
- wrong_config = TEST_CONFIG.copy()
- wrong_config['object_types'] = ['something-wrong']
- with pytest.raises(ValueError) as e:
- JournalBackfiller(wrong_config)
+# def test_config_ko_unknown_object_type():
+# """Parse arguments will fail if the object type is unknown
+
+# """
+# backfiller = JournalBackfiller(TEST_CONFIG)
+# with pytest.raises(ValueError) as e:
+# backfiller.parse_arguments('unknown-object-type', 1, 2)
+
+# error = ('Object type unknown-object-type is not supported. '
+# 'The only possible values are %s' % (
+# ', '.join(PARTITION_KEY)))
+# assert e.value.args[0] == error
+
+
+def test_compute_query_content():
+ query, where_args, column_aliases = compute_query(
+ 'content', '\x000000', '\x000001')
+
+ assert where_args == ['\x000000', '\x000001']
+
+ assert column_aliases == [
+ 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status',
+ 'ctime'
+ ]
+
+ assert query == '''
+select sha1,sha1_git,sha256,blake2s256,length,status,ctime
+from content
- error = ('The object type something-wrong is not supported. '
- 'Possible values are %s' % (
- ', '.join(TYPE_TO_PRIMARY_KEY)))
- assert e.value.args[0] == error
+where (sha1) >= %s and (sha1) < %s
+ '''
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 10:57 AM (4 w, 7 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3270853
Attached To
rDJNL Journal infrastructure
Event Timeline
Log In to Comment