diff --git a/docs/cli.rst b/docs/cli.rst index 74e78cc2..d7c349e0 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -1,8 +1,8 @@ .. _swh-storage-cli: Command-line interface ====================== .. click:: swh.storage.cli:storage - :prog: swh storage - :nested: full + :prog: swh storage + :nested: full diff --git a/docs/extrinsic-metadata-specification.rst b/docs/extrinsic-metadata-specification.rst index 351ab825..f1522ec7 100644 --- a/docs/extrinsic-metadata-specification.rst +++ b/docs/extrinsic-metadata-specification.rst @@ -1,345 +1,347 @@ :orphan: .. _extrinsic-metadata-specification: Extrinsic metadata specification ================================ :term:`Extrinsic metadata` is information about software that is not part of the source code itself but still closely related to the software. Typical sources for extrinsic metadata are: the hosting place of a repository, which can offer metadata via its web view or API; external registries like collaborative curation initiatives; and out-of-band information available at source code archival time. Since they are not part of the source code, a dedicated mechanism to fetch and store them is needed. This specification assumes the reader is familiar with Software Heritage's :ref:`architecture` and :ref:`data-model`. Metadata sources ---------------- Authorities ^^^^^^^^^^^ Metadata authorities are entities that provide metadata about an :term:`origin`. Metadata authorities include: code hosting places, :term:`deposit` submitters, and registries (eg. Wikidata). An authority is uniquely defined by these properties: - * its type, representing the kind of authority, which is one of these values: - * `deposit_client`, for metadata pushed to Software Heritage at the same time - as a software artifact - * `forge`, for metadata pulled from the same source as the one hosting - the software artifacts (which includes package managers) - * `registry`, for metadata pulled from a third-party - * its URL, which unambiguously identifies an instance of the authority type. +* its type, representing the kind of authority, which is one of these values: + + * ``deposit_client``, for metadata pushed to Software Heritage at the same time + as a software artifact + * ``forge``, for metadata pulled from the same source as the one hosting + the software artifacts (which includes package managers) + * ``registry``, for metadata pulled from a third-party + +* its URL, which unambiguously identifies an instance of the authority type. Examples: =============== ================================= type url =============== ================================= deposit_client https://hal.archives-ouvertes.fr/ deposit_client https://hal.inria.fr/ deposit_client https://software.intel.com/ forge https://gitlab.com/ forge https://gitlab.inria.fr/ forge https://0xacab.org/ forge https://github.com/ registry https://www.wikidata.org/ registry https://swmath.org/ registry https://ascl.net/ =============== ================================= Metadata fetchers ^^^^^^^^^^^^^^^^^ Metadata fetchers are software components used to fetch metadata from a metadata authority, and ingest them into the Software Heritage archive. A metadata fetcher is uniquely defined by these properties: * its type * its version Examples: * :term:`loaders `, which may either discover metadata as a side-effect of loading source code, or be dedicated to fetching metadata. * :term:`listers `, which may discover metadata as a side-effect of discovering origins. * :term:`deposit` submitters, which push metadata to SWH from a third-party; usually at the same time as a :term:`software artifact` * crawlers, which fetch metadata from an authority in a way that is none of the above (eg. by querying a specific API of the origin's forge). Storage API ----------- Authorities and metadata fetchers ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The :term:`storage` API offers these endpoints to manipulate metadata authorities and metadata fetchers: * ``metadata_authority_add(type, url, metadata)`` which adds a new metadata authority to the storage. * ``metadata_authority_get(type, url)`` which looks up a known authority (there is at most one) and if it is known, returns a dictionary with keys ``type``, ``url``, and ``metadata``. * ``metadata_fetcher_add(name, version, metadata)`` which adds a new metadata fetcher to the storage. * ``metadata_fetcher_get(name, version)`` which looks up a known fetcher (there is at most one) and if it is known, returns a dictionary with keys ``name``, ``version``, and ``metadata``. These `metadata` fields contain JSON-encodable dictionaries with information about the authority/fetcher, in a format specific to each authority/fetcher. With authority, the `metadata` field is reserved for information describing and qualifying the authority. With fetchers, the `metadata` field is reserved for configuration metadata and other technical usage. Origin metadata ^^^^^^^^^^^^^^^ Extrinsic metadata are stored in SWH's :term:`storage database`. The storage API offers three endpoints to manipulate origin metadata: * Adding metadata:: raw_extrinsic_metadata_add( "origin", origin_url, discovery_date, authority, fetcher, format, metadata ) which adds a new `metadata` byte string obtained from a given authority and associated to the origin. `discovery_date` is a Python datetime. `authority` must be a dict containing keys `type` and `url`, and `fetcher` a dict containing keys `name` and `version`. The authority and fetcher must be known to the storage before using this endpoint. `format` is a text field indicating the format of the content of the `metadata` byte string, see `extrinsic-metadata-formats`_. * Getting latest metadata:: raw_extrinsic_metadata_get_latest( "origin", origin_url, authority ) where `authority` must be a dict containing keys `type` and `url`, which returns a dictionary corresponding to the latest metadata entry added from this origin, in the format:: { 'origin_url': ..., 'authority': {'type': ..., 'url': ...}, 'fetcher': {'name': ..., 'version': ...}, 'discovery_date': ..., 'format': '...', 'metadata': b'...' } * Getting all metadata:: raw_extrinsic_metadata_get( "origin", origin_url, authority, page_token, limit ) where `authority` must be a dict containing keys `type` and `url` which returns a dictionary with keys: * `next_page_token`, which is an opaque token to be used as `page_token` for retrieving the next page. if absent, there is no more pages to gather. * `results`: list of dictionaries, one for each metadata item deposited, corresponding to the given origin and obtained from the specified authority. Each of these dictionaries is in the following format:: { 'authority': {'type': ..., 'url': ...}, 'fetcher': {'name': ..., 'version': ...}, 'discovery_date': ..., 'format': '...', 'metadata': b'...' } The parameters ``page_token`` and ``limit`` are used for pagination based on an arbitrary order. An initial query to ``origin_metadata_get`` must set ``page_token`` to ``None``, and further query must use the value from the previous query's ``next_page_token`` to get the next page of results. ``metadata`` is a bytes array (eventually encoded using Base64). Its format is specific to each authority; and is treated as an opaque value by the storage. Unifying these various formats into a common language is outside the scope of this specification. Artifact metadata ^^^^^^^^^^^^^^^^^ In addition to origin metadata, the storage database stores metadata on all software artifacts supported by the data model. This works similarly to origin metadata, with one major difference: extrinsic metadata can be given on a specific artifact within a specified context (for example: a directory in a specific revision from a specific visit on a specific origin) which will be stored along the metadata itself. For example, two origins may develop the same file independently; the information about authorship, licensing or even description may vary about the same artifact in a different context. This is why it is important to qualify the metadata with the complete context for which it is intended, if any. The same two endpoints as for origin can be used, but with a different value for the first argument: * Adding metadata:: raw_extrinsic_metadata_add( type, id, context, discovery_date, authority, fetcher, format, metadata ) * Getting all metadata:: raw_extrinsic_metadata_get( type, id, authority, after, page_token, limit ) definited similarly to ``origin_metadata_add`` and ``origin_metadata_get``, but where ``id`` is a core SWHID (with type matching ````), and with an extra ``context`` (argument when adding metadata, and dictionary key when getting them) that is a dictionary with keys depending on the artifact ``type``: * for ``snapshot``: ``origin`` (a URL) and ``visit`` (an integer) * for ``release``: those above, plus ``snapshot`` (the core SWHID of a snapshot) * for ``revision``: all those above, plus ``release`` (the core SWHID of a release) * for ``directory``: all those above, plus ``revision`` (the core SWHID of a revision) and ``path`` (a byte string), representing the path to this directory from the root of the ``revision`` * for ``content``: all those above, plus ``directory`` (the core SWHID of a directory) All keys are optional, but should be provided whenever possible. The dictionary may be empty, if metadata is fully independent from context. In all cases, ``visit`` should only be provided if ``origin`` is (as visit ids are only unique with respect to an origin). .. _extrinsic-metadata-formats: Extrinsic metadata format ------------------------- Here is a list of all the metadata format stored: ``pypi-project-json`` The metadata is a release entry from a PyPI project's JSON file, extracted and re-serialized. ``replicate-npm-package-json`` ditto, but from a replicate.npmjs.com project ``nixguix-sources-json`` ditto, but from https://nix-community.github.io/nixpkgs-swh/ ``original-artifacts-json`` tarball data, see below ``sword-v2-atom-codemeta`` XML Atom document, with Codemeta metadata, as sent by a deposit client, see the :ref:`Deposit protocol reference `. ``sword-v2-atom-codemeta-v2`` Deprecated alias of ``sword-v2-atom-codemeta`` ``sword-v2-atom-codemeta-v2-in-json`` Deprecated, JSON serialization of a ``sword-v2-atom-codemeta`` document. ``xml-deposit-info`` Information about a deposit, to identify the provenance of a metadata object sent via swh-deposit, see below Details on some of these formats: original-artifacts-json ^^^^^^^^^^^^^^^^^^^^^^^ This is a loosely defined format, originally used as a ``metadata`` column on the ``revision`` table that changed over the years. It is a JSON array, and each entry is a JSON object representing an archive (tarball, zipball, ...) that was unpackaged by the SWH loader before loading its content in Software Heritage. When writing this specification, it was stabilized to this format:: [ { "length": , "filename": "", "checksums": { "sha1": "", "sha256": "", }, "url": "" }, ... ] Older ``original-artifacts-json`` were migrated to use this format, but may be missing some of the keys. xml-deposit-info ^^^^^^^^^^^^^^^^ Deposits with code objects are loaded as their own origin, so we can look them up in the deposit database from their metadata (which hold the origin as a context). This is not true for metadata-only deposits, because we don't create an origin for them; so we need to store this information somewhere. The naive solution would be to insert them in the Atom entry provided by the client, but it means altering a document before we archive it, which potentially corrupts it or loses part of the data. Therefore, on each metadata-only deposit, the deposit creates an extra "metametadata" object, with the original metadata object as target, and using this format:: {{ deposit.id }} {{ deposit.client.provider_url }} {{ deposit.collection.name }} diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 37d1bebe..5518f91c 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,110 +1,111 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import importlib from typing import TYPE_CHECKING, Any, Dict, List import warnings if TYPE_CHECKING: from .interface import StorageInterface STORAGE_IMPLEMENTATIONS = { "local": ".postgresql.storage.Storage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.InMemoryStorage", "filter": ".filter.FilteringProxyStorage", "buffer": ".buffer.BufferingProxyStorage", "retry": ".retry.RetryingProxyStorage", "cassandra": ".cassandra.CassandraStorage", "validate": ".validate.ValidatingProxyStorage", } def get_storage(cls: str, **kwargs) -> "StorageInterface": """Get a storage object of class `storage_class` with arguments `storage_args`. Args: - cls (str): storage's class, can be: - - ``local`` to use a postgresql database - - ``cassandra`` to use a cassandra database - - ``remote`` to connect to a swh-storage server - - ``memory`` for an in-memory storage, useful for fast tests - - ``filter``, ``buffer``, ... to use specific storage "proxies", see their - respective documentations + cls (str): + storage's class, can be: + - ``local`` to use a postgresql database + - ``cassandra`` to use a cassandra database + - ``remote`` to connect to a swh-storage server + - ``memory`` for an in-memory storage, useful for fast tests + - ``filter``, ``buffer``, ... to use specific storage "proxies", see their + respective documentations args (dict): dictionary with keys Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] if cls == "pipeline": return get_storage_pipeline(**kwargs) class_path = STORAGE_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( "Unknown storage class `%s`. Supported: %s" % (cls, ", ".join(STORAGE_IMPLEMENTATIONS)) ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Storage = getattr(module, class_name) check_config = kwargs.pop("check_config", {}) storage = Storage(**kwargs) if check_config: if not storage.check_config(**check_config): raise EnvironmentError("storage check config failed") return storage def get_storage_pipeline( steps: List[Dict[str, Any]], check_config=None ) -> "StorageInterface": """Recursively get a storage object that may use other storage objects as backends. Args: steps (List[dict]): List of dicts that may be used as kwargs for `get_storage`. Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ storage_config = None for step in reversed(steps): if "args" in step: warnings.warn( 'Explicit "args" key is deprecated, use keys directly ' "instead.", DeprecationWarning, ) step = { "cls": step["cls"], **step["args"], } if storage_config: step["storage"] = storage_config step["check_config"] = check_config storage_config = step if storage_config is None: raise ValueError("'pipeline' has no steps.") return get_storage(**storage_config) diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py index 0389754d..01f376f9 100644 --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,182 +1,183 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial from typing import Dict, Iterable, Mapping, Sequence, Tuple from typing_extensions import Literal from swh.core.utils import grouper from swh.model.model import BaseModel, Content, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface LObjectType = Literal[ "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ] OBJECT_TYPES: Tuple[LObjectType, ...] = ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ) DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { "content": 10000, "content_bytes": 100 * 1024 * 1024, "skipped_content": 10000, "directory": 25000, "revision": 100000, "release": 100000, "snapshot": 25000, "extid": 10000, } class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Deduplicates values based on a tuple of keys depending on the object type. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 revision: 1000 release: 10000 snapshot: 5000 """ def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): self.storage: StorageInterface = get_storage(**storage) self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] if object_type in OBJECT_TYPES: return partial(self.object_add, object_type=object_type, keys=["id"],) if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, contents: Sequence[Content]) -> Dict: """Push contents to write to the storage in the buffer. Following policies apply: - - if the buffer's threshold is hit, flush content to the storage. - - otherwise, if the total size of buffered contents's threshold is hit, - flush content to the storage. + + - if the buffer's threshold is hit, flush content to the storage. + - otherwise, if the total size of buffered contents's threshold is hit, + flush content to the storage. """ stats = self.object_add( contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) if not stats: # We did not flush already self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) return stats def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict: return self.object_add( contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) def object_add( self, objects: Sequence[BaseModel], *, object_type: LObjectType, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {} def flush( self, object_types: Sequence[LObjectType] = OBJECT_TYPES ) -> Dict[str, int]: summary: Dict[str, int] = {} def update_summary(stats): for k, v in stats.items(): summary[k] = v + summary.get(k, 0) for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) stats = add_fn(list(batch)) update_summary(stats) # Flush underlying storage stats = self.storage.flush(object_types) update_summary(stats) self.clear_buffers(object_types) return summary def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/cli.py b/swh/storage/cli.py index d21b9071..34fd17e0 100644 --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -1,226 +1,227 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import os from typing import Dict, Optional import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group try: from systemd.daemon import notify except ImportError: notify = None @swh_cli_group.group(name="storage", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--check-config", default=None, type=click.Choice(["no", "read", "write"]), help=( "Check the configuration of the storage at startup for read or write access; " "if set, override the value present in the configuration file if any. " "Defaults to 'read' for the 'backfill' command, and 'write' for 'rpc-server' " "and 'replay' commands." ), ) @click.pass_context def storage(ctx, config_file, check_config): """Software Heritage Storage tools.""" from swh.core import config if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["check_config"] = check_config @storage.command(name="rpc-serve") @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5002, type=click.INT, metavar="PORT", show_default=True, help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, host, port, debug): """Software Heritage Storage RPC server. Do NOT use this in a production environment. """ from swh.storage.api.server import app if "log_level" in ctx.obj: logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") app.config.update(ctx.obj["config"]) app.run(host, port=int(port), debug=bool(debug)) @storage.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfill(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: + - brokers: a list of kafka endpoints (the journal) in which entries will be - added. + added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "read") # for "lazy" loading from swh.storage.backfill import JournalBackfiller try: from systemd.daemon import notify except ImportError: notify = None conf = ctx.obj["config"] backfiller = JournalBackfiller(conf) if notify: notify("READY=1") try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run, ) except KeyboardInterrupt: if notify: notify("STOPPING=1") ctx.exit(0) @storage.command() @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.pass_context def replay(ctx, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ import functools from swh.journal.client import get_journal_client from swh.storage import get_storage from swh.storage.replay import process_replay_objects ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") conf = ctx.obj["config"] storage = get_storage(**conf.pop("storage")) client_cfg = conf.pop("journal_client") if stop_after_objects: client_cfg["stop_after_objects"] = stop_after_objects try: client = get_journal_client(**client_cfg) except ValueError as exc: ctx.fail(exc) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str): """Helper function to inject the setting of check_config option in the storage config dict according to the expected default value (default value depends on the command, eg. backfill can be read-only). """ if check_config is not None: if check_config == "no": storage_cfg.pop("check_config", None) else: storage_cfg["check_config"] = {"check_write": check_config == "write"} else: if "check_config" not in storage_cfg: storage_cfg["check_config"] = {"check_write": default == "write"} def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_STORAGE") if __name__ == "__main__": main()