Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338137
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
22 KB
Subscribers
None
View Options
diff --git a/mypy.ini b/mypy.ini
index 2949be58..0e12c6de 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,39 +1,42 @@
[mypy]
namespace_packages = True
# due to the conditional import logic on swh.journal, in some cases a specific
# type: ignore is needed, in other it isn't...
warn_unused_ignores = False
# support for sqlalchemy magic: see https://github.com/dropbox/sqlalchemy-stubs
plugins = sqlmypy
# 3rd party libraries without stubs (yet)
[mypy-cassandra.*]
ignore_missing_imports = True
# only shipped indirectly via hypothesis
[mypy-django.*]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-psycopg2.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
+[mypy-systemd.daemon.*]
+ignore_missing_imports = True
+
[mypy-tenacity.*]
ignore_missing_imports = True
# temporary work-around for landing typing support in spite of the current
# journal<->storage dependency loop
[mypy-swh.journal.*]
ignore_missing_imports = True
[mypy-pytest_postgresql.*]
ignore_missing_imports = True
diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py
new file mode 100644
index 00000000..6993dd00
--- /dev/null
+++ b/swh/storage/backfill.py
@@ -0,0 +1,490 @@
+# Copyright (C) 2017-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Storage backfiller.
+
+The backfiller goal is to produce back part or all of the objects
+from a storage to the journal topics
+
+Current implementation consists in the JournalBackfiller class.
+
+It simply reads the objects from the storage and sends every object identifier back to
+the journal.
+
+"""
+
+import logging
+
+
+from swh.core.db import BaseDb
+from swh.journal.writer.kafka import KafkaJournalWriter
+from swh.storage.converters import db_to_release, db_to_revision
+
+
+logger = logging.getLogger(__name__)
+
+PARTITION_KEY = {
+ "content": "sha1",
+ "skipped_content": "sha1",
+ "directory": "id",
+ "revision": "revision.id",
+ "release": "release.id",
+ "snapshot": "id",
+ "origin": "id",
+ "origin_visit": "origin_visit.origin",
+}
+
+COLUMNS = {
+ "content": [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "status",
+ "ctime",
+ ],
+ "skipped_content": [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "ctime",
+ "status",
+ "reason",
+ ],
+ "directory": ["id", "dir_entries", "file_entries", "rev_entries"],
+ "revision": [
+ ("revision.id", "id"),
+ "date",
+ "date_offset",
+ "committer_date",
+ "committer_date_offset",
+ "type",
+ "directory",
+ "message",
+ "synthetic",
+ "metadata",
+ "date_neg_utc_offset",
+ "committer_date_neg_utc_offset",
+ (
+ "array(select parent_id::bytea from revision_history rh "
+ "where rh.id = revision.id order by rh.parent_rank asc)",
+ "parents",
+ ),
+ ("a.id", "author_id"),
+ ("a.name", "author_name"),
+ ("a.email", "author_email"),
+ ("a.fullname", "author_fullname"),
+ ("c.id", "committer_id"),
+ ("c.name", "committer_name"),
+ ("c.email", "committer_email"),
+ ("c.fullname", "committer_fullname"),
+ ],
+ "release": [
+ ("release.id", "id"),
+ "date",
+ "date_offset",
+ "comment",
+ ("release.name", "name"),
+ "synthetic",
+ "date_neg_utc_offset",
+ "target",
+ "target_type",
+ ("a.id", "author_id"),
+ ("a.name", "author_name"),
+ ("a.email", "author_email"),
+ ("a.fullname", "author_fullname"),
+ ],
+ "snapshot": ["id", "object_id"],
+ "origin": ["type", "url"],
+ "origin_visit": [
+ "visit",
+ "origin.type",
+ "origin_visit.type",
+ "url",
+ "date",
+ "snapshot",
+ "status",
+ "metadata",
+ ],
+}
+
+
+JOINS = {
+ "release": ["person a on release.author=a.id"],
+ "revision": [
+ "person a on revision.author=a.id",
+ "person c on revision.committer=c.id",
+ ],
+ "origin_visit": ["origin on origin_visit.origin=origin.id"],
+}
+
+
+def directory_converter(db, directory):
+ """Convert directory from the flat representation to swh model
+ compatible objects.
+
+ """
+ columns = ["target", "name", "perms"]
+ query_template = """
+ select %(columns)s
+ from directory_entry_%(type)s
+ where id in %%s
+ """
+
+ types = ["file", "dir", "rev"]
+
+ entries = []
+ with db.cursor() as cur:
+ for type in types:
+ ids = directory.pop("%s_entries" % type)
+ if not ids:
+ continue
+ query = query_template % {
+ "columns": ",".join(columns),
+ "type": type,
+ }
+ cur.execute(query, (tuple(ids),))
+ for row in cur:
+ entry = dict(zip(columns, row))
+ entry["type"] = type
+ entries.append(entry)
+
+ directory["entries"] = entries
+ return directory
+
+
+def revision_converter(db, revision):
+ """Convert revision from the flat representation to swh model
+ compatible objects.
+
+ """
+ return db_to_revision(revision)
+
+
+def release_converter(db, release):
+ """Convert release from the flat representation to swh model
+ compatible objects.
+
+ """
+ release = db_to_release(release)
+ if "author" in release and release["author"]:
+ del release["author"]["id"]
+ return release
+
+
+def snapshot_converter(db, snapshot):
+ """Convert snapshot from the flat representation to swh model
+ compatible objects.
+
+ """
+ columns = ["name", "target", "target_type"]
+ query = """
+ select %s
+ from snapshot_branches sbs
+ inner join snapshot_branch sb on sb.object_id=sbs.branch_id
+ where sbs.snapshot_id=%%s
+ """ % ", ".join(
+ columns
+ )
+ with db.cursor() as cur:
+ cur.execute(query, (snapshot.pop("object_id"),))
+ branches = {}
+ for name, *row in cur:
+ branch = dict(zip(columns[1:], row))
+ if not branch["target"] and not branch["target_type"]:
+ branch = None
+ branches[name] = branch
+
+ snapshot["branches"] = branches
+ return snapshot
+
+
+def origin_visit_converter(db, origin_visit):
+ origin = {
+ "type": origin_visit.pop("origin.type"),
+ "url": origin_visit.pop("url"),
+ }
+ origin_visit["origin"] = origin
+ origin_visit["type"] = origin_visit.pop("origin_visit.type")
+ return origin_visit
+
+
+CONVERTERS = {
+ "directory": directory_converter,
+ "revision": revision_converter,
+ "release": release_converter,
+ "snapshot": snapshot_converter,
+ "origin_visit": origin_visit_converter,
+}
+
+
+def object_to_offset(object_id, numbits):
+ """Compute the index of the range containing object id, when dividing
+ space into 2^numbits.
+
+ Args:
+ object_id (str): The hex representation of object_id
+ numbits (int): Number of bits in which we divide input space
+
+ Returns:
+ The index of the range containing object id
+
+ """
+ q, r = divmod(numbits, 8)
+ length = q + (r != 0)
+ shift_bits = 8 - r if r else 0
+
+ truncated_id = object_id[: length * 2]
+ if len(truncated_id) < length * 2:
+ truncated_id += "0" * (length * 2 - len(truncated_id))
+
+ truncated_id_bytes = bytes.fromhex(truncated_id)
+ return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits
+
+
+def byte_ranges(numbits, start_object=None, end_object=None):
+ """Generate start/end pairs of bytes spanning numbits bits and
+ constrained by optional start_object and end_object.
+
+ Args:
+ numbits (int): Number of bits in which we divide input space
+ start_object (str): Hex object id contained in the first range
+ returned
+ end_object (str): Hex object id contained in the last range
+ returned
+
+ Yields:
+ 2^numbits pairs of bytes
+
+ """
+ q, r = divmod(numbits, 8)
+ length = q + (r != 0)
+ shift_bits = 8 - r if r else 0
+
+ def to_bytes(i):
+ return int.to_bytes(i << shift_bits, length=length, byteorder="big")
+
+ start_offset = 0
+ end_offset = 1 << numbits
+
+ if start_object is not None:
+ start_offset = object_to_offset(start_object, numbits)
+ if end_object is not None:
+ end_offset = object_to_offset(end_object, numbits) + 1
+
+ for start in range(start_offset, end_offset):
+ end = start + 1
+
+ if start == 0:
+ yield None, to_bytes(end)
+ elif end == 1 << numbits:
+ yield to_bytes(start), None
+ else:
+ yield to_bytes(start), to_bytes(end)
+
+
+def integer_ranges(start, end, block_size=1000):
+ for start in range(start, end, block_size):
+ if start == 0:
+ yield None, block_size
+ elif start + block_size > end:
+ yield start, end
+ else:
+ yield start, start + block_size
+
+
+RANGE_GENERATORS = {
+ "content": lambda start, end: byte_ranges(24, start, end),
+ "skipped_content": lambda start, end: [(None, None)],
+ "directory": lambda start, end: byte_ranges(24, start, end),
+ "revision": lambda start, end: byte_ranges(24, start, end),
+ "release": lambda start, end: byte_ranges(16, start, end),
+ "snapshot": lambda start, end: byte_ranges(16, start, end),
+ "origin": integer_ranges,
+ "origin_visit": integer_ranges,
+}
+
+
+def compute_query(obj_type, start, end):
+ columns = COLUMNS.get(obj_type)
+ join_specs = JOINS.get(obj_type, [])
+ join_clause = "\n".join("left join %s" % clause for clause in join_specs)
+
+ where = []
+ where_args = []
+ if start:
+ where.append("%(keys)s >= %%s")
+ where_args.append(start)
+ if end:
+ where.append("%(keys)s < %%s")
+ where_args.append(end)
+
+ where_clause = ""
+ if where:
+ where_clause = ("where " + " and ".join(where)) % {
+ "keys": "(%s)" % PARTITION_KEY[obj_type]
+ }
+
+ column_specs = []
+ column_aliases = []
+ for column in columns:
+ if isinstance(column, str):
+ column_specs.append(column)
+ column_aliases.append(column)
+ else:
+ column_specs.append("%s as %s" % column)
+ column_aliases.append(column[1])
+
+ query = """
+select %(columns)s
+from %(table)s
+%(join)s
+%(where)s
+ """ % {
+ "columns": ",".join(column_specs),
+ "table": obj_type,
+ "join": join_clause,
+ "where": where_clause,
+ }
+
+ return query, where_args, column_aliases
+
+
+def fetch(db, obj_type, start, end):
+ """Fetch all obj_type's identifiers from db.
+
+ This opens one connection, stream objects and when done, close
+ the connection.
+
+ Args:
+ db (BaseDb): Db connection object
+ obj_type (str): Object type
+ start (Union[bytes|Tuple]): Range start identifier
+ end (Union[bytes|Tuple]): Range end identifier
+
+ Raises:
+ ValueError if obj_type is not supported
+
+ Yields:
+ Objects in the given range
+
+ """
+ query, where_args, column_aliases = compute_query(obj_type, start, end)
+ converter = CONVERTERS.get(obj_type)
+ with db.cursor() as cursor:
+ logger.debug("Fetching data for table %s", obj_type)
+ logger.debug("query: %s %s", query, where_args)
+ cursor.execute(query, where_args)
+ for row in cursor:
+ record = dict(zip(column_aliases, row))
+ if converter:
+ record = converter(db, record)
+
+ logger.debug("record: %s" % record)
+ yield record
+
+
+def _format_range_bound(bound):
+ if isinstance(bound, bytes):
+ return bound.hex()
+ else:
+ return str(bound)
+
+
+MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"]
+
+
+class JournalBackfiller:
+ """Class in charge of reading the storage's objects and sends those
+ back to the journal's topics.
+
+ This is designed to be run periodically.
+
+ """
+
+ def __init__(self, config=None):
+ self.config = config
+ self.check_config(config)
+
+ def check_config(self, config):
+ missing_keys = []
+ for key in MANDATORY_KEYS:
+ if not config.get(key):
+ missing_keys.append(key)
+
+ if missing_keys:
+ raise ValueError(
+ "Configuration error: The following keys must be"
+ " provided: %s" % (",".join(missing_keys),)
+ )
+
+ def parse_arguments(self, object_type, start_object, end_object):
+ """Parse arguments
+
+ Raises:
+ ValueError for unsupported object type
+ ValueError if object ids are not parseable
+
+ Returns:
+ Parsed start and end object ids
+
+ """
+ if object_type not in COLUMNS:
+ raise ValueError(
+ "Object type %s is not supported. "
+ "The only possible values are %s"
+ % (object_type, ", ".join(COLUMNS.keys()))
+ )
+
+ if object_type in ["origin", "origin_visit"]:
+ if start_object:
+ start_object = int(start_object)
+ else:
+ start_object = 0
+ if end_object:
+ end_object = int(end_object)
+ else:
+ end_object = 100 * 1000 * 1000 # hard-coded limit
+
+ return start_object, end_object
+
+ def run(self, object_type, start_object, end_object, dry_run=False):
+ """Reads storage's subscribed object types and send them to the
+ journal's reading topic.
+
+ """
+ start_object, end_object = self.parse_arguments(
+ object_type, start_object, end_object
+ )
+
+ db = BaseDb.connect(self.config["storage_dbconn"])
+ writer = KafkaJournalWriter(
+ brokers=self.config["brokers"],
+ prefix=self.config["prefix"],
+ client_id=self.config["client_id"],
+ )
+ for range_start, range_end in RANGE_GENERATORS[object_type](
+ start_object, end_object
+ ):
+ logger.info(
+ "Processing %s range %s to %s",
+ object_type,
+ _format_range_bound(range_start),
+ _format_range_bound(range_end),
+ )
+
+ for obj in fetch(db, object_type, start=range_start, end=range_end,):
+ if dry_run:
+ continue
+ writer.write_addition(object_type=object_type, object_=obj)
+
+ writer.producer.flush()
+
+
+if __name__ == "__main__":
+ print('Please use the "swh-journal backfiller run" command')
diff --git a/swh/storage/cli.py b/swh/storage/cli.py
index 13b26128..8f10624c 100644
--- a/swh/storage/cli.py
+++ b/swh/storage/cli.py
@@ -1,62 +1,112 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.storage.api.server import load_and_check_config, app
@click.group(name="storage", context_settings=CONTEXT_SETTINGS)
@click.pass_context
def storage(ctx):
"""Software Heritage Storage tools."""
pass
@storage.command(name="rpc-serve")
@click.argument("config-path", required=True)
@click.option(
"--host",
default="0.0.0.0",
metavar="IP",
show_default=True,
help="Host ip address to bind the server on",
)
@click.option(
"--port",
default=5002,
type=click.INT,
metavar="PORT",
show_default=True,
help="Binding port of the server",
)
@click.option(
"--debug/--no-debug",
default=True,
help="Indicates if the server should run in debug mode",
)
@click.pass_context
def serve(ctx, config_path, host, port, debug):
"""Software Heritage Storage RPC server.
Do NOT use this in a production environment.
"""
if "log_level" in ctx.obj:
logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"])
api_cfg = load_and_check_config(config_path, type="any")
app.config.update(api_cfg)
app.run(host, port=int(port), debug=bool(debug))
+@storage.command()
+@click.argument("object_type")
+@click.option("--start-object", default=None)
+@click.option("--end-object", default=None)
+@click.option("--dry-run", is_flag=True, default=False)
+@click.pass_context
+def backfiller(ctx, object_type, start_object, end_object, dry_run):
+ """Run the backfiller
+
+ The backfiller list objects from a Storage and produce journal entries from
+ there.
+
+ Typically used to rebuild a journal or compensate for missing objects in a
+ journal (eg. due to a downtime of this later).
+
+ The configuration file requires the following entries:
+ - brokers: a list of kafka endpoints (the journal) in which entries will be
+ added.
+ - storage_dbconn: URL to connect to the storage DB.
+ - prefix: the prefix of the topics (topics will be <prefix>.<object_type>).
+ - client_id: the kafka client ID.
+
+ """
+ # for "lazy" loading
+ from swh.storage.backfill import JournalBackfiller
+
+ try:
+ from systemd.daemon import notify
+ except ImportError:
+ notify = None
+
+ conf = ctx.obj["config"]
+ backfiller = JournalBackfiller(conf)
+
+ if notify:
+ notify("READY=1")
+
+ try:
+ backfiller.run(
+ object_type=object_type,
+ start_object=start_object,
+ end_object=end_object,
+ dry_run=dry_run,
+ )
+ except KeyboardInterrupt:
+ if notify:
+ notify("STOPPING=1")
+ ctx.exit(0)
+
+
def main():
logging.basicConfig()
return serve(auto_envvar_prefix="SWH_STORAGE")
if __name__ == "__main__":
main()
diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py
new file mode 100644
index 00000000..85f5ec97
--- /dev/null
+++ b/swh/storage/tests/test_backfill.py
@@ -0,0 +1,160 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.storage.backfill import JournalBackfiller, compute_query, PARTITION_KEY
+
+
+TEST_CONFIG = {
+ "brokers": ["localhost"],
+ "prefix": "swh.tmp_journal.new",
+ "client_id": "swh.journal.client.test",
+ "storage_dbconn": "service=swh-dev",
+}
+
+
+def test_config_ko_missing_mandatory_key():
+ """Missing configuration key will make the initialization fail
+
+ """
+ for key in TEST_CONFIG.keys():
+ config = TEST_CONFIG.copy()
+ config.pop(key)
+
+ with pytest.raises(ValueError) as e:
+ JournalBackfiller(config)
+
+ error = "Configuration error: The following keys must be" " provided: %s" % (
+ ",".join([key]),
+ )
+ assert e.value.args[0] == error
+
+
+def test_config_ko_unknown_object_type():
+ """Parse arguments will fail if the object type is unknown
+
+ """
+ backfiller = JournalBackfiller(TEST_CONFIG)
+ with pytest.raises(ValueError) as e:
+ backfiller.parse_arguments("unknown-object-type", 1, 2)
+
+ error = (
+ "Object type unknown-object-type is not supported. "
+ "The only possible values are %s" % (", ".join(PARTITION_KEY))
+ )
+ assert e.value.args[0] == error
+
+
+def test_compute_query_content():
+ query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001")
+
+ assert where_args == ["\x000000", "\x000001"]
+
+ assert column_aliases == [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "status",
+ "ctime",
+ ]
+
+ assert (
+ query
+ == """
+select sha1,sha1_git,sha256,blake2s256,length,status,ctime
+from content
+
+where (sha1) >= %s and (sha1) < %s
+ """
+ )
+
+
+def test_compute_query_skipped_content():
+ query, where_args, column_aliases = compute_query("skipped_content", None, None)
+
+ assert where_args == []
+
+ assert column_aliases == [
+ "sha1",
+ "sha1_git",
+ "sha256",
+ "blake2s256",
+ "length",
+ "ctime",
+ "status",
+ "reason",
+ ]
+
+ assert (
+ query
+ == """
+select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason
+from skipped_content
+
+
+ """
+ )
+
+
+def test_compute_query_origin_visit():
+ query, where_args, column_aliases = compute_query("origin_visit", 1, 10)
+
+ assert where_args == [1, 10]
+
+ assert column_aliases == [
+ "visit",
+ "origin.type",
+ "origin_visit.type",
+ "url",
+ "date",
+ "snapshot",
+ "status",
+ "metadata",
+ ]
+
+ assert (
+ query
+ == """
+select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata
+from origin_visit
+left join origin on origin_visit.origin=origin.id
+where (origin_visit.origin) >= %s and (origin_visit.origin) < %s
+ """
+ )
+
+
+def test_compute_query_release():
+ query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003")
+
+ assert where_args == ["\x000002", "\x000003"]
+
+ assert column_aliases == [
+ "id",
+ "date",
+ "date_offset",
+ "comment",
+ "name",
+ "synthetic",
+ "date_neg_utc_offset",
+ "target",
+ "target_type",
+ "author_id",
+ "author_name",
+ "author_email",
+ "author_fullname",
+ ]
+
+ assert (
+ query
+ == """
+select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname
+from release
+left join person a on release.author=a.id
+where (release.id) >= %s and (release.id) < %s
+ """ # noqa
+ )
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:34 AM (7 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293416
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment