Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py
index 87727f7c..ed5c8952 100644
--- a/swh/storage/cassandra/storage.py
+++ b/swh/storage/cassandra/storage.py
@@ -1,1227 +1,1228 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import datetime
import itertools
import json
import random
import re
from typing import Any, Dict, List, Iterable, Optional, Tuple, Union
import attr
from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.identifiers import parse_swhid, SWHID
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.model import (
Revision,
Release,
Directory,
DirectoryEntry,
Content,
SkippedContent,
OriginVisit,
OriginVisitStatus,
Snapshot,
Origin,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MetadataTargetType,
RawExtrinsicMetadata,
+ Sha1Git,
)
from swh.storage.interface import ListOrder, PagedResult, VISIT_STATUSES
from swh.storage.objstorage import ObjStorage
from swh.storage.writer import JournalWriter
from swh.storage.utils import map_optional, now
from ..exc import StorageArgumentException, HashCollision
from .common import TOKEN_BEGIN, TOKEN_END
from . import converters
from .cql import CqlRunner
from .schema import HASH_ALGORITHMS
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
class CassandraStorage:
def __init__(self, hosts, keyspace, objstorage, port=9042, journal_writer=None):
self._cql_runner = CqlRunner(hosts, keyspace, port)
self.journal_writer = JournalWriter(journal_writer)
self.objstorage = ObjStorage(objstorage)
def check_config(self, *, check_write):
self._cql_runner.check_read()
return True
def _content_get_from_hash(self, algo, hash_) -> Iterable:
"""From the name of a hash algorithm and a value of that hash,
looks up the "hash -> token" secondary table (content_by_{algo})
to get tokens.
Then, looks up the main table (content) to get all contents with
that token, and filters out contents whose hash doesn't match."""
found_tokens = self._cql_runner.content_get_tokens_from_single_hash(algo, hash_)
for token in found_tokens:
# Query the main table ('content').
res = self._cql_runner.content_get_from_token(token)
for row in res:
# re-check the the hash (in case of murmur3 collision)
if getattr(row, algo) == hash_:
yield row
def _content_add(self, contents: List[Content], with_data: bool) -> Dict:
# Filter-out content already in the database.
contents = [
c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict())
]
self.journal_writer.content_add(contents)
if with_data:
# First insert to the objstorage, if the endpoint is
# `content_add` (as opposed to `content_add_metadata`).
# TODO: this should probably be done in concurrently to inserting
# in index tables (but still before the main table; so an entry is
# only added to the main table after everything else was
# successfully inserted.
summary = self.objstorage.content_add(
c for c in contents if c.status != "absent"
)
content_add_bytes = summary["content:add:bytes"]
content_add = 0
for content in contents:
content_add += 1
# Check for sha1 or sha1_git collisions. This test is not atomic
# with the insertion, so it won't detect a collision if both
# contents are inserted at the same time, but it's good enough.
#
# The proper way to do it would probably be a BATCH, but this
# would be inefficient because of the number of partitions we
# need to affect (len(HASH_ALGORITHMS)+1, which is currently 5)
for algo in {"sha1", "sha1_git"}:
collisions = []
# Get tokens of 'content' rows with the same value for
# sha1/sha1_git
rows = self._content_get_from_hash(algo, content.get_hash(algo))
for row in rows:
if getattr(row, algo) != content.get_hash(algo):
# collision of token(partition key), ignore this
# row
continue
for algo in HASH_ALGORITHMS:
if getattr(row, algo) != content.get_hash(algo):
# This hash didn't match; discard the row.
collisions.append(
{algo: getattr(row, algo) for algo in HASH_ALGORITHMS}
)
if collisions:
collisions.append(content.hashes())
raise HashCollision(algo, content.get_hash(algo), collisions)
(token, insertion_finalizer) = self._cql_runner.content_add_prepare(content)
# Then add to index tables
for algo in HASH_ALGORITHMS:
self._cql_runner.content_index_add_one(algo, content, token)
# Then to the main table
insertion_finalizer()
summary = {
"content:add": content_add,
}
if with_data:
summary["content:add:bytes"] = content_add_bytes
return summary
def content_add(self, content: List[Content]) -> Dict:
contents = [attr.evolve(c, ctime=now()) for c in content]
return self._content_add(list(contents), with_data=True)
def content_update(self, content, keys=[]):
raise NotImplementedError(
"content_update is not supported by the Cassandra backend"
)
def content_add_metadata(self, content: List[Content]) -> Dict:
return self._content_add(content, with_data=False)
def content_get(self, content):
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX
)
yield from self.objstorage.content_get(content)
def content_get_partition(
self,
partition_id: int,
nb_partitions: int,
limit: int = 1000,
page_token: str = None,
):
if limit is None:
raise StorageArgumentException("limit should not be None")
# Compute start and end of the range of tokens covered by the
# requested partition
partition_size = (TOKEN_END - TOKEN_BEGIN) // nb_partitions
range_start = TOKEN_BEGIN + partition_id * partition_size
range_end = TOKEN_BEGIN + (partition_id + 1) * partition_size
# offset the range start according to the `page_token`.
if page_token is not None:
if not (range_start <= int(page_token) <= range_end):
raise StorageArgumentException("Invalid page_token.")
range_start = int(page_token)
# Get the first rows of the range
rows = self._cql_runner.content_get_token_range(range_start, range_end, limit)
rows = list(rows)
if len(rows) == limit:
next_page_token: Optional[str] = str(rows[-1].tok + 1)
else:
next_page_token = None
return {
"contents": [row._asdict() for row in rows if row.status != "absent"],
"next_page_token": next_page_token,
}
def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]:
result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents}
for sha1 in contents:
# Get all (sha1, sha1_git, sha256, blake2s256) whose sha1
# matches the argument, from the index table ('content_by_sha1')
for row in self._content_get_from_hash("sha1", sha1):
content_metadata = row._asdict()
content_metadata.pop("ctime")
result[content_metadata["sha1"]].append(content_metadata)
return result
def content_find(self, content):
# Find an algorithm that is common to all the requested contents.
# It will be used to do an initial filtering efficiently.
filter_algos = list(set(content).intersection(HASH_ALGORITHMS))
if not filter_algos:
raise StorageArgumentException(
"content keys must contain at least one of: "
"%s" % ", ".join(sorted(HASH_ALGORITHMS))
)
common_algo = filter_algos[0]
results = []
rows = self._content_get_from_hash(common_algo, content[common_algo])
for row in rows:
# Re-check all the hashes, in case of collisions (either of the
# hash of the partition key, or the hashes in it)
for algo in HASH_ALGORITHMS:
if content.get(algo) and getattr(row, algo) != content[algo]:
# This hash didn't match; discard the row.
break
else:
# All hashes match, keep this row.
results.append(
{
**row._asdict(),
"ctime": row.ctime.replace(tzinfo=datetime.timezone.utc),
}
)
return results
def content_missing(self, content, key_hash="sha1"):
for cont in content:
res = self.content_find(cont)
if not res:
yield cont[key_hash]
if any(c["status"] == "missing" for c in res):
yield cont[key_hash]
def content_missing_per_sha1(self, contents):
return self.content_missing([{"sha1": c for c in contents}])
def content_missing_per_sha1_git(self, contents):
return self.content_missing(
[{"sha1_git": c for c in contents}], key_hash="sha1_git"
)
- def content_get_random(self):
+ def content_get_random(self) -> Sha1Git:
return self._cql_runner.content_get_random().sha1_git
def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable:
"""From the name of a hash algorithm and a value of that hash,
looks up the "hash -> token" secondary table
(skipped_content_by_{algo}) to get tokens.
Then, looks up the main table (content) to get all contents with
that token, and filters out contents whose hash doesn't match."""
found_tokens = self._cql_runner.skipped_content_get_tokens_from_single_hash(
algo, hash_
)
for token in found_tokens:
# Query the main table ('content').
res = self._cql_runner.skipped_content_get_from_token(token)
for row in res:
# re-check the the hash (in case of murmur3 collision)
if getattr(row, algo) == hash_:
yield row
def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict:
# Filter-out content already in the database.
contents = [
c
for c in contents
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict())
]
self.journal_writer.skipped_content_add(contents)
for content in contents:
# Compute token of the row in the main table
(token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare(
content
)
# Then add to index tables
for algo in HASH_ALGORITHMS:
self._cql_runner.skipped_content_index_add_one(algo, content, token)
# Then to the main table
insertion_finalizer()
return {"skipped_content:add": len(contents)}
def skipped_content_add(self, content: List[SkippedContent]) -> Dict:
contents = [attr.evolve(c, ctime=now()) for c in content]
return self._skipped_content_add(contents)
def skipped_content_missing(self, contents):
for content in contents:
if not self._cql_runner.skipped_content_get_from_pk(content):
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS}
def directory_add(self, directories: List[Directory]) -> Dict:
# Filter out directories that are already inserted.
missing = self.directory_missing([dir_.id for dir_ in directories])
directories = [dir_ for dir_ in directories if dir_.id in missing]
self.journal_writer.directory_add(directories)
for directory in directories:
# Add directory entries to the 'directory_entry' table
for entry in directory.entries:
self._cql_runner.directory_entry_add_one(
{**entry.to_dict(), "directory_id": directory.id}
)
# Add the directory *after* adding all the entries, so someone
# calling snapshot_get_branch in the meantime won't end up
# with half the entries.
self._cql_runner.directory_add_one(directory.id)
return {"directory:add": len(missing)}
def directory_missing(self, directories):
return self._cql_runner.directory_missing(directories)
def _join_dentry_to_content(self, dentry):
keys = (
"status",
"sha1",
"sha1_git",
"sha256",
"length",
)
ret = dict.fromkeys(keys)
ret.update(dentry.to_dict())
if ret["type"] == "file":
content = self.content_find({"sha1_git": ret["target"]})
if content:
content = content[0]
for key in keys:
ret[key] = content[key]
return ret
def _directory_ls(self, directory_id, recursive, prefix=b""):
if self.directory_missing([directory_id]):
return
rows = list(self._cql_runner.directory_entry_get([directory_id]))
for row in rows:
# Build and yield the directory entry dict
entry = row._asdict()
del entry["directory_id"]
entry = DirectoryEntry.from_dict(entry)
ret = self._join_dentry_to_content(entry)
ret["name"] = prefix + ret["name"]
ret["dir_id"] = directory_id
yield ret
if recursive and ret["type"] == "dir":
yield from self._directory_ls(
ret["target"], True, prefix + ret["name"] + b"/"
)
def directory_entry_get_by_path(self, directory, paths):
return self._directory_entry_get_by_path(directory, paths, b"")
def _directory_entry_get_by_path(self, directory, paths, prefix):
if not paths:
return
contents = list(self.directory_ls(directory))
if not contents:
return
def _get_entry(entries, name):
"""Finds the entry with the requested name, prepends the
prefix (to get its full path), and returns it.
If no entry has that name, returns None."""
for entry in entries:
if entry["name"] == name:
entry = entry.copy()
entry["name"] = prefix + entry["name"]
return entry
first_item = _get_entry(contents, paths[0])
if len(paths) == 1:
return first_item
if not first_item or first_item["type"] != "dir":
return
return self._directory_entry_get_by_path(
first_item["target"], paths[1:], prefix + paths[0] + b"/"
)
def directory_ls(self, directory, recursive=False):
yield from self._directory_ls(directory, recursive)
- def directory_get_random(self):
+ def directory_get_random(self) -> Sha1Git:
return self._cql_runner.directory_get_random().id
def revision_add(self, revisions: List[Revision]) -> Dict:
# Filter-out revisions already in the database
missing = self.revision_missing([rev.id for rev in revisions])
revisions = [rev for rev in revisions if rev.id in missing]
self.journal_writer.revision_add(revisions)
for revision in revisions:
revobject = converters.revision_to_db(revision)
if revobject:
# Add parents first
for (rank, parent) in enumerate(revobject["parents"]):
self._cql_runner.revision_parent_add_one(
revobject["id"], rank, parent
)
# Then write the main revision row.
# Writing this after all parents were written ensures that
# read endpoints don't return a partial view while writing
# the parents
self._cql_runner.revision_add_one(revobject)
return {"revision:add": len(revisions)}
def revision_missing(self, revisions):
return self._cql_runner.revision_missing(revisions)
def revision_get(self, revisions):
rows = self._cql_runner.revision_get(revisions)
revs = {}
for row in rows:
# TODO: use a single query to get all parents?
# (it might have lower latency, but requires more code and more
# bandwidth, because revision id would be part of each returned
# row)
parent_rows = self._cql_runner.revision_parent_get(row.id)
# parent_rank is the clustering key, so results are already
# sorted by rank.
parents = tuple(row.parent_id for row in parent_rows)
rev = converters.revision_from_db(row, parents=parents)
revs[rev.id] = rev.to_dict()
for rev_id in revisions:
yield revs.get(rev_id)
def _get_parent_revs(self, rev_ids, seen, limit, short):
if limit and len(seen) >= limit:
return
rev_ids = [id_ for id_ in rev_ids if id_ not in seen]
if not rev_ids:
return
seen |= set(rev_ids)
# We need this query, even if short=True, to return consistent
# results (ie. not return only a subset of a revision's parents
# if it is being written)
if short:
rows = self._cql_runner.revision_get_ids(rev_ids)
else:
rows = self._cql_runner.revision_get(rev_ids)
for row in rows:
# TODO: use a single query to get all parents?
# (it might have less latency, but requires less code and more
# bandwidth (because revision id would be part of each returned
# row)
parent_rows = self._cql_runner.revision_parent_get(row.id)
# parent_rank is the clustering key, so results are already
# sorted by rank.
parents = tuple(row.parent_id for row in parent_rows)
if short:
yield (row.id, parents)
else:
rev = converters.revision_from_db(row, parents=parents)
yield rev.to_dict()
yield from self._get_parent_revs(parents, seen, limit, short)
def revision_log(self, revisions, limit=None):
seen = set()
yield from self._get_parent_revs(revisions, seen, limit, False)
def revision_shortlog(self, revisions, limit=None):
seen = set()
yield from self._get_parent_revs(revisions, seen, limit, True)
- def revision_get_random(self):
+ def revision_get_random(self) -> Sha1Git:
return self._cql_runner.revision_get_random().id
def release_add(self, releases: List[Release]) -> Dict:
to_add = []
for rel in releases:
if rel not in to_add:
to_add.append(rel)
missing = set(self.release_missing([rel.id for rel in to_add]))
to_add = [rel for rel in to_add if rel.id in missing]
self.journal_writer.release_add(to_add)
for release in to_add:
if release:
self._cql_runner.release_add_one(converters.release_to_db(release))
return {"release:add": len(to_add)}
def release_missing(self, releases):
return self._cql_runner.release_missing(releases)
def release_get(self, releases):
rows = self._cql_runner.release_get(releases)
rels = {}
for row in rows:
release = converters.release_from_db(row)
rels[row.id] = release.to_dict()
for rel_id in releases:
yield rels.get(rel_id)
- def release_get_random(self):
+ def release_get_random(self) -> Sha1Git:
return self._cql_runner.release_get_random().id
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
missing = self._cql_runner.snapshot_missing([snp.id for snp in snapshots])
snapshots = [snp for snp in snapshots if snp.id in missing]
for snapshot in snapshots:
self.journal_writer.snapshot_add([snapshot])
# Add branches
for (branch_name, branch) in snapshot.branches.items():
if branch is None:
target_type = None
target = None
else:
target_type = branch.target_type.value
target = branch.target
self._cql_runner.snapshot_branch_add_one(
{
"snapshot_id": snapshot.id,
"name": branch_name,
"target_type": target_type,
"target": target,
}
)
# Add the snapshot *after* adding all the branches, so someone
# calling snapshot_get_branch in the meantime won't end up
# with half the branches.
self._cql_runner.snapshot_add_one(snapshot.id)
return {"snapshot:add": len(snapshots)}
def snapshot_missing(self, snapshots):
return self._cql_runner.snapshot_missing(snapshots)
def snapshot_get(self, snapshot_id):
return self.snapshot_get_branches(snapshot_id)
def snapshot_get_by_origin_visit(self, origin, visit):
visit_status = self.origin_visit_status_get_latest(
origin, visit, require_snapshot=True
)
if not visit_status:
return None
return self.snapshot_get(visit_status.snapshot)
def snapshot_count_branches(self, snapshot_id):
if self._cql_runner.snapshot_missing([snapshot_id]):
# Makes sure we don't fetch branches for a snapshot that is
# being added.
return None
rows = list(self._cql_runner.snapshot_count_branches(snapshot_id))
assert len(rows) == 1
(nb_none, counts) = rows[0].counts
counts = dict(counts)
if nb_none:
counts[None] = nb_none
return counts
def snapshot_get_branches(
self, snapshot_id, branches_from=b"", branches_count=1000, target_types=None
):
if self._cql_runner.snapshot_missing([snapshot_id]):
# Makes sure we don't fetch branches for a snapshot that is
# being added.
return None
branches = []
while len(branches) < branches_count + 1:
new_branches = list(
self._cql_runner.snapshot_branch_get(
snapshot_id, branches_from, branches_count + 1
)
)
if not new_branches:
break
branches_from = new_branches[-1].name
new_branches_filtered = new_branches
# Filter by target_type
if target_types:
new_branches_filtered = [
branch
for branch in new_branches_filtered
if branch.target is not None and branch.target_type in target_types
]
branches.extend(new_branches_filtered)
if len(new_branches) < branches_count + 1:
break
if len(branches) > branches_count:
last_branch = branches.pop(-1).name
else:
last_branch = None
branches = {
branch.name: {"target": branch.target, "target_type": branch.target_type,}
if branch.target
else None
for branch in branches
}
return {
"id": snapshot_id,
"branches": branches,
"next_branch": last_branch,
}
- def snapshot_get_random(self):
+ def snapshot_get_random(self) -> Sha1Git:
return self._cql_runner.snapshot_get_random().id
def object_find_by_sha1_git(self, ids):
results = {id_: [] for id_ in ids}
missing_ids = set(ids)
# Mind the order, revision is the most likely one for a given ID,
# so we check revisions first.
queries = [
("revision", self._cql_runner.revision_missing),
("release", self._cql_runner.release_missing),
("content", self._cql_runner.content_missing_by_sha1_git),
("directory", self._cql_runner.directory_missing),
]
for (object_type, query_fn) in queries:
found_ids = missing_ids - set(query_fn(missing_ids))
for sha1_git in found_ids:
results[sha1_git].append(
{"sha1_git": sha1_git, "type": object_type,}
)
missing_ids.remove(sha1_git)
if not missing_ids:
# We found everything, skipping the next queries.
break
return results
def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]:
return [self.origin_get_one(origin) for origin in origins]
def origin_get_one(self, origin_url: str) -> Optional[Origin]:
"""Given an origin url, return the origin if it exists, None otherwise
"""
rows = list(self._cql_runner.origin_get_by_url(origin_url))
if rows:
assert len(rows) == 1
return Origin(url=rows[0].url)
else:
return None
def origin_get_by_sha1(self, sha1s):
results = []
for sha1 in sha1s:
rows = self._cql_runner.origin_get_by_sha1(sha1)
if rows:
results.append({"url": rows.one().url})
else:
results.append(None)
return results
def origin_list(
self, page_token: Optional[str] = None, limit: int = 100
) -> PagedResult[Origin]:
# Compute what token to begin the listing from
start_token = TOKEN_BEGIN
if page_token:
start_token = int(page_token)
if not (TOKEN_BEGIN <= start_token <= TOKEN_END):
raise StorageArgumentException("Invalid page_token.")
next_page_token = None
origins = []
# Take one more origin so we can reuse it as the next page token if any
for row in self._cql_runner.origin_list(start_token, limit + 1):
origins.append(Origin(url=row.url))
# keep reference of the last id for pagination purposes
last_id = row.tok
if len(origins) > limit:
# last origin id is the next page token
next_page_token = str(last_id)
# excluding that origin from the result to respect the limit size
origins = origins[:limit]
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
def origin_search(
self,
url_pattern: str,
page_token: Optional[str] = None,
limit: int = 50,
regexp: bool = False,
with_visit: bool = False,
) -> PagedResult[Origin]:
# TODO: remove this endpoint, swh-search should be used instead.
next_page_token = None
offset = int(page_token) if page_token else 0
origins = self._cql_runner.origin_iter_all()
if regexp:
pat = re.compile(url_pattern)
origins = [Origin(orig.url) for orig in origins if pat.search(orig.url)]
else:
origins = [Origin(orig.url) for orig in origins if url_pattern in orig.url]
if with_visit:
origins = [Origin(orig.url) for orig in origins if orig.next_visit_id > 1]
origins = origins[offset : offset + limit + 1]
if len(origins) > limit:
# next offset
next_page_token = str(offset + limit)
# excluding that origin from the result to respect the limit size
origins = origins[:limit]
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
def origin_add(self, origins: List[Origin]) -> Dict[str, int]:
to_add = [ori for ori in origins if self.origin_get_one(ori.url) is None]
self.journal_writer.origin_add(to_add)
for origin in to_add:
self._cql_runner.origin_add_one(origin)
return {"origin:add": len(to_add)}
def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
for visit in visits:
origin = self.origin_get_one(visit.origin)
if not origin: # Cannot add a visit without an origin
raise StorageArgumentException("Unknown origin %s", visit.origin)
all_visits = []
nb_visits = 0
for visit in visits:
nb_visits += 1
if not visit.visit:
visit_id = self._cql_runner.origin_generate_unique_visit_id(
visit.origin
)
visit = attr.evolve(visit, visit=visit_id)
self.journal_writer.origin_visit_add([visit])
self._cql_runner.origin_visit_add_one(visit)
assert visit.visit is not None
all_visits.append(visit)
self._origin_visit_status_add(
OriginVisitStatus(
origin=visit.origin,
visit=visit.visit,
date=visit.date,
status="created",
snapshot=None,
)
)
return all_visits
def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None:
"""Add an origin visit status"""
self.journal_writer.origin_visit_status_add([visit_status])
self._cql_runner.origin_visit_status_add_one(visit_status)
def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus]) -> None:
# First round to check existence (fail early if any is ko)
for visit_status in visit_statuses:
origin_url = self.origin_get_one(visit_status.origin)
if not origin_url:
raise StorageArgumentException(f"Unknown origin {visit_status.origin}")
for visit_status in visit_statuses:
self._origin_visit_status_add(visit_status)
def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]:
"""Retrieve the latest visit status information for the origin visit.
Then merge it with the visit and return it.
"""
row = self._cql_runner.origin_visit_status_get_latest(
visit["origin"], visit["visit"]
)
assert row is not None
visit_status = converters.row_to_visit_status(row)
return {
# default to the values in visit
**visit,
# override with the last update
**visit_status.to_dict(),
# visit['origin'] is the URL (via a join), while
# visit_status['origin'] is only an id.
"origin": visit["origin"],
# but keep the date of the creation of the origin visit
"date": visit["date"],
}
def _origin_visit_get_latest_status(self, visit: OriginVisit) -> OriginVisitStatus:
"""Retrieve the latest visit status information for the origin visit object.
"""
row = self._cql_runner.origin_visit_status_get_latest(visit.origin, visit.visit)
assert row is not None
visit_status = converters.row_to_visit_status(row)
return attr.evolve(visit_status, origin=visit.origin)
@staticmethod
def _format_origin_visit_row(visit):
return {
**visit._asdict(),
"origin": visit.origin,
"date": visit.date.replace(tzinfo=datetime.timezone.utc),
}
def origin_visit_get(
self,
origin: str,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
) -> PagedResult[OriginVisit]:
if not isinstance(order, ListOrder):
raise StorageArgumentException("order must be a ListOrder value")
if page_token and not isinstance(page_token, str):
raise StorageArgumentException("page_token must be a string.")
next_page_token = None
visit_from = page_token and int(page_token)
visits: List[OriginVisit] = []
extra_limit = limit + 1
rows = self._cql_runner.origin_visit_get(origin, visit_from, extra_limit, order)
for row in rows:
visits.append(converters.row_to_visit(row))
assert len(visits) <= extra_limit
if len(visits) == extra_limit:
visits = visits[:limit]
next_page_token = str(visits[-1].visit)
return PagedResult(results=visits, next_page_token=next_page_token)
def origin_visit_status_get(
self,
origin: str,
visit: int,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
) -> PagedResult[OriginVisitStatus]:
next_page_token = None
date_from = None
if page_token is not None:
date_from = datetime.datetime.fromisoformat(page_token)
# Take one more visit status so we can reuse it as the next page token if any
rows = self._cql_runner.origin_visit_status_get_range(
origin, visit, date_from, limit + 1, order
)
visit_statuses = [converters.row_to_visit_status(row) for row in rows]
if len(visit_statuses) > limit:
# last visit status date is the next page token
next_page_token = str(visit_statuses[-1].date)
# excluding that visit status from the result to respect the limit size
visit_statuses = visit_statuses[:limit]
return PagedResult(results=visit_statuses, next_page_token=next_page_token)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime
) -> Optional[OriginVisit]:
# Iterator over all the visits of the origin
# This should be ok for now, as there aren't too many visits
# per origin.
rows = list(self._cql_runner.origin_visit_get_all(origin))
def key(visit):
dt = visit.date.replace(tzinfo=datetime.timezone.utc) - visit_date
return (abs(dt), -visit.visit)
if rows:
return converters.row_to_visit(min(rows, key=key))
return None
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]:
row = self._cql_runner.origin_visit_get_one(origin, visit)
if row:
return converters.row_to_visit(row)
return None
def origin_visit_get_latest(
self,
origin: str,
type: Optional[str] = None,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
) -> Optional[OriginVisit]:
if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES):
raise StorageArgumentException(
f"Unknown allowed statuses {','.join(allowed_statuses)}, only "
f"{','.join(VISIT_STATUSES)} authorized"
)
# TODO: Do not fetch all visits
rows = self._cql_runner.origin_visit_get_all(origin)
latest_visit = None
for row in rows:
visit = self._format_origin_visit_row(row)
updated_visit = self._origin_visit_apply_last_status(visit)
if type is not None and updated_visit["type"] != type:
continue
if allowed_statuses and updated_visit["status"] not in allowed_statuses:
continue
if require_snapshot and updated_visit["snapshot"] is None:
continue
# updated_visit is a candidate
if latest_visit is not None:
if updated_visit["date"] < latest_visit["date"]:
continue
if updated_visit["visit"] < latest_visit["visit"]:
continue
latest_visit = updated_visit
if latest_visit is None:
return None
return OriginVisit(
origin=latest_visit["origin"],
visit=latest_visit["visit"],
date=latest_visit["date"],
type=latest_visit["type"],
)
def origin_visit_status_get_latest(
self,
origin_url: str,
visit: int,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
) -> Optional[OriginVisitStatus]:
if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES):
raise StorageArgumentException(
f"Unknown allowed statuses {','.join(allowed_statuses)}, only "
f"{','.join(VISIT_STATUSES)} authorized"
)
rows = self._cql_runner.origin_visit_status_get(
origin_url, visit, allowed_statuses, require_snapshot
)
# filtering is done python side as we cannot do it server side
if allowed_statuses:
rows = [row for row in rows if row.status in allowed_statuses]
if require_snapshot:
rows = [row for row in rows if row.snapshot is not None]
if not rows:
return None
return converters.row_to_visit_status(rows[0])
def origin_visit_status_get_random(
self, type: str
) -> Optional[Tuple[OriginVisit, OriginVisitStatus]]:
back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back
# Random position to start iteration at
start_token = random.randint(TOKEN_BEGIN, TOKEN_END)
# Iterator over all visits, ordered by token(origins) then visit_id
rows = self._cql_runner.origin_visit_iter(start_token)
for row in rows:
visit = converters.row_to_visit(row)
visit_status = self._origin_visit_get_latest_status(visit)
if visit.date > back_in_the_day and visit_status.status == "full":
return visit, visit_status
return None
def stat_counters(self):
rows = self._cql_runner.stat_counters()
keys = (
"content",
"directory",
"origin",
"origin_visit",
"release",
"revision",
"skipped_content",
"snapshot",
)
stats = {key: 0 for key in keys}
stats.update({row.object_type: row.count for row in rows})
return stats
def refresh_stat_counters(self):
pass
def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata]) -> None:
self.journal_writer.raw_extrinsic_metadata_add(metadata)
for metadata_entry in metadata:
if not self._cql_runner.metadata_authority_get(
metadata_entry.authority.type.value, metadata_entry.authority.url
):
raise StorageArgumentException(
f"Unknown authority {metadata_entry.authority}"
)
if not self._cql_runner.metadata_fetcher_get(
metadata_entry.fetcher.name, metadata_entry.fetcher.version
):
raise StorageArgumentException(
f"Unknown fetcher {metadata_entry.fetcher}"
)
try:
self._cql_runner.raw_extrinsic_metadata_add(
type=metadata_entry.type.value,
id=str(metadata_entry.id),
authority_type=metadata_entry.authority.type.value,
authority_url=metadata_entry.authority.url,
discovery_date=metadata_entry.discovery_date,
fetcher_name=metadata_entry.fetcher.name,
fetcher_version=metadata_entry.fetcher.version,
format=metadata_entry.format,
metadata=metadata_entry.metadata,
origin=metadata_entry.origin,
visit=metadata_entry.visit,
snapshot=map_optional(str, metadata_entry.snapshot),
release=map_optional(str, metadata_entry.release),
revision=map_optional(str, metadata_entry.revision),
path=metadata_entry.path,
directory=map_optional(str, metadata_entry.directory),
)
except TypeError as e:
raise StorageArgumentException(*e.args)
def raw_extrinsic_metadata_get(
self,
type: MetadataTargetType,
id: Union[str, SWHID],
authority: MetadataAuthority,
after: Optional[datetime.datetime] = None,
page_token: Optional[bytes] = None,
limit: int = 1000,
) -> PagedResult[RawExtrinsicMetadata]:
if type == MetadataTargetType.ORIGIN:
if isinstance(id, SWHID):
raise StorageArgumentException(
f"raw_extrinsic_metadata_get called with type='origin', "
f"but provided id is an SWHID: {id!r}"
)
else:
if not isinstance(id, SWHID):
raise StorageArgumentException(
f"raw_extrinsic_metadata_get called with type!='origin', "
f"but provided id is not an SWHID: {id!r}"
)
if page_token is not None:
(after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads(
base64.b64decode(page_token)
)
if after and after_date < after:
raise StorageArgumentException(
"page_token is inconsistent with the value of 'after'."
)
entries = self._cql_runner.raw_extrinsic_metadata_get_after_date_and_fetcher( # noqa
str(id),
authority.type.value,
authority.url,
after_date,
after_fetcher_name,
after_fetcher_url,
)
elif after is not None:
entries = self._cql_runner.raw_extrinsic_metadata_get_after_date(
str(id), authority.type.value, authority.url, after
)
else:
entries = self._cql_runner.raw_extrinsic_metadata_get(
str(id), authority.type.value, authority.url
)
if limit:
entries = itertools.islice(entries, 0, limit + 1)
results = []
for entry in entries:
discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc)
assert str(id) == entry.id
result = RawExtrinsicMetadata(
type=MetadataTargetType(entry.type),
id=id,
authority=MetadataAuthority(
type=MetadataAuthorityType(entry.authority_type),
url=entry.authority_url,
),
fetcher=MetadataFetcher(
name=entry.fetcher_name, version=entry.fetcher_version,
),
discovery_date=discovery_date,
format=entry.format,
metadata=entry.metadata,
origin=entry.origin,
visit=entry.visit,
snapshot=map_optional(parse_swhid, entry.snapshot),
release=map_optional(parse_swhid, entry.release),
revision=map_optional(parse_swhid, entry.revision),
path=entry.path,
directory=map_optional(parse_swhid, entry.directory),
)
results.append(result)
if len(results) > limit:
results.pop()
assert len(results) == limit
last_result = results[-1]
next_page_token: Optional[str] = base64.b64encode(
msgpack_dumps(
(
last_result.discovery_date,
last_result.fetcher.name,
last_result.fetcher.version,
)
)
).decode()
else:
next_page_token = None
return PagedResult(next_page_token=next_page_token, results=results,)
def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None:
self.journal_writer.metadata_fetcher_add(fetchers)
for fetcher in fetchers:
self._cql_runner.metadata_fetcher_add(
fetcher.name,
fetcher.version,
json.dumps(map_optional(dict, fetcher.metadata)),
)
def metadata_fetcher_get(
self, name: str, version: str
) -> Optional[MetadataFetcher]:
fetcher = self._cql_runner.metadata_fetcher_get(name, version)
if fetcher:
return MetadataFetcher(
name=fetcher.name,
version=fetcher.version,
metadata=json.loads(fetcher.metadata),
)
else:
return None
def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None:
self.journal_writer.metadata_authority_add(authorities)
for authority in authorities:
self._cql_runner.metadata_authority_add(
authority.url,
authority.type.value,
json.dumps(map_optional(dict, authority.metadata)),
)
def metadata_authority_get(
self, type: MetadataAuthorityType, url: str
) -> Optional[MetadataAuthority]:
authority = self._cql_runner.metadata_authority_get(type.value, url)
if authority:
return MetadataAuthority(
type=MetadataAuthorityType(authority.type),
url=authority.url,
metadata=json.loads(authority.metadata),
)
else:
return None
def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
"""Do nothing
"""
return None
def flush(self, object_types: Optional[List[str]] = None) -> Dict:
return {}
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
index 9f6559b2..4e41b8b3 100644
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -1,1268 +1,1267 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import bisect
import collections
import copy
import datetime
import itertools
import random
import re
from collections import defaultdict
from datetime import timedelta
from typing import (
Any,
Callable,
Dict,
Generic,
Hashable,
Iterable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
)
import attr
from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.identifiers import SWHID
from swh.model.model import (
BaseContent,
Content,
SkippedContent,
Directory,
Revision,
Release,
Snapshot,
OriginVisit,
OriginVisitStatus,
Origin,
SHA1_SIZE,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MetadataTargetType,
RawExtrinsicMetadata,
+ Sha1Git,
)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
from swh.storage.interface import ListOrder, PagedResult, VISIT_STATUSES
from swh.storage.objstorage import ObjStorage
from swh.storage.utils import now
from .converters import origin_url_to_sha1
from .exc import StorageArgumentException, HashCollision
from .utils import get_partition_bounds_bytes
from .writer import JournalWriter
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
SortedListItem = TypeVar("SortedListItem")
SortedListKey = TypeVar("SortedListKey")
FetcherKey = Tuple[str, str]
class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]):
data: List[Tuple[SortedListKey, SortedListItem]]
# https://github.com/python/mypy/issues/708
# key: Callable[[SortedListItem], SortedListKey]
def __init__(
self,
data: List[SortedListItem] = None,
key: Optional[Callable[[SortedListItem], SortedListKey]] = None,
):
if key is None:
def key(item):
return item
assert key is not None # for mypy
super().__init__(sorted((key(x), x) for x in data or []))
self.key: Callable[[SortedListItem], SortedListKey] = key
def add(self, item: SortedListItem):
k = self.key(item)
bisect.insort(self.data, (k, item))
def __iter__(self) -> Iterator[SortedListItem]:
for (k, item) in self.data:
yield item
def iter_from(self, start_key: Any) -> Iterator[SortedListItem]:
"""Returns an iterator over all the elements whose key is greater
or equal to `start_key`.
(This is an efficient equivalent to:
`(x for x in L if key(x) >= start_key)`)
"""
from_index = bisect.bisect_left(self.data, (start_key,))
for (k, item) in itertools.islice(self.data, from_index, None):
yield item
def iter_after(self, start_key: Any) -> Iterator[SortedListItem]:
"""Same as iter_from, but using a strict inequality."""
it = self.iter_from(start_key)
for item in it:
if self.key(item) > start_key: # type: ignore
yield item
break
yield from it
class InMemoryStorage:
def __init__(self, journal_writer=None):
self.reset()
self.journal_writer = JournalWriter(journal_writer)
def reset(self):
self._contents = {}
self._content_indexes = defaultdict(lambda: defaultdict(set))
self._skipped_contents = {}
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set))
self._directories = {}
self._revisions = {}
self._releases = {}
self._snapshots = {}
self._origins = {}
self._origins_by_sha1 = {}
self._origin_visits = {}
self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {}
self._persons = {}
# {object_type: {id: {authority: [metadata]}}}
self._raw_extrinsic_metadata: Dict[
MetadataTargetType,
Dict[
Union[str, SWHID],
Dict[
Hashable,
SortedList[
Tuple[datetime.datetime, FetcherKey], RawExtrinsicMetadata
],
],
],
] = defaultdict(
lambda: defaultdict(
lambda: defaultdict(
lambda: SortedList(
key=lambda x: (
x.discovery_date,
self._metadata_fetcher_key(x.fetcher),
)
)
)
)
) # noqa
self._metadata_fetchers: Dict[FetcherKey, MetadataFetcher] = {}
self._metadata_authorities: Dict[Hashable, MetadataAuthority] = {}
self._objects = defaultdict(list)
self._sorted_sha1s = SortedList[bytes, bytes]()
self.objstorage = ObjStorage({"cls": "memory", "args": {}})
def check_config(self, *, check_write):
return True
def _content_add(self, contents: List[Content], with_data: bool) -> Dict:
self.journal_writer.content_add(contents)
content_add = 0
if with_data:
summary = self.objstorage.content_add(
c for c in contents if c.status != "absent"
)
content_add_bytes = summary["content:add:bytes"]
for content in contents:
key = self._content_key(content)
if key in self._contents:
continue
for algorithm in DEFAULT_ALGORITHMS:
hash_ = content.get_hash(algorithm)
if hash_ in self._content_indexes[algorithm] and (
algorithm not in {"blake2s256", "sha256"}
):
colliding_content_hashes = []
# Add the already stored contents
for content_hashes_set in self._content_indexes[algorithm][hash_]:
hashes = dict(content_hashes_set)
colliding_content_hashes.append(hashes)
# Add the new colliding content
colliding_content_hashes.append(content.hashes())
raise HashCollision(algorithm, hash_, colliding_content_hashes)
for algorithm in DEFAULT_ALGORITHMS:
hash_ = content.get_hash(algorithm)
self._content_indexes[algorithm][hash_].add(key)
self._objects[content.sha1_git].append(("content", content.sha1))
self._contents[key] = content
self._sorted_sha1s.add(content.sha1)
self._contents[key] = attr.evolve(self._contents[key], data=None)
content_add += 1
summary = {
"content:add": content_add,
}
if with_data:
summary["content:add:bytes"] = content_add_bytes
return summary
def content_add(self, content: List[Content]) -> Dict:
content = [attr.evolve(c, ctime=now()) for c in content]
return self._content_add(content, with_data=True)
def content_update(self, content, keys=[]):
self.journal_writer.content_update(content)
for cont_update in content:
cont_update = cont_update.copy()
sha1 = cont_update.pop("sha1")
for old_key in self._content_indexes["sha1"][sha1]:
old_cont = self._contents.pop(old_key)
for algorithm in DEFAULT_ALGORITHMS:
hash_ = old_cont.get_hash(algorithm)
self._content_indexes[algorithm][hash_].remove(old_key)
new_cont = attr.evolve(old_cont, **cont_update)
new_key = self._content_key(new_cont)
self._contents[new_key] = new_cont
for algorithm in DEFAULT_ALGORITHMS:
hash_ = new_cont.get_hash(algorithm)
self._content_indexes[algorithm][hash_].add(new_key)
def content_add_metadata(self, content: List[Content]) -> Dict:
return self._content_add(content, with_data=False)
def content_get(self, content):
# FIXME: Make this method support slicing the `data`.
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX
)
yield from self.objstorage.content_get(content)
def content_get_range(self, start, end, limit=1000):
if limit is None:
raise StorageArgumentException("limit should not be None")
sha1s = (
(sha1, content_key)
for sha1 in self._sorted_sha1s.iter_from(start)
for content_key in self._content_indexes["sha1"][sha1]
)
matched = []
next_content = None
for sha1, key in sha1s:
if sha1 > end:
break
if len(matched) >= limit:
next_content = sha1
break
matched.append(self._contents[key].to_dict())
return {
"contents": matched,
"next": next_content,
}
def content_get_partition(
self,
partition_id: int,
nb_partitions: int,
limit: int = 1000,
page_token: str = None,
):
if limit is None:
raise StorageArgumentException("limit should not be None")
(start, end) = get_partition_bounds_bytes(
partition_id, nb_partitions, SHA1_SIZE
)
if page_token:
start = hash_to_bytes(page_token)
if end is None:
end = b"\xff" * SHA1_SIZE
result = self.content_get_range(start, end, limit)
result2 = {
"contents": result["contents"],
"next_page_token": None,
}
if result["next"]:
result2["next_page_token"] = hash_to_hex(result["next"])
return result2
def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]:
result: Dict = {sha1: [] for sha1 in contents}
for sha1 in contents:
if sha1 in self._content_indexes["sha1"]:
objs = self._content_indexes["sha1"][sha1]
# only 1 element as content_add_metadata would have raised a
# hash collision otherwise
for key in objs:
d = self._contents[key].to_dict()
del d["ctime"]
if "data" in d:
del d["data"]
result[sha1].append(d)
return result
def content_find(self, content):
if not set(content).intersection(DEFAULT_ALGORITHMS):
raise StorageArgumentException(
"content keys must contain at least one of: %s"
% ", ".join(sorted(DEFAULT_ALGORITHMS))
)
found = []
for algo in DEFAULT_ALGORITHMS:
hash = content.get(algo)
if hash and hash in self._content_indexes[algo]:
found.append(self._content_indexes[algo][hash])
if not found:
return []
keys = list(set.intersection(*found))
return [self._contents[key].to_dict() for key in keys]
def content_missing(self, content, key_hash="sha1"):
for cont in content:
for (algo, hash_) in cont.items():
if algo not in DEFAULT_ALGORITHMS:
continue
if hash_ not in self._content_indexes.get(algo, []):
yield cont[key_hash]
break
else:
for result in self.content_find(cont):
if result["status"] == "missing":
yield cont[key_hash]
def content_missing_per_sha1(self, contents):
for content in contents:
if content not in self._content_indexes["sha1"]:
yield content
def content_missing_per_sha1_git(self, contents):
for content in contents:
if content not in self._content_indexes["sha1_git"]:
yield content
- def content_get_random(self):
+ def content_get_random(self) -> Sha1Git:
return random.choice(list(self._content_indexes["sha1_git"]))
def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict:
self.journal_writer.skipped_content_add(contents)
summary = {"skipped_content:add": 0}
missing_contents = self.skipped_content_missing([c.hashes() for c in contents])
missing = {self._content_key(c) for c in missing_contents}
contents = [c for c in contents if self._content_key(c) in missing]
for content in contents:
key = self._content_key(content)
for algo in DEFAULT_ALGORITHMS:
if content.get_hash(algo):
self._skipped_content_indexes[algo][content.get_hash(algo)].add(key)
self._skipped_contents[key] = content
summary["skipped_content:add"] += 1
return summary
def skipped_content_add(self, content: List[SkippedContent]) -> Dict:
content = [attr.evolve(c, ctime=now()) for c in content]
return self._skipped_content_add(content)
def skipped_content_missing(self, contents):
for content in contents:
matches = list(self._skipped_contents.values())
for (algorithm, key) in self._content_key(content):
if algorithm == "blake2s256":
continue
# Filter out skipped contents with the same hash
matches = [
match for match in matches if match.get_hash(algorithm) == key
]
# if none of the contents match
if not matches:
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS}
def directory_add(self, directories: List[Directory]) -> Dict:
directories = [dir_ for dir_ in directories if dir_.id not in self._directories]
self.journal_writer.directory_add(directories)
count = 0
for directory in directories:
count += 1
self._directories[directory.id] = directory
self._objects[directory.id].append(("directory", directory.id))
return {"directory:add": count}
def directory_missing(self, directories):
for id in directories:
if id not in self._directories:
yield id
def _join_dentry_to_content(self, dentry):
keys = (
"status",
"sha1",
"sha1_git",
"sha256",
"length",
)
ret = dict.fromkeys(keys)
ret.update(dentry)
if ret["type"] == "file":
# TODO: Make it able to handle more than one content
content = self.content_find({"sha1_git": ret["target"]})
if content:
content = content[0]
for key in keys:
ret[key] = content[key]
return ret
def _directory_ls(self, directory_id, recursive, prefix=b""):
if directory_id in self._directories:
for entry in self._directories[directory_id].entries:
ret = self._join_dentry_to_content(entry.to_dict())
ret["name"] = prefix + ret["name"]
ret["dir_id"] = directory_id
yield ret
if recursive and ret["type"] == "dir":
yield from self._directory_ls(
ret["target"], True, prefix + ret["name"] + b"/"
)
def directory_ls(self, directory, recursive=False):
yield from self._directory_ls(directory, recursive)
def directory_entry_get_by_path(self, directory, paths):
return self._directory_entry_get_by_path(directory, paths, b"")
- def directory_get_random(self):
- if not self._directories:
- return None
+ def directory_get_random(self) -> Sha1Git:
return random.choice(list(self._directories))
def _directory_entry_get_by_path(self, directory, paths, prefix):
if not paths:
return
contents = list(self.directory_ls(directory))
if not contents:
return
def _get_entry(entries, name):
for entry in entries:
if entry["name"] == name:
entry = entry.copy()
entry["name"] = prefix + entry["name"]
return entry
first_item = _get_entry(contents, paths[0])
if len(paths) == 1:
return first_item
if not first_item or first_item["type"] != "dir":
return
return self._directory_entry_get_by_path(
first_item["target"], paths[1:], prefix + paths[0] + b"/"
)
def revision_add(self, revisions: List[Revision]) -> Dict:
revisions = [rev for rev in revisions if rev.id not in self._revisions]
self.journal_writer.revision_add(revisions)
count = 0
for revision in revisions:
revision = attr.evolve(
revision,
committer=self._person_add(revision.committer),
author=self._person_add(revision.author),
)
self._revisions[revision.id] = revision
self._objects[revision.id].append(("revision", revision.id))
count += 1
return {"revision:add": count}
def revision_missing(self, revisions):
for id in revisions:
if id not in self._revisions:
yield id
def revision_get(self, revisions):
for id in revisions:
if id in self._revisions:
yield self._revisions.get(id).to_dict()
else:
yield None
def _get_parent_revs(self, rev_id, seen, limit):
if limit and len(seen) >= limit:
return
if rev_id in seen or rev_id not in self._revisions:
return
seen.add(rev_id)
yield self._revisions[rev_id].to_dict()
for parent in self._revisions[rev_id].parents:
yield from self._get_parent_revs(parent, seen, limit)
def revision_log(self, revisions, limit=None):
seen = set()
for rev_id in revisions:
yield from self._get_parent_revs(rev_id, seen, limit)
def revision_shortlog(self, revisions, limit=None):
yield from (
(rev["id"], rev["parents"]) for rev in self.revision_log(revisions, limit)
)
- def revision_get_random(self):
+ def revision_get_random(self) -> Sha1Git:
return random.choice(list(self._revisions))
def release_add(self, releases: List[Release]) -> Dict:
to_add = []
for rel in releases:
if rel.id not in self._releases and rel not in to_add:
to_add.append(rel)
self.journal_writer.release_add(to_add)
for rel in to_add:
if rel.author:
self._person_add(rel.author)
self._objects[rel.id].append(("release", rel.id))
self._releases[rel.id] = rel
return {"release:add": len(to_add)}
def release_missing(self, releases):
yield from (rel for rel in releases if rel not in self._releases)
def release_get(self, releases):
for rel_id in releases:
if rel_id in self._releases:
yield self._releases[rel_id].to_dict()
else:
yield None
- def release_get_random(self):
+ def release_get_random(self) -> Sha1Git:
return random.choice(list(self._releases))
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
count = 0
snapshots = [snap for snap in snapshots if snap.id not in self._snapshots]
for snapshot in snapshots:
self.journal_writer.snapshot_add([snapshot])
self._snapshots[snapshot.id] = snapshot
self._objects[snapshot.id].append(("snapshot", snapshot.id))
count += 1
return {"snapshot:add": count}
def snapshot_missing(self, snapshots):
for id in snapshots:
if id not in self._snapshots:
yield id
def snapshot_get(self, snapshot_id):
return self.snapshot_get_branches(snapshot_id)
def snapshot_get_by_origin_visit(self, origin, visit):
origin_url = self._get_origin_url(origin)
if not origin_url:
return
if origin_url not in self._origins or visit > len(
self._origin_visits[origin_url]
):
return None
visit = self._origin_visit_get_updated(origin_url, visit)
snapshot_id = visit["snapshot"]
if snapshot_id:
return self.snapshot_get(snapshot_id)
else:
return None
def snapshot_count_branches(self, snapshot_id):
snapshot = self._snapshots[snapshot_id]
return collections.Counter(
branch.target_type.value if branch else None
for branch in snapshot.branches.values()
)
def snapshot_get_branches(
self, snapshot_id, branches_from=b"", branches_count=1000, target_types=None
):
snapshot = self._snapshots.get(snapshot_id)
if snapshot is None:
return None
sorted_branches = sorted(snapshot.branches.items())
sorted_branch_names = [k for (k, v) in sorted_branches]
from_index = bisect.bisect_left(sorted_branch_names, branches_from)
if target_types:
next_branch = None
branches = {}
for (branch_name, branch) in sorted_branches:
if branch_name in sorted_branch_names[from_index:]:
if branch and branch.target_type.value in target_types:
if len(branches) < branches_count:
branches[branch_name] = branch
else:
next_branch = branch_name
break
else:
# As there is no 'target_types', we can do that much faster
to_index = from_index + branches_count
returned_branch_names = frozenset(sorted_branch_names[from_index:to_index])
branches = dict(
(branch_name, branch)
for (branch_name, branch) in snapshot.branches.items()
if branch_name in returned_branch_names
)
if to_index >= len(sorted_branch_names):
next_branch = None
else:
next_branch = sorted_branch_names[to_index]
branches = {
name: branch.to_dict() if branch else None
for (name, branch) in branches.items()
}
return {
"id": snapshot_id,
"branches": branches,
"next_branch": next_branch,
}
- def snapshot_get_random(self):
+ def snapshot_get_random(self) -> Sha1Git:
return random.choice(list(self._snapshots))
def object_find_by_sha1_git(self, ids):
ret = {}
for id_ in ids:
objs = self._objects.get(id_, [])
ret[id_] = [{"sha1_git": id_, "type": obj[0],} for obj in objs]
return ret
def _convert_origin(self, t):
if t is None:
return None
return t.to_dict()
def origin_get_one(self, origin_url: str) -> Optional[Origin]:
return self._origins.get(origin_url)
def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]:
return [self.origin_get_one(origin_url) for origin_url in origins]
def origin_get_by_sha1(self, sha1s):
return [self._convert_origin(self._origins_by_sha1.get(sha1)) for sha1 in sha1s]
def origin_list(
self, page_token: Optional[str] = None, limit: int = 100
) -> PagedResult[Origin]:
origin_urls = sorted(self._origins)
from_ = bisect.bisect_left(origin_urls, page_token) if page_token else 0
next_page_token = None
# Take one more origin so we can reuse it as the next page token if any
origins = [Origin(url=url) for url in origin_urls[from_ : from_ + limit + 1]]
if len(origins) > limit:
# last origin id is the next page token
next_page_token = str(origins[-1].url)
# excluding that origin from the result to respect the limit size
origins = origins[:limit]
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
def origin_search(
self,
url_pattern: str,
page_token: Optional[str] = None,
limit: int = 50,
regexp: bool = False,
with_visit: bool = False,
) -> PagedResult[Origin]:
next_page_token = None
offset = int(page_token) if page_token else 0
origins = self._origins.values()
if regexp:
pat = re.compile(url_pattern)
origins = [orig for orig in origins if pat.search(orig.url)]
else:
origins = [orig for orig in origins if url_pattern in orig.url]
if with_visit:
filtered_origins = []
for orig in origins:
visits = (
self._origin_visit_get_updated(ov.origin, ov.visit)
for ov in self._origin_visits[orig.url]
)
for ov in visits:
snapshot = ov["snapshot"]
if snapshot and snapshot in self._snapshots:
filtered_origins.append(orig)
break
else:
filtered_origins = origins
# Take one more origin so we can reuse it as the next page token if any
origins = filtered_origins[offset : offset + limit + 1]
if len(origins) > limit:
# next offset
next_page_token = str(offset + limit)
# excluding that origin from the result to respect the limit size
origins = origins[:limit]
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
def origin_count(
self, url_pattern: str, regexp: bool = False, with_visit: bool = False
) -> int:
actual_page = self.origin_search(
url_pattern, regexp=regexp, with_visit=with_visit, limit=len(self._origins),
)
assert actual_page.next_page_token is None
return len(actual_page.results)
def origin_add(self, origins: List[Origin]) -> Dict[str, int]:
added = 0
for origin in origins:
if origin.url not in self._origins:
self.origin_add_one(origin)
added += 1
return {"origin:add": added}
def origin_add_one(self, origin: Origin) -> str:
if origin.url not in self._origins:
self.journal_writer.origin_add([origin])
self._origins[origin.url] = origin
self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin
self._origin_visits[origin.url] = []
self._objects[origin.url].append(("origin", origin.url))
return origin.url
def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
for visit in visits:
origin = self.origin_get_one(visit.origin)
if not origin: # Cannot add a visit without an origin
raise StorageArgumentException("Unknown origin %s", visit.origin)
all_visits = []
for visit in visits:
origin_url = visit.origin
if origin_url in self._origins:
origin = self._origins[origin_url]
if visit.visit:
self.journal_writer.origin_visit_add([visit])
while len(self._origin_visits[origin_url]) < visit.visit:
self._origin_visits[origin_url].append(None)
self._origin_visits[origin_url][visit.visit - 1] = visit
else:
# visit ids are in the range [1, +inf[
visit_id = len(self._origin_visits[origin_url]) + 1
visit = attr.evolve(visit, visit=visit_id)
self.journal_writer.origin_visit_add([visit])
self._origin_visits[origin_url].append(visit)
visit_key = (origin_url, visit.visit)
self._objects[visit_key].append(("origin_visit", None))
assert visit.visit is not None
self._origin_visit_status_add_one(
OriginVisitStatus(
origin=visit.origin,
visit=visit.visit,
date=visit.date,
status="created",
snapshot=None,
)
)
all_visits.append(visit)
return all_visits
def _origin_visit_status_add_one(self, visit_status: OriginVisitStatus) -> None:
"""Add an origin visit status without checks. If already present, do nothing.
"""
self.journal_writer.origin_visit_status_add([visit_status])
visit_key = (visit_status.origin, visit_status.visit)
self._origin_visit_statuses.setdefault(visit_key, [])
visit_statuses = self._origin_visit_statuses[visit_key]
if visit_status not in visit_statuses:
visit_statuses.append(visit_status)
def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus],) -> None:
# First round to check existence (fail early if any is ko)
for visit_status in visit_statuses:
origin_url = self.origin_get_one(visit_status.origin)
if not origin_url:
raise StorageArgumentException(f"Unknown origin {visit_status.origin}")
for visit_status in visit_statuses:
self._origin_visit_status_add_one(visit_status)
def _origin_visit_status_get_latest(
self, origin: str, visit_id: int
) -> Tuple[OriginVisit, OriginVisitStatus]:
"""Return a tuple of OriginVisit, latest associated OriginVisitStatus.
"""
assert visit_id >= 1
visit = self._origin_visits[origin][visit_id - 1]
assert visit is not None
visit_key = (origin, visit_id)
visit_update = max(self._origin_visit_statuses[visit_key], key=lambda v: v.date)
return visit, visit_update
def _origin_visit_get_updated(self, origin: str, visit_id: int) -> Dict[str, Any]:
"""Merge origin visit and latest origin visit status
"""
visit, visit_update = self._origin_visit_status_get_latest(origin, visit_id)
assert visit is not None and visit_update is not None
return {
# default to the values in visit
**visit.to_dict(),
# override with the last update
**visit_update.to_dict(),
# but keep the date of the creation of the origin visit
"date": visit.date,
}
def origin_visit_get(
self,
origin: str,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
) -> PagedResult[OriginVisit]:
next_page_token = None
page_token = page_token or "0"
if not isinstance(order, ListOrder):
raise StorageArgumentException("order must be a ListOrder value")
if not isinstance(page_token, str):
raise StorageArgumentException("page_token must be a string.")
visit_from = int(page_token)
origin_url = self._get_origin_url(origin)
extra_limit = limit + 1
visits = sorted(
self._origin_visits.get(origin_url, []),
key=lambda v: v.visit,
reverse=(order == ListOrder.DESC),
)
if visit_from > 0 and order == ListOrder.ASC:
visits = [v for v in visits if v.visit > visit_from]
elif visit_from > 0 and order == ListOrder.DESC:
visits = [v for v in visits if v.visit < visit_from]
visits = visits[:extra_limit]
assert len(visits) <= extra_limit
if len(visits) == extra_limit:
visits = visits[:limit]
next_page_token = str(visits[-1].visit)
return PagedResult(results=visits, next_page_token=next_page_token)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime
) -> Optional[OriginVisit]:
origin_url = self._get_origin_url(origin)
if origin_url in self._origin_visits:
visits = self._origin_visits[origin_url]
visit = min(visits, key=lambda v: (abs(v.date - visit_date), -v.visit))
return visit
return None
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]:
origin_url = self._get_origin_url(origin)
if origin_url in self._origin_visits and visit <= len(
self._origin_visits[origin_url]
):
found_visit, _ = self._origin_visit_status_get_latest(origin, visit)
return found_visit
return None
def origin_visit_get_latest(
self,
origin: str,
type: Optional[str] = None,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
) -> Optional[OriginVisit]:
if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES):
raise StorageArgumentException(
f"Unknown allowed statuses {','.join(allowed_statuses)}, only "
f"{','.join(VISIT_STATUSES)} authorized"
)
ori = self._origins.get(origin)
if not ori:
return None
visits = sorted(
self._origin_visits[ori.url], key=lambda v: (v.date, v.visit), reverse=True,
)
for visit in visits:
if type is not None and visit.type != type:
continue
visit_statuses = self._origin_visit_statuses[origin, visit.visit]
if allowed_statuses is not None:
visit_statuses = [
vs for vs in visit_statuses if vs.status in allowed_statuses
]
if require_snapshot:
visit_statuses = [vs for vs in visit_statuses if vs.snapshot]
if visit_statuses: # we found visit statuses matching criteria
visit_status = max(visit_statuses, key=lambda vs: (vs.date, vs.visit))
assert visit.origin == visit_status.origin
assert visit.visit == visit_status.visit
return visit
return None
def origin_visit_status_get(
self,
origin: str,
visit: int,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
) -> PagedResult[OriginVisitStatus]:
next_page_token = None
date_from = None
if page_token is not None:
date_from = datetime.datetime.fromisoformat(page_token)
visit_statuses = sorted(
self._origin_visit_statuses.get((origin, visit), []),
key=lambda v: v.date,
reverse=(order == ListOrder.DESC),
)
if date_from is not None:
if order == ListOrder.ASC:
visit_statuses = [v for v in visit_statuses if v.date >= date_from]
elif order == ListOrder.DESC:
visit_statuses = [v for v in visit_statuses if v.date <= date_from]
# Take one more visit status so we can reuse it as the next page token if any
visit_statuses = visit_statuses[: limit + 1]
if len(visit_statuses) > limit:
# last visit status date is the next page token
next_page_token = str(visit_statuses[-1].date)
# excluding that visit status from the result to respect the limit size
visit_statuses = visit_statuses[:limit]
return PagedResult(results=visit_statuses, next_page_token=next_page_token)
def origin_visit_status_get_latest(
self,
origin_url: str,
visit: int,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
) -> Optional[OriginVisitStatus]:
if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES):
raise StorageArgumentException(
f"Unknown allowed statuses {','.join(allowed_statuses)}, only "
f"{','.join(VISIT_STATUSES)} authorized"
)
ori = self._origins.get(origin_url)
if not ori:
return None
visit_key = (origin_url, visit)
visits = self._origin_visit_statuses.get(visit_key)
if not visits:
return None
if allowed_statuses is not None:
visits = [visit for visit in visits if visit.status in allowed_statuses]
if require_snapshot:
visits = [visit for visit in visits if visit.snapshot]
visit_status = max(visits, key=lambda v: (v.date, v.visit), default=None)
return visit_status
def _select_random_origin_visit_by_type(self, type: str) -> str:
while True:
url = random.choice(list(self._origin_visits.keys()))
random_origin_visits = self._origin_visits[url]
if random_origin_visits[0].type == type:
return url
def origin_visit_status_get_random(
self, type: str
) -> Optional[Tuple[OriginVisit, OriginVisitStatus]]:
url = self._select_random_origin_visit_by_type(type)
random_origin_visits = copy.deepcopy(self._origin_visits[url])
random_origin_visits.reverse()
back_in_the_day = now() - timedelta(weeks=12) # 3 months back
# This should be enough for tests
for visit in random_origin_visits:
origin_visit, latest_visit_status = self._origin_visit_status_get_latest(
url, visit.visit
)
assert latest_visit_status is not None
if (
origin_visit.date > back_in_the_day
and latest_visit_status.status == "full"
):
return origin_visit, latest_visit_status
else:
return None
def stat_counters(self):
keys = (
"content",
"directory",
"origin",
"origin_visit",
"person",
"release",
"revision",
"skipped_content",
"snapshot",
)
stats = {key: 0 for key in keys}
stats.update(
collections.Counter(
obj_type
for (obj_type, obj_id) in itertools.chain(*self._objects.values())
)
)
return stats
def refresh_stat_counters(self):
pass
def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None:
self.journal_writer.raw_extrinsic_metadata_add(metadata)
for metadata_entry in metadata:
authority_key = self._metadata_authority_key(metadata_entry.authority)
if authority_key not in self._metadata_authorities:
raise StorageArgumentException(
f"Unknown authority {metadata_entry.authority}"
)
fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher)
if fetcher_key not in self._metadata_fetchers:
raise StorageArgumentException(
f"Unknown fetcher {metadata_entry.fetcher}"
)
raw_extrinsic_metadata_list = self._raw_extrinsic_metadata[
metadata_entry.type
][metadata_entry.id][authority_key]
for existing_raw_extrinsic_metadata in raw_extrinsic_metadata_list:
if (
self._metadata_fetcher_key(existing_raw_extrinsic_metadata.fetcher)
== fetcher_key
and existing_raw_extrinsic_metadata.discovery_date
== metadata_entry.discovery_date
):
# Duplicate of an existing one; ignore it.
break
else:
raw_extrinsic_metadata_list.add(metadata_entry)
def raw_extrinsic_metadata_get(
self,
type: MetadataTargetType,
id: Union[str, SWHID],
authority: MetadataAuthority,
after: Optional[datetime.datetime] = None,
page_token: Optional[bytes] = None,
limit: int = 1000,
) -> PagedResult[RawExtrinsicMetadata]:
authority_key = self._metadata_authority_key(authority)
if type == MetadataTargetType.ORIGIN:
if isinstance(id, SWHID):
raise StorageArgumentException(
f"raw_extrinsic_metadata_get called with type='origin', "
f"but provided id is an SWHID: {id!r}"
)
else:
if not isinstance(id, SWHID):
raise StorageArgumentException(
f"raw_extrinsic_metadata_get called with type!='origin', "
f"but provided id is not an SWHID: {id!r}"
)
if page_token is not None:
(after_time, after_fetcher) = msgpack_loads(base64.b64decode(page_token))
after_fetcher = tuple(after_fetcher)
if after is not None and after > after_time:
raise StorageArgumentException(
"page_token is inconsistent with the value of 'after'."
)
entries = self._raw_extrinsic_metadata[type][id][authority_key].iter_after(
(after_time, after_fetcher)
)
elif after is not None:
entries = self._raw_extrinsic_metadata[type][id][authority_key].iter_from(
(after,)
)
entries = (entry for entry in entries if entry.discovery_date > after)
else:
entries = iter(self._raw_extrinsic_metadata[type][id][authority_key])
if limit:
entries = itertools.islice(entries, 0, limit + 1)
results = []
for entry in entries:
entry_authority = self._metadata_authorities[
self._metadata_authority_key(entry.authority)
]
entry_fetcher = self._metadata_fetchers[
self._metadata_fetcher_key(entry.fetcher)
]
if after:
assert entry.discovery_date > after
results.append(
attr.evolve(
entry,
authority=attr.evolve(entry_authority, metadata=None),
fetcher=attr.evolve(entry_fetcher, metadata=None),
)
)
if len(results) > limit:
results.pop()
assert len(results) == limit
last_result = results[-1]
next_page_token: Optional[str] = base64.b64encode(
msgpack_dumps(
(
last_result.discovery_date,
self._metadata_fetcher_key(last_result.fetcher),
)
)
).decode()
else:
next_page_token = None
return PagedResult(next_page_token=next_page_token, results=results,)
def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None:
self.journal_writer.metadata_fetcher_add(fetchers)
for fetcher in fetchers:
if fetcher.metadata is None:
raise StorageArgumentException(
"MetadataFetcher.metadata may not be None in metadata_fetcher_add."
)
key = self._metadata_fetcher_key(fetcher)
if key not in self._metadata_fetchers:
self._metadata_fetchers[key] = fetcher
def metadata_fetcher_get(
self, name: str, version: str
) -> Optional[MetadataFetcher]:
return self._metadata_fetchers.get(
self._metadata_fetcher_key(MetadataFetcher(name=name, version=version))
)
def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None:
self.journal_writer.metadata_authority_add(authorities)
for authority in authorities:
if authority.metadata is None:
raise StorageArgumentException(
"MetadataAuthority.metadata may not be None in "
"metadata_authority_add."
)
key = self._metadata_authority_key(authority)
self._metadata_authorities[key] = authority
def metadata_authority_get(
self, type: MetadataAuthorityType, url: str
) -> Optional[MetadataAuthority]:
return self._metadata_authorities.get(
self._metadata_authority_key(MetadataAuthority(type=type, url=url))
)
def _get_origin_url(self, origin):
if isinstance(origin, str):
return origin
else:
raise TypeError("origin must be a string.")
def _person_add(self, person):
key = ("person", person.fullname)
if key not in self._objects:
self._persons[person.fullname] = person
self._objects[key].append(key)
return self._persons[person.fullname]
@staticmethod
def _content_key(content):
""" A stable key and the algorithm for a content"""
if isinstance(content, BaseContent):
content = content.to_dict()
return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS))
@staticmethod
def _metadata_fetcher_key(fetcher: MetadataFetcher) -> FetcherKey:
return (fetcher.name, fetcher.version)
@staticmethod
def _metadata_authority_key(authority: MetadataAuthority) -> Hashable:
return (authority.type, authority.url)
def diff_directories(self, from_dir, to_dir, track_renaming=False):
raise NotImplementedError("InMemoryStorage.diff_directories")
def diff_revisions(self, from_rev, to_rev, track_renaming=False):
raise NotImplementedError("InMemoryStorage.diff_revisions")
def diff_revision(self, revision, track_renaming=False):
raise NotImplementedError("InMemoryStorage.diff_revision")
def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
"""Do nothing
"""
return None
def flush(self, object_types: Optional[List[str]] = None) -> Dict:
return {}
diff --git a/swh/storage/interface.py b/swh/storage/interface.py
index 9971e966..6e117841 100644
--- a/swh/storage/interface.py
+++ b/swh/storage/interface.py
@@ -1,1290 +1,1291 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple, TypeVar, Union
from swh.core.api import remote_api_endpoint
from swh.core.api.classes import PagedResult as CorePagedResult
from swh.model.identifiers import SWHID
from swh.model.model import (
Content,
Directory,
Origin,
OriginVisit,
OriginVisitStatus,
Revision,
Release,
Snapshot,
SkippedContent,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MetadataTargetType,
RawExtrinsicMetadata,
+ Sha1Git,
)
class ListOrder(Enum):
"""Specifies the order for paginated endpoints returning sorted results."""
ASC = "asc"
DESC = "desc"
TResult = TypeVar("TResult")
PagedResult = CorePagedResult[TResult, str]
# TODO: Make it an enum (too much impact)
VISIT_STATUSES = ["created", "ongoing", "full", "partial"]
def deprecated(f):
f.deprecated_endpoint = True
return f
class StorageInterface:
@remote_api_endpoint("check_config")
def check_config(self, *, check_write):
"""Check that the storage is configured and ready to go."""
...
@remote_api_endpoint("content/add")
def content_add(self, content: List[Content]) -> Dict:
"""Add content blobs to the storage
Args:
contents (iterable): iterable of dictionaries representing
individual pieces of content to add. Each dictionary has the
following keys:
- data (bytes): the actual content
- length (int): content length
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum
- status (str): one of visible, hidden
Raises:
The following exceptions can occur:
- HashCollision in case of collision
- Any other exceptions raise by the db
In case of errors, some of the content may have been stored in
the DB and in the objstorage.
Since additions to both idempotent, that should not be a problem.
Returns:
Summary dict with the following keys and associated values:
content:add: New contents added
content:add:bytes: Sum of the contents' length data
"""
...
@remote_api_endpoint("content/update")
def content_update(self, content, keys=[]):
"""Update content blobs to the storage. Does nothing for unknown
contents or skipped ones.
Args:
content (iterable): iterable of dictionaries representing
individual pieces of content to update. Each dictionary has the
following keys:
- data (bytes): the actual content
- length (int): content length (default: -1)
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum
- status (str): one of visible, hidden, absent
keys (list): List of keys (str) whose values needs an update, e.g.,
new hash column
"""
...
@remote_api_endpoint("content/add_metadata")
def content_add_metadata(self, content: List[Content]) -> Dict:
"""Add content metadata to the storage (like `content_add`, but
without inserting to the objstorage).
Args:
content (iterable): iterable of dictionaries representing
individual pieces of content to add. Each dictionary has the
following keys:
- length (int): content length (default: -1)
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum
- status (str): one of visible, hidden, absent
- reason (str): if status = absent, the reason why
- origin (int): if status = absent, the origin we saw the
content in
- ctime (datetime): time of insertion in the archive
Returns:
Summary dict with the following key and associated values:
content:add: New contents added
skipped_content:add: New skipped contents (no data) added
"""
...
@remote_api_endpoint("content/data")
def content_get(self, content):
"""Retrieve in bulk contents and their data.
This generator yields exactly as many items than provided sha1
identifiers, but callers should not assume this will always be true.
It may also yield `None` values in case an object was not found.
Args:
content: iterables of sha1
Yields:
Dict[str, bytes]: Generates streams of contents as dict with their
raw data:
- sha1 (bytes): content id
- data (bytes): content's raw data
Raises:
ValueError in case of too much contents are required.
cf. BULK_BLOCK_CONTENT_LEN_MAX
"""
...
@deprecated
@remote_api_endpoint("content/range")
def content_get_range(self, start, end, limit=1000):
"""Retrieve contents within range [start, end] bound by limit.
Note that this function may return more than one blob per hash. The
limit is enforced with multiplicity (ie. two blobs with the same hash
will count twice toward the limit).
Args:
**start** (bytes): Starting identifier range (expected smaller
than end)
**end** (bytes): Ending identifier range (expected larger
than start)
**limit** (int): Limit result (default to 1000)
Returns:
a dict with keys:
- contents [dict]: iterable of contents in between the range.
- next (bytes): There remains content in the range
starting from this next sha1
"""
...
@remote_api_endpoint("content/partition")
def content_get_partition(
self,
partition_id: int,
nb_partitions: int,
limit: int = 1000,
page_token: str = None,
):
"""Splits contents into nb_partitions, and returns one of these based on
partition_id (which must be in [0, nb_partitions-1])
There is no guarantee on how the partitioning is done, or the
result order.
Args:
partition_id (int): index of the partition to fetch
nb_partitions (int): total number of partitions to split into
limit (int): Limit result (default to 1000)
page_token (Optional[str]): opaque token used for pagination.
Returns:
a dict with keys:
- contents (List[dict]): iterable of contents in the partition.
- **next_page_token** (Optional[str]): opaque token to be used as
`page_token` for retrieving the next page. if absent, there is
no more pages to gather.
"""
...
@remote_api_endpoint("content/metadata")
def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]:
"""Retrieve content metadata in bulk
Args:
content: iterable of content identifiers (sha1)
Returns:
a dict with keys the content's sha1 and the associated value
either the existing content's metadata or None if the content does
not exist.
"""
...
@remote_api_endpoint("content/missing")
def content_missing(self, content, key_hash="sha1"):
"""List content missing from storage
Args:
content ([dict]): iterable of dictionaries whose keys are
either 'length' or an item of
:data:`swh.model.hashutil.ALGORITHMS`;
mapped to the corresponding checksum
(or length).
key_hash (str): name of the column to use as hash id
result (default: 'sha1')
Returns:
iterable ([bytes]): missing content ids (as per the
key_hash column)
Raises:
TODO: an exception when we get a hash collision.
"""
...
@remote_api_endpoint("content/missing/sha1")
def content_missing_per_sha1(self, contents):
"""List content missing from storage based only on sha1.
Args:
contents: List of sha1 to check for absence.
Returns:
iterable: missing ids
Raises:
TODO: an exception when we get a hash collision.
"""
...
@remote_api_endpoint("content/missing/sha1_git")
def content_missing_per_sha1_git(self, contents):
"""List content missing from storage based only on sha1_git.
Args:
contents (List): An iterable of content id (sha1_git)
Yields:
missing contents sha1_git
"""
...
@remote_api_endpoint("content/present")
def content_find(self, content):
"""Find a content hash in db.
Args:
content: a dictionary representing one content hash, mapping
checksum algorithm names (see swh.model.hashutil.ALGORITHMS) to
checksum values
Returns:
a triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
Raises:
ValueError: in case the key of the dictionary is not sha1, sha1_git
nor sha256.
"""
...
@remote_api_endpoint("content/get_random")
- def content_get_random(self):
+ def content_get_random(self) -> Sha1Git:
"""Finds a random content id.
Returns:
a sha1_git
"""
...
@remote_api_endpoint("content/skipped/add")
def skipped_content_add(self, content: List[SkippedContent]) -> Dict:
"""Add contents to the skipped_content list, which contains
(partial) information about content missing from the archive.
Args:
contents (iterable): iterable of dictionaries representing
individual pieces of content to add. Each dictionary has the
following keys:
- length (Optional[int]): content length (default: -1)
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum; each is optional
- status (str): must be "absent"
- reason (str): the reason why the content is absent
- origin (int): if status = absent, the origin we saw the
content in
Raises:
The following exceptions can occur:
- HashCollision in case of collision
- Any other exceptions raise by the backend
In case of errors, some content may have been stored in
the DB and in the objstorage.
Since additions to both idempotent, that should not be a problem.
Returns:
Summary dict with the following key and associated values:
skipped_content:add: New skipped contents (no data) added
"""
...
@remote_api_endpoint("content/skipped/missing")
def skipped_content_missing(self, contents):
"""List skipped_content missing from storage
Args:
content: iterable of dictionaries containing the data for each
checksum algorithm.
Returns:
iterable: missing signatures
"""
...
@remote_api_endpoint("directory/add")
def directory_add(self, directories: List[Directory]) -> Dict:
"""Add directories to the storage
Args:
directories (iterable): iterable of dictionaries representing the
individual directories to add. Each dict has the following
keys:
- id (sha1_git): the id of the directory to add
- entries (list): list of dicts for each entry in the
directory. Each dict has the following keys:
- name (bytes)
- type (one of 'file', 'dir', 'rev'): type of the
directory entry (file, directory, revision)
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
Returns:
Summary dict of keys with associated count as values:
directory:add: Number of directories actually added
"""
...
@remote_api_endpoint("directory/missing")
def directory_missing(self, directories):
"""List directories missing from storage
Args:
directories (iterable): an iterable of directory ids
Yields:
missing directory ids
"""
...
@remote_api_endpoint("directory/ls")
def directory_ls(self, directory, recursive=False):
"""Get entries for one directory.
Args:
- directory: the directory to list entries from.
- recursive: if flag on, this list recursively from this directory.
Returns:
List of entries for such directory.
If `recursive=True`, names in the path of a dir/file not at the
root are concatenated with a slash (`/`).
"""
...
@remote_api_endpoint("directory/path")
def directory_entry_get_by_path(self, directory, paths):
"""Get the directory entry (either file or dir) from directory with path.
Args:
- directory: sha1 of the top level directory
- paths: path to lookup from the top level directory. From left
(top) to right (bottom).
Returns:
The corresponding directory entry if found, None otherwise.
"""
...
@remote_api_endpoint("directory/get_random")
- def directory_get_random(self):
+ def directory_get_random(self) -> Sha1Git:
"""Finds a random directory id.
Returns:
a sha1_git
"""
...
@remote_api_endpoint("revision/add")
def revision_add(self, revisions: List[Revision]) -> Dict:
"""Add revisions to the storage
Args:
revisions (List[dict]): iterable of dictionaries representing
the individual revisions to add. Each dict has the following
keys:
- **id** (:class:`sha1_git`): id of the revision to add
- **date** (:class:`dict`): date the revision was written
- **committer_date** (:class:`dict`): date the revision got
added to the origin
- **type** (one of 'git', 'tar'): type of the
revision added
- **directory** (:class:`sha1_git`): the directory the
revision points at
- **message** (:class:`bytes`): the message associated with
the revision
- **author** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
- **committer** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
- **metadata** (:class:`jsonb`): extra information as
dictionary
- **synthetic** (:class:`bool`): revision's nature (tarball,
directory creates synthetic revision`)
- **parents** (:class:`list[sha1_git]`): the parents of
this revision
date dictionaries have the form defined in :mod:`swh.model`.
Returns:
Summary dict of keys with associated count as values
revision:add: New objects actually stored in db
"""
...
@remote_api_endpoint("revision/missing")
def revision_missing(self, revisions):
"""List revisions missing from storage
Args:
revisions (iterable): revision ids
Yields:
missing revision ids
"""
...
@remote_api_endpoint("revision")
def revision_get(self, revisions):
"""Get all revisions from storage
Args:
revisions: an iterable of revision ids
Returns:
iterable: an iterable of revisions as dictionaries (or None if the
revision doesn't exist)
"""
...
@remote_api_endpoint("revision/log")
def revision_log(self, revisions, limit=None):
"""Fetch revision entry from the given root revisions.
Args:
revisions: array of root revision to lookup
limit: limitation on the output result. Default to None.
Yields:
List of revision log from such revisions root.
"""
...
@remote_api_endpoint("revision/shortlog")
def revision_shortlog(self, revisions, limit=None):
"""Fetch the shortlog for the given revisions
Args:
revisions: list of root revisions to lookup
limit: depth limitation for the output
Yields:
a list of (id, parents) tuples.
"""
...
@remote_api_endpoint("revision/get_random")
- def revision_get_random(self):
+ def revision_get_random(self) -> Sha1Git:
"""Finds a random revision id.
Returns:
a sha1_git
"""
...
@remote_api_endpoint("release/add")
def release_add(self, releases: List[Release]) -> Dict:
"""Add releases to the storage
Args:
releases (List[dict]): iterable of dictionaries representing
the individual releases to add. Each dict has the following
keys:
- **id** (:class:`sha1_git`): id of the release to add
- **revision** (:class:`sha1_git`): id of the revision the
release points to
- **date** (:class:`dict`): the date the release was made
- **name** (:class:`bytes`): the name of the release
- **comment** (:class:`bytes`): the comment associated with
the release
- **author** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
Returns:
Summary dict of keys with associated count as values
release:add: New objects contents actually stored in db
"""
...
@remote_api_endpoint("release/missing")
def release_missing(self, releases):
"""List releases missing from storage
Args:
releases: an iterable of release ids
Returns:
a list of missing release ids
"""
...
@remote_api_endpoint("release")
def release_get(self, releases):
"""Given a list of sha1, return the releases's information
Args:
releases: list of sha1s
Yields:
dicts with the same keys as those given to `release_add`
(or ``None`` if a release does not exist)
"""
...
@remote_api_endpoint("release/get_random")
- def release_get_random(self):
+ def release_get_random(self) -> Sha1Git:
"""Finds a random release id.
Returns:
a sha1_git
"""
...
@remote_api_endpoint("snapshot/add")
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
"""Add snapshots to the storage.
Args:
snapshot ([dict]): the snapshots to add, containing the
following keys:
- **id** (:class:`bytes`): id of the snapshot
- **branches** (:class:`dict`): branches the snapshot contains,
mapping the branch name (:class:`bytes`) to the branch target,
itself a :class:`dict` (or ``None`` if the branch points to an
unknown object)
- **target_type** (:class:`str`): one of ``content``,
``directory``, ``revision``, ``release``,
``snapshot``, ``alias``
- **target** (:class:`bytes`): identifier of the target
(currently a ``sha1_git`` for all object kinds, or the name
of the target branch for aliases)
Raises:
ValueError: if the origin or visit id does not exist.
Returns:
Summary dict of keys with associated count as values
snapshot:add: Count of object actually stored in db
"""
...
@remote_api_endpoint("snapshot/missing")
def snapshot_missing(self, snapshots):
"""List snapshots missing from storage
Args:
snapshots (iterable): an iterable of snapshot ids
Yields:
missing snapshot ids
"""
...
@remote_api_endpoint("snapshot")
def snapshot_get(self, snapshot_id):
"""Get the content, possibly partial, of a snapshot with the given id
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
snapshot_id (bytes): identifier of the snapshot
Returns:
dict: a dict with three keys:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
...
@remote_api_endpoint("snapshot/by_origin_visit")
def snapshot_get_by_origin_visit(self, origin, visit):
"""Get the content, possibly partial, of a snapshot for the given origin visit
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
origin (int): the origin identifier
visit (int): the visit identifier
Returns:
dict: None if the snapshot does not exist;
a dict with three keys otherwise:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
...
@remote_api_endpoint("snapshot/count_branches")
def snapshot_count_branches(self, snapshot_id):
"""Count the number of branches in the snapshot with the given id
Args:
snapshot_id (bytes): identifier of the snapshot
Returns:
dict: A dict whose keys are the target types of branches and
values their corresponding amount
"""
...
@remote_api_endpoint("snapshot/get_branches")
def snapshot_get_branches(
self, snapshot_id, branches_from=b"", branches_count=1000, target_types=None
):
"""Get the content, possibly partial, of a snapshot with the given id
The branches of the snapshot are iterated in the lexicographical
order of their names.
Args:
snapshot_id (bytes): identifier of the snapshot
branches_from (bytes): optional parameter used to skip branches
whose name is lesser than it before returning them
branches_count (int): optional parameter used to restrain
the amount of returned branches
target_types (list): optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
Returns:
dict: None if the snapshot does not exist;
a dict with three keys otherwise:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than
`branches_count` branches after `branches_from` included.
"""
...
@remote_api_endpoint("snapshot/get_random")
- def snapshot_get_random(self):
+ def snapshot_get_random(self) -> Sha1Git:
"""Finds a random snapshot id.
Returns:
a sha1_git
"""
...
@remote_api_endpoint("origin/visit/add")
def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
"""Add visits to storage. If the visits have no id, they will be created and assigned
one. The resulted visits are visits with their visit id set.
Args:
visits: List of OriginVisit objects to add
Raises:
StorageArgumentException if some origin visit reference unknown origins
Returns:
List[OriginVisit] stored
"""
...
@remote_api_endpoint("origin/visit_status/add")
def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus],) -> None:
"""Add origin visit statuses.
If there is already a status for the same origin and visit id at the same
date, the new one will be either dropped or will replace the existing one
(it is unspecified which one of these two behaviors happens).
Args:
visit_statuses: origin visit statuses to add
Raises: StorageArgumentException if the origin of the visit status is unknown
"""
...
@remote_api_endpoint("origin/visit/get")
def origin_visit_get(
self,
origin: str,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
) -> PagedResult[OriginVisit]:
"""Retrieve page of OriginVisit information.
Args:
origin: The visited origin
page_token: opaque string used to get the next results of a search
order: Order on visit id fields to list origin visits (default to asc)
limit: Number of visits to return
Raises:
StorageArgumentException if the order is wrong or the page_token type is
mistyped.
Returns: Page of OriginVisit data model objects. if next_page_token is None,
there is no longer data to retrieve.
"""
...
@remote_api_endpoint("origin/visit/find_by_date")
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime
) -> Optional[OriginVisit]:
"""Retrieves the origin visit whose date is closest to the provided
timestamp.
In case of a tie, the visit with largest id is selected.
Args:
origin: origin (URL)
visit_date: expected visit date
Returns:
A visit if found, None otherwise
"""
...
@remote_api_endpoint("origin/visit/getby")
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]:
"""Retrieve origin visit's information.
Args:
origin: origin (URL)
visit: visit id
Returns:
The information on that particular OriginVisit or None if
it does not exist
"""
...
@remote_api_endpoint("origin/visit/get_latest")
def origin_visit_get_latest(
self,
origin: str,
type: Optional[str] = None,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
) -> Optional[OriginVisit]:
"""Get the latest origin visit for the given origin, optionally
looking only for those with one of the given allowed_statuses
or for those with a snapshot.
Args:
origin: origin URL
type: Optional visit type to filter on (e.g git, tar, dsc, svn,
hg, npm, pypi, ...)
allowed_statuses: list of visit statuses considered
to find the latest visit. For instance,
``allowed_statuses=['full']`` will only consider visits that
have successfully run to completion.
require_snapshot: If True, only a visit with a snapshot
will be returned.
Raises:
StorageArgumentException if values for the allowed_statuses parameters
are unknown
Returns:
OriginVisit matching the criteria if found, None otherwise. Note that as
OriginVisit no longer held reference on the visit status or snapshot, you
may want to use origin_visit_status_get_latest for those information.
"""
...
@remote_api_endpoint("origin/visit_status/get")
def origin_visit_status_get(
self,
origin: str,
visit: int,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
) -> PagedResult[OriginVisitStatus]:
"""Retrieve page of OriginVisitStatus information.
Args:
origin: The visited origin
visit: The visit identifier
page_token: opaque string used to get the next results of a search
order: Order on visit status objects to list (default to asc)
limit: Number of visit statuses to return
Returns: Page of OriginVisitStatus data model objects. if next_page_token is
None, there is no longer data to retrieve.
"""
...
@remote_api_endpoint("origin/visit_status/get_latest")
def origin_visit_status_get_latest(
self,
origin_url: str,
visit: int,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
) -> Optional[OriginVisitStatus]:
"""Get the latest origin visit status for the given origin visit, optionally
looking only for those with one of the given allowed_statuses or with a
snapshot.
Args:
origin: origin URL
allowed_statuses: list of visit statuses considered to find the latest
visit. Possible values are {created, ongoing, partial, full}. For
instance, ``allowed_statuses=['full']`` will only consider visits that
have successfully run to completion.
require_snapshot: If True, only a visit with a snapshot
will be returned.
Raises:
StorageArgumentException if values for the allowed_statuses parameters
are unknown
Returns:
The OriginVisitStatus matching the criteria
"""
...
@remote_api_endpoint("origin/visit_status/get_random")
def origin_visit_status_get_random(
self, type: str
) -> Optional[Tuple[OriginVisit, OriginVisitStatus]]:
"""Randomly select one successful origin visit with <type>
made in the last 3 months.
Returns:
One random tuple of (OriginVisit, OriginVisitStatus) matching the
selection criteria
"""
...
@remote_api_endpoint("object/find_by_sha1_git")
def object_find_by_sha1_git(self, ids):
"""Return the objects found with the given ids.
Args:
ids: a generator of sha1_gits
Returns:
dict: a mapping from id to the list of objects found. Each object
found is itself a dict with keys:
- sha1_git: the input id
- type: the type of object found
"""
...
@remote_api_endpoint("origin/get")
def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]:
"""Return origins.
Args:
origin: a list of urls to find
Returns:
the list of associated existing origin model objects. The unknown origins
will be returned as None at the same index as the input.
"""
...
@remote_api_endpoint("origin/get_sha1")
def origin_get_by_sha1(self, sha1s):
"""Return origins, identified by the sha1 of their URLs.
Args:
sha1s (list[bytes]): a list of sha1s
Yields:
dicts containing origin information as returned
by :meth:`swh.storage.storage.Storage.origin_get`, or None if an
origin matching the sha1 is not found.
"""
...
@remote_api_endpoint("origin/list")
def origin_list(
self, page_token: Optional[str] = None, limit: int = 100
) -> PagedResult[Origin]:
"""Returns the list of origins
Args:
page_token: opaque token used for pagination.
limit: the maximum number of results to return
Returns:
Page of Origin data model objects. if next_page_token is None, there is
no longer data to retrieve.
"""
...
@remote_api_endpoint("origin/search")
def origin_search(
self,
url_pattern: str,
page_token: Optional[str] = None,
limit: int = 50,
regexp: bool = False,
with_visit: bool = False,
) -> PagedResult[Origin]:
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
The search is performed in a case insensitive way.
Args:
url_pattern: the string pattern to search for in origin urls
page_token: opaque token used for pagination
limit: the maximum number of found origins to return
regexp: if True, consider the provided pattern as a regular
expression and return origins whose urls match it
with_visit: if True, filter out origins with no visit
Yields:
PagedResult of Origin
"""
...
@deprecated
@remote_api_endpoint("origin/count")
def origin_count(
self, url_pattern: str, regexp: bool = False, with_visit: bool = False
) -> int:
"""Count origins whose urls contain a provided string pattern
or match a provided regular expression.
The pattern search in origin urls is performed in a case insensitive
way.
Args:
url_pattern (str): the string pattern to search for in origin urls
regexp (bool): if True, consider the provided pattern as a regular
expression and return origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
Returns:
int: The number of origins matching the search criterion.
"""
...
@remote_api_endpoint("origin/add_multi")
def origin_add(self, origins: List[Origin]) -> Dict[str, int]:
"""Add origins to the storage
Args:
origins: list of dictionaries representing the individual origins,
with the following keys:
- type: the origin type ('git', 'svn', 'deb', ...)
- url (bytes): the url the origin points to
Returns:
Summary dict of keys with associated count as values
origin:add: Count of object actually stored in db
"""
...
def stat_counters(self):
"""compute statistics about the number of tuples in various tables
Returns:
dict: a dictionary mapping textual labels (e.g., content) to
integer values (e.g., the number of tuples in table content)
"""
...
def refresh_stat_counters(self):
"""Recomputes the statistics for `stat_counters`."""
...
@remote_api_endpoint("raw_extrinsic_metadata/add")
def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None:
"""Add extrinsic metadata on objects (contents, directories, ...).
The authority and fetcher must be known to the storage before
using this endpoint.
If there is already metadata for the same object, authority,
fetcher, and at the same date; the new one will be either dropped or
will replace the existing one
(it is unspecified which one of these two behaviors happens).
Args:
metadata: iterable of RawExtrinsicMetadata objects to be inserted.
"""
...
@remote_api_endpoint("raw_extrinsic_metadata/get")
def raw_extrinsic_metadata_get(
self,
type: MetadataTargetType,
id: Union[str, SWHID],
authority: MetadataAuthority,
after: Optional[datetime.datetime] = None,
page_token: Optional[bytes] = None,
limit: int = 1000,
) -> PagedResult[RawExtrinsicMetadata]:
"""Retrieve list of all raw_extrinsic_metadata entries for the id
Args:
type: one of the values of swh.model.model.MetadataTargetType
id: an URL if type is 'origin', else a core SWHID
authority: a dict containing keys `type` and `url`.
after: minimum discovery_date for a result to be returned
page_token: opaque token, used to get the next page of results
limit: maximum number of results to be returned
Returns:
PagedResult of RawExtrinsicMetadata
"""
...
@remote_api_endpoint("metadata_fetcher/add")
def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> None:
"""Add new metadata fetchers to the storage.
Their `name` and `version` together are unique identifiers of this
fetcher; and `metadata` is an arbitrary dict of JSONable data
with information about this fetcher, which must not be `None`
(but may be empty).
Args:
fetchers: iterable of MetadataFetcher to be inserted
"""
...
@remote_api_endpoint("metadata_fetcher/get")
def metadata_fetcher_get(
self, name: str, version: str
) -> Optional[MetadataFetcher]:
"""Retrieve information about a fetcher
Args:
name: the name of the fetcher
version: version of the fetcher
Returns:
a MetadataFetcher object (with a non-None metadata field) if it is known,
else None.
"""
...
@remote_api_endpoint("metadata_authority/add")
def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None:
"""Add new metadata authorities to the storage.
Their `type` and `url` together are unique identifiers of this
authority; and `metadata` is an arbitrary dict of JSONable data
with information about this authority, which must not be `None`
(but may be empty).
Args:
authorities: iterable of MetadataAuthority to be inserted
"""
...
@remote_api_endpoint("metadata_authority/get")
def metadata_authority_get(
self, type: MetadataAuthorityType, url: str
) -> Optional[MetadataAuthority]:
"""Retrieve information about an authority
Args:
type: one of "deposit_client", "forge", or "registry"
url: unique URI identifying the authority
Returns:
a MetadataAuthority object (with a non-None metadata field) if it is known,
else None.
"""
...
@deprecated
@remote_api_endpoint("algos/diff_directories")
def diff_directories(self, from_dir, to_dir, track_renaming=False):
"""Compute the list of file changes introduced between two arbitrary
directories (insertion / deletion / modification / renaming of files).
Args:
from_dir (bytes): identifier of the directory to compare from
to_dir (bytes): identifier of the directory to compare to
track_renaming (bool): whether or not to track files renaming
Returns:
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`
for more details).
"""
...
@deprecated
@remote_api_endpoint("algos/diff_revisions")
def diff_revisions(self, from_rev, to_rev, track_renaming=False):
"""Compute the list of file changes introduced between two arbitrary
revisions (insertion / deletion / modification / renaming of files).
Args:
from_rev (bytes): identifier of the revision to compare from
to_rev (bytes): identifier of the revision to compare to
track_renaming (bool): whether or not to track files renaming
Returns:
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`
for more details).
"""
...
@deprecated
@remote_api_endpoint("algos/diff_revision")
def diff_revision(self, revision, track_renaming=False):
"""Compute the list of file changes introduced by a specific revision
(insertion / deletion / modification / renaming of files) by comparing
it against its first parent.
Args:
revision (bytes): identifier of the revision from which to
compute the list of files changes
track_renaming (bool): whether or not to track files renaming
Returns:
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`
for more details).
"""
...
@remote_api_endpoint("clear/buffer")
def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
"""For backend storages (pg, storage, in-memory), this is a noop operation. For proxy
storages (especially filter, buffer), this is an operation which cleans internal
state.
"""
@remote_api_endpoint("flush")
def flush(self, object_types: Optional[List[str]] = None) -> Dict:
"""For backend storages (pg, storage, in-memory), this is expected to be a noop
operation. For proxy storages (especially buffer), this is expected to trigger
actual writes to the backend.
"""
...
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
index 15a5f2f1..d87e52fb 100644
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -1,1403 +1,1404 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import contextlib
import datetime
import itertools
from collections import defaultdict
from contextlib import contextmanager
from typing import (
Counter,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)
import attr
import psycopg2
import psycopg2.pool
import psycopg2.errors
from swh.core.api.serializers import msgpack_loads, msgpack_dumps
from swh.model.identifiers import SWHID
from swh.model.model import (
Content,
Directory,
Origin,
OriginVisit,
OriginVisitStatus,
Revision,
Release,
SkippedContent,
+ Sha1Git,
Snapshot,
SHA1_SIZE,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MetadataTargetType,
RawExtrinsicMetadata,
)
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
from swh.storage.interface import ListOrder, PagedResult, VISIT_STATUSES
from swh.storage.objstorage import ObjStorage
from swh.storage.utils import now
from . import converters
from .common import db_transaction_generator, db_transaction
from .db import Db
from .exc import StorageArgumentException, StorageDBError, HashCollision
from .algos import diff
from .metrics import timed, send_metric, process_metrics
from .utils import get_partition_bounds_bytes, extract_collision_hash, map_optional
from .writer import JournalWriter
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
EMPTY_SNAPSHOT_ID = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e")
"""Identifier for the empty snapshot"""
VALIDATION_EXCEPTIONS = (
KeyError,
TypeError,
ValueError,
psycopg2.errors.CheckViolation,
psycopg2.errors.IntegrityError,
psycopg2.errors.InvalidTextRepresentation,
psycopg2.errors.NotNullViolation,
psycopg2.errors.NumericValueOutOfRange,
psycopg2.errors.UndefinedFunction, # (raised on wrong argument typs)
)
"""Exceptions raised by postgresql when validation of the arguments
failed."""
@contextlib.contextmanager
def convert_validation_exceptions():
"""Catches postgresql errors related to invalid arguments, and
re-raises a StorageArgumentException."""
try:
yield
except tuple(VALIDATION_EXCEPTIONS) as e:
raise StorageArgumentException(str(e))
class Storage:
"""SWH storage proxy, encompassing DB and object storage
"""
def __init__(
self, db, objstorage, min_pool_conns=1, max_pool_conns=10, journal_writer=None
):
"""
Args:
db_conn: either a libpq connection string, or a psycopg2 connection
obj_root: path to the root of the object storage
"""
try:
if isinstance(db, psycopg2.extensions.connection):
self._pool = None
self._db = Db(db)
else:
self._pool = psycopg2.pool.ThreadedConnectionPool(
min_pool_conns, max_pool_conns, db
)
self._db = None
except psycopg2.OperationalError as e:
raise StorageDBError(e)
self.journal_writer = JournalWriter(journal_writer)
self.objstorage = ObjStorage(objstorage)
def get_db(self):
if self._db:
return self._db
else:
return Db.from_pool(self._pool)
def put_db(self, db):
if db is not self._db:
db.put_conn()
@contextmanager
def db(self):
db = None
try:
db = self.get_db()
yield db
finally:
if db:
self.put_db(db)
@timed
@db_transaction()
def check_config(self, *, check_write, db=None, cur=None):
if not self.objstorage.check_config(check_write=check_write):
return False
# Check permissions on one of the tables
if check_write:
check = "INSERT"
else:
check = "SELECT"
cur.execute("select has_table_privilege(current_user, 'content', %s)", (check,))
return cur.fetchone()[0]
def _content_unique_key(self, hash, db):
"""Given a hash (tuple or dict), return a unique key from the
aggregation of keys.
"""
keys = db.content_hash_keys
if isinstance(hash, tuple):
return hash
return tuple([hash[k] for k in keys])
def _content_add_metadata(self, db, cur, content):
"""Add content to the postgresql database but not the object storage.
"""
# create temporary table for metadata injection
db.mktemp("content", cur)
db.copy_to(
(c.to_dict() for c in content), "tmp_content", db.content_add_keys, cur
)
# move metadata in place
try:
db.content_add_from_temp(cur)
except psycopg2.IntegrityError as e:
if e.diag.sqlstate == "23505" and e.diag.table_name == "content":
message_detail = e.diag.message_detail
if message_detail:
hash_name, hash_id = extract_collision_hash(message_detail)
collision_contents_hashes = [
c.hashes() for c in content if c.get_hash(hash_name) == hash_id
]
else:
constraint_to_hash_name = {
"content_pkey": "sha1",
"content_sha1_git_idx": "sha1_git",
"content_sha256_idx": "sha256",
}
hash_name = constraint_to_hash_name.get(e.diag.constraint_name)
hash_id = None
collision_contents_hashes = None
raise HashCollision(
hash_name, hash_id, collision_contents_hashes
) from None
else:
raise
@timed
@process_metrics
def content_add(self, content: List[Content]) -> Dict:
ctime = now()
contents = [attr.evolve(c, ctime=ctime) for c in content]
objstorage_summary = self.objstorage.content_add(contents)
with self.db() as db:
with db.transaction() as cur:
missing = list(
self.content_missing(
map(Content.to_dict, contents),
key_hash="sha1_git",
db=db,
cur=cur,
)
)
contents = [c for c in contents if c.sha1_git in missing]
self.journal_writer.content_add(contents)
self._content_add_metadata(db, cur, contents)
return {
"content:add": len(contents),
"content:add:bytes": objstorage_summary["content:add:bytes"],
}
@timed
@db_transaction()
def content_update(self, content, keys=[], db=None, cur=None):
# TODO: Add a check on input keys. How to properly implement
# this? We don't know yet the new columns.
self.journal_writer.content_update(content)
db.mktemp("content", cur)
select_keys = list(set(db.content_get_metadata_keys).union(set(keys)))
with convert_validation_exceptions():
db.copy_to(content, "tmp_content", select_keys, cur)
db.content_update_from_temp(keys_to_update=keys, cur=cur)
@timed
@process_metrics
@db_transaction()
def content_add_metadata(self, content: List[Content], db=None, cur=None) -> Dict:
missing = self.content_missing(
(c.to_dict() for c in content), key_hash="sha1_git", db=db, cur=cur,
)
contents = [c for c in content if c.sha1_git in missing]
self.journal_writer.content_add_metadata(contents)
self._content_add_metadata(db, cur, contents)
return {
"content:add": len(contents),
}
@timed
def content_get(self, content):
# FIXME: Make this method support slicing the `data`.
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise StorageArgumentException(
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX
)
yield from self.objstorage.content_get(content)
@timed
@db_transaction()
def content_get_range(self, start, end, limit=1000, db=None, cur=None):
if limit is None:
raise StorageArgumentException("limit should not be None")
contents = []
next_content = None
for counter, content_row in enumerate(
db.content_get_range(start, end, limit + 1, cur)
):
content = dict(zip(db.content_get_metadata_keys, content_row))
if counter >= limit:
# take the last commit for the next page starting from this
next_content = content["sha1"]
break
contents.append(content)
return {
"contents": contents,
"next": next_content,
}
@timed
def content_get_partition(
self,
partition_id: int,
nb_partitions: int,
limit: int = 1000,
page_token: str = None,
):
if limit is None:
raise StorageArgumentException("limit should not be None")
(start, end) = get_partition_bounds_bytes(
partition_id, nb_partitions, SHA1_SIZE
)
if page_token:
start = hash_to_bytes(page_token)
if end is None:
end = b"\xff" * SHA1_SIZE
result = self.content_get_range(start, end, limit)
result2 = {
"contents": result["contents"],
"next_page_token": None,
}
if result["next"]:
result2["next_page_token"] = hash_to_hex(result["next"])
return result2
@timed
@db_transaction(statement_timeout=500)
def content_get_metadata(
self, contents: List[bytes], db=None, cur=None
) -> Dict[bytes, List[Dict]]:
result: Dict[bytes, List[Dict]] = {sha1: [] for sha1 in contents}
for row in db.content_get_metadata_from_sha1s(contents, cur):
content_meta = dict(zip(db.content_get_metadata_keys, row))
result[content_meta["sha1"]].append(content_meta)
return result
@timed
@db_transaction_generator()
def content_missing(self, content, key_hash="sha1", db=None, cur=None):
keys = db.content_hash_keys
if key_hash not in keys:
raise StorageArgumentException("key_hash should be one of %s" % keys)
key_hash_idx = keys.index(key_hash)
if not content:
return
for obj in db.content_missing_from_list(content, cur):
yield obj[key_hash_idx]
@timed
@db_transaction_generator()
def content_missing_per_sha1(self, contents, db=None, cur=None):
for obj in db.content_missing_per_sha1(contents, cur):
yield obj[0]
@timed
@db_transaction_generator()
def content_missing_per_sha1_git(self, contents, db=None, cur=None):
for obj in db.content_missing_per_sha1_git(contents, cur):
yield obj[0]
@timed
@db_transaction()
def content_find(self, content, db=None, cur=None):
if not set(content).intersection(DEFAULT_ALGORITHMS):
raise StorageArgumentException(
"content keys must contain at least one of: "
"sha1, sha1_git, sha256, blake2s256"
)
contents = db.content_find(
sha1=content.get("sha1"),
sha1_git=content.get("sha1_git"),
sha256=content.get("sha256"),
blake2s256=content.get("blake2s256"),
cur=cur,
)
return [dict(zip(db.content_find_cols, content)) for content in contents]
@timed
@db_transaction()
- def content_get_random(self, db=None, cur=None):
+ def content_get_random(self, db=None, cur=None) -> Sha1Git:
return db.content_get_random(cur)
@staticmethod
def _skipped_content_normalize(d):
d = d.copy()
if d.get("status") is None:
d["status"] = "absent"
if d.get("length") is None:
d["length"] = -1
return d
def _skipped_content_add_metadata(self, db, cur, content: List[SkippedContent]):
origin_ids = db.origin_id_get_by_url([cont.origin for cont in content], cur=cur)
content = [
attr.evolve(c, origin=origin_id)
for (c, origin_id) in zip(content, origin_ids)
]
db.mktemp("skipped_content", cur)
db.copy_to(
[c.to_dict() for c in content],
"tmp_skipped_content",
db.skipped_content_keys,
cur,
)
# move metadata in place
db.skipped_content_add_from_temp(cur)
@timed
@process_metrics
@db_transaction()
def skipped_content_add(
self, content: List[SkippedContent], db=None, cur=None
) -> Dict:
ctime = now()
content = [attr.evolve(c, ctime=ctime) for c in content]
missing_contents = self.skipped_content_missing(
(c.to_dict() for c in content), db=db, cur=cur,
)
content = [
c
for c in content
if any(
all(
c.get_hash(algo) == missing_content.get(algo)
for algo in DEFAULT_ALGORITHMS
)
for missing_content in missing_contents
)
]
self.journal_writer.skipped_content_add(content)
self._skipped_content_add_metadata(db, cur, content)
return {
"skipped_content:add": len(content),
}
@timed
@db_transaction_generator()
def skipped_content_missing(self, contents, db=None, cur=None):
contents = list(contents)
for content in db.skipped_content_missing(contents, cur):
yield dict(zip(db.content_hash_keys, content))
@timed
@process_metrics
@db_transaction()
def directory_add(self, directories: List[Directory], db=None, cur=None) -> Dict:
summary = {"directory:add": 0}
dirs = set()
dir_entries: Dict[str, defaultdict] = {
"file": defaultdict(list),
"dir": defaultdict(list),
"rev": defaultdict(list),
}
for cur_dir in directories:
dir_id = cur_dir.id
dirs.add(dir_id)
for src_entry in cur_dir.entries:
entry = src_entry.to_dict()
entry["dir_id"] = dir_id
dir_entries[entry["type"]][dir_id].append(entry)
dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur))
if not dirs_missing:
return summary
self.journal_writer.directory_add(
dir_ for dir_ in directories if dir_.id in dirs_missing
)
# Copy directory ids
dirs_missing_dict = ({"id": dir} for dir in dirs_missing)
db.mktemp("directory", cur)
db.copy_to(dirs_missing_dict, "tmp_directory", ["id"], cur)
# Copy entries
for entry_type, entry_list in dir_entries.items():
entries = itertools.chain.from_iterable(
entries_for_dir
for dir_id, entries_for_dir in entry_list.items()
if dir_id in dirs_missing
)
db.mktemp_dir_entry(entry_type)
db.copy_to(
entries,
"tmp_directory_entry_%s" % entry_type,
["target", "name", "perms", "dir_id"],
cur,
)
# Do the final copy
db.directory_add_from_temp(cur)
summary["directory:add"] = len(dirs_missing)
return summary
@timed
@db_transaction_generator()
def directory_missing(self, directories, db=None, cur=None):
for obj in db.directory_missing_from_list(directories, cur):
yield obj[0]
@timed
@db_transaction_generator(statement_timeout=20000)
def directory_ls(self, directory, recursive=False, db=None, cur=None):
if recursive:
res_gen = db.directory_walk(directory, cur=cur)
else:
res_gen = db.directory_walk_one(directory, cur=cur)
for line in res_gen:
yield dict(zip(db.directory_ls_cols, line))
@timed
@db_transaction(statement_timeout=2000)
def directory_entry_get_by_path(self, directory, paths, db=None, cur=None):
res = db.directory_entry_get_by_path(directory, paths, cur)
if res:
return dict(zip(db.directory_ls_cols, res))
@timed
@db_transaction()
- def directory_get_random(self, db=None, cur=None):
+ def directory_get_random(self, db=None, cur=None) -> Sha1Git:
return db.directory_get_random(cur)
@timed
@process_metrics
@db_transaction()
def revision_add(self, revisions: List[Revision], db=None, cur=None) -> Dict:
summary = {"revision:add": 0}
revisions_missing = set(
self.revision_missing(
set(revision.id for revision in revisions), db=db, cur=cur
)
)
if not revisions_missing:
return summary
db.mktemp_revision(cur)
revisions_filtered = [
revision for revision in revisions if revision.id in revisions_missing
]
self.journal_writer.revision_add(revisions_filtered)
revisions_filtered = list(map(converters.revision_to_db, revisions_filtered))
parents_filtered: List[bytes] = []
with convert_validation_exceptions():
db.copy_to(
revisions_filtered,
"tmp_revision",
db.revision_add_cols,
cur,
lambda rev: parents_filtered.extend(rev["parents"]),
)
db.revision_add_from_temp(cur)
db.copy_to(
parents_filtered,
"revision_history",
["id", "parent_id", "parent_rank"],
cur,
)
return {"revision:add": len(revisions_missing)}
@timed
@db_transaction_generator()
def revision_missing(self, revisions, db=None, cur=None):
if not revisions:
return
for obj in db.revision_missing_from_list(revisions, cur):
yield obj[0]
@timed
@db_transaction_generator(statement_timeout=1000)
def revision_get(self, revisions, db=None, cur=None):
for line in db.revision_get_from_list(revisions, cur):
data = converters.db_to_revision(dict(zip(db.revision_get_cols, line)))
if not data["type"]:
yield None
continue
yield data
@timed
@db_transaction_generator(statement_timeout=2000)
def revision_log(self, revisions, limit=None, db=None, cur=None):
for line in db.revision_log(revisions, limit, cur):
data = converters.db_to_revision(dict(zip(db.revision_get_cols, line)))
if not data["type"]:
yield None
continue
yield data
@timed
@db_transaction_generator(statement_timeout=2000)
def revision_shortlog(self, revisions, limit=None, db=None, cur=None):
yield from db.revision_shortlog(revisions, limit, cur)
@timed
@db_transaction()
- def revision_get_random(self, db=None, cur=None):
+ def revision_get_random(self, db=None, cur=None) -> Sha1Git:
return db.revision_get_random(cur)
@timed
@process_metrics
@db_transaction()
def release_add(self, releases: List[Release], db=None, cur=None) -> Dict:
summary = {"release:add": 0}
release_ids = set(release.id for release in releases)
releases_missing = set(self.release_missing(release_ids, db=db, cur=cur))
if not releases_missing:
return summary
db.mktemp_release(cur)
releases_filtered = [
release for release in releases if release.id in releases_missing
]
self.journal_writer.release_add(releases_filtered)
releases_filtered = list(map(converters.release_to_db, releases_filtered))
with convert_validation_exceptions():
db.copy_to(releases_filtered, "tmp_release", db.release_add_cols, cur)
db.release_add_from_temp(cur)
return {"release:add": len(releases_missing)}
@timed
@db_transaction_generator()
def release_missing(self, releases, db=None, cur=None):
if not releases:
return
for obj in db.release_missing_from_list(releases, cur):
yield obj[0]
@timed
@db_transaction_generator(statement_timeout=500)
def release_get(self, releases, db=None, cur=None):
for release in db.release_get_from_list(releases, cur):
data = converters.db_to_release(dict(zip(db.release_get_cols, release)))
yield data if data["target_type"] else None
@timed
@db_transaction()
- def release_get_random(self, db=None, cur=None):
+ def release_get_random(self, db=None, cur=None) -> Sha1Git:
return db.release_get_random(cur)
@timed
@process_metrics
@db_transaction()
def snapshot_add(self, snapshots: List[Snapshot], db=None, cur=None) -> Dict:
created_temp_table = False
count = 0
for snapshot in snapshots:
if not db.snapshot_exists(snapshot.id, cur):
if not created_temp_table:
db.mktemp_snapshot_branch(cur)
created_temp_table = True
with convert_validation_exceptions():
db.copy_to(
(
{
"name": name,
"target": info.target if info else None,
"target_type": (
info.target_type.value if info else None
),
}
for name, info in snapshot.branches.items()
),
"tmp_snapshot_branch",
["name", "target", "target_type"],
cur,
)
self.journal_writer.snapshot_add([snapshot])
db.snapshot_add(snapshot.id, cur)
count += 1
return {"snapshot:add": count}
@timed
@db_transaction_generator()
def snapshot_missing(self, snapshots, db=None, cur=None):
for obj in db.snapshot_missing_from_list(snapshots, cur):
yield obj[0]
@timed
@db_transaction(statement_timeout=2000)
def snapshot_get(self, snapshot_id, db=None, cur=None):
return self.snapshot_get_branches(snapshot_id, db=db, cur=cur)
@timed
@db_transaction(statement_timeout=2000)
def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None):
snapshot_id = db.snapshot_get_by_origin_visit(origin, visit, cur)
if snapshot_id:
return self.snapshot_get(snapshot_id, db=db, cur=cur)
return None
@timed
@db_transaction(statement_timeout=2000)
def snapshot_count_branches(self, snapshot_id, db=None, cur=None):
return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)])
@timed
@db_transaction(statement_timeout=2000)
def snapshot_get_branches(
self,
snapshot_id,
branches_from=b"",
branches_count=1000,
target_types=None,
db=None,
cur=None,
):
if snapshot_id == EMPTY_SNAPSHOT_ID:
return {
"id": snapshot_id,
"branches": {},
"next_branch": None,
}
branches = {}
next_branch = None
fetched_branches = list(
db.snapshot_get_by_id(
snapshot_id,
branches_from=branches_from,
branches_count=branches_count + 1,
target_types=target_types,
cur=cur,
)
)
for branch in fetched_branches[:branches_count]:
branch = dict(zip(db.snapshot_get_cols, branch))
del branch["snapshot_id"]
name = branch.pop("name")
if branch == {"target": None, "target_type": None}:
branch = None
branches[name] = branch
if len(fetched_branches) > branches_count:
branch = dict(zip(db.snapshot_get_cols, fetched_branches[-1]))
next_branch = branch["name"]
if branches:
return {
"id": snapshot_id,
"branches": branches,
"next_branch": next_branch,
}
return None
@timed
@db_transaction()
- def snapshot_get_random(self, db=None, cur=None):
+ def snapshot_get_random(self, db=None, cur=None) -> Sha1Git:
return db.snapshot_get_random(cur)
@timed
@db_transaction()
def origin_visit_add(
self, visits: List[OriginVisit], db=None, cur=None
) -> Iterable[OriginVisit]:
for visit in visits:
origin = self.origin_get([visit.origin], db=db, cur=cur)[0]
if not origin: # Cannot add a visit without an origin
raise StorageArgumentException("Unknown origin %s", visit.origin)
all_visits = []
nb_visits = 0
for visit in visits:
nb_visits += 1
if not visit.visit:
with convert_validation_exceptions():
visit_id = db.origin_visit_add(
visit.origin, visit.date, visit.type, cur=cur
)
visit = attr.evolve(visit, visit=visit_id)
else:
db.origin_visit_add_with_id(visit, cur=cur)
assert visit.visit is not None
all_visits.append(visit)
# Forced to write after for the case when the visit has no id
self.journal_writer.origin_visit_add([visit])
visit_status = OriginVisitStatus(
origin=visit.origin,
visit=visit.visit,
date=visit.date,
status="created",
snapshot=None,
)
self._origin_visit_status_add(visit_status, db=db, cur=cur)
send_metric("origin_visit:add", count=nb_visits, method_name="origin_visit")
return all_visits
def _origin_visit_status_add(
self, visit_status: OriginVisitStatus, db, cur
) -> None:
"""Add an origin visit status"""
self.journal_writer.origin_visit_status_add([visit_status])
db.origin_visit_status_add(visit_status, cur=cur)
send_metric(
"origin_visit_status:add", count=1, method_name="origin_visit_status"
)
@timed
@db_transaction()
def origin_visit_status_add(
self, visit_statuses: List[OriginVisitStatus], db=None, cur=None,
) -> None:
# First round to check existence (fail early if any is ko)
for visit_status in visit_statuses:
origin_url = self.origin_get([visit_status.origin], db=db, cur=cur)[0]
if not origin_url:
raise StorageArgumentException(f"Unknown origin {visit_status.origin}")
for visit_status in visit_statuses:
self._origin_visit_status_add(visit_status, db, cur)
@timed
@db_transaction()
def origin_visit_status_get_latest(
self,
origin_url: str,
visit: int,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
db=None,
cur=None,
) -> Optional[OriginVisitStatus]:
if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES):
raise StorageArgumentException(
f"Unknown allowed statuses {','.join(allowed_statuses)}, only "
f"{','.join(VISIT_STATUSES)} authorized"
)
row = db.origin_visit_status_get_latest(
origin_url, visit, allowed_statuses, require_snapshot, cur=cur
)
if not row:
return None
return OriginVisitStatus.from_dict(row)
@timed
@db_transaction(statement_timeout=500)
def origin_visit_get(
self,
origin: str,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
db=None,
cur=None,
) -> PagedResult[OriginVisit]:
page_token = page_token or "0"
if not isinstance(order, ListOrder):
raise StorageArgumentException("order must be a ListOrder value")
if not isinstance(page_token, str):
raise StorageArgumentException("page_token must be a string.")
next_page_token = None
visit_from = int(page_token)
visits: List[OriginVisit] = []
extra_limit = limit + 1
for row in db.origin_visit_get_range(
origin, visit_from=visit_from, order=order, limit=extra_limit, cur=cur
):
row_d = dict(zip(db.origin_visit_cols, row))
visits.append(
OriginVisit(
origin=row_d["origin"],
visit=row_d["visit"],
date=row_d["date"],
type=row_d["type"],
)
)
assert len(visits) <= extra_limit
if len(visits) == extra_limit:
visits = visits[:limit]
next_page_token = str(visits[-1].visit)
return PagedResult(results=visits, next_page_token=next_page_token)
@timed
@db_transaction(statement_timeout=500)
def origin_visit_find_by_date(
self, origin: str, visit_date: datetime.datetime, db=None, cur=None
) -> Optional[OriginVisit]:
row_d = db.origin_visit_find_by_date(origin, visit_date, cur=cur)
if not row_d:
return None
return OriginVisit(
origin=row_d["origin"],
visit=row_d["visit"],
date=row_d["date"],
type=row_d["type"],
)
@timed
@db_transaction(statement_timeout=500)
def origin_visit_get_by(
self, origin: str, visit: int, db=None, cur=None
) -> Optional[OriginVisit]:
row = db.origin_visit_get(origin, visit, cur)
if row:
row_d = dict(zip(db.origin_visit_get_cols, row))
return OriginVisit(
origin=row_d["origin"],
visit=row_d["visit"],
date=row_d["date"],
type=row_d["type"],
)
return None
@timed
@db_transaction(statement_timeout=4000)
def origin_visit_get_latest(
self,
origin: str,
type: Optional[str] = None,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
db=None,
cur=None,
) -> Optional[OriginVisit]:
if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES):
raise StorageArgumentException(
f"Unknown allowed statuses {','.join(allowed_statuses)}, only "
f"{','.join(VISIT_STATUSES)} authorized"
)
row = db.origin_visit_get_latest(
origin,
type=type,
allowed_statuses=allowed_statuses,
require_snapshot=require_snapshot,
cur=cur,
)
if row:
row_d = dict(zip(db.origin_visit_get_cols, row))
visit = OriginVisit(
origin=row_d["origin"],
visit=row_d["visit"],
date=row_d["date"],
type=row_d["type"],
)
return visit
return None
@timed
@db_transaction(statement_timeout=500)
def origin_visit_status_get(
self,
origin: str,
visit: int,
page_token: Optional[str] = None,
order: ListOrder = ListOrder.ASC,
limit: int = 10,
db=None,
cur=None,
) -> PagedResult[OriginVisitStatus]:
next_page_token = None
date_from = None
if page_token is not None:
date_from = datetime.datetime.fromisoformat(page_token)
visit_statuses: List[OriginVisitStatus] = []
# Take one more visit status so we can reuse it as the next page token if any
for row in db.origin_visit_status_get_range(
origin, visit, date_from=date_from, order=order, limit=limit + 1, cur=cur,
):
row_d = dict(zip(db.origin_visit_status_cols, row))
visit_statuses.append(
OriginVisitStatus(
origin=row_d["origin"],
visit=row_d["visit"],
date=row_d["date"],
status=row_d["status"],
snapshot=row_d["snapshot"],
metadata=row_d["metadata"],
)
)
if len(visit_statuses) > limit:
# last visit status date is the next page token
next_page_token = str(visit_statuses[-1].date)
# excluding that visit status from the result to respect the limit size
visit_statuses = visit_statuses[:limit]
return PagedResult(results=visit_statuses, next_page_token=next_page_token)
@timed
@db_transaction()
def origin_visit_status_get_random(
self, type: str, db=None, cur=None
) -> Optional[Tuple[OriginVisit, OriginVisitStatus]]:
row = db.origin_visit_get_random(type, cur)
if row is not None:
row_d = dict(zip(db.origin_visit_get_cols, row))
visit = OriginVisit(
origin=row_d["origin"],
visit=row_d["visit"],
date=row_d["date"],
type=row_d["type"],
)
visit_status = OriginVisitStatus(
origin=row_d["origin"],
visit=row_d["visit"],
date=row_d["date"],
status=row_d["status"],
metadata=row_d["metadata"],
snapshot=row_d["snapshot"],
)
return visit, visit_status
return None
@timed
@db_transaction(statement_timeout=2000)
def object_find_by_sha1_git(self, ids, db=None, cur=None):
ret = {id: [] for id in ids}
for retval in db.object_find_by_sha1_git(ids, cur=cur):
if retval[1]:
ret[retval[0]].append(
dict(zip(db.object_find_by_sha1_git_cols, retval))
)
return ret
@timed
@db_transaction(statement_timeout=500)
def origin_get(
self, origins: List[str], db=None, cur=None
) -> Iterable[Optional[Origin]]:
rows = db.origin_get_by_url(origins, cur)
result: List[Optional[Origin]] = []
for row in rows:
origin_d = dict(zip(db.origin_cols, row))
url = origin_d["url"]
result.append(None if url is None else Origin(url=url))
return result
@timed
@db_transaction_generator(statement_timeout=500)
def origin_get_by_sha1(self, sha1s, db=None, cur=None):
for line in db.origin_get_by_sha1(sha1s, cur):
if line[0] is not None:
yield dict(zip(db.origin_cols, line))
else:
yield None
@timed
@db_transaction_generator()
def origin_get_range(self, origin_from=1, origin_count=100, db=None, cur=None):
for origin in db.origin_get_range(origin_from, origin_count, cur):
yield dict(zip(db.origin_get_range_cols, origin))
@timed
@db_transaction()
def origin_list(
self, page_token: Optional[str] = None, limit: int = 100, *, db=None, cur=None
) -> PagedResult[Origin]:
page_token = page_token or "0"
if not isinstance(page_token, str):
raise StorageArgumentException("page_token must be a string.")
origin_from = int(page_token)
next_page_token = None
origins: List[Origin] = []
# Take one more origin so we can reuse it as the next page token if any
for row_d in self.origin_get_range(origin_from, limit + 1, db=db, cur=cur):
origins.append(Origin(url=row_d["url"]))
# keep the last_id for the pagination if needed
last_id = row_d["id"]
if len(origins) > limit: # data left for subsequent call
# last origin id is the next page token
next_page_token = str(last_id)
# excluding that origin from the result to respect the limit size
origins = origins[:limit]
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
@timed
@db_transaction()
def origin_search(
self,
url_pattern: str,
page_token: Optional[str] = None,
limit: int = 50,
regexp: bool = False,
with_visit: bool = False,
db=None,
cur=None,
) -> PagedResult[Origin]:
next_page_token = None
offset = int(page_token) if page_token else 0
origins = []
# Take one more origin so we can reuse it as the next page token if any
for origin in db.origin_search(
url_pattern, offset, limit + 1, regexp, with_visit, cur
):
row_d = dict(zip(db.origin_cols, origin))
origins.append(Origin(url=row_d["url"]))
if len(origins) > limit:
# next offset
next_page_token = str(offset + limit)
# excluding that origin from the result to respect the limit size
origins = origins[:limit]
assert len(origins) <= limit
return PagedResult(results=origins, next_page_token=next_page_token)
@timed
@db_transaction()
def origin_count(
self,
url_pattern: str,
regexp: bool = False,
with_visit: bool = False,
db=None,
cur=None,
) -> int:
return db.origin_count(url_pattern, regexp, with_visit, cur)
@timed
@process_metrics
@db_transaction()
def origin_add(self, origins: List[Origin], db=None, cur=None) -> Dict[str, int]:
urls = [o.url for o in origins]
known_origins = set(url for (url,) in db.origin_get_by_url(urls, cur))
# use lists here to keep origins sorted; some tests depend on this
to_add = [url for url in urls if url not in known_origins]
self.journal_writer.origin_add([Origin(url=url) for url in to_add])
added = 0
for url in to_add:
if db.origin_add(url, cur):
added += 1
return {"origin:add": added}
@db_transaction(statement_timeout=500)
def stat_counters(self, db=None, cur=None):
return {k: v for (k, v) in db.stat_counters()}
@db_transaction()
def refresh_stat_counters(self, db=None, cur=None):
keys = [
"content",
"directory",
"directory_entry_dir",
"directory_entry_file",
"directory_entry_rev",
"origin",
"origin_visit",
"person",
"release",
"revision",
"revision_history",
"skipped_content",
"snapshot",
]
for key in keys:
cur.execute("select * from swh_update_counter(%s)", (key,))
@db_transaction()
def raw_extrinsic_metadata_add(
self, metadata: List[RawExtrinsicMetadata], db, cur,
) -> None:
metadata = list(metadata)
self.journal_writer.raw_extrinsic_metadata_add(metadata)
counter = Counter[MetadataTargetType]()
for metadata_entry in metadata:
authority_id = self._get_authority_id(metadata_entry.authority, db, cur)
fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur)
db.raw_extrinsic_metadata_add(
type=metadata_entry.type.value,
id=str(metadata_entry.id),
discovery_date=metadata_entry.discovery_date,
authority_id=authority_id,
fetcher_id=fetcher_id,
format=metadata_entry.format,
metadata=metadata_entry.metadata,
origin=metadata_entry.origin,
visit=metadata_entry.visit,
snapshot=map_optional(str, metadata_entry.snapshot),
release=map_optional(str, metadata_entry.release),
revision=map_optional(str, metadata_entry.revision),
path=metadata_entry.path,
directory=map_optional(str, metadata_entry.directory),
cur=cur,
)
counter[metadata_entry.type] += 1
for (type, count) in counter.items():
send_metric(
f"{type.value}_metadata:add",
count=count,
method_name=f"{type.value}_metadata_add",
)
@db_transaction()
def raw_extrinsic_metadata_get(
self,
type: MetadataTargetType,
id: Union[str, SWHID],
authority: MetadataAuthority,
after: Optional[datetime.datetime] = None,
page_token: Optional[bytes] = None,
limit: int = 1000,
db=None,
cur=None,
) -> PagedResult[RawExtrinsicMetadata]:
if type == MetadataTargetType.ORIGIN:
if isinstance(id, SWHID):
raise StorageArgumentException(
f"raw_extrinsic_metadata_get called with type='origin', "
f"but provided id is an SWHID: {id!r}"
)
else:
if not isinstance(id, SWHID):
raise StorageArgumentException(
f"raw_extrinsic_metadata_get called with type!='origin', "
f"but provided id is not an SWHID: {id!r}"
)
if page_token:
(after_time, after_fetcher) = msgpack_loads(base64.b64decode(page_token))
if after and after_time < after:
raise StorageArgumentException(
"page_token is inconsistent with the value of 'after'."
)
else:
after_time = after
after_fetcher = None
authority_id = self._get_authority_id(authority, db, cur)
if not authority_id:
return PagedResult(next_page_token=None, results=[],)
rows = db.raw_extrinsic_metadata_get(
type, str(id), authority_id, after_time, after_fetcher, limit + 1, cur,
)
rows = [dict(zip(db.raw_extrinsic_metadata_get_cols, row)) for row in rows]
results = []
for row in rows:
assert str(id) == row["raw_extrinsic_metadata.id"]
results.append(converters.db_to_raw_extrinsic_metadata(row))
if len(results) > limit:
results.pop()
assert len(results) == limit
last_returned_row = rows[-2] # rows[-1] corresponds to the popped result
next_page_token: Optional[str] = base64.b64encode(
msgpack_dumps(
(
last_returned_row["discovery_date"],
last_returned_row["metadata_fetcher.id"],
)
)
).decode()
else:
next_page_token = None
return PagedResult(next_page_token=next_page_token, results=results,)
@timed
@db_transaction()
def metadata_fetcher_add(
self, fetchers: List[MetadataFetcher], db=None, cur=None
) -> None:
fetchers = list(fetchers)
self.journal_writer.metadata_fetcher_add(fetchers)
count = 0
for fetcher in fetchers:
if fetcher.metadata is None:
raise StorageArgumentException(
"MetadataFetcher.metadata may not be None in metadata_fetcher_add."
)
db.metadata_fetcher_add(
fetcher.name, fetcher.version, dict(fetcher.metadata), cur=cur
)
count += 1
send_metric("metadata_fetcher:add", count=count, method_name="metadata_fetcher")
@timed
@db_transaction(statement_timeout=500)
def metadata_fetcher_get(
self, name: str, version: str, db=None, cur=None
) -> Optional[MetadataFetcher]:
row = db.metadata_fetcher_get(name, version, cur=cur)
if not row:
return None
return MetadataFetcher.from_dict(dict(zip(db.metadata_fetcher_cols, row)))
@timed
@db_transaction()
def metadata_authority_add(
self, authorities: List[MetadataAuthority], db=None, cur=None
) -> None:
authorities = list(authorities)
self.journal_writer.metadata_authority_add(authorities)
count = 0
for authority in authorities:
if authority.metadata is None:
raise StorageArgumentException(
"MetadataAuthority.metadata may not be None in "
"metadata_authority_add."
)
db.metadata_authority_add(
authority.type.value, authority.url, dict(authority.metadata), cur=cur
)
count += 1
send_metric(
"metadata_authority:add", count=count, method_name="metadata_authority"
)
@timed
@db_transaction()
def metadata_authority_get(
self, type: MetadataAuthorityType, url: str, db=None, cur=None
) -> Optional[MetadataAuthority]:
row = db.metadata_authority_get(type.value, url, cur=cur)
if not row:
return None
return MetadataAuthority.from_dict(dict(zip(db.metadata_authority_cols, row)))
@timed
def diff_directories(self, from_dir, to_dir, track_renaming=False):
return diff.diff_directories(self, from_dir, to_dir, track_renaming)
@timed
def diff_revisions(self, from_rev, to_rev, track_renaming=False):
return diff.diff_revisions(self, from_rev, to_rev, track_renaming)
@timed
def diff_revision(self, revision, track_renaming=False):
return diff.diff_revision(self, revision, track_renaming)
def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
"""Do nothing
"""
return None
def flush(self, object_types: Optional[List[str]] = None) -> Dict:
return {}
def _get_authority_id(self, authority: MetadataAuthority, db, cur):
authority_id = db.metadata_authority_get_id(
authority.type.value, authority.url, cur
)
if not authority_id:
raise StorageArgumentException(f"Unknown authority {authority}")
return authority_id
def _get_fetcher_id(self, fetcher: MetadataFetcher, db, cur):
fetcher_id = db.metadata_fetcher_get_id(fetcher.name, fetcher.version, cur)
if not fetcher_id:
raise StorageArgumentException(f"Unknown fetcher {fetcher}")
return fetcher_id
diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py
index 99e5a64c..37de5f97 100644
--- a/swh/storage/tests/test_retry.py
+++ b/swh/storage/tests/test_retry.py
@@ -1,833 +1,824 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import attr
from unittest.mock import call
import psycopg2
import pytest
from swh.model.model import MetadataTargetType
from swh.storage.exc import HashCollision, StorageArgumentException
@pytest.fixture
def monkeypatch_sleep(monkeypatch, swh_storage):
"""In test context, we don't want to wait, make test faster
"""
from swh.storage.retry import RetryingProxyStorage
for method_name, method in RetryingProxyStorage.__dict__.items():
if "_add" in method_name or "_update" in method_name:
monkeypatch.setattr(method.retry, "sleep", lambda x: None)
return monkeypatch
@pytest.fixture
def fake_hash_collision(sample_data):
return HashCollision("sha1", "38762cf7f55934b34d179ae6a4c80cadccbb7f0a", [])
@pytest.fixture
def swh_storage_backend_config():
yield {
"cls": "pipeline",
"steps": [{"cls": "retry"}, {"cls": "memory"},],
}
def test_retrying_proxy_storage_content_add(swh_storage, sample_data):
"""Standard content_add works as before
"""
sample_content = sample_data.content
content = next(swh_storage.content_get([sample_content.sha1]))
assert not content
s = swh_storage.content_add([sample_content])
assert s == {
"content:add": 1,
"content:add:bytes": sample_content.length,
}
content = next(swh_storage.content_get([sample_content.sha1]))
assert content["sha1"] == sample_content.sha1
def test_retrying_proxy_storage_content_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add")
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("content already inserted"),
# ok then!
{"content:add": 1},
]
sample_content = sample_data.content
content = next(swh_storage.content_get([sample_content.sha1]))
assert not content
s = swh_storage.content_add([sample_content])
assert s == {"content:add": 1}
mock_memory.assert_has_calls(
[call([sample_content]), call([sample_content]), call([sample_content]),]
)
def test_retrying_proxy_swh_storage_content_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add")
mock_memory.side_effect = StorageArgumentException("Refuse to add content always!")
sample_content = sample_data.content
content = next(swh_storage.content_get([sample_content.sha1]))
assert not content
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.content_add([sample_content])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data):
"""Standard content_add_metadata works as before
"""
sample_content = sample_data.content
content = attr.evolve(sample_content, data=None)
pk = content.sha1
content_metadata = swh_storage.content_get_metadata([pk])
assert not content_metadata[pk]
s = swh_storage.content_add_metadata([content])
assert s == {
"content:add": 1,
}
content_metadata = swh_storage.content_get_metadata([pk])
assert len(content_metadata[pk]) == 1
assert content_metadata[pk][0]["sha1"] == pk
def test_retrying_proxy_storage_content_add_metadata_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.content_add_metadata"
)
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("content_metadata already inserted"),
# ok then!
{"content:add": 1},
]
sample_content = sample_data.content
content = attr.evolve(sample_content, data=None)
s = swh_storage.content_add_metadata([content])
assert s == {"content:add": 1}
mock_memory.assert_has_calls(
[call([content]), call([content]), call([content]),]
)
def test_retrying_proxy_swh_storage_content_add_metadata_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.content_add_metadata"
)
mock_memory.side_effect = StorageArgumentException(
"Refuse to add content_metadata!"
)
sample_content = sample_data.content
content = attr.evolve(sample_content, data=None)
pk = content.sha1
content_metadata = swh_storage.content_get_metadata([pk])
assert not content_metadata[pk]
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.content_add_metadata([content])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_skipped_content_add(swh_storage, sample_data):
"""Standard skipped_content_add works as before
"""
sample_content = sample_data.skipped_content
sample_content_dict = sample_content.to_dict()
skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
assert len(skipped_contents) == 1
s = swh_storage.skipped_content_add([sample_content])
assert s == {
"skipped_content:add": 1,
}
skipped_content = list(swh_storage.skipped_content_missing([sample_content_dict]))
assert len(skipped_content) == 0
def test_retrying_proxy_storage_skipped_content_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.skipped_content_add"
)
mock_memory.side_effect = [
# 1st & 2nd try goes ko
fake_hash_collision,
psycopg2.IntegrityError("skipped_content already inserted"),
# ok then!
{"skipped_content:add": 1},
]
sample_content = sample_data.skipped_content
s = swh_storage.skipped_content_add([sample_content])
assert s == {"skipped_content:add": 1}
mock_memory.assert_has_calls(
[call([sample_content]), call([sample_content]), call([sample_content]),]
)
def test_retrying_proxy_swh_storage_skipped_content_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.skipped_content_add"
)
mock_memory.side_effect = StorageArgumentException(
"Refuse to add content_metadata!"
)
sample_content = sample_data.skipped_content
sample_content_dict = sample_content.to_dict()
skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
assert len(skipped_contents) == 1
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.skipped_content_add([sample_content])
skipped_contents = list(swh_storage.skipped_content_missing([sample_content_dict]))
assert len(skipped_contents) == 1
assert mock_memory.call_count == 1
def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data):
"""Standard origin_visit_add works as before
"""
origin = sample_data.origin
visit = sample_data.origin_visit
assert visit.origin == origin.url
swh_storage.origin_add([origin])
origins = swh_storage.origin_visit_get(origin.url).results
assert not origins
origin_visit = swh_storage.origin_visit_add([visit])[0]
assert origin_visit.origin == origin.url
assert isinstance(origin_visit.visit, int)
actual_visit = swh_storage.origin_visit_get(origin.url).results[0]
assert actual_visit == visit
def test_retrying_proxy_swh_storage_origin_visit_add_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
origin = sample_data.origin
visit = sample_data.origin_visit
assert visit.origin == origin.url
swh_storage.origin_add([origin])
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add")
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("origin already inserted"),
# ok then!
[visit],
]
origins = swh_storage.origin_visit_get(origin.url).results
assert not origins
r = swh_storage.origin_visit_add([visit])
assert r == [visit]
mock_memory.assert_has_calls(
[call([visit]), call([visit]), call([visit]),]
)
def test_retrying_proxy_swh_storage_origin_visit_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add")
mock_memory.side_effect = StorageArgumentException("Refuse to add origin always!")
origin = sample_data.origin
visit = sample_data.origin_visit
assert visit.origin == origin.url
origins = swh_storage.origin_visit_get(origin.url).results
assert not origins
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.origin_visit_add([visit])
mock_memory.assert_has_calls(
[call([visit]),]
)
def test_retrying_proxy_storage_metadata_fetcher_add(swh_storage, sample_data):
"""Standard metadata_fetcher_add works as before
"""
fetcher = sample_data.metadata_fetcher
metadata_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
assert not metadata_fetcher
swh_storage.metadata_fetcher_add([fetcher])
actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
assert actual_fetcher == fetcher
def test_retrying_proxy_storage_metadata_fetcher_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
fetcher = sample_data.metadata_fetcher
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add"
)
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("metadata_fetcher already inserted"),
# ok then!
[fetcher],
]
actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
assert not actual_fetcher
swh_storage.metadata_fetcher_add([fetcher])
mock_memory.assert_has_calls(
[call([fetcher]), call([fetcher]), call([fetcher]),]
)
def test_retrying_proxy_swh_storage_metadata_fetcher_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.metadata_fetcher_add"
)
mock_memory.side_effect = StorageArgumentException(
"Refuse to add metadata_fetcher always!"
)
fetcher = sample_data.metadata_fetcher
actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version)
assert not actual_fetcher
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.metadata_fetcher_add([fetcher])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_metadata_authority_add(swh_storage, sample_data):
"""Standard metadata_authority_add works as before
"""
authority = sample_data.metadata_authority
assert not swh_storage.metadata_authority_get(authority.type, authority.url)
swh_storage.metadata_authority_add([authority])
actual_authority = swh_storage.metadata_authority_get(authority.type, authority.url)
assert actual_authority == authority
def test_retrying_proxy_storage_metadata_authority_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
authority = sample_data.metadata_authority
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.metadata_authority_add"
)
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("foo bar"),
# ok then!
None,
]
assert not swh_storage.metadata_authority_get(authority.type, authority.url)
swh_storage.metadata_authority_add([authority])
mock_memory.assert_has_calls(
[call([authority]), call([authority]), call([authority])]
)
def test_retrying_proxy_swh_storage_metadata_authority_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.metadata_authority_add"
)
mock_memory.side_effect = StorageArgumentException(
"Refuse to add authority_id always!"
)
authority = sample_data.metadata_authority
swh_storage.metadata_authority_get(authority.type, authority.url)
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.metadata_authority_add([authority])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_raw_extrinsic_metadata_add(swh_storage, sample_data):
"""Standard raw_extrinsic_metadata_add works as before
"""
origin = sample_data.origin
ori_meta = sample_data.origin_metadata1
assert origin.url == ori_meta.id
swh_storage.origin_add([origin])
swh_storage.metadata_authority_add([sample_data.metadata_authority])
swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher])
origin_metadata = swh_storage.raw_extrinsic_metadata_get(
MetadataTargetType.ORIGIN, ori_meta.id, ori_meta.authority
)
assert origin_metadata.next_page_token is None
assert not origin_metadata.results
swh_storage.raw_extrinsic_metadata_add([ori_meta])
origin_metadata = swh_storage.raw_extrinsic_metadata_get(
MetadataTargetType.ORIGIN, ori_meta.id, ori_meta.authority
)
assert origin_metadata
def test_retrying_proxy_storage_raw_extrinsic_metadata_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision,
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
origin = sample_data.origin
ori_meta = sample_data.origin_metadata1
assert origin.url == ori_meta.id
swh_storage.origin_add([origin])
swh_storage.metadata_authority_add([sample_data.metadata_authority])
swh_storage.metadata_fetcher_add([sample_data.metadata_fetcher])
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add"
)
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("foo bar"),
# ok then!
None,
]
# No exception raised as insertion finally came through
swh_storage.raw_extrinsic_metadata_add([ori_meta])
mock_memory.assert_has_calls(
[ # 3 calls, as long as error raised
call([ori_meta]),
call([ori_meta]),
call([ori_meta]),
]
)
def test_retrying_proxy_swh_storage_raw_extrinsic_metadata_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch(
"swh.storage.in_memory.InMemoryStorage.raw_extrinsic_metadata_add"
)
mock_memory.side_effect = StorageArgumentException("Refuse to add always!")
origin = sample_data.origin
ori_meta = sample_data.origin_metadata1
assert origin.url == ori_meta.id
swh_storage.origin_add([origin])
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.raw_extrinsic_metadata_add([ori_meta])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_directory_add(swh_storage, sample_data):
"""Standard directory_add works as before
"""
sample_dir = sample_data.directory
- directory = swh_storage.directory_get_random() # no directory
- assert not directory
-
s = swh_storage.directory_add([sample_dir])
assert s == {
"directory:add": 1,
}
directory_id = swh_storage.directory_get_random() # only 1
assert directory_id == sample_dir.id
def test_retrying_proxy_storage_directory_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add")
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("directory already inserted"),
# ok then!
{"directory:add": 1},
]
sample_dir = sample_data.directories[1]
- directory_id = swh_storage.directory_get_random() # no directory
- assert not directory_id
-
s = swh_storage.directory_add([sample_dir])
assert s == {
"directory:add": 1,
}
mock_memory.assert_has_calls(
[call([sample_dir]), call([sample_dir]), call([sample_dir]),]
)
def test_retrying_proxy_swh_storage_directory_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add")
mock_memory.side_effect = StorageArgumentException(
"Refuse to add directory always!"
)
sample_dir = sample_data.directory
- directory_id = swh_storage.directory_get_random() # no directory
- assert not directory_id
-
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.directory_add([sample_dir])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_revision_add(swh_storage, sample_data):
"""Standard revision_add works as before
"""
sample_rev = sample_data.revision
revision = next(swh_storage.revision_get([sample_rev.id]))
assert not revision
s = swh_storage.revision_add([sample_rev])
assert s == {
"revision:add": 1,
}
revision = next(swh_storage.revision_get([sample_rev.id]))
assert revision["id"] == sample_rev.id
def test_retrying_proxy_storage_revision_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add")
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("revision already inserted"),
# ok then!
{"revision:add": 1},
]
sample_rev = sample_data.revision
revision = next(swh_storage.revision_get([sample_rev.id]))
assert not revision
s = swh_storage.revision_add([sample_rev])
assert s == {
"revision:add": 1,
}
mock_memory.assert_has_calls(
[call([sample_rev]), call([sample_rev]), call([sample_rev]),]
)
def test_retrying_proxy_swh_storage_revision_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add")
mock_memory.side_effect = StorageArgumentException("Refuse to add revision always!")
sample_rev = sample_data.revision
revision = next(swh_storage.revision_get([sample_rev.id]))
assert not revision
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.revision_add([sample_rev])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_release_add(swh_storage, sample_data):
"""Standard release_add works as before
"""
sample_rel = sample_data.release
release = next(swh_storage.release_get([sample_rel.id]))
assert not release
s = swh_storage.release_add([sample_rel])
assert s == {
"release:add": 1,
}
release = next(swh_storage.release_get([sample_rel.id]))
assert release["id"] == sample_rel.id
def test_retrying_proxy_storage_release_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add")
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("release already inserted"),
# ok then!
{"release:add": 1},
]
sample_rel = sample_data.release
release = next(swh_storage.release_get([sample_rel.id]))
assert not release
s = swh_storage.release_add([sample_rel])
assert s == {
"release:add": 1,
}
mock_memory.assert_has_calls(
[call([sample_rel]), call([sample_rel]), call([sample_rel]),]
)
def test_retrying_proxy_swh_storage_release_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add")
mock_memory.side_effect = StorageArgumentException("Refuse to add release always!")
sample_rel = sample_data.release
release = next(swh_storage.release_get([sample_rel.id]))
assert not release
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.release_add([sample_rel])
assert mock_memory.call_count == 1
def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data):
"""Standard snapshot_add works as before
"""
sample_snap = sample_data.snapshot
snapshot = swh_storage.snapshot_get(sample_snap.id)
assert not snapshot
s = swh_storage.snapshot_add([sample_snap])
assert s == {
"snapshot:add": 1,
}
snapshot = swh_storage.snapshot_get(sample_snap.id)
assert snapshot["id"] == sample_snap.id
def test_retrying_proxy_storage_snapshot_add_with_retry(
monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision
):
"""Multiple retries for hash collision and psycopg2 error but finally ok
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add")
mock_memory.side_effect = [
# first try goes ko
fake_hash_collision,
# second try goes ko
psycopg2.IntegrityError("snapshot already inserted"),
# ok then!
{"snapshot:add": 1},
]
sample_snap = sample_data.snapshot
snapshot = swh_storage.snapshot_get(sample_snap.id)
assert not snapshot
s = swh_storage.snapshot_add([sample_snap])
assert s == {
"snapshot:add": 1,
}
mock_memory.assert_has_calls(
[call([sample_snap]), call([sample_snap]), call([sample_snap]),]
)
def test_retrying_proxy_swh_storage_snapshot_add_failure(
swh_storage, sample_data, mocker
):
"""Unfiltered errors are raising without retry
"""
mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add")
mock_memory.side_effect = StorageArgumentException("Refuse to add snapshot always!")
sample_snap = sample_data.snapshot
snapshot = swh_storage.snapshot_get(sample_snap.id)
assert not snapshot
with pytest.raises(StorageArgumentException, match="Refuse to add"):
swh_storage.snapshot_add([sample_snap])
assert mock_memory.call_count == 1

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:49 PM (2 w, 59 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3290718

Event Timeline