Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/backfill.py
- This file was added.
# Copyright (C) 2017-2019 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
"""The backfiller's goal is to produce part or all of the swh objects | |||||
from the storage to the journal. | |||||
anlambert: I do not really understand what the journal backfiller is supposed to do by reading that first… | |||||
At the moment, a first implementation is the JournalBackfiller. It | |||||
reads the objects from the storage and sends every inflated object to | |||||
the journal. | |||||
""" | |||||
import logging | |||||
from .direct_writer import DirectKafkaWriter | |||||
from swh.core.db import BaseDb | |||||
from swh.storage.converters import db_to_release, db_to_revision | |||||
logger = logging.getLogger(__name__) | |||||
PARTITION_KEY = { | |||||
'content': 'sha1', | |||||
'skipped_content': None, # unused | |||||
'directory': 'id', | |||||
'revision': 'revision.id', | |||||
'release': 'release.id', | |||||
'snapshot': 'id', | |||||
'origin': 'id', | |||||
'origin_visit': 'origin_visit.origin', | |||||
} | |||||
COLUMNS = { | |||||
'content': [ | |||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', | |||||
'ctime' | |||||
], | |||||
'skipped_content': [ | |||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', | |||||
'status', 'reason', | |||||
], | |||||
'directory': ['id', 'dir_entries', 'file_entries', 'rev_entries'], | |||||
'revision': [ | |||||
("revision.id", "id"), | |||||
"date", | |||||
"date_offset", | |||||
Not Done Inline ActionsI think it should simply be "id" here. The defined alias is the same as the column name. anlambert: I think it should simply be `"id"` here. The defined alias is the same as the column name. | |||||
Done Inline ActionsWe used those to disambiguate the sql query we generate (when a join is implied). For example, release, revision joins on person which also has columns name and id. So we need those. ardumont: We used those to disambiguate the sql query we generate (when a join is implied).
For example… | |||||
Not Done Inline ActionsOh I see, that was not straightforward to understand. anlambert: Oh I see, that was not straightforward to understand. | |||||
Done Inline ActionsIndeed! ardumont: Indeed! | |||||
"committer_date", | |||||
"committer_date_offset", | |||||
"type", | |||||
"directory", | |||||
"message", | |||||
"synthetic", | |||||
"metadata", | |||||
"date_neg_utc_offset", | |||||
"committer_date_neg_utc_offset", | |||||
("array(select parent_id::bytea from revision_history rh " | |||||
"where rh.id = revision.id order by rh.parent_rank asc)", | |||||
"parents"), | |||||
("a.id", "author_id"), | |||||
("a.name", "author_name"), | |||||
("a.email", "author_email"), | |||||
("a.fullname", "author_fullname"), | |||||
("c.id", "committer_id"), | |||||
("c.name", "committer_name"), | |||||
("c.email", "committer_email"), | |||||
("c.fullname", "committer_fullname"), | |||||
], | |||||
'release': [ | |||||
("release.id", "id"), | |||||
"date", | |||||
"date_offset", | |||||
Not Done Inline Actionssame here anlambert: same here | |||||
"comment", | |||||
("release.name", "name"), | |||||
"synthetic", | |||||
"date_neg_utc_offset", | |||||
Not Done Inline Actionssame here anlambert: same here | |||||
"target", | |||||
"target_type", | |||||
("a.id", "author_id"), | |||||
("a.name", "author_name"), | |||||
("a.email", "author_email"), | |||||
("a.fullname", "author_fullname"), | |||||
], | |||||
'snapshot': ['id', 'object_id'], | |||||
'origin': ['type', 'url'], | |||||
'origin_visit': ['type', 'url', 'date', 'snapshot', 'status'], | |||||
} | |||||
JOINS = { | |||||
'release': ['person a on release.author=a.id'], | |||||
'revision': ['person a on revision.author=a.id', | |||||
'person c on revision.committer=c.id'], | |||||
'origin_visit': ['origin on origin_visit.origin=origin.id'], | |||||
} | |||||
def directory_converter(db, directory): | |||||
"""Convert directory from the flat representation to swh model | |||||
compatible objects. | |||||
""" | |||||
columns = ['target', 'name', 'perms'] | |||||
query_template = ''' | |||||
select %(columns)s | |||||
from directory_entry_%(type)s | |||||
where id in %%s | |||||
''' | |||||
types = ['file', 'dir', 'rev'] | |||||
entries = [] | |||||
with db.cursor() as cur: | |||||
for type in types: | |||||
ids = directory.pop('%s_entries' % type) | |||||
if not ids: | |||||
continue | |||||
query = query_template % { | |||||
'columns': ','.join(columns), | |||||
'type': type, | |||||
} | |||||
cur.execute(query, (tuple(ids), )) | |||||
for row in cur: | |||||
entry = dict(zip(columns, row)) | |||||
entry['type'] = type | |||||
entries.append(entry) | |||||
directory['entries'] = entries | |||||
return directory | |||||
def revision_converter(db, revision): | |||||
"""Convert revision from the flat representation to swh model | |||||
compatible objects. | |||||
""" | |||||
revision = db_to_revision(revision) | |||||
if 'author' in revision and revision['author']: | |||||
del revision['author']['id'] | |||||
if 'committer' in revision and revision['committer']: | |||||
del revision['committer']['id'] | |||||
return revision | |||||
def release_converter(db, release): | |||||
"""Convert release from the flat representation to swh model | |||||
compatible objects. | |||||
""" | |||||
release = db_to_release(release) | |||||
if 'author' in release and release['author']: | |||||
del release['author']['id'] | |||||
return release | |||||
def snapshot_converter(db, snapshot): | |||||
"""Convert snapshot from the flat representation to swh model | |||||
compatible objects. | |||||
""" | |||||
columns = ['name', 'target', 'target_type'] | |||||
query = ''' | |||||
select %s | |||||
from snapshot_branches sbs | |||||
inner join snapshot_branch sb on sb.object_id=sbs.branch_id | |||||
where sbs.snapshot_id=%%s | |||||
''' % ', '.join(columns) | |||||
with db.cursor() as cur: | |||||
cur.execute(query, (snapshot.pop('object_id'), )) | |||||
branches = {} | |||||
for name, *row in cur: | |||||
branch = dict(zip(columns[1:], row)) | |||||
if not branch['target'] and not branch['target_type']: | |||||
branch = None | |||||
branches[name] = branch | |||||
snapshot['branches'] = branches | |||||
return snapshot | |||||
def origin_visit_converter(db, origin_visit): | |||||
origin = { | |||||
'type': origin_visit.pop('type'), | |||||
'url': origin_visit.pop('url'), | |||||
} | |||||
origin_visit['origin'] = origin | |||||
return origin_visit | |||||
CONVERTERS = { | |||||
'directory': directory_converter, | |||||
'revision': revision_converter, | |||||
'release': release_converter, | |||||
'snapshot': snapshot_converter, | |||||
'origin_visit': origin_visit_converter, | |||||
} | |||||
def _compute_shift_bits(numbits): | |||||
"""Compute length and shift bits from numbits | |||||
""" | |||||
q, r = divmod(numbits, 8) | |||||
length = q + (r != 0) | |||||
shift_bits = 8 - r if r else 0 | |||||
return length, shift_bits | |||||
def object_to_offset(object_id, numbits): | |||||
"""Compute the index of the range containing object id, when dividing | |||||
space into 2^numbits. | |||||
Args: | |||||
object_id (str): The hex representation of object_id | |||||
numbits (int): Number of bits in which we divide input space | |||||
Done Inline ActionsThis could be moved in a dedicated _compute_shift_bits function as it is also used in the byte_ranges method below. anlambert: This could be moved in a dedicated `_compute_shift_bits` function as it is also used in the… | |||||
Done Inline ActionsIndeed! ardumont: Indeed!
I will adapt. | |||||
Returns: | |||||
The index of the range containing object id | |||||
""" | |||||
length, shift_bits = _compute_shift_bits(numbits) | |||||
truncated_id = object_id[:length * 2] | |||||
if len(truncated_id) < length * 2: | |||||
truncated_id += '0' * (length * 2 - len(truncated_id)) | |||||
truncated_id_bytes = bytes.fromhex(truncated_id) | |||||
return int.from_bytes(truncated_id_bytes, byteorder='big') >> shift_bits | |||||
def byte_ranges(numbits, start_object=None, end_object=None): | |||||
"""Generate start/end pairs of bytes spanning numbits bits and | |||||
constrained by optional start_object and end_object. | |||||
Args: | |||||
numbits (int): Number of bits in which we divide input space | |||||
start_object (str): Hex object id contained in the first range | |||||
returned | |||||
end_object (str): Hex object id contained in the last range | |||||
returned | |||||
Yields: | |||||
2^numbits pairs of bytes | |||||
""" | |||||
length, shift_bits = _compute_shift_bits(numbits) | |||||
def to_bytes(i): | |||||
return int.to_bytes(i << shift_bits, length=length, byteorder='big') | |||||
start_offset = 0 | |||||
end_offset = 1 << numbits | |||||
if start_object is not None: | |||||
start_offset = object_to_offset(start_object, numbits) | |||||
if end_object is not None: | |||||
end_offset = object_to_offset(end_object, numbits) + 1 | |||||
for start in range(start_offset, end_offset): | |||||
end = start + 1 | |||||
if start == 0: | |||||
yield None, to_bytes(end) | |||||
elif end == 1 << numbits: | |||||
yield to_bytes(start), None | |||||
else: | |||||
yield to_bytes(start), to_bytes(end) | |||||
def integer_ranges(start, end, block_size=1000): | |||||
"""Compute range of integers between [start, end]. | |||||
""" | |||||
for start in range(start, end, block_size): | |||||
if start == 0: | |||||
yield None, block_size | |||||
elif start + block_size > end: | |||||
yield start, end | |||||
else: | |||||
yield start, start + block_size | |||||
RANGE_GENERATORS = { | |||||
'content': lambda start, end: byte_ranges(24, start, end), | |||||
'skipped_content': lambda start, end: [(None, None)], | |||||
'directory': lambda start, end: byte_ranges(24, start, end), | |||||
'revision': lambda start, end: byte_ranges(24, start, end), | |||||
'release': lambda start, end: byte_ranges(16, start, end), | |||||
'snapshot': lambda start, end: byte_ranges(16, start, end), | |||||
'origin': integer_ranges, | |||||
'origin_visit': integer_ranges, | |||||
} | |||||
def compute_query(obj_type, start, end): | |||||
"""Compute the query for object_type between start and end range. | |||||
""" | |||||
columns = COLUMNS.get(obj_type) | |||||
join_specs = JOINS.get(obj_type, []) | |||||
join_clause = '\n'.join('left join %s' % clause for clause in join_specs) | |||||
where = [] | |||||
where_args = [] | |||||
if start: | |||||
where.append('%(keys)s >= %%s') | |||||
where_args.append(start) | |||||
if end: | |||||
where.append('%(keys)s < %%s') | |||||
where_args.append(end) | |||||
where_clause = '' | |||||
if where: | |||||
where_clause = ('where ' + ' and '.join(where)) % { | |||||
'keys': '(%s)' % PARTITION_KEY[obj_type] | |||||
} | |||||
column_specs = [] | |||||
column_aliases = [] | |||||
for column in columns: | |||||
if isinstance(column, str): | |||||
column_specs.append(column) | |||||
column_aliases.append(column) | |||||
else: | |||||
column_specs.append('%s as %s' % column) | |||||
column_aliases.append(column[1]) | |||||
query = ''' | |||||
select %(columns)s | |||||
from %(table)s | |||||
%(join)s | |||||
%(where)s | |||||
''' % { | |||||
'columns': ','.join(column_specs), | |||||
'table': obj_type, | |||||
'join': join_clause, | |||||
'where': where_clause, | |||||
} | |||||
return query, where_args, column_aliases | |||||
Done Inline Actions@faux here :) ardumont: @faux here :) | |||||
def fetch(db, obj_type, start, end): | |||||
"""Fetch all obj_type's identifiers from db. | |||||
This opens one connection, stream objects and when done, close | |||||
the connection. | |||||
Args: | |||||
db (BaseDb): Db connection object | |||||
obj_type (str): Object type | |||||
start (Union[bytes|Tuple]): Range start identifier | |||||
end (Union[bytes|Tuple]): Range end identifier | |||||
Raises: | |||||
ValueError if obj_type is not supported | |||||
Yields: | |||||
Objects in the given range | |||||
""" | |||||
query, where_args, column_aliases = compute_query(obj_type, start, end) | |||||
converter = CONVERTERS.get(obj_type) | |||||
with db.cursor() as cursor: | |||||
logger.debug('Fetching data for table %s', obj_type) | |||||
logger.debug('query: %s %s', query, where_args) | |||||
cursor.execute(query, where_args) | |||||
for row in cursor: | |||||
record = dict(zip(column_aliases, row)) | |||||
if converter: | |||||
record = converter(db, record) | |||||
logger.debug('record: %s' % record) | |||||
yield record | |||||
MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id'] | |||||
class JournalBackfiller: | |||||
"""Read the storage's objects and send to the journal's topics. | |||||
This is designed to be run periodically. | |||||
""" | |||||
def __init__(self, config=None): | |||||
self.config = config | |||||
self.check_config(config) | |||||
def check_config(self, config): | |||||
missing_keys = [] | |||||
for key in MANDATORY_KEYS: | |||||
if not config.get(key): | |||||
missing_keys.append(key) | |||||
if missing_keys: | |||||
raise ValueError( | |||||
'Configuration error: The following keys must be' | |||||
' provided: %s' % (','.join(missing_keys), )) | |||||
def parse_arguments(self, object_type, start_object, end_object): | |||||
"""Parse arguments | |||||
Raises: | |||||
ValueError for unsupported object type | |||||
ValueError if object ids are not parseable | |||||
Returns: | |||||
Parsed start and end object ids | |||||
""" | |||||
if object_type not in COLUMNS: | |||||
raise ValueError('Object type %s is not supported. ' | |||||
'The only possible values are %s' % ( | |||||
object_type, ', '.join(COLUMNS.keys()))) | |||||
if object_type in ['origin', 'origin_visit']: | |||||
if start_object: | |||||
start_object = int(start_object) | |||||
else: | |||||
start_object = 0 | |||||
if end_object: | |||||
end_object = int(end_object) | |||||
Not Done Inline Actionss/Reads/Read as imperative form is used elsewhere anlambert: s/Reads/Read as imperative form is used elsewhere | |||||
else: | |||||
end_object = 100 * 1000 * 1000 # hard-coded limit | |||||
return start_object, end_object | |||||
def run(self, object_type, start_object, end_object, dry_run=False): | |||||
"""Read storage's subscribed object types and send them to the | |||||
journal's reading topic. | |||||
""" | |||||
start_object, end_object = self.parse_arguments( | |||||
object_type, start_object, end_object) | |||||
db = BaseDb.connect(self.config['storage_dbconn']) | |||||
writer = DirectKafkaWriter( | |||||
brokers=self.config['brokers'], | |||||
prefix=self.config['final_prefix'], | |||||
client_id=self.config['client_id'] | |||||
) | |||||
for range_start, range_end in RANGE_GENERATORS[object_type]( | |||||
start_object, end_object): | |||||
logger.info('Processing %s range %s to %s', object_type, | |||||
range_start, range_end) | |||||
data_sent = False | |||||
for obj in fetch( | |||||
db, object_type, start=range_start, end=range_end, | |||||
): | |||||
if dry_run: | |||||
continue | |||||
data_sent = True | |||||
writer.write_addition(object_type=object_type, | |||||
object_=obj) | |||||
if data_sent: | |||||
writer.producer.flush() | |||||
if __name__ == '__main__': | |||||
print('Please use the "swh-journal backfiller run" command') |
I do not really understand what the journal backfiller is supposed to do by reading that first sentence.