diff --git a/swh/storage/interface.py b/swh/storage/interface.py index 230085e0..2decd798 100644 --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1,1421 +1,1406 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from enum import Enum -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Iterable, - List, - Optional, - Sequence, - Tuple, - TypeVar, -) +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar import attr from typing_extensions import Protocol, TypedDict, runtime_checkable from swh.core.api import remote_api_endpoint from swh.core.api.classes import PagedResult as CorePagedResult from swh.model.model import ( Content, Directory, DirectoryEntry, ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Release, Revision, Sha1, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, ) from swh.model.swhids import ExtendedSWHID, ObjectType -if TYPE_CHECKING: - from swh.storage.writer import JournalWriter - class ListOrder(Enum): """Specifies the order for paginated endpoints returning sorted results.""" ASC = "asc" DESC = "desc" class PartialBranches(TypedDict): """Type of the dictionary returned by snapshot_get_branches""" id: Sha1Git """Identifier of the snapshot""" branches: Dict[bytes, Optional[SnapshotBranch]] """A dict of branches contained in the snapshot whose keys are the branches' names""" next_branch: Optional[bytes] """The name of the first branch not returned or :const:`None` if the snapshot has less than the request number of branches.""" @attr.s class OriginVisitWithStatuses: visit = attr.ib(type=OriginVisit) statuses = attr.ib(type=List[OriginVisitStatus]) TResult = TypeVar("TResult") PagedResult = CorePagedResult[TResult, str] # TODO: Make it an enum (too much impact) VISIT_STATUSES = ["created", "ongoing", "full", "partial"] def deprecated(f): f.deprecated_endpoint = True return f @runtime_checkable class StorageInterface(Protocol): - journal_writer: Optional["JournalWriter"] - @remote_api_endpoint("check_config") def check_config(self, *, check_write: bool) -> bool: """Check that the storage is configured and ready to go.""" ... @remote_api_endpoint("content/add") def content_add(self, content: List[Content]) -> Dict[str, int]: """Add content blobs to the storage Args: contents (iterable): iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys: - data (bytes): the actual content - length (int): content length - one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum - status (str): one of visible, hidden Raises: The following exceptions can occur: - HashCollision in case of collision - Any other exceptions raise by the db In case of errors, some of the content may have been stored in the DB and in the objstorage. Since additions to both idempotent, that should not be a problem. Returns: Summary dict with the following keys and associated values: content:add: New contents added content:add:bytes: Sum of the contents' length data """ ... @remote_api_endpoint("content/update") def content_update( self, contents: List[Dict[str, Any]], keys: List[str] = [] ) -> None: """Update content blobs to the storage. Does nothing for unknown contents or skipped ones. Args: content: iterable of dictionaries representing individual pieces of content to update. Each dictionary has the following keys: - data (bytes): the actual content - length (int): content length (default: -1) - one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum - status (str): one of visible, hidden, absent keys (list): List of keys (str) whose values needs an update, e.g., new hash column """ ... @remote_api_endpoint("content/add_metadata") def content_add_metadata(self, content: List[Content]) -> Dict[str, int]: """Add content metadata to the storage (like `content_add`, but without inserting to the objstorage). Args: content (iterable): iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys: - length (int): content length (default: -1) - one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum - status (str): one of visible, hidden, absent - reason (str): if status = absent, the reason why - origin (int): if status = absent, the origin we saw the content in - ctime (datetime): time of insertion in the archive Returns: Summary dict with the following key and associated values: content:add: New contents added skipped_content:add: New skipped contents (no data) added """ ... @remote_api_endpoint("content/data") def content_get_data(self, content: Sha1) -> Optional[bytes]: """Given a content identifier, returns its associated data if any. Args: content: sha1 identifier Returns: raw content data (bytes) """ ... @remote_api_endpoint("content/partition") def content_get_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, ) -> PagedResult[Content]: """Splits contents into nb_partitions, and returns one of these based on partition_id (which must be in [0, nb_partitions-1]) There is no guarantee on how the partitioning is done, or the result order. Args: partition_id: index of the partition to fetch nb_partitions: total number of partitions to split into page_token: opaque token used for pagination. limit: Limit result (default to 1000) Returns: PagedResult of Content model objects within the partition. If next_page_token is None, there is no longer data to retrieve. """ ... @remote_api_endpoint("content/metadata") def content_get( self, contents: List[bytes], algo: str = "sha1" ) -> List[Optional[Content]]: """Retrieve content metadata in bulk Args: content: List of content identifiers algo: one of the checksum algorithm in :data:`swh.model.hashutil.DEFAULT_ALGORITHMS` Returns: List of contents model objects when they exist, None otherwise. """ ... @remote_api_endpoint("content/missing") def content_missing( self, contents: List[Dict[str, Any]], key_hash: str = "sha1" ) -> Iterable[bytes]: """List content missing from storage Args: content: iterable of dictionaries whose keys are either 'length' or an item of :data:`swh.model.hashutil.ALGORITHMS`; mapped to the corresponding checksum (or length). key_hash: name of the column to use as hash id result (default: 'sha1') Raises: StorageArgumentException when key_hash is unknown. TODO: an exception when we get a hash collision. Returns: iterable of missing content ids (as per the `key_hash` column) """ ... @remote_api_endpoint("content/missing/sha1") def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]: """List content missing from storage based only on sha1. Args: contents: List of sha1 to check for absence. Raises: TODO: an exception when we get a hash collision. Returns: Iterable of missing content ids (sha1) """ ... @remote_api_endpoint("content/missing/sha1_git") def content_missing_per_sha1_git( self, contents: List[Sha1Git] ) -> Iterable[Sha1Git]: """List content missing from storage based only on sha1_git. Args: contents (List): An iterable of content id (sha1_git) Yields: missing contents sha1_git """ ... @remote_api_endpoint("content/present") def content_find(self, content: Dict[str, Any]) -> List[Content]: """Find a content hash in db. Args: content: a dictionary representing one content hash, mapping checksum algorithm names (see swh.model.hashutil.ALGORITHMS) to checksum values Raises: ValueError: in case the key of the dictionary is not sha1, sha1_git nor sha256. Returns: an iterable of Content objects matching the search criteria if the content exist. Empty iterable otherwise. """ ... @remote_api_endpoint("content/get_random") def content_get_random(self) -> Sha1Git: """Finds a random content id. Returns: a sha1_git """ ... @remote_api_endpoint("content/skipped/add") def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: """Add contents to the skipped_content list, which contains (partial) information about content missing from the archive. Args: contents (iterable): iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys: - length (Optional[int]): content length (default: -1) - one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum; each is optional - status (str): must be "absent" - reason (str): the reason why the content is absent - origin (int): if status = absent, the origin we saw the content in Raises: The following exceptions can occur: - HashCollision in case of collision - Any other exceptions raise by the backend In case of errors, some content may have been stored in the DB and in the objstorage. Since additions to both idempotent, that should not be a problem. Returns: Summary dict with the following key and associated values: skipped_content:add: New skipped contents (no data) added """ ... @remote_api_endpoint("content/skipped/missing") def skipped_content_missing( self, contents: List[Dict[str, Any]] ) -> Iterable[Dict[str, Any]]: """List skipped contents missing from storage. Args: contents: iterable of dictionaries containing the data for each checksum algorithm. Returns: Iterable of missing skipped contents as dict """ ... @remote_api_endpoint("directory/add") def directory_add(self, directories: List[Directory]) -> Dict[str, int]: """Add directories to the storage Args: directories (iterable): iterable of dictionaries representing the individual directories to add. Each dict has the following keys: - id (sha1_git): the id of the directory to add - entries (list): list of dicts for each entry in the directory. Each dict has the following keys: - name (bytes) - type (one of 'file', 'dir', 'rev'): type of the directory entry (file, directory, revision) - target (sha1_git): id of the object pointed at by the directory entry - perms (int): entry permissions Returns: Summary dict of keys with associated count as values: directory:add: Number of directories actually added """ ... @remote_api_endpoint("directory/missing") def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: """List directories missing from storage. Args: directories: list of directory ids Yields: missing directory ids """ ... @remote_api_endpoint("directory/ls") def directory_ls( self, directory: Sha1Git, recursive: bool = False ) -> Iterable[Dict[str, Any]]: """List entries for one directory. If `recursive=True`, names in the path of a dir/file not at the root are concatenated with a slash (`/`). Args: directory: the directory to list entries from. recursive: if flag on, this list recursively from this directory. Yields: directory entries for such directory. """ ... @remote_api_endpoint("directory/path") def directory_entry_get_by_path( self, directory: Sha1Git, paths: List[bytes] ) -> Optional[Dict[str, Any]]: """Get the directory entry (either file or dir) from directory with path. Args: directory: directory id paths: path to lookup from the top level directory. From left (top) to right (bottom). Returns: The corresponding directory entry as dict if found, None otherwise. """ ... @remote_api_endpoint("directory/get_entries") def directory_get_entries( self, directory_id: Sha1Git, page_token: Optional[bytes] = None, limit: int = 1000, ) -> Optional[PagedResult[DirectoryEntry]]: """Get the content, possibly partial, of a directory with the given id The entries of the directory are not guaranteed to be returned in any particular order. The number of results is not guaranteed to be lower than the ``limit``. Args: directory_id: dentifier of the directory page_token: opaque string used to get the next results of a search limit: Number of entries to return Returns: None if the directory does not exist; a page of DirectoryEntry objects otherwise. """ ... @remote_api_endpoint("directory/get_raw_manifest") def directory_get_raw_manifest( self, directory_ids: List[Sha1Git] ) -> Dict[Sha1Git, Optional[bytes]]: """Returns the raw manifest of directories that do not fit the SWH data model, or None if they do. Directories missing from the archive are not returned at all. Args: directory_ids: List of directory ids to query """ ... @remote_api_endpoint("directory/get_random") def directory_get_random(self) -> Sha1Git: """Finds a random directory id. Returns: a sha1_git """ ... @remote_api_endpoint("revision/add") def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: """Add revisions to the storage Args: revisions (List[dict]): iterable of dictionaries representing the individual revisions to add. Each dict has the following keys: - **id** (:class:`sha1_git`): id of the revision to add - **date** (:class:`dict`): date the revision was written - **committer_date** (:class:`dict`): date the revision got added to the origin - **type** (one of 'git', 'tar'): type of the revision added - **directory** (:class:`sha1_git`): the directory the revision points at - **message** (:class:`bytes`): the message associated with the revision - **author** (:class:`Dict[str, bytes]`): dictionary with keys: name, fullname, email - **committer** (:class:`Dict[str, bytes]`): dictionary with keys: name, fullname, email - **metadata** (:class:`jsonb`): extra information as dictionary - **synthetic** (:class:`bool`): revision's nature (tarball, directory creates synthetic revision`) - **parents** (:class:`list[sha1_git]`): the parents of this revision date dictionaries have the form defined in :mod:`swh.model`. Returns: Summary dict of keys with associated count as values revision:add: New objects actually stored in db """ ... @remote_api_endpoint("revision/missing") def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]: """List revisions missing from storage Args: revisions: revision ids Yields: missing revision ids """ ... @remote_api_endpoint("revision") def revision_get( self, revision_ids: List[Sha1Git], ignore_displayname: bool = False ) -> List[Optional[Revision]]: """Get revisions from storage Args: revisions: revision ids ignore_displayname: return the original author/committer's full name even if it's masked by a displayname. Returns: list of revision object (if the revision exists or None otherwise) """ ... @remote_api_endpoint("extid/from_extid") def extid_get_from_extid( self, id_type: str, ids: List[bytes], version: Optional[int] = None ) -> List[ExtID]: """Get ExtID objects from external IDs Args: id_type: type of the given external identifiers (e.g. 'mercurial') ids: list of external IDs version: (Optional) version to use as filter Returns: list of ExtID objects """ ... @remote_api_endpoint("extid/from_target") def extid_get_from_target( self, target_type: ObjectType, ids: List[Sha1Git], extid_type: Optional[str] = None, extid_version: Optional[int] = None, ) -> List[ExtID]: """Get ExtID objects from target IDs and target_type Args: target_type: type the SWH object ids: list of target IDs extid_type: (Optional) extid_type to use as filter. This cannot be empty if extid_version is provided. extid_version: (Optional) version to use as filter. This cannot be empty if extid_type is provided. Raises: ValueError if extid_version is provided without extid_type and vice versa. Returns: list of ExtID objects """ ... @remote_api_endpoint("extid/add") def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: """Add a series of ExtID objects Args: ids: list of ExtID objects Returns: Summary dict of keys with associated count as values extid:add: New ExtID objects actually stored in db """ ... @remote_api_endpoint("revision/log") def revision_log( self, revisions: List[Sha1Git], ignore_displayname: bool = False, limit: Optional[int] = None, ) -> Iterable[Optional[Dict[str, Any]]]: """Fetch revision entry from the given root revisions. Args: revisions: array of root revisions to lookup ignore_displayname: return the original author/committer's full name even if it's masked by a displayname. limit: limitation on the output result. Default to None. Yields: revision entries log from the given root root revisions """ ... @remote_api_endpoint("revision/shortlog") def revision_shortlog( self, revisions: List[Sha1Git], limit: Optional[int] = None ) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]: """Fetch the shortlog for the given revisions Args: revisions: list of root revisions to lookup limit: depth limitation for the output Yields: a list of (id, parents) tuples """ ... @remote_api_endpoint("revision/get_random") def revision_get_random(self) -> Sha1Git: """Finds a random revision id. Returns: a sha1_git """ ... @remote_api_endpoint("release/add") def release_add(self, releases: List[Release]) -> Dict[str, int]: """Add releases to the storage Args: releases (List[dict]): iterable of dictionaries representing the individual releases to add. Each dict has the following keys: - **id** (:class:`sha1_git`): id of the release to add - **revision** (:class:`sha1_git`): id of the revision the release points to - **date** (:class:`dict`): the date the release was made - **name** (:class:`bytes`): the name of the release - **comment** (:class:`bytes`): the comment associated with the release - **author** (:class:`Dict[str, bytes]`): dictionary with keys: name, fullname, email the date dictionary has the form defined in :mod:`swh.model`. Returns: Summary dict of keys with associated count as values release:add: New objects contents actually stored in db """ ... @remote_api_endpoint("release/missing") def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]: """List missing release ids from storage Args: releases: release ids Yields: a list of missing release ids """ ... @remote_api_endpoint("release") def release_get( self, releases: List[Sha1Git], ignore_displayname: bool = False ) -> List[Optional[Release]]: """Given a list of sha1, return the releases's information Args: releases: list of sha1s ignore_displayname: return the original author's full name even if it's masked by a displayname. Returns: List of releases matching the identifiers or None if the release does not exist. """ ... @remote_api_endpoint("release/get_random") def release_get_random(self) -> Sha1Git: """Finds a random release id. Returns: a sha1_git """ ... @remote_api_endpoint("snapshot/add") def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: """Add snapshots to the storage. Args: snapshot ([dict]): the snapshots to add, containing the following keys: - **id** (:class:`bytes`): id of the snapshot - **branches** (:class:`dict`): branches the snapshot contains, mapping the branch name (:class:`bytes`) to the branch target, itself a :class:`dict` (or ``None`` if the branch points to an unknown object) - **target_type** (:class:`str`): one of ``content``, ``directory``, ``revision``, ``release``, ``snapshot``, ``alias`` - **target** (:class:`bytes`): identifier of the target (currently a ``sha1_git`` for all object kinds, or the name of the target branch for aliases) Raises: ValueError: if the origin or visit id does not exist. Returns: Summary dict of keys with associated count as values snapshot:add: Count of object actually stored in db """ ... @remote_api_endpoint("snapshot/missing") def snapshot_missing(self, snapshots: List[Sha1Git]) -> Iterable[Sha1Git]: """List snapshots missing from storage Args: snapshots: snapshot ids Yields: missing snapshot ids """ ... @remote_api_endpoint("snapshot") def snapshot_get(self, snapshot_id: Sha1Git) -> Optional[Dict[str, Any]]: """Get the content, possibly partial, of a snapshot with the given id The branches of the snapshot are iterated in the lexicographical order of their names. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. In order to browse the whole set of branches, the method :meth:`snapshot_get_branches` should be used instead. Args: snapshot_id: snapshot identifier Returns: dict: a dict with three keys: * **id**: identifier of the snapshot * **branches**: a dict of branches contained in the snapshot whose keys are the branches' names. * **next_branch**: the name of the first branch not returned or :const:`None` if the snapshot has less than 1000 branches. """ ... @remote_api_endpoint("snapshot/count_branches") def snapshot_count_branches( self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Optional[Dict[Optional[str], int]]: """Count the number of branches in the snapshot with the given id Args: snapshot_id: snapshot identifier branch_name_exclude_prefix: if provided, do not count branches whose name starts with given prefix Returns: A dict whose keys are the target types of branches and values their corresponding amount """ ... @remote_api_endpoint("snapshot/get_branches") def snapshot_get_branches( self, snapshot_id: Sha1Git, branches_from: bytes = b"", branches_count: int = 1000, target_types: Optional[List[str]] = None, branch_name_include_substring: Optional[bytes] = None, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Optional[PartialBranches]: """Get the content, possibly partial, of a snapshot with the given id The branches of the snapshot are iterated in the lexicographical order of their names. Args: snapshot_id: identifier of the snapshot branches_from: optional parameter used to skip branches whose name is lesser than it before returning them branches_count: optional parameter used to restrain the amount of returned branches target_types: optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) branch_name_include_substring: if provided, only return branches whose name contains given substring branch_name_exclude_prefix: if provided, do not return branches whose name contains given prefix Returns: dict: None if the snapshot does not exist; a dict with three keys otherwise: * **id**: identifier of the snapshot * **branches**: a dict of branches contained in the snapshot whose keys are the branches' names. * **next_branch**: the name of the first branch not returned or :const:`None` if the snapshot has less than `branches_count` branches after `branches_from` included. """ ... @remote_api_endpoint("snapshot/get_random") def snapshot_get_random(self) -> Sha1Git: """Finds a random snapshot id. Returns: a sha1_git """ ... @remote_api_endpoint("origin/visit/add") def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: """Add visits to storage. If the visits have no id, they will be created and assigned one. The resulted visits are visits with their visit id set. Args: visits: List of OriginVisit objects to add Raises: StorageArgumentException if some origin visit reference unknown origins Returns: List[OriginVisit] stored """ ... @remote_api_endpoint("origin/visit_status/add") def origin_visit_status_add( self, visit_statuses: List[OriginVisitStatus], ) -> Dict[str, int]: """Add origin visit statuses. If there is already a status for the same origin and visit id at the same date, the new one will be either dropped or will replace the existing one (it is unspecified which one of these two behaviors happens). Args: visit_statuses: origin visit statuses to add Raises: StorageArgumentException if the origin of the visit status is unknown """ ... @remote_api_endpoint("origin/visit/get") def origin_visit_get( self, origin: str, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10, ) -> PagedResult[OriginVisit]: """Retrieve page of OriginVisit information. Args: origin: The visited origin page_token: opaque string used to get the next results of a search order: Order on visit id fields to list origin visits (default to asc) limit: Number of visits to return Raises: StorageArgumentException if the order is wrong or the page_token type is mistyped. Returns: Page of OriginVisit data model objects. if next_page_token is None, there is no longer data to retrieve. """ ... @remote_api_endpoint("origin/visit/find_by_date") def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime ) -> Optional[OriginVisit]: """Retrieves the origin visit whose date is closest to the provided timestamp. In case of a tie, the visit with largest id is selected. Args: origin: origin (URL) visit_date: expected visit date Returns: A visit if found, None otherwise """ ... @remote_api_endpoint("origin/visit/getby") def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]: """Retrieve origin visit's information. Args: origin: origin (URL) visit: visit id Returns: The information on that particular OriginVisit or None if it does not exist """ ... @remote_api_endpoint("origin/visit/get_latest") def origin_visit_get_latest( self, origin: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ) -> Optional[OriginVisit]: """Get the latest origin visit for the given origin, optionally looking only for those with one of the given allowed_statuses or for those with a snapshot. Args: origin: origin URL type: Optional visit type to filter on (e.g git, tar, dsc, svn, hg, npm, pypi, ...) allowed_statuses: list of visit statuses considered to find the latest visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. require_snapshot: If True, only a visit with a snapshot will be returned. Raises: StorageArgumentException if values for the allowed_statuses parameters are unknown Returns: OriginVisit matching the criteria if found, None otherwise. Note that as OriginVisit no longer held reference on the visit status or snapshot, you may want to use origin_visit_status_get_latest for those information. """ ... @remote_api_endpoint("origin/visit_status/get") def origin_visit_status_get( self, origin: str, visit: int, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10, ) -> PagedResult[OriginVisitStatus]: """Retrieve page of OriginVisitStatus information. Args: origin: The visited origin visit: The visit identifier page_token: opaque string used to get the next results of a search order: Order on visit status objects to list (default to asc) limit: Number of visit statuses to return Returns: Page of OriginVisitStatus data model objects. if next_page_token is None, there is no longer data to retrieve. """ ... @remote_api_endpoint("origin/visit_status/get_latest") def origin_visit_status_get_latest( self, origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ) -> Optional[OriginVisitStatus]: """Get the latest origin visit status for the given origin visit, optionally looking only for those with one of the given allowed_statuses or with a snapshot. Args: origin: origin URL allowed_statuses: list of visit statuses considered to find the latest visit. Possible values are {created, ongoing, partial, full}. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. require_snapshot: If True, only a visit with a snapshot will be returned. Raises: StorageArgumentException if values for the allowed_statuses parameters are unknown Returns: The OriginVisitStatus matching the criteria """ ... @remote_api_endpoint("origin/visit/get_with_statuses") def origin_visit_get_with_statuses( self, origin: str, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10, ) -> PagedResult[OriginVisitWithStatuses]: """Retrieve page of origin visits and all their statuses. Origin visit statuses are always sorted in ascending order of their dates. Args: origin: The visited origin URL allowed_statuses: Only visit statuses matching that list will be returned. If empty, all visit statuses will be returned. Possible status values are ``created``, ``not_found``, ``ongoing``, ``failed``, ``partial`` and ``full``. require_snapshot: If :const:`True`, only visit statuses with a snapshot will be returned. page_token: opaque string used to get the next results order: Order on visit objects to list (default to asc) limit: Number of visits with their statuses to return Returns: Page of OriginVisitWithStatuses objects. if next_page_token is None, there is no longer data to retrieve. """ ... @remote_api_endpoint("origin/visit_status/get_random") def origin_visit_status_get_random(self, type: str) -> Optional[OriginVisitStatus]: """Randomly select one successful origin visit with made in the last 3 months. Returns: One random OriginVisitStatus matching the selection criteria """ ... @remote_api_endpoint("object/find_by_sha1_git") def object_find_by_sha1_git(self, ids: List[Sha1Git]) -> Dict[Sha1Git, List[Dict]]: """Return the objects found with the given ids. Args: ids: a generator of sha1_gits Returns: A dict from id to the list of objects found for that id. Each object found is itself a dict with keys: - sha1_git: the input id - type: the type of object found """ ... @remote_api_endpoint("origin/get") def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]: """Return origins. Args: origin: a list of urls to find Returns: the list of associated existing origin model objects. The unknown origins will be returned as None at the same index as the input. """ ... @remote_api_endpoint("origin/get_sha1") def origin_get_by_sha1(self, sha1s: List[bytes]) -> List[Optional[Dict[str, Any]]]: """Return origins, identified by the sha1 of their URLs. Args: sha1s: a list of sha1s Returns: List of origins dict whose sha1 of their url match, None otherwise. """ ... @remote_api_endpoint("origin/list") def origin_list( self, page_token: Optional[str] = None, limit: int = 100 ) -> PagedResult[Origin]: """Returns the list of origins Args: page_token: opaque token used for pagination. limit: the maximum number of results to return Returns: Page of Origin data model objects. if next_page_token is None, there is no longer data to retrieve. """ ... @remote_api_endpoint("origin/search") def origin_search( self, url_pattern: str, page_token: Optional[str] = None, limit: int = 50, regexp: bool = False, with_visit: bool = False, visit_types: Optional[List[str]] = None, ) -> PagedResult[Origin]: """Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. Args: url_pattern: the string pattern to search for in origin urls page_token: opaque token used for pagination limit: the maximum number of found origins to return regexp: if True, consider the provided pattern as a regular expression and return origins whose urls match it with_visit: if True, filter out origins with no visit visit_types: Only origins having any of the provided visit types (e.g. git, svn, pypi) will be returned Yields: PagedResult of Origin """ ... @deprecated @remote_api_endpoint("origin/count") def origin_count( self, url_pattern: str, regexp: bool = False, with_visit: bool = False ) -> int: """Count origins whose urls contain a provided string pattern or match a provided regular expression. The pattern search in origin urls is performed in a case insensitive way. Args: url_pattern (str): the string pattern to search for in origin urls regexp (bool): if True, consider the provided pattern as a regular expression and return origins whose urls match it with_visit (bool): if True, filter out origins with no visit Returns: int: The number of origins matching the search criterion. """ ... @remote_api_endpoint("origin/snapshot/get") def origin_snapshot_get_all(self, origin_url: str) -> List[Sha1Git]: """Return all unique snapshot identifiers resulting from origin visits. Args: origin_url: origin URL Returns: list of sha1s """ ... @remote_api_endpoint("origin/add_multi") def origin_add(self, origins: List[Origin]) -> Dict[str, int]: """Add origins to the storage Args: origins: list of dictionaries representing the individual origins, with the following keys: - type: the origin type ('git', 'svn', 'deb', ...) - url (bytes): the url the origin points to Returns: Summary dict of keys with associated count as values origin:add: Count of object actually stored in db """ ... def stat_counters(self): """compute statistics about the number of tuples in various tables Returns: dict: a dictionary mapping textual labels (e.g., content) to integer values (e.g., the number of tuples in table content) """ ... def refresh_stat_counters(self): """Recomputes the statistics for `stat_counters`.""" ... @remote_api_endpoint("raw_extrinsic_metadata/add") def raw_extrinsic_metadata_add( self, metadata: List[RawExtrinsicMetadata], ) -> Dict[str, int]: """Add extrinsic metadata on objects (contents, directories, ...). The authority and fetcher must be known to the storage before using this endpoint. If there is already metadata for the same object, authority, fetcher, and at the same date; the new one will be either dropped or will replace the existing one (it is unspecified which one of these two behaviors happens). Args: metadata: iterable of RawExtrinsicMetadata objects to be inserted. """ ... @remote_api_endpoint("raw_extrinsic_metadata/get") def raw_extrinsic_metadata_get( self, target: ExtendedSWHID, authority: MetadataAuthority, after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, ) -> PagedResult[RawExtrinsicMetadata]: """Retrieve list of all raw_extrinsic_metadata entries targeting the id Args: target: the SWHID of the objects to find metadata on authority: a dict containing keys `type` and `url`. after: minimum discovery_date for a result to be returned page_token: opaque token, used to get the next page of results limit: maximum number of results to be returned Returns: PagedResult of RawExtrinsicMetadata """ ... @remote_api_endpoint("raw_extrinsic_metadata/get_by_ids") def raw_extrinsic_metadata_get_by_ids( self, ids: List[Sha1Git] ) -> List[RawExtrinsicMetadata]: """Retrieve list of raw_extrinsic_metadata entries of the given id (unlike raw_extrinsic_metadata_get, which returns metadata entries **targeting** the id) Args: ids: list of hashes of RawExtrinsicMetadata objects """ ... @remote_api_endpoint("raw_extrinsic_metadata/get_authorities") def raw_extrinsic_metadata_get_authorities( self, target: ExtendedSWHID ) -> List[MetadataAuthority]: """Returns all authorities that provided metadata on the given object.""" ... @remote_api_endpoint("metadata_fetcher/add") def metadata_fetcher_add( self, fetchers: List[MetadataFetcher], ) -> Dict[str, int]: """Add new metadata fetchers to the storage. Their `name` and `version` together are unique identifiers of this fetcher; and `metadata` is an arbitrary dict of JSONable data with information about this fetcher, which must not be `None` (but may be empty). Args: fetchers: iterable of MetadataFetcher to be inserted """ ... @remote_api_endpoint("metadata_fetcher/get") def metadata_fetcher_get( self, name: str, version: str ) -> Optional[MetadataFetcher]: """Retrieve information about a fetcher Args: name: the name of the fetcher version: version of the fetcher Returns: a MetadataFetcher object (with a non-None metadata field) if it is known, else None. """ ... @remote_api_endpoint("metadata_authority/add") def metadata_authority_add( self, authorities: List[MetadataAuthority] ) -> Dict[str, int]: """Add new metadata authorities to the storage. Their `type` and `url` together are unique identifiers of this authority; and `metadata` is an arbitrary dict of JSONable data with information about this authority, which must not be `None` (but may be empty). Args: authorities: iterable of MetadataAuthority to be inserted """ ... @remote_api_endpoint("metadata_authority/get") def metadata_authority_get( self, type: MetadataAuthorityType, url: str ) -> Optional[MetadataAuthority]: """Retrieve information about an authority Args: type: one of "deposit_client", "forge", or "registry" url: unique URI identifying the authority Returns: a MetadataAuthority object (with a non-None metadata field) if it is known, else None. """ ... @remote_api_endpoint("clear/buffer") def clear_buffers(self, object_types: Sequence[str] = ()) -> None: """For backend storages (pg, storage, in-memory), this is a noop operation. For proxy storages (especially filter, buffer), this is an operation which cleans internal state. """ @remote_api_endpoint("flush") def flush(self, object_types: Sequence[str] = ()) -> Dict[str, int]: """For backend storages (pg, storage, in-memory), this is expected to be a noop operation. For proxy storages (especially buffer), this is expected to trigger actual writes to the backend. """ ... diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py index cf0c6534..426b8be6 100644 --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -1,369 +1,368 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging from unittest.mock import patch import attr import pytest from swh.journal.client import JournalClient from swh.model.model import Directory, DirectoryEntry from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.backfill import ( PARTITION_KEY, JournalBackfiller, byte_ranges, compute_query, fetch, raw_extrinsic_metadata_target_ranges, ) from swh.storage.in_memory import InMemoryStorage from swh.storage.replay import ModelObjectDeserializer, process_replay_objects from swh.storage.tests.test_replay import check_replayed TEST_CONFIG = { "journal_writer": { "brokers": ["localhost"], "prefix": "swh.tmp_journal.new", "client_id": "swh.journal.client.test", }, "storage": {"cls": "postgresql", "db": "service=swh-dev"}, } def test_config_ko_missing_mandatory_key(): """Missing configuration key will make the initialization fail""" for key in TEST_CONFIG.keys(): config = TEST_CONFIG.copy() config.pop(key) with pytest.raises(ValueError) as e: JournalBackfiller(config) error = "Configuration error: The following keys must be provided: %s" % ( ",".join([key]), ) assert e.value.args[0] == error def test_config_ko_unknown_object_type(): """Parse arguments will fail if the object type is unknown""" backfiller = JournalBackfiller(TEST_CONFIG) with pytest.raises(ValueError) as e: backfiller.parse_arguments("unknown-object-type", 1, 2) error = ( "Object type unknown-object-type is not supported. " "The only possible values are %s" % (", ".join(sorted(PARTITION_KEY))) ) assert e.value.args[0] == error def test_compute_query_content(): query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") assert where_args == ["\x000000", "\x000001"] assert column_aliases == [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", "ctime", ] assert ( query == """ select sha1,sha1_git,sha256,blake2s256,length,status,ctime from content where (sha1) >= %s and (sha1) < %s """ ) def test_compute_query_skipped_content(): query, where_args, column_aliases = compute_query("skipped_content", None, None) assert where_args == [] assert column_aliases == [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", "reason", ] assert ( query == """ select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason from skipped_content """ ) def test_compute_query_origin_visit(): query, where_args, column_aliases = compute_query("origin_visit", 1, 10) assert where_args == [1, 10] assert column_aliases == [ "visit", "type", "origin", "date", ] assert ( query == """ select visit,type,origin.url as origin,date from origin_visit left join origin on origin_visit.origin=origin.id where (origin_visit.origin) >= %s and (origin_visit.origin) < %s """ ) def test_compute_query_release(): query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") assert where_args == ["\x000002", "\x000003"] assert column_aliases == [ "id", "date", "date_offset_bytes", "comment", "name", "synthetic", "target", "target_type", "author_id", "author_name", "author_email", "author_fullname", "raw_manifest", ] assert ( query == """ select release.id as id,date,date_offset_bytes,comment,release.name as name,synthetic,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname,raw_manifest from release left join person a on release.author=a.id where (release.id) >= %s and (release.id) < %s """ # noqa ) @pytest.mark.parametrize("numbits", [2, 3, 8, 16]) def test_byte_ranges(numbits): ranges = list(byte_ranges(numbits)) assert len(ranges) == 2**numbits assert ranges[0][0] is None assert ranges[-1][1] is None bounds = [] for i, (left, right) in enumerate(zip(ranges[:-1], ranges[1:])): assert left[1] == right[0], f"Mismatched bounds in {i}th range" bounds.append(left[1]) assert bounds == sorted(bounds) def test_raw_extrinsic_metadata_target_ranges(): ranges = list(raw_extrinsic_metadata_target_ranges()) assert ranges[0][0] == "" assert ranges[-1][1] is None bounds = [] for i, (left, right) in enumerate(zip(ranges[:-1], ranges[1:])): assert left[1] == right[0], f"Mismatched bounds in {i}th range" bounds.append(left[1]) assert bounds == sorted(bounds) RANGE_GENERATORS = { "content": lambda start, end: [(None, None)], "skipped_content": lambda start, end: [(None, None)], "directory": lambda start, end: [(None, None)], "extid": lambda start, end: [(None, None)], "metadata_authority": lambda start, end: [(None, None)], "metadata_fetcher": lambda start, end: [(None, None)], "revision": lambda start, end: [(None, None)], "release": lambda start, end: [(None, None)], "snapshot": lambda start, end: [(None, None)], "origin": lambda start, end: [(None, 10000)], "origin_visit": lambda start, end: [(None, 10000)], "origin_visit_status": lambda start, end: [(None, 10000)], "raw_extrinsic_metadata": lambda start, end: [(None, None)], } @patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS) def test_backfiller( swh_storage_backend_config, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): prefix1 = f"{kafka_prefix}-1" prefix2 = f"{kafka_prefix}-2" journal1 = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer-1", "prefix": prefix1, "auto_flush": False, } swh_storage_backend_config["journal_writer"] = journal1 storage = get_storage(**swh_storage_backend_config) # fill the storage and the journal (under prefix1) for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") method(objects) - assert storage.journal_writer is not None - storage.journal_writer.journal.flush() + storage.journal_writer.journal.flush() # type: ignore[attr-defined] # now apply the backfiller on the storage to fill the journal under prefix2 backfiller_config = { "journal_writer": { "brokers": [kafka_server], "client_id": "kafka_writer-2", "prefix": prefix2, "auto_flush": False, }, "storage": swh_storage_backend_config, } # Backfilling backfiller = JournalBackfiller(backfiller_config) for object_type in TEST_OBJECTS: backfiller.run(object_type, None, None) backfiller.writer.journal.flush() # Trace log messages for unhandled object types in the replayer caplog.set_level(logging.DEBUG, "swh.storage.replay") # now check journal content are the same under both topics # use the replayer scaffolding to fill storages to make is a bit easier # Replaying #1 deserializer = ModelObjectDeserializer() sto1 = get_storage(cls="memory") replayer1 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-1", prefix=prefix1, stop_on_eof=True, value_deserializer=deserializer.convert, ) worker_fn1 = functools.partial(process_replay_objects, storage=sto1) replayer1.process(worker_fn1) # Replaying #2 sto2 = get_storage(cls="memory") replayer2 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-2", prefix=prefix2, stop_on_eof=True, value_deserializer=deserializer.convert, ) worker_fn2 = functools.partial(process_replay_objects, storage=sto2) replayer2.process(worker_fn2) # Compare storages assert isinstance(sto1, InMemoryStorage) # needed to help mypy assert isinstance(sto2, InMemoryStorage) check_replayed(sto1, sto2) for record in caplog.records: assert ( "this should not happen" not in record.message ), "Replayer ignored some message types, see captured logging" def test_backfiller__duplicate_directory_entries( swh_storage_backend_config, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Tests the backfiller doesn't crash when reading a legacy directory with duplicated entries, which is no longer allowed. Instead, it should slightly mangle entries and set a raw_manifest. """ storage = get_storage(**swh_storage_backend_config) db = storage.get_db() # type: ignore run_validators = attr.get_run_validators() attr.set_run_validators(False) try: invalid_directory = Directory( entries=( DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), ) ) finally: attr.set_run_validators(run_validators) storage.directory_add([invalid_directory]) # Make sure we successfully inserted a corrupt directory, otherwise this test # is pointless with db.conn.cursor() as cur: cur.execute("select id, dir_entries, file_entries, raw_manifest from directory") (row,) = cur (id_, (dir_entry,), (file_entry,), raw_manifest) = row assert id_ == invalid_directory.id assert raw_manifest is None cur.execute("select id, name, target from directory_entry_dir") assert list(cur) == [(dir_entry, b"foo", b"\x01" * 20)] cur.execute("select id, name, target from directory_entry_file") assert list(cur) == [(file_entry, b"foo", b"\x00" * 20)] # Run the backfiller on the directory (which would crash if calling # Directory() directly instead of Directory.from_possibly_duplicated_entries()) directories = list(fetch(db, "directory", start=None, end=None)) # Make sure the directory looks as expected deduplicated_directory = Directory( id=invalid_directory.id, entries=( DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), DirectoryEntry( name=b"foo_0000000000", type="file", target=b"\x00" * 20, perms=0 ), ), raw_manifest=( # fmt: off b"tree 52\x00" + b"0 foo\x00" + b"\x00" * 20 + b"1 foo\x00" + b"\x01" * 20 # fmt: on ), ) assert directories == [deduplicated_directory] diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py index 40f704a2..638256f7 100644 --- a/swh/storage/tests/test_kafka_writer.py +++ b/swh/storage/tests/test_kafka_writer.py @@ -1,172 +1,170 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict from attr import asdict, has from confluent_kafka import Consumer from hypothesis import given from hypothesis.strategies import lists from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages from swh.model.hypothesis_strategies import objects from swh.model.model import Person from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, "auto_flush": False, } storage_config: Dict[str, Any] = { "cls": "pipeline", "steps": [ {"cls": "memory", "journal_writer": writer_config}, ], } storage = get_storage(**storage_config) expected_messages = 0 for obj_type, objs in TEST_OBJECTS.items(): method = getattr(storage, obj_type + "_add") if obj_type in ( "content", "skipped_content", "directory", "extid", "metadata_authority", "metadata_fetcher", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", ): method(objs) expected_messages += len(objs) else: assert False, obj_type - assert storage.journal_writer is not None - storage.journal_writer.journal.flush() + storage.journal_writer.journal.flush() # type: ignore[attr-defined] existing_topics = set( topic for topic in consumer.list_topics(timeout=10).topics.keys() if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics ) assert existing_topics == { f"{kafka_prefix}.{obj_type}" for obj_type in ( "content", "directory", "extid", "metadata_authority", "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", ) } consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) def test_storage_direct_writer_anonymized( kafka_prefix: str, kafka_server, consumer: Consumer ): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": True, "auto_flush": False, } storage_config: Dict[str, Any] = { "cls": "pipeline", "steps": [ {"cls": "memory", "journal_writer": writer_config}, ], } storage = get_storage(**storage_config) expected_messages = 0 for obj_type, objs in TEST_OBJECTS.items(): if obj_type == "origin_visit": # these have non-consistent API and are unrelated with what we # want to test here continue method = getattr(storage, obj_type + "_add") method(objs) expected_messages += len(objs) - assert storage.journal_writer is not None - storage.journal_writer.journal.flush() + storage.journal_writer.journal.flush() # type: ignore[attr-defined] existing_topics = set( topic for topic in consumer.list_topics(timeout=10).topics.keys() if topic.startswith(kafka_prefix) ) assert existing_topics == { f"{kafka_prefix}.{obj_type}" for obj_type in ( "content", "directory", "extid", "metadata_authority", "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", ) } | { f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ( "release", "revision", ) } def check_anonymized_obj(obj): if has(obj): if isinstance(obj, Person): assert obj.name is None assert obj.email is None assert len(obj.fullname) == 32 else: for key, value in asdict(obj, recurse=False).items(): check_anonymized_obj(value) @given(lists(objects(split_content=True))) def test_anonymizer(obj_type_and_objs): for obj_type, obj in obj_type_and_objs: check_anonymized_obj(obj.anonymize()) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py index b5b41fc6..9fb20d99 100644 --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -1,582 +1,581 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools import logging import re from typing import Any, Container, Dict, Optional, cast import attr import pytest from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes, hash_to_hex from swh.model.model import Revision, RevisionType from swh.model.tests.swh_model_data import ( COMMITTERS, DATES, DUPLICATE_CONTENTS, REVISIONS, ) from swh.model.tests.swh_model_data import TEST_OBJECTS as _TEST_OBJECTS from swh.storage import get_storage from swh.storage.cassandra.model import ContentRow, SkippedContentRow from swh.storage.exc import StorageArgumentException from swh.storage.in_memory import InMemoryStorage from swh.storage.replay import ModelObjectDeserializer, process_replay_objects UTC = datetime.timezone.utc TEST_OBJECTS = _TEST_OBJECTS.copy() # add a revision with metadata to check this later is dropped while being replayed TEST_OBJECTS["revision"] = list(_TEST_OBJECTS["revision"]) + [ Revision( id=hash_to_bytes("51d9d94ab08d3f75512e3a9fd15132e0a7ca7928"), message=b"hello again", date=DATES[1], committer=COMMITTERS[1], author=COMMITTERS[0], committer_date=DATES[0], type=RevisionType.GIT, directory=b"\x03" * 20, synthetic=False, metadata={"something": "interesting"}, parents=(REVISIONS[0].id,), ), ] WRONG_ID_REG = re.compile( "Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*" ) def nullify_ctime(obj): if isinstance(obj, (ContentRow, SkippedContentRow)): return dataclasses.replace(obj, ctime=None) else: return obj @pytest.fixture() def replayer_storage_and_client( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): journal_writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "auto_flush": False, } storage_config: Dict[str, Any] = { "cls": "memory", "journal_writer": journal_writer_config, } storage = get_storage(**storage_config) deserializer = ModelObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=deserializer.convert, ) yield storage, replayer def test_storage_replayer(replayer_storage_and_client, caplog): """Optimal replayer scenario. This: - writes objects to a source storage - replayer consumes objects from the topic and replays them - a destination storage is filled from this In the end, both storages should have the same content. """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) src.journal_writer.journal.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted assert isinstance(src, InMemoryStorage) # needed to help mypy assert isinstance(dst, InMemoryStorage) check_replayed(src, dst) collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_replay_with_collision(replayer_storage_and_client, caplog): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) src.journal_writer.journal.flush() # Create collision in input data # These should not be written in the destination producer = src.journal_writer.journal.producer prefix = src.journal_writer.journal._prefix for content in DUPLICATE_CONTENTS: topic = f"{prefix}.content" key = content.sha1 now = datetime.datetime.now(tz=UTC) content = attr.evolve(content, ctime=now) producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content.to_dict()), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check the logs for the collision being properly detected nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0].get_hash(algo)) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in content.hashes().items() } assert expected_content_hashes in actual_colliding_hashes # all objects from the src should exists in the dst storage assert isinstance(src, InMemoryStorage) # needed to help mypy assert isinstance(dst, InMemoryStorage) # needed to help mypy check_replayed(src, dst, exclude=["contents"]) # but the dst has one content more (one of the 2 colliding ones) assert ( len(list(src._cql_runner._contents.iter_all())) == len(list(dst._cql_runner._contents.iter_all())) - 1 ) def test_replay_skipped_content(replayer_storage_and_client): """Test the 'skipped_content' topic is properly replayed.""" src, replayer = replayer_storage_and_client _check_replay_skipped_content(src, replayer, "skipped_content") # utility functions def check_replayed( src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None, expected_anonymized=False, ): """Simple utility function to compare the content of 2 in_memory storages""" def fix_expected(attr, row): if expected_anonymized: if attr == "releases": row = dataclasses.replace( row, author=row.author and row.author.anonymize() ) elif attr == "revisions": row = dataclasses.replace( row, author=row.author.anonymize(), committer=row.committer.anonymize(), ) if attr == "revisions": # the replayer should now drop the metadata attribute; see # swh/storgae/replay.py:_insert_objects() row.metadata = "null" return row for attr_ in ( "contents", "skipped_contents", "directories", "extid", "revisions", "releases", "snapshots", "origins", "origin_visits", "origin_visit_statuses", "raw_extrinsic_metadata", ): if exclude and attr_ in exclude: continue expected_objects = [ (id, nullify_ctime(fix_expected(attr_, obj))) for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all()) ] got_objects = [ (id, nullify_ctime(obj)) for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()) ] assert got_objects == expected_objects, f"Mismatch object list for {attr_}" def _check_replay_skipped_content(storage, replayer, topic): skipped_contents = _gen_skipped_contents(100) nb_sent = len(skipped_contents) producer = storage.journal_writer.journal.producer prefix = storage.journal_writer.journal._prefix for i, obj in enumerate(skipped_contents): producer.produce( topic=f"{prefix}.{topic}", key=key_to_kafka({"sha1": obj["sha1"]}), value=value_to_kafka(obj), ) producer.flush() dst_storage = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted for content in skipped_contents: assert not storage.content_find({"sha1": content["sha1"]}) # no skipped_content_find API endpoint, so use this instead assert not list(dst_storage.skipped_content_missing(skipped_contents)) def _updated(d1, d2): d1.update(d2) d1.pop("data", None) return d1 def _gen_skipped_contents(n=10): # we do not use the hypothesis strategy here because this does not play well with # pytest fixtures, and it makes test execution very slow algos = DEFAULT_ALGORITHMS | {"length"} now = datetime.datetime.now(tz=UTC) return [ _updated( MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(), { "status": "absent", "reason": "why not", "origin": f"https://somewhere/{i}", "ctime": now, }, ) for i in range(n) ] @pytest.mark.parametrize("privileged", [True, False]) def test_storage_replay_anonymized( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, privileged: bool, ): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them This tests the behavior with both a privileged and non-privileged replayer """ writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": True, "auto_flush": False, } src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} storage = get_storage(**src_config) # Fill the src storage nb_sent = 0 for obj_type, objs in TEST_OBJECTS.items(): if obj_type in ("origin_visit", "origin_visit_status"): # these are unrelated with what we want to test here continue method = getattr(storage, obj_type + "_add") method(objs) nb_sent += len(objs) - assert storage.journal_writer is not None - storage.journal_writer.journal.flush() + storage.journal_writer.journal.flush() # type: ignore[attr-defined] # Fill a destination storage from Kafka, potentially using privileged topics dst_storage = get_storage(cls="memory") deserializer = ModelObjectDeserializer( validate=False ) # we cannot validate an anonymized replay replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, privileged=privileged, value_deserializer=deserializer.convert, ) worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) replayer.consumer.commit() assert nb_sent == nb_inserted # Check the contents of the destination storage, and whether the anonymization was # properly used assert isinstance(storage, InMemoryStorage) # needed to help mypy assert isinstance(dst_storage, InMemoryStorage) check_replayed(storage, dst_storage, expected_anonymized=not privileged) def test_storage_replayer_with_validation_ok( replayer_storage_and_client, caplog, redisdb ): """Optimal replayer scenario with validation activated and reporter set to a redis db. - writes objects to a source storage - replayer consumes objects from the topic and replays them - a destination storage is filled from this - nothing has been reported in the redis db - both storages should have the same content """ src, replayer = replayer_storage_and_client replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set) # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) src.journal_writer.journal.flush() # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check we do not have invalid objects reported invalid = 0 for record in caplog.records: logtext = record.getMessage() if WRONG_ID_REG.match(logtext): invalid += 1 assert invalid == 0, "Invalid objects should not be detected" assert not redisdb.keys() # so the dst should be the same as src storage check_replayed(cast(InMemoryStorage, src), cast(InMemoryStorage, dst)) def test_storage_replayer_with_validation_nok( replayer_storage_and_client, caplog, redisdb ): """Replayer scenario with invalid objects with validation and reporter set to a redis db. - writes objects to a source storage - replayer consumes objects from the topic and replays them - the destination storage is filled with only valid objects - the redis db contains the invalid (raw kafka mesg) objects """ src, replayer = replayer_storage_and_client replayer.value_deserializer = ModelObjectDeserializer( validate=True, reporter=redisdb.set ).convert caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) # insert invalid objects for object_type in ("revision", "directory", "release", "snapshot"): method = getattr(src, object_type + "_add") method([attr.evolve(TEST_OBJECTS[object_type][0], id=b"\x00" * 20)]) nb_sent += 1 # also add an object that won't even be possible to instantiate; this needs # to be done at low kafka level (since we cannot instantiate the invalid model # object...) # we use directory[1] because it actually have some entries dict_repr = { # copy each dir entry twice "entries": TEST_OBJECTS["directory"][1].to_dict()["entries"] * 2, "id": b"\x01" * 20, } topic = f"{src.journal_writer.journal._prefix}.directory" src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr) nb_sent += 1 src.journal_writer.journal.flush() # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check we do have invalid objects reported invalid = 0 for record in caplog.records: logtext = record.getMessage() if WRONG_ID_REG.match(logtext): invalid += 1 assert invalid == 4, "Invalid objects should be detected" assert set(redisdb.keys()) == { f"swh:1:{typ}:{'0'*40}".encode() for typ in ("rel", "rev", "snp", "dir") } | {b"directory:" + b"01" * 20} for key in redisdb.keys(): # check the stored value looks right rawvalue = redisdb.get(key) value = kafka_to_value(rawvalue) assert isinstance(value, dict) assert "id" in value assert value["id"] in (b"\x00" * 20, b"\x01" * 20) # check that invalid objects did not reach the dst storage for attr_ in ( "directories", "revisions", "releases", "snapshots", ): for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()): assert id not in (b"\x00" * 20, b"\x01" * 20) # check that valid objects did reach the dst storage # revisions expected = [attr.evolve(rev, metadata=None) for rev in TEST_OBJECTS["revision"]] result = dst.revision_get([obj.id for obj in TEST_OBJECTS["revision"]]) assert result == expected # releases expected = TEST_OBJECTS["release"] result = dst.release_get([obj.id for obj in TEST_OBJECTS["release"]]) assert result == expected # snapshot # result from snapshot_get is paginated, so adapt the expected to be comparable expected = [ {"next_branch": None, **obj.to_dict()} for obj in TEST_OBJECTS["snapshot"] ] result = [dst.snapshot_get(obj.id) for obj in TEST_OBJECTS["snapshot"]] assert result == expected # directories for directory in TEST_OBJECTS["directory"]: assert set(dst.directory_get_entries(directory.id).results) == set( directory.entries ) def test_storage_replayer_with_validation_nok_raises( replayer_storage_and_client, caplog, redisdb ): """Replayer scenario with invalid objects with raise_on_error set to True This: - writes both valid & invalid objects to a source storage - a StorageArgumentException should be raised while replayer consumes objects from the topic and replays them """ src, replayer = replayer_storage_and_client replayer.value_deserializer = ModelObjectDeserializer( validate=True, reporter=redisdb.set, raise_on_error=True ).convert caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) # insert invalid objects for object_type in ("revision", "directory", "release", "snapshot"): method = getattr(src, object_type + "_add") method([attr.evolve(TEST_OBJECTS[object_type][0], id=b"\x00" * 20)]) nb_sent += 1 # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) with pytest.raises(StorageArgumentException): replayer.process(worker_fn) # check we do have invalid objects reported invalid = 0 for record in caplog.records: logtext = record.getMessage() if WRONG_ID_REG.match(logtext): invalid += 1 assert invalid == 1, "One invalid objects should be detected" assert len(redisdb.keys()) == 1