Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/mypy.ini b/mypy.ini
index 2949be58..0e12c6de 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,39 +1,42 @@
[mypy]
namespace_packages = True
# due to the conditional import logic on swh.journal, in some cases a specific
# type: ignore is needed, in other it isn't...
warn_unused_ignores = False
# support for sqlalchemy magic: see https://github.com/dropbox/sqlalchemy-stubs
plugins = sqlmypy
# 3rd party libraries without stubs (yet)
[mypy-cassandra.*]
ignore_missing_imports = True
# only shipped indirectly via hypothesis
[mypy-django.*]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-psycopg2.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-systemd.daemon.*]
+ignore_missing_imports = True
+
[mypy-tenacity.*]
ignore_missing_imports = True
# temporary work-around for landing typing support in spite of the current
# journal<->storage dependency loop
[mypy-swh.journal.*]
ignore_missing_imports = True
[mypy-pytest_postgresql.*]
ignore_missing_imports = True
diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py
new file mode 100644
index 00000000..6993dd00
--- /dev/null
+++ b/swh/storage/backfill.py
@@ -0,0 +1,490 @@
+# Copyright (C) 2017-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Storage backfiller.
+
+The backfiller goal is to produce back part or all of the objects
+from a storage to the journal topics
+
+Current implementation consists in the JournalBackfiller class.
+
+It simply reads the objects from the storage and sends every object identifier back to
+the journal.
+
+"""
+
+import logging
+
+
+from swh.core.db import BaseDb
+from swh.journal.writer.kafka import KafkaJournalWriter
+from swh.storage.converters import db_to_release, db_to_revision
+
+
+logger = logging.getLogger(__name__)
+
+PARTITION_KEY = {
+ "content": "sha1",
+ "skipped_content": "sha1",
+ "directory": "id",
+ "revision": "revision.id",
+ "release": "release.id",
+ "snapshot": "id",
+ "origin": "id",
+ "origin_visit": "origin_visit.origin",
+}
+
+COLUMNS = {
+ "content": [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "status",
+ "ctime",
+ ],
+ "skipped_content": [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "ctime",
+ "status",
+ "reason",
+ ],
+ "directory": ["id", "dir_entries", "file_entries", "rev_entries"],
+ "revision": [
+ ("revision.id", "id"),
+ "date",
+ "date_offset",
+ "committer_date",
+ "committer_date_offset",
+ "type",
+ "directory",
+ "message",
+ "synthetic",
+ "metadata",
+ "date_neg_utc_offset",
+ "committer_date_neg_utc_offset",
+ (
+ "array(select parent_id::bytea from revision_history rh "
+ "where rh.id = revision.id order by rh.parent_rank asc)",
+ "parents",
+ ),
+ ("a.id", "author_id"),
+ ("a.name", "author_name"),
+ ("a.email", "author_email"),
+ ("a.fullname", "author_fullname"),
+ ("c.id", "committer_id"),
+ ("c.name", "committer_name"),
+ ("c.email", "committer_email"),
+ ("c.fullname", "committer_fullname"),
+ ],
+ "release": [
+ ("release.id", "id"),
+ "date",
+ "date_offset",
+ "comment",
+ ("release.name", "name"),
+ "synthetic",
+ "date_neg_utc_offset",
+ "target",
+ "target_type",
+ ("a.id", "author_id"),
+ ("a.name", "author_name"),
+ ("a.email", "author_email"),
+ ("a.fullname", "author_fullname"),
+ ],
+ "snapshot": ["id", "object_id"],
+ "origin": ["type", "url"],
+ "origin_visit": [
+ "visit",
+ "origin.type",
+ "origin_visit.type",
+ "url",
+ "date",
+ "snapshot",
+ "status",
+ "metadata",
+ ],
+}
+
+
+JOINS = {
+ "release": ["person a on release.author=a.id"],
+ "revision": [
+ "person a on revision.author=a.id",
+ "person c on revision.committer=c.id",
+ ],
+ "origin_visit": ["origin on origin_visit.origin=origin.id"],
+}
+
+
+def directory_converter(db, directory):
+ """Convert directory from the flat representation to swh model
+ compatible objects.
+
+ """
+ columns = ["target", "name", "perms"]
+ query_template = """
+ select %(columns)s
+ from directory_entry_%(type)s
+ where id in %%s
+ """
+
+ types = ["file", "dir", "rev"]
+
+ entries = []
+ with db.cursor() as cur:
+ for type in types:
+ ids = directory.pop("%s_entries" % type)
+ if not ids:
+ continue
+ query = query_template % {
+ "columns": ",".join(columns),
+ "type": type,
+ }
+ cur.execute(query, (tuple(ids),))
+ for row in cur:
+ entry = dict(zip(columns, row))
+ entry["type"] = type
+ entries.append(entry)
+
+ directory["entries"] = entries
+ return directory
+
+
+def revision_converter(db, revision):
+ """Convert revision from the flat representation to swh model
+ compatible objects.
+
+ """
+ return db_to_revision(revision)
+
+
+def release_converter(db, release):
+ """Convert release from the flat representation to swh model
+ compatible objects.
+
+ """
+ release = db_to_release(release)
+ if "author" in release and release["author"]:
+ del release["author"]["id"]
+ return release
+
+
+def snapshot_converter(db, snapshot):
+ """Convert snapshot from the flat representation to swh model
+ compatible objects.
+
+ """
+ columns = ["name", "target", "target_type"]
+ query = """
+ select %s
+ from snapshot_branches sbs
+ inner join snapshot_branch sb on sb.object_id=sbs.branch_id
+ where sbs.snapshot_id=%%s
+ """ % ", ".join(
+ columns
+ )
+ with db.cursor() as cur:
+ cur.execute(query, (snapshot.pop("object_id"),))
+ branches = {}
+ for name, *row in cur:
+ branch = dict(zip(columns[1:], row))
+ if not branch["target"] and not branch["target_type"]:
+ branch = None
+ branches[name] = branch
+
+ snapshot["branches"] = branches
+ return snapshot
+
+
+def origin_visit_converter(db, origin_visit):
+ origin = {
+ "type": origin_visit.pop("origin.type"),
+ "url": origin_visit.pop("url"),
+ }
+ origin_visit["origin"] = origin
+ origin_visit["type"] = origin_visit.pop("origin_visit.type")
+ return origin_visit
+
+
+CONVERTERS = {
+ "directory": directory_converter,
+ "revision": revision_converter,
+ "release": release_converter,
+ "snapshot": snapshot_converter,
+ "origin_visit": origin_visit_converter,
+}
+
+
+def object_to_offset(object_id, numbits):
+ """Compute the index of the range containing object id, when dividing
+ space into 2^numbits.
+
+ Args:
+ object_id (str): The hex representation of object_id
+ numbits (int): Number of bits in which we divide input space
+
+ Returns:
+ The index of the range containing object id
+
+ """
+ q, r = divmod(numbits, 8)
+ length = q + (r != 0)
+ shift_bits = 8 - r if r else 0
+
+ truncated_id = object_id[: length * 2]
+ if len(truncated_id) < length * 2:
+ truncated_id += "0" * (length * 2 - len(truncated_id))
+
+ truncated_id_bytes = bytes.fromhex(truncated_id)
+ return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits
+
+
+def byte_ranges(numbits, start_object=None, end_object=None):
+ """Generate start/end pairs of bytes spanning numbits bits and
+ constrained by optional start_object and end_object.
+
+ Args:
+ numbits (int): Number of bits in which we divide input space
+ start_object (str): Hex object id contained in the first range
+ returned
+ end_object (str): Hex object id contained in the last range
+ returned
+
+ Yields:
+ 2^numbits pairs of bytes
+
+ """
+ q, r = divmod(numbits, 8)
+ length = q + (r != 0)
+ shift_bits = 8 - r if r else 0
+
+ def to_bytes(i):
+ return int.to_bytes(i << shift_bits, length=length, byteorder="big")
+
+ start_offset = 0
+ end_offset = 1 << numbits
+
+ if start_object is not None:
+ start_offset = object_to_offset(start_object, numbits)
+ if end_object is not None:
+ end_offset = object_to_offset(end_object, numbits) + 1
+
+ for start in range(start_offset, end_offset):
+ end = start + 1
+
+ if start == 0:
+ yield None, to_bytes(end)
+ elif end == 1 << numbits:
+ yield to_bytes(start), None
+ else:
+ yield to_bytes(start), to_bytes(end)
+
+
+def integer_ranges(start, end, block_size=1000):
+ for start in range(start, end, block_size):
+ if start == 0:
+ yield None, block_size
+ elif start + block_size > end:
+ yield start, end
+ else:
+ yield start, start + block_size
+
+
+RANGE_GENERATORS = {
+ "content": lambda start, end: byte_ranges(24, start, end),
+ "skipped_content": lambda start, end: [(None, None)],
+ "directory": lambda start, end: byte_ranges(24, start, end),
+ "revision": lambda start, end: byte_ranges(24, start, end),
+ "release": lambda start, end: byte_ranges(16, start, end),
+ "snapshot": lambda start, end: byte_ranges(16, start, end),
+ "origin": integer_ranges,
+ "origin_visit": integer_ranges,
+}
+
+
+def compute_query(obj_type, start, end):
+ columns = COLUMNS.get(obj_type)
+ join_specs = JOINS.get(obj_type, [])
+ join_clause = "\n".join("left join %s" % clause for clause in join_specs)
+
+ where = []
+ where_args = []
+ if start:
+ where.append("%(keys)s >= %%s")
+ where_args.append(start)
+ if end:
+ where.append("%(keys)s < %%s")
+ where_args.append(end)
+
+ where_clause = ""
+ if where:
+ where_clause = ("where " + " and ".join(where)) % {
+ "keys": "(%s)" % PARTITION_KEY[obj_type]
+ }
+
+ column_specs = []
+ column_aliases = []
+ for column in columns:
+ if isinstance(column, str):
+ column_specs.append(column)
+ column_aliases.append(column)
+ else:
+ column_specs.append("%s as %s" % column)
+ column_aliases.append(column[1])
+
+ query = """
+select %(columns)s
+from %(table)s
+%(join)s
+%(where)s
+ """ % {
+ "columns": ",".join(column_specs),
+ "table": obj_type,
+ "join": join_clause,
+ "where": where_clause,
+ }
+
+ return query, where_args, column_aliases
+
+
+def fetch(db, obj_type, start, end):
+ """Fetch all obj_type's identifiers from db.
+
+ This opens one connection, stream objects and when done, close
+ the connection.
+
+ Args:
+ db (BaseDb): Db connection object
+ obj_type (str): Object type
+ start (Union[bytes|Tuple]): Range start identifier
+ end (Union[bytes|Tuple]): Range end identifier
+
+ Raises:
+ ValueError if obj_type is not supported
+
+ Yields:
+ Objects in the given range
+
+ """
+ query, where_args, column_aliases = compute_query(obj_type, start, end)
+ converter = CONVERTERS.get(obj_type)
+ with db.cursor() as cursor:
+ logger.debug("Fetching data for table %s", obj_type)
+ logger.debug("query: %s %s", query, where_args)
+ cursor.execute(query, where_args)
+ for row in cursor:
+ record = dict(zip(column_aliases, row))
+ if converter:
+ record = converter(db, record)
+
+ logger.debug("record: %s" % record)
+ yield record
+
+
+def _format_range_bound(bound):
+ if isinstance(bound, bytes):
+ return bound.hex()
+ else:
+ return str(bound)
+
+
+MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"]
+
+
+class JournalBackfiller:
+ """Class in charge of reading the storage's objects and sends those
+ back to the journal's topics.
+
+ This is designed to be run periodically.
+
+ """
+
+ def __init__(self, config=None):
+ self.config = config
+ self.check_config(config)
+
+ def check_config(self, config):
+ missing_keys = []
+ for key in MANDATORY_KEYS:
+ if not config.get(key):
+ missing_keys.append(key)
+
+ if missing_keys:
+ raise ValueError(
+ "Configuration error: The following keys must be"
+ " provided: %s" % (",".join(missing_keys),)
+ )
+
+ def parse_arguments(self, object_type, start_object, end_object):
+ """Parse arguments
+
+ Raises:
+ ValueError for unsupported object type
+ ValueError if object ids are not parseable
+
+ Returns:
+ Parsed start and end object ids
+
+ """
+ if object_type not in COLUMNS:
+ raise ValueError(
+ "Object type %s is not supported. "
+ "The only possible values are %s"
+ % (object_type, ", ".join(COLUMNS.keys()))
+ )
+
+ if object_type in ["origin", "origin_visit"]:
+ if start_object:
+ start_object = int(start_object)
+ else:
+ start_object = 0
+ if end_object:
+ end_object = int(end_object)
+ else:
+ end_object = 100 * 1000 * 1000 # hard-coded limit
+
+ return start_object, end_object
+
+ def run(self, object_type, start_object, end_object, dry_run=False):
+ """Reads storage's subscribed object types and send them to the
+ journal's reading topic.
+
+ """
+ start_object, end_object = self.parse_arguments(
+ object_type, start_object, end_object
+ )
+
+ db = BaseDb.connect(self.config["storage_dbconn"])
+ writer = KafkaJournalWriter(
+ brokers=self.config["brokers"],
+ prefix=self.config["prefix"],
+ client_id=self.config["client_id"],
+ )
+ for range_start, range_end in RANGE_GENERATORS[object_type](
+ start_object, end_object
+ ):
+ logger.info(
+ "Processing %s range %s to %s",
+ object_type,
+ _format_range_bound(range_start),
+ _format_range_bound(range_end),
+ )
+
+ for obj in fetch(db, object_type, start=range_start, end=range_end,):
+ if dry_run:
+ continue
+ writer.write_addition(object_type=object_type, object_=obj)
+
+ writer.producer.flush()
+
+
+if __name__ == "__main__":
+ print('Please use the "swh-journal backfiller run" command')
diff --git a/swh/storage/cli.py b/swh/storage/cli.py
index 13b26128..8f10624c 100644
--- a/swh/storage/cli.py
+++ b/swh/storage/cli.py
@@ -1,62 +1,112 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.storage.api.server import load_and_check_config, app
@click.group(name="storage", context_settings=CONTEXT_SETTINGS)
@click.pass_context
def storage(ctx):
"""Software Heritage Storage tools."""
pass
@storage.command(name="rpc-serve")
@click.argument("config-path", required=True)
@click.option(
"--host",
default="0.0.0.0",
metavar="IP",
show_default=True,
help="Host ip address to bind the server on",
)
@click.option(
"--port",
default=5002,
type=click.INT,
metavar="PORT",
show_default=True,
help="Binding port of the server",
)
@click.option(
"--debug/--no-debug",
default=True,
help="Indicates if the server should run in debug mode",
)
@click.pass_context
def serve(ctx, config_path, host, port, debug):
"""Software Heritage Storage RPC server.
Do NOT use this in a production environment.
"""
if "log_level" in ctx.obj:
logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"])
api_cfg = load_and_check_config(config_path, type="any")
app.config.update(api_cfg)
app.run(host, port=int(port), debug=bool(debug))
+@storage.command()
+@click.argument("object_type")
+@click.option("--start-object", default=None)
+@click.option("--end-object", default=None)
+@click.option("--dry-run", is_flag=True, default=False)
+@click.pass_context
+def backfiller(ctx, object_type, start_object, end_object, dry_run):
+ """Run the backfiller
+
+ The backfiller list objects from a Storage and produce journal entries from
+ there.
+
+ Typically used to rebuild a journal or compensate for missing objects in a
+ journal (eg. due to a downtime of this later).
+
+ The configuration file requires the following entries:
+ - brokers: a list of kafka endpoints (the journal) in which entries will be
+ added.
+ - storage_dbconn: URL to connect to the storage DB.
+ - prefix: the prefix of the topics (topics will be <prefix>.<object_type>).
+ - client_id: the kafka client ID.
+
+ """
+ # for "lazy" loading
+ from swh.storage.backfill import JournalBackfiller
+
+ try:
+ from systemd.daemon import notify
+ except ImportError:
+ notify = None
+
+ conf = ctx.obj["config"]
+ backfiller = JournalBackfiller(conf)
+
+ if notify:
+ notify("READY=1")
+
+ try:
+ backfiller.run(
+ object_type=object_type,
+ start_object=start_object,
+ end_object=end_object,
+ dry_run=dry_run,
+ )
+ except KeyboardInterrupt:
+ if notify:
+ notify("STOPPING=1")
+ ctx.exit(0)
+
+
def main():
logging.basicConfig()
return serve(auto_envvar_prefix="SWH_STORAGE")
if __name__ == "__main__":
main()
diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py
new file mode 100644
index 00000000..85f5ec97
--- /dev/null
+++ b/swh/storage/tests/test_backfill.py
@@ -0,0 +1,160 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.storage.backfill import JournalBackfiller, compute_query, PARTITION_KEY
+
+
+TEST_CONFIG = {
+ "brokers": ["localhost"],
+ "prefix": "swh.tmp_journal.new",
+ "client_id": "swh.journal.client.test",
+ "storage_dbconn": "service=swh-dev",
+}
+
+
+def test_config_ko_missing_mandatory_key():
+ """Missing configuration key will make the initialization fail
+
+ """
+ for key in TEST_CONFIG.keys():
+ config = TEST_CONFIG.copy()
+ config.pop(key)
+
+ with pytest.raises(ValueError) as e:
+ JournalBackfiller(config)
+
+ error = "Configuration error: The following keys must be" " provided: %s" % (
+ ",".join([key]),
+ )
+ assert e.value.args[0] == error
+
+
+def test_config_ko_unknown_object_type():
+ """Parse arguments will fail if the object type is unknown
+
+ """
+ backfiller = JournalBackfiller(TEST_CONFIG)
+ with pytest.raises(ValueError) as e:
+ backfiller.parse_arguments("unknown-object-type", 1, 2)
+
+ error = (
+ "Object type unknown-object-type is not supported. "
+ "The only possible values are %s" % (", ".join(PARTITION_KEY))
+ )
+ assert e.value.args[0] == error
+
+
+def test_compute_query_content():
+ query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001")
+
+ assert where_args == ["\x000000", "\x000001"]
+
+ assert column_aliases == [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "status",
+ "ctime",
+ ]
+
+ assert (
+ query
+ == """
+select sha1,sha1_git,sha256,blake2s256,length,status,ctime
+from content
+
+where (sha1) >= %s and (sha1) < %s
+ """
+ )
+
+
+def test_compute_query_skipped_content():
+ query, where_args, column_aliases = compute_query("skipped_content", None, None)
+
+ assert where_args == []
+
+ assert column_aliases == [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "ctime",
+ "status",
+ "reason",
+ ]
+
+ assert (
+ query
+ == """
+select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason
+from skipped_content
+
+
+ """
+ )
+
+
+def test_compute_query_origin_visit():
+ query, where_args, column_aliases = compute_query("origin_visit", 1, 10)
+
+ assert where_args == [1, 10]
+
+ assert column_aliases == [
+ "visit",
+ "origin.type",
+ "origin_visit.type",
+ "url",
+ "date",
+ "snapshot",
+ "status",
+ "metadata",
+ ]
+
+ assert (
+ query
+ == """
+select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata
+from origin_visit
+left join origin on origin_visit.origin=origin.id
+where (origin_visit.origin) >= %s and (origin_visit.origin) < %s
+ """
+ )
+
+
+def test_compute_query_release():
+ query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003")
+
+ assert where_args == ["\x000002", "\x000003"]
+
+ assert column_aliases == [
+ "id",
+ "date",
+ "date_offset",
+ "comment",
+ "name",
+ "synthetic",
+ "date_neg_utc_offset",
+ "target",
+ "target_type",
+ "author_id",
+ "author_name",
+ "author_email",
+ "author_fullname",
+ ]
+
+ assert (
+ query
+ == """
+select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname
+from release
+left join person a on release.author=a.id
+where (release.id) >= %s and (release.id) < %s
+ """ # noqa
+ )

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:34 AM (7 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293416

Event Timeline