diff --git a/mypy.ini b/mypy.ini
index e84dd5aa..99c0bcc6 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,57 +1,60 @@
[mypy]
namespace_packages = True
# due to the conditional import logic on swh.journal, in some cases a specific
# type: ignore is needed, in other it isn't...
warn_unused_ignores = False
# support for sqlalchemy magic: see https://github.com/dropbox/sqlalchemy-stubs
plugins = sqlmypy
# 3rd party libraries without stubs (yet)
[mypy-cassandra.*]
ignore_missing_imports = True
[mypy-confluent_kafka.*]
ignore_missing_imports = True
[mypy-deprecated.*]
ignore_missing_imports = True
# only shipped indirectly via hypothesis
[mypy-django.*]
ignore_missing_imports = True
+[mypy-msgpack.*]
+ignore_missing_imports = True
+
[mypy-multiprocessing.util]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-psycopg2.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-pytest_cov.*]
ignore_missing_imports = True
[mypy-pytest_kafka.*]
ignore_missing_imports = True
[mypy-systemd.daemon.*]
ignore_missing_imports = True
[mypy-tenacity.*]
ignore_missing_imports = True
# temporary work-around for landing typing support in spite of the current
# journal<->storage dependency loop
[mypy-swh.journal.*]
ignore_missing_imports = True
[mypy-pytest_postgresql.*]
ignore_missing_imports = True
diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt
index 1ab13e77..816f455a 100644
--- a/requirements-swh-journal.txt
+++ b/requirements-swh-journal.txt
@@ -1 +1 @@
-swh.journal >= 0.3.2
+swh.journal >= 0.4
diff --git a/requirements-swh.txt b/requirements-swh.txt
index 8b1d72cd..65e45186 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
-swh.core[db,http] >= 0.0.94
-swh.model >= 0.3.4
+swh.core[db,http] >= 0.1.0
+swh.model >= 0.4.0
swh.objstorage >= 0.0.40
diff --git a/sql/upgrades/158.sql b/sql/upgrades/158.sql
new file mode 100644
index 00000000..0c8e0849
--- /dev/null
+++ b/sql/upgrades/158.sql
@@ -0,0 +1,76 @@
+-- SWH DB schema upgrade
+-- from_version: 157
+-- to_version: 158
+-- description: Add the extra_headers column in the revision table
+
+-- latest schema version
+insert into dbversion(version, release, description)
+ values(158, now(), 'Work Still In Progress');
+
+-- Adapt the revision table for the new extra_headers column
+alter table revision add column (extra_headers bytea[][]);
+
+-- Adapt the revision_entry type for the new extra_headers attribute
+alter type revision_entry add attribute (extra_headers bytea[][]);
+
+-- Create entries in revision from tmp_revision
+create or replace function swh_revision_add()
+ returns void
+ language plpgsql
+as $$
+begin
+ perform swh_person_add_from_revision();
+
+ insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic, extra_headers)
+ select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic, t.extra_headers
+ from tmp_revision t
+ left join person a on a.fullname = t.author_fullname
+ left join person c on c.fullname = t.committer_fullname;
+ return;
+end
+$$;
+
+-- "git style" revision log. Similar to swh_revision_list(), but returning all
+-- information associated to each revision, and expanding authors/committers
+create or replace function swh_revision_log(root_revisions bytea[], num_revs bigint default NULL)
+ returns setof revision_entry
+ language sql
+ stable
+as $$
+ select t.id, r.date, r.date_offset, r.date_neg_utc_offset,
+ r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
+ r.type, r.directory, r.message,
+ a.id, a.fullname, a.name, a.email,
+ c.id, c.fullname, c.name, c.email,
+ r.metadata, r.synthetic, r.extra_headers, t.parents, r.object_id
+ from swh_revision_list(root_revisions, num_revs) as t
+ left join revision r on t.id = r.id
+ left join person a on a.id = r.author
+ left join person c on c.id = r.committer;
+$$;
+
+create or replace function swh_revision_list_by_object_id(
+ min_excl bigint,
+ max_incl bigint
+)
+ returns setof revision_entry
+ language sql
+ stable
+as $$
+ with revs as (
+ select * from revision
+ where object_id > min_excl and object_id <= max_incl
+ )
+ select r.id, r.date, r.date_offset, r.date_neg_utc_offset,
+ r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
+ r.type, r.directory, r.message,
+ a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic, r.extra_headers,
+ array(select rh.parent_id::bytea from revision_history rh where rh.id = r.id order by rh.parent_rank)
+ as parents, r.object_id
+ from revs r
+ left join person a on a.id = r.author
+ left join person c on c.id = r.committer
+ order by r.object_id;
+$$;
+
+-- TODO: add the migration magic query...
diff --git a/swh/storage/cassandra/converters.py b/swh/storage/cassandra/converters.py
index b018f05d..a1943011 100644
--- a/swh/storage/cassandra/converters.py
+++ b/swh/storage/cassandra/converters.py
@@ -1,92 +1,94 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import json
import attr
from copy import deepcopy
from typing import Any, Dict, Tuple
from cassandra.cluster import ResultSet
from swh.model.model import (
ObjectType,
OriginVisitStatus,
Revision,
RevisionType,
Release,
Sha1Git,
)
from swh.model.hashutil import DEFAULT_ALGORITHMS
-from ..converters import git_headers_to_db, db_to_git_headers
from .common import Row
def revision_to_db(revision: Revision) -> Dict[str, Any]:
# we use a deepcopy of the dict because we do not want to recurse the
# Model->dict conversion (to keep Timestamp & al. entities), BUT we do not
# want to modify original metadata (embedded in the Model entity), so we
# non-recursively convert it as a dict but make a deep copy.
db_revision = deepcopy(attr.asdict(revision, recurse=False))
metadata = revision.metadata
- if metadata and "extra_headers" in metadata:
- db_revision["metadata"]["extra_headers"] = git_headers_to_db(
- metadata["extra_headers"]
- )
+ extra_headers = revision.extra_headers
+ if not extra_headers and metadata and "extra_headers" in metadata:
+ extra_headers = db_revision["metadata"].pop("extra_headers")
db_revision["metadata"] = json.dumps(db_revision["metadata"])
+ db_revision["extra_headers"] = extra_headers
db_revision["type"] = db_revision["type"].value
return db_revision
def revision_from_db(db_revision: Row, parents: Tuple[Sha1Git]) -> Revision:
revision = db_revision._asdict() # type: ignore
metadata = json.loads(revision.pop("metadata", None))
- if metadata and "extra_headers" in metadata:
- extra_headers = db_to_git_headers(metadata["extra_headers"])
- metadata["extra_headers"] = extra_headers
+ extra_headers = revision.pop("extra_headers", ())
+ if not extra_headers and metadata and "extra_headers" in metadata:
+ extra_headers = metadata.pop("extra_headers")
+ if extra_headers is None:
+ extra_headers = ()
return Revision(
parents=parents,
type=RevisionType(revision.pop("type")),
metadata=metadata,
+ extra_headers=extra_headers,
**revision,
)
def release_to_db(release: Release) -> Dict[str, Any]:
db_release = attr.asdict(release, recurse=False)
db_release["target_type"] = db_release["target_type"].value
return db_release
def release_from_db(db_release: Row) -> Release:
release = db_release._asdict() # type: ignore
return Release(target_type=ObjectType(release.pop("target_type")), **release,)
def row_to_content_hashes(row: Row) -> Dict[str, bytes]:
"""Convert cassandra row to a content hashes
"""
hashes = {}
for algo in DEFAULT_ALGORITHMS:
hashes[algo] = getattr(row, algo)
return hashes
def row_to_visit_status(row: ResultSet) -> OriginVisitStatus:
"""Format a row representing a visit_status to an actual dict representing an
OriginVisitStatus.
"""
return OriginVisitStatus.from_dict(
{
**row._asdict(),
"origin": row.origin,
"date": row.date.replace(tzinfo=datetime.timezone.utc),
"metadata": (json.loads(row.metadata) if row.metadata else None),
}
)
diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
index db65629e..6a2997f8 100644
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -1,999 +1,1000 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
import json
import logging
import random
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
)
from cassandra import CoordinationFailure
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import PreparedStatement, BoundStatement
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
retry_if_exception_type,
)
from swh.model.model import (
Sha1Git,
TimestampWithTimezone,
Timestamp,
Person,
Content,
SkippedContent,
OriginVisit,
OriginVisitStatus,
Origin,
)
from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url
from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS
from .. import extrinsic_metadata
logger = logging.getLogger(__name__)
_execution_profiles = {
EXEC_PROFILE_DEFAULT: ExecutionProfile(
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy())
),
}
# Configuration for cassandra-driver's access to servers:
# * hit the right server directly when sending a query (TokenAwarePolicy),
# * if there's more than one, then pick one at random that's in the same
# datacenter as the client (DCAwareRoundRobinPolicy)
def create_keyspace(
hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True
):
cluster = Cluster(hosts, port=port, execution_profiles=_execution_profiles)
session = cluster.connect()
extra_params = ""
if not durable_writes:
extra_params = "AND durable_writes = false"
session.execute(
"""CREATE KEYSPACE IF NOT EXISTS "%s"
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
} %s;
"""
% (keyspace, extra_params)
)
session.execute('USE "%s"' % keyspace)
for query in CREATE_TABLES_QUERIES:
session.execute(query)
T = TypeVar("T")
def _prepared_statement(query: str) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""Returns a decorator usable on methods of CqlRunner, to
inject them with a 'statement' argument, that is a prepared
statement corresponding to the query.
This only works on methods of CqlRunner, as preparing a
statement requires a connection to a Cassandra server."""
def decorator(f):
@functools.wraps(f)
def newf(self, *args, **kwargs) -> T:
if f.__name__ not in self._prepared_statements:
statement: PreparedStatement = self._session.prepare(query)
self._prepared_statements[f.__name__] = statement
return f(
self, *args, **kwargs, statement=self._prepared_statements[f.__name__]
)
return newf
return decorator
def _prepared_insert_statement(table_name: str, columns: List[str]):
"""Shorthand for using `_prepared_statement` for `INSERT INTO`
statements."""
return _prepared_statement(
"INSERT INTO %s (%s) VALUES (%s)"
% (table_name, ", ".join(columns), ", ".join("?" for _ in columns),)
)
def _prepared_exists_statement(table_name: str):
"""Shorthand for using `_prepared_statement` for queries that only
check which ids in a list exist in the table."""
return _prepared_statement(f"SELECT id FROM {table_name} WHERE id IN ?")
class CqlRunner:
"""Class managing prepared statements and building queries to be sent
to Cassandra."""
def __init__(self, hosts: List[str], keyspace: str, port: int):
self._cluster = Cluster(
hosts, port=port, execution_profiles=_execution_profiles
)
self._session = self._cluster.connect(keyspace)
self._cluster.register_user_type(
keyspace, "microtimestamp_with_timezone", TimestampWithTimezone
)
self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp)
self._cluster.register_user_type(keyspace, "person", Person)
self._prepared_statements: Dict[str, PreparedStatement] = {}
##########################
# Common utility functions
##########################
MAX_RETRIES = 3
@retry(
wait=wait_random_exponential(multiplier=1, max=10),
stop=stop_after_attempt(MAX_RETRIES),
retry=retry_if_exception_type(CoordinationFailure),
)
def _execute_with_retries(self, statement, args) -> ResultSet:
return self._session.execute(statement, args, timeout=1000.0)
@_prepared_statement(
"UPDATE object_count SET count = count + ? "
"WHERE partition_key = 0 AND object_type = ?"
)
def _increment_counter(
self, object_type: str, nb: int, *, statement: PreparedStatement
) -> None:
self._execute_with_retries(statement, [nb, object_type])
def _add_one(self, statement, object_type: str, obj, keys: List[str]) -> None:
self._increment_counter(object_type, 1)
self._execute_with_retries(statement, [getattr(obj, key) for key in keys])
def _get_random_row(self, statement) -> Optional[Row]:
"""Takes a prepared statement of the form
"SELECT * FROM
WHERE token() > ? LIMIT 1"
and uses it to return a random row"""
token = random.randint(TOKEN_BEGIN, TOKEN_END)
rows = self._execute_with_retries(statement, [token])
if not rows:
# There are no row with a greater token; wrap around to get
# the row with the smallest token
rows = self._execute_with_retries(statement, [TOKEN_BEGIN])
if rows:
return rows.one()
else:
return None
def _missing(self, statement, ids):
res = self._execute_with_retries(statement, [ids])
found_ids = {id_ for (id_,) in res}
return [id_ for id_ in ids if id_ not in found_ids]
##########################
# 'content' table
##########################
_content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"]
_content_keys = [
"sha1",
"sha1_git",
"sha256",
"blake2s256",
"length",
"ctime",
"status",
]
def _content_add_finalize(self, statement: BoundStatement) -> None:
"""Returned currified by content_add_prepare, to be called when the
content row should be added to the primary table."""
self._execute_with_retries(statement, None)
self._increment_counter("content", 1)
@_prepared_insert_statement("content", _content_keys)
def content_add_prepare(
self, content, *, statement
) -> Tuple[int, Callable[[], None]]:
"""Prepares insertion of a Content to the main 'content' table.
Returns a token (to be used in secondary tables), and a function to be
called to perform the insertion in the main table."""
statement = statement.bind(
[getattr(content, key) for key in self._content_keys]
)
# Type used for hashing keys (usually, it will be
# cassandra.metadata.Murmur3Token)
token_class = self._cluster.metadata.token_map.token_class
# Token of the row when it will be inserted. This is equivalent to
# "SELECT token({', '.join(self._content_pk)}) FROM content WHERE ..."
# after the row is inserted; but we need the token to insert in the
# index tables *before* inserting to the main 'content' table
token = token_class.from_key(statement.routing_key).value
assert TOKEN_BEGIN <= token <= TOKEN_END
# Function to be called after the indexes contain their respective
# row
finalizer = functools.partial(self._content_add_finalize, statement)
return (token, finalizer)
@_prepared_statement(
"SELECT * FROM content WHERE "
+ " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS))
)
def content_get_from_pk(
self, content_hashes: Dict[str, bytes], *, statement
) -> Optional[Row]:
rows = list(
self._execute_with_retries(
statement, [content_hashes[algo] for algo in HASH_ALGORITHMS]
)
)
assert len(rows) <= 1
if rows:
return rows[0]
else:
return None
@_prepared_statement(
"SELECT * FROM content WHERE token(" + ", ".join(_content_pk) + ") = ?"
)
def content_get_from_token(self, token, *, statement) -> Iterable[Row]:
return self._execute_with_retries(statement, [token])
@_prepared_statement(
"SELECT * FROM content WHERE token(%s) > ? LIMIT 1" % ", ".join(_content_pk)
)
def content_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
@_prepared_statement(
(
"SELECT token({0}) AS tok, {1} FROM content "
"WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?"
).format(", ".join(_content_pk), ", ".join(_content_keys))
)
def content_get_token_range(
self, start: int, end: int, limit: int, *, statement
) -> Iterable[Row]:
return self._execute_with_retries(statement, [start, end, limit])
##########################
# 'content_by_*' tables
##########################
@_prepared_statement("SELECT sha1_git FROM content_by_sha1_git WHERE sha1_git IN ?")
def content_missing_by_sha1_git(
self, ids: List[bytes], *, statement
) -> List[bytes]:
return self._missing(statement, ids)
def content_index_add_one(self, algo: str, content: Content, token: int) -> None:
"""Adds a row mapping content[algo] to the token of the Content in
the main 'content' table."""
query = (
f"INSERT INTO content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)"
)
self._execute_with_retries(query, [content.get_hash(algo), token])
def content_get_tokens_from_single_hash(
self, algo: str, hash_: bytes
) -> Iterable[int]:
assert algo in HASH_ALGORITHMS
query = f"SELECT target_token FROM content_by_{algo} WHERE {algo} = %s"
return (tok for (tok,) in self._execute_with_retries(query, [hash_]))
##########################
# 'skipped_content' table
##########################
_skipped_content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"]
_skipped_content_keys = [
"sha1",
"sha1_git",
"sha256",
"blake2s256",
"length",
"ctime",
"status",
"reason",
"origin",
]
_magic_null_pk = b""
"""
NULLs (or all-empty blobs) are not allowed in primary keys; instead use a
special value that can't possibly be a valid hash.
"""
def _skipped_content_add_finalize(self, statement: BoundStatement) -> None:
"""Returned currified by skipped_content_add_prepare, to be called
when the content row should be added to the primary table."""
self._execute_with_retries(statement, None)
self._increment_counter("skipped_content", 1)
@_prepared_insert_statement("skipped_content", _skipped_content_keys)
def skipped_content_add_prepare(
self, content, *, statement
) -> Tuple[int, Callable[[], None]]:
"""Prepares insertion of a Content to the main 'skipped_content' table.
Returns a token (to be used in secondary tables), and a function to be
called to perform the insertion in the main table."""
# Replace NULLs (which are not allowed in the partition key) with
# an empty byte string
content = content.to_dict()
for key in self._skipped_content_pk:
if content[key] is None:
content[key] = self._magic_null_pk
statement = statement.bind(
[content.get(key) for key in self._skipped_content_keys]
)
# Type used for hashing keys (usually, it will be
# cassandra.metadata.Murmur3Token)
token_class = self._cluster.metadata.token_map.token_class
# Token of the row when it will be inserted. This is equivalent to
# "SELECT token({', '.join(self._content_pk)})
# FROM skipped_content WHERE ..."
# after the row is inserted; but we need the token to insert in the
# index tables *before* inserting to the main 'skipped_content' table
token = token_class.from_key(statement.routing_key).value
assert TOKEN_BEGIN <= token <= TOKEN_END
# Function to be called after the indexes contain their respective
# row
finalizer = functools.partial(self._skipped_content_add_finalize, statement)
return (token, finalizer)
@_prepared_statement(
"SELECT * FROM skipped_content WHERE "
+ " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS))
)
def skipped_content_get_from_pk(
self, content_hashes: Dict[str, bytes], *, statement
) -> Optional[Row]:
rows = list(
self._execute_with_retries(
statement,
[
content_hashes[algo] or self._magic_null_pk
for algo in HASH_ALGORITHMS
],
)
)
assert len(rows) <= 1
if rows:
# TODO: convert _magic_null_pk back to None?
return rows[0]
else:
return None
##########################
# 'skipped_content_by_*' tables
##########################
def skipped_content_index_add_one(
self, algo: str, content: SkippedContent, token: int
) -> None:
"""Adds a row mapping content[algo] to the token of the SkippedContent
in the main 'skipped_content' table."""
query = (
f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) "
f"VALUES (%s, %s)"
)
self._execute_with_retries(
query, [content.get_hash(algo) or self._magic_null_pk, token]
)
##########################
# 'revision' table
##########################
_revision_keys = [
"id",
"date",
"committer_date",
"type",
"directory",
"message",
"author",
"committer",
"synthetic",
"metadata",
+ "extra_headers",
]
@_prepared_exists_statement("revision")
def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement("revision", _revision_keys)
def revision_add_one(self, revision: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [revision[key] for key in self._revision_keys]
)
self._increment_counter("revision", 1)
@_prepared_statement("SELECT id FROM revision WHERE id IN ?")
def revision_get_ids(self, revision_ids, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [revision_ids])
@_prepared_statement("SELECT * FROM revision WHERE id IN ?")
def revision_get(self, revision_ids, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [revision_ids])
@_prepared_statement("SELECT * FROM revision WHERE token(id) > ? LIMIT 1")
def revision_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'revision_parent' table
##########################
_revision_parent_keys = ["id", "parent_rank", "parent_id"]
@_prepared_insert_statement("revision_parent", _revision_parent_keys)
def revision_parent_add_one(
self, id_: Sha1Git, parent_rank: int, parent_id: Sha1Git, *, statement
) -> None:
self._execute_with_retries(statement, [id_, parent_rank, parent_id])
@_prepared_statement("SELECT parent_id FROM revision_parent WHERE id = ?")
def revision_parent_get(self, revision_id: Sha1Git, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [revision_id])
##########################
# 'release' table
##########################
_release_keys = [
"id",
"target",
"target_type",
"date",
"name",
"message",
"author",
"synthetic",
]
@_prepared_exists_statement("release")
def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement("release", _release_keys)
def release_add_one(self, release: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [release[key] for key in self._release_keys]
)
self._increment_counter("release", 1)
@_prepared_statement("SELECT * FROM release WHERE id in ?")
def release_get(self, release_ids: List[str], *, statement) -> None:
return self._execute_with_retries(statement, [release_ids])
@_prepared_statement("SELECT * FROM release WHERE token(id) > ? LIMIT 1")
def release_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'directory' table
##########################
_directory_keys = ["id"]
@_prepared_exists_statement("directory")
def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement("directory", _directory_keys)
def directory_add_one(self, directory_id: Sha1Git, *, statement) -> None:
"""Called after all calls to directory_entry_add_one, to
commit/finalize the directory."""
self._execute_with_retries(statement, [directory_id])
self._increment_counter("directory", 1)
@_prepared_statement("SELECT * FROM directory WHERE token(id) > ? LIMIT 1")
def directory_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'directory_entry' table
##########################
_directory_entry_keys = ["directory_id", "name", "type", "target", "perms"]
@_prepared_insert_statement("directory_entry", _directory_entry_keys)
def directory_entry_add_one(self, entry: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [entry[key] for key in self._directory_entry_keys]
)
@_prepared_statement("SELECT * FROM directory_entry WHERE directory_id IN ?")
def directory_entry_get(self, directory_ids, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [directory_ids])
##########################
# 'snapshot' table
##########################
_snapshot_keys = ["id"]
@_prepared_exists_statement("snapshot")
def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]:
return self._missing(statement, ids)
@_prepared_insert_statement("snapshot", _snapshot_keys)
def snapshot_add_one(self, snapshot_id: Sha1Git, *, statement) -> None:
self._execute_with_retries(statement, [snapshot_id])
self._increment_counter("snapshot", 1)
@_prepared_statement("SELECT * FROM snapshot WHERE id = ?")
def snapshot_get(self, snapshot_id: Sha1Git, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [snapshot_id])
@_prepared_statement("SELECT * FROM snapshot WHERE token(id) > ? LIMIT 1")
def snapshot_get_random(self, *, statement) -> Optional[Row]:
return self._get_random_row(statement)
##########################
# 'snapshot_branch' table
##########################
_snapshot_branch_keys = ["snapshot_id", "name", "target_type", "target"]
@_prepared_insert_statement("snapshot_branch", _snapshot_branch_keys)
def snapshot_branch_add_one(self, branch: Dict[str, Any], *, statement) -> None:
self._execute_with_retries(
statement, [branch[key] for key in self._snapshot_branch_keys]
)
@_prepared_statement(
"SELECT ascii_bins_count(target_type) AS counts "
"FROM snapshot_branch "
"WHERE snapshot_id = ? "
)
def snapshot_count_branches(self, snapshot_id: Sha1Git, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [snapshot_id])
@_prepared_statement(
"SELECT * FROM snapshot_branch WHERE snapshot_id = ? AND name >= ? LIMIT ?"
)
def snapshot_branch_get(
self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement
) -> None:
return self._execute_with_retries(statement, [snapshot_id, from_, limit])
##########################
# 'origin' table
##########################
origin_keys = ["sha1", "url", "type", "next_visit_id"]
@_prepared_statement(
"INSERT INTO origin (sha1, url, next_visit_id) "
"VALUES (?, ?, 1) IF NOT EXISTS"
)
def origin_add_one(self, origin: Origin, *, statement) -> None:
self._execute_with_retries(statement, [hash_url(origin.url), origin.url])
self._increment_counter("origin", 1)
@_prepared_statement("SELECT * FROM origin WHERE sha1 = ?")
def origin_get_by_sha1(self, sha1: bytes, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [sha1])
def origin_get_by_url(self, url: str) -> ResultSet:
return self.origin_get_by_sha1(hash_url(url))
@_prepared_statement(
f'SELECT token(sha1) AS tok, {", ".join(origin_keys)} '
f"FROM origin WHERE token(sha1) >= ? LIMIT ?"
)
def origin_list(self, start_token: int, limit: int, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [start_token, limit])
@_prepared_statement("SELECT * FROM origin")
def origin_iter_all(self, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [])
@_prepared_statement("SELECT next_visit_id FROM origin WHERE sha1 = ?")
def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int:
rows = list(self._execute_with_retries(statement, [origin_sha1]))
assert len(rows) == 1 # TODO: error handling
return rows[0].next_visit_id
@_prepared_statement(
"UPDATE origin SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=?"
)
def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int:
origin_sha1 = hash_url(origin_url)
next_id = self._origin_get_next_visit_id(origin_sha1)
while True:
res = list(
self._execute_with_retries(
statement, [next_id + 1, origin_sha1, next_id]
)
)
assert len(res) == 1
if res[0].applied:
# No data race
return next_id
else:
# Someone else updated it before we did, let's try again
next_id = res[0].next_visit_id
# TODO: abort after too many attempts
return next_id
##########################
# 'origin_visit' table
##########################
_origin_visit_keys = [
"origin",
"visit",
"type",
"date",
]
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? AND visit > ? "
"ORDER BY visit ASC"
)
def _origin_visit_get_pagination_asc_no_limit(
self, origin_url: str, last_visit: int, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url, last_visit])
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? AND visit > ? "
"ORDER BY visit ASC "
"LIMIT ?"
)
def _origin_visit_get_pagination_asc_limit(
self, origin_url: str, last_visit: int, limit: int, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url, last_visit, limit])
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? AND visit < ? "
"ORDER BY visit DESC"
)
def _origin_visit_get_pagination_desc_no_limit(
self, origin_url: str, last_visit: int, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url, last_visit])
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? AND visit < ? "
"ORDER BY visit DESC "
"LIMIT ?"
)
def _origin_visit_get_pagination_desc_limit(
self, origin_url: str, last_visit: int, limit: int, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url, last_visit, limit])
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit ASC LIMIT ?"
)
def _origin_visit_get_no_pagination_asc_limit(
self, origin_url: str, limit: int, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url, limit])
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit ASC "
)
def _origin_visit_get_no_pagination_asc_no_limit(
self, origin_url: str, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url])
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit DESC"
)
def _origin_visit_get_no_pagination_desc_no_limit(
self, origin_url: str, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url])
@_prepared_statement(
"SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit DESC LIMIT ?"
)
def _origin_visit_get_no_pagination_desc_limit(
self, origin_url: str, limit: int, *, statement
) -> ResultSet:
return self._execute_with_retries(statement, [origin_url, limit])
def origin_visit_get(
self,
origin_url: str,
last_visit: Optional[int],
limit: Optional[int],
order: str = "asc",
) -> ResultSet:
order = order.lower()
assert order in ["asc", "desc"]
args: List[Any] = [origin_url]
if last_visit is not None:
page_name = "pagination"
args.append(last_visit)
else:
page_name = "no_pagination"
if limit is not None:
limit_name = "limit"
args.append(limit)
else:
limit_name = "no_limit"
method_name = f"_origin_visit_get_{page_name}_{order}_{limit_name}"
origin_visit_get_method = getattr(self, method_name)
return origin_visit_get_method(*args)
@_prepared_insert_statement("origin_visit", _origin_visit_keys)
def origin_visit_add_one(self, visit: OriginVisit, *, statement) -> None:
self._add_one(statement, "origin_visit", visit, self._origin_visit_keys)
_origin_visit_status_keys = [
"origin",
"visit",
"date",
"status",
"snapshot",
"metadata",
]
@_prepared_insert_statement("origin_visit_status", _origin_visit_status_keys)
def origin_visit_status_add_one(
self, visit_update: OriginVisitStatus, *, statement
) -> None:
assert self._origin_visit_status_keys[-1] == "metadata"
keys = self._origin_visit_status_keys
metadata = json.dumps(visit_update.metadata)
self._execute_with_retries(
statement, [getattr(visit_update, key) for key in keys[:-1]] + [metadata]
)
def origin_visit_status_get_latest(self, origin: str, visit: int,) -> Optional[Row]:
"""Given an origin visit id, return its latest origin_visit_status
"""
rows = self.origin_visit_status_get(origin, visit)
return rows[0] if rows else None
@_prepared_statement(
"SELECT * FROM origin_visit_status "
"WHERE origin = ? AND visit = ? "
"ORDER BY date DESC"
)
def origin_visit_status_get(
self,
origin: str,
visit: int,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
*,
statement,
) -> List[Row]:
"""Return all origin visit statuses for a given visit
"""
return list(self._execute_with_retries(statement, [origin, visit]))
@_prepared_statement("SELECT * FROM origin_visit WHERE origin = ? AND visit = ?")
def origin_visit_get_one(
self, origin_url: str, visit_id: int, *, statement
) -> Optional[Row]:
# TODO: error handling
rows = list(self._execute_with_retries(statement, [origin_url, visit_id]))
if rows:
return rows[0]
else:
return None
@_prepared_statement("SELECT * FROM origin_visit WHERE origin = ?")
def origin_visit_get_all(self, origin_url: str, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [origin_url])
@_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) >= ?")
def _origin_visit_iter_from(self, min_token: int, *, statement) -> Iterator[Row]:
yield from self._execute_with_retries(statement, [min_token])
@_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) < ?")
def _origin_visit_iter_to(self, max_token: int, *, statement) -> Iterator[Row]:
yield from self._execute_with_retries(statement, [max_token])
def origin_visit_iter(self, start_token: int) -> Iterator[Row]:
"""Returns all origin visits in order from this token,
and wraps around the token space."""
yield from self._origin_visit_iter_from(start_token)
yield from self._origin_visit_iter_to(start_token)
##########################
# 'metadata_authority' table
##########################
_metadata_authority_keys = ["url", "type", "metadata"]
@_prepared_insert_statement("metadata_authority", _metadata_authority_keys)
def metadata_authority_add(self, url, type, metadata, *, statement):
return self._execute_with_retries(statement, [url, type, metadata])
@_prepared_statement("SELECT * from metadata_authority WHERE type = ? AND url = ?")
def metadata_authority_get(self, type, url, *, statement) -> Optional[Row]:
return next(iter(self._execute_with_retries(statement, [type, url])), None)
##########################
# 'metadata_fetcher' table
##########################
_metadata_fetcher_keys = ["name", "version", "metadata"]
@_prepared_insert_statement("metadata_fetcher", _metadata_fetcher_keys)
def metadata_fetcher_add(self, name, version, metadata, *, statement):
return self._execute_with_retries(statement, [name, version, metadata])
@_prepared_statement(
"SELECT * from metadata_fetcher WHERE name = ? AND version = ?"
)
def metadata_fetcher_get(self, name, version, *, statement) -> Optional[Row]:
return next(iter(self._execute_with_retries(statement, [name, version])), None)
#########################
# 'object_metadata' table
#########################
_object_metadata_keys = [
"type",
"id",
"authority_type",
"authority_url",
"discovery_date",
"fetcher_name",
"fetcher_version",
"format",
"metadata",
"origin",
"visit",
"snapshot",
"release",
"revision",
"path",
"directory",
]
@_prepared_statement(
f"INSERT INTO object_metadata ({', '.join(_object_metadata_keys)}) "
f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})"
)
def object_metadata_add(
self,
object_type: str,
id: str,
authority_type,
authority_url,
discovery_date,
fetcher_name,
fetcher_version,
format,
metadata,
context: Dict[str, Union[str, bytes, int]],
*,
statement,
):
params = [
object_type,
id,
authority_type,
authority_url,
discovery_date,
fetcher_name,
fetcher_version,
format,
metadata,
]
params.extend(
context.get(key) for key in extrinsic_metadata.CONTEXT_KEYS[object_type]
)
return self._execute_with_retries(statement, params,)
@_prepared_statement(
"SELECT * from object_metadata "
"WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?"
)
def object_metadata_get_after_date(
self,
id: str,
authority_type: str,
authority_url: str,
after: datetime.datetime,
*,
statement,
):
return self._execute_with_retries(
statement, [id, authority_url, after, authority_type]
)
@_prepared_statement(
"SELECT * from object_metadata "
"WHERE id=? AND authority_type=? AND authority_url=? "
"AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)"
)
def object_metadata_get_after_date_and_fetcher(
self,
id: str,
authority_type: str,
authority_url: str,
after_date: datetime.datetime,
after_fetcher_name: str,
after_fetcher_version: str,
*,
statement,
):
return self._execute_with_retries(
statement,
[
id,
authority_type,
authority_url,
after_date,
after_fetcher_name,
after_fetcher_version,
],
)
@_prepared_statement(
"SELECT * from object_metadata "
"WHERE id=? AND authority_url=? AND authority_type=?"
)
def object_metadata_get(
self, id: str, authority_type: str, authority_url: str, *, statement
) -> Iterable[Row]:
return self._execute_with_retries(
statement, [id, authority_url, authority_type]
)
##########################
# Miscellaneous
##########################
@_prepared_statement("SELECT uuid() FROM revision LIMIT 1;")
def check_read(self, *, statement):
self._execute_with_retries(statement, [])
@_prepared_statement(
"SELECT object_type, count FROM object_count WHERE partition_key=0"
)
def stat_counters(self, *, statement) -> ResultSet:
return self._execute_with_retries(statement, [])
diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py
index 01bdecec..945842e3 100644
--- a/swh/storage/cassandra/schema.py
+++ b/swh/storage/cassandra/schema.py
@@ -1,282 +1,283 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
CREATE_TABLES_QUERIES = """
CREATE OR REPLACE FUNCTION ascii_bins_count_sfunc (
state tuple>, -- (nb_none, map)
bin_name ascii
)
CALLED ON NULL INPUT
RETURNS tuple>
LANGUAGE java AS
$$
if (bin_name == null) {
state.setInt(0, state.getInt(0) + 1);
}
else {
Map counters = state.getMap(
1, String.class, Integer.class);
Integer nb = counters.get(bin_name);
if (nb == null) {
nb = 0;
}
counters.put(bin_name, nb + 1);
state.setMap(1, counters, String.class, Integer.class);
}
return state;
$$
;
CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii )
SFUNC ascii_bins_count_sfunc
STYPE tuple>
INITCOND (0, {})
;
CREATE TYPE IF NOT EXISTS microtimestamp (
seconds bigint,
microseconds int
);
CREATE TYPE IF NOT EXISTS microtimestamp_with_timezone (
timestamp frozen,
offset smallint,
negative_utc boolean
);
CREATE TYPE IF NOT EXISTS person (
fullname blob,
name blob,
email blob
);
CREATE TABLE IF NOT EXISTS content (
sha1 blob,
sha1_git blob,
sha256 blob,
blake2s256 blob,
length bigint,
ctime timestamp,
-- creation time, i.e. time of (first) injection into the storage
status ascii,
PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256))
);
CREATE TABLE IF NOT EXISTS skipped_content (
sha1 blob,
sha1_git blob,
sha256 blob,
blake2s256 blob,
length bigint,
ctime timestamp,
-- creation time, i.e. time of (first) injection into the storage
status ascii,
reason text,
origin text,
PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256))
);
CREATE TABLE IF NOT EXISTS revision (
id blob PRIMARY KEY,
date microtimestamp_with_timezone,
committer_date microtimestamp_with_timezone,
type ascii,
directory blob, -- source code "root" directory
message blob,
author person,
committer person,
synthetic boolean,
-- true iff revision has been created by Software Heritage
- metadata text
- -- extra metadata as JSON(tarball checksums,
- -- extra commit information, etc...)
+ metadata text,
+ -- extra metadata as JSON(tarball checksums, etc...)
+ extra_headers frozen> >
+ -- extra commit information as (tuple(key, value), ...)
);
CREATE TABLE IF NOT EXISTS revision_parent (
id blob,
parent_rank int,
-- parent position in merge commits, 0-based
parent_id blob,
PRIMARY KEY ((id), parent_rank)
);
CREATE TABLE IF NOT EXISTS release
(
id blob PRIMARY KEY,
target_type ascii,
target blob,
date microtimestamp_with_timezone,
name blob,
message blob,
author person,
synthetic boolean,
-- true iff release has been created by Software Heritage
);
CREATE TABLE IF NOT EXISTS directory (
id blob PRIMARY KEY,
);
CREATE TABLE IF NOT EXISTS directory_entry (
directory_id blob,
name blob, -- path name, relative to containing dir
target blob,
perms int, -- unix-like permissions
type ascii, -- target type
PRIMARY KEY ((directory_id), name)
);
CREATE TABLE IF NOT EXISTS snapshot (
id blob PRIMARY KEY,
);
-- For a given snapshot_id, branches are sorted by their name,
-- allowing easy pagination.
CREATE TABLE IF NOT EXISTS snapshot_branch (
snapshot_id blob,
name blob,
target_type ascii,
target blob,
PRIMARY KEY ((snapshot_id), name)
);
CREATE TABLE IF NOT EXISTS origin_visit (
origin text,
visit bigint,
date timestamp,
type text,
PRIMARY KEY ((origin), visit)
);
CREATE TABLE IF NOT EXISTS origin_visit_status (
origin text,
visit bigint,
date timestamp,
status ascii,
metadata text,
snapshot blob,
PRIMARY KEY ((origin), visit, date)
);
CREATE TABLE IF NOT EXISTS origin (
sha1 blob PRIMARY KEY,
url text,
type text,
next_visit_id int,
-- We need integer visit ids for compatibility with the pgsql
-- storage, so we're using lightweight transactions with this trick:
-- https://stackoverflow.com/a/29391877/539465
);
CREATE TABLE IF NOT EXISTS metadata_authority (
url text,
type ascii,
metadata text,
PRIMARY KEY ((url), type)
);
CREATE TABLE IF NOT EXISTS metadata_fetcher (
name ascii,
version ascii,
metadata text,
PRIMARY KEY ((name), version)
);
CREATE TABLE IF NOT EXISTS object_metadata (
type text,
id text,
-- metadata source
authority_type text,
authority_url text,
discovery_date timestamp,
fetcher_name ascii,
fetcher_version ascii,
-- metadata itself
format ascii,
metadata blob,
-- context
origin text,
visit bigint,
snapshot text,
release text,
revision text,
path blob,
directory text,
PRIMARY KEY ((id), authority_type, authority_url, discovery_date,
fetcher_name, fetcher_version)
);
CREATE TABLE IF NOT EXISTS object_count (
partition_key smallint, -- Constant, must always be 0
object_type ascii,
count counter,
PRIMARY KEY ((partition_key), object_type)
);
""".split(
"\n\n\n"
)
CONTENT_INDEX_TEMPLATE = """
-- Secondary table, used for looking up "content" from a single hash
CREATE TABLE IF NOT EXISTS content_by_{main_algo} (
{main_algo} blob,
target_token bigint, -- value of token(pk) on the "primary" table
PRIMARY KEY (({main_algo}), target_token)
);
CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} (
{main_algo} blob,
target_token bigint, -- value of token(pk) on the "primary" table
PRIMARY KEY (({main_algo}), target_token)
);
"""
TABLES = (
"skipped_content content revision revision_parent release "
"directory directory_entry snapshot snapshot_branch "
"origin_visit origin object_metadata object_count "
"origin_visit_status metadata_authority "
"metadata_fetcher"
).split()
HASH_ALGORITHMS = ["sha1", "sha1_git", "sha256", "blake2s256"]
for main_algo in HASH_ALGORITHMS:
CREATE_TABLES_QUERIES.extend(
CONTENT_INDEX_TEMPLATE.format(
main_algo=main_algo,
other_algos=", ".join(
[algo for algo in HASH_ALGORITHMS if algo != main_algo]
),
).split("\n\n")
)
TABLES.append("content_by_%s" % main_algo)
TABLES.append("skipped_content_by_%s" % main_algo)
diff --git a/swh/storage/converters.py b/swh/storage/converters.py
index 541d5aa2..6c4e6286 100644
--- a/swh/storage/converters.py
+++ b/swh/storage/converters.py
@@ -1,320 +1,293 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from typing import Optional, Dict
-from swh.core.utils import decode_with_escape, encode_with_unescape
+from swh.core.utils import encode_with_unescape
from swh.model import identifiers
from swh.model.hashutil import MultiHash
DEFAULT_AUTHOR = {
"fullname": None,
"name": None,
"email": None,
}
DEFAULT_DATE = {
"timestamp": None,
"offset": 0,
"neg_utc_offset": None,
}
def author_to_db(author):
"""Convert a swh-model author to its DB representation.
Args:
author: a :mod:`swh.model` compatible author
Returns:
dict: a dictionary with three keys: author, fullname and email
"""
if author is None:
return DEFAULT_AUTHOR
return author
def db_to_author(
fullname: Optional[bytes], name: Optional[bytes], email: Optional[bytes]
) -> Optional[Dict[str, Optional[bytes]]]:
"""Convert the DB representation of an author to a swh-model author.
Args:
fullname (bytes): the author's fullname
name (bytes): the author's name
email (bytes): the author's email
Returns:
a dictionary with three keys (fullname, name and email), or
None if all the arguments are None.
"""
if (fullname, name, email) == (None, None, None):
return None
return {
"fullname": fullname,
"name": name,
"email": email,
}
-def git_headers_to_db(git_headers):
- """Convert git headers to their database representation.
-
- We convert the bytes to unicode by decoding them into utf-8 and replacing
- invalid utf-8 sequences with backslash escapes.
-
- """
- ret = []
- for key, values in git_headers:
- if isinstance(values, list):
- ret.append([key, [decode_with_escape(value) for value in values]])
- else:
- ret.append([key, decode_with_escape(values)])
-
- return ret
-
-
def db_to_git_headers(db_git_headers):
ret = []
- for key, values in db_git_headers:
- if isinstance(values, list):
- ret.append([key, [encode_with_unescape(value) for value in values]])
- else:
- ret.append([key, encode_with_unescape(values)])
-
+ for key, value in db_git_headers:
+ ret.append([key.encode("utf-8"), encode_with_unescape(value)])
return ret
def db_to_date(date, offset, neg_utc_offset):
"""Convert the DB representation of a date to a swh-model compatible date.
Args:
date (datetime.datetime): a date pulled out of the database
offset (int): an integer number of minutes representing an UTC offset
neg_utc_offset (boolean): whether an utc offset is negative
Returns:
dict: a dict with three keys:
- timestamp: a timestamp from UTC
- offset: the number of minutes since UTC
- negative_utc: whether a null UTC offset is negative
"""
if date is None:
return None
return {
"timestamp": {
"seconds": int(date.timestamp()),
"microseconds": date.microsecond,
},
"offset": offset,
"negative_utc": neg_utc_offset,
}
def date_to_db(date_offset):
"""Convert a swh-model date_offset to its DB representation.
Args:
date_offset: a :mod:`swh.model` compatible date_offset
Returns:
dict: a dictionary with three keys:
- timestamp: a date in ISO format
- offset: the UTC offset in minutes
- neg_utc_offset: a boolean indicating whether a null offset is
negative or positive.
"""
if date_offset is None:
return DEFAULT_DATE
normalized = identifiers.normalize_timestamp(date_offset)
ts = normalized["timestamp"]
seconds = ts.get("seconds", 0)
microseconds = ts.get("microseconds", 0)
timestamp = datetime.datetime.fromtimestamp(seconds, datetime.timezone.utc)
timestamp = timestamp.replace(microsecond=microseconds)
return {
# PostgreSQL supports isoformatted timestamps
"timestamp": timestamp.isoformat(),
"offset": normalized["offset"],
"neg_utc_offset": normalized["negative_utc"],
}
def revision_to_db(rev):
"""Convert a swh-model revision to its database representation.
"""
revision = rev.to_dict()
author = author_to_db(revision["author"])
date = date_to_db(revision["date"])
committer = author_to_db(revision["committer"])
committer_date = date_to_db(revision["committer_date"])
- metadata = revision["metadata"]
-
- if metadata and "extra_headers" in metadata:
- metadata = metadata.copy()
- extra_headers = git_headers_to_db(metadata["extra_headers"])
- metadata["extra_headers"] = extra_headers
-
return {
"id": revision["id"],
"author_fullname": author["fullname"],
"author_name": author["name"],
"author_email": author["email"],
"date": date["timestamp"],
"date_offset": date["offset"],
"date_neg_utc_offset": date["neg_utc_offset"],
"committer_fullname": committer["fullname"],
"committer_name": committer["name"],
"committer_email": committer["email"],
"committer_date": committer_date["timestamp"],
"committer_date_offset": committer_date["offset"],
"committer_date_neg_utc_offset": committer_date["neg_utc_offset"],
"type": revision["type"],
"directory": revision["directory"],
"message": revision["message"],
- "metadata": metadata,
+ "metadata": revision["metadata"],
"synthetic": revision["synthetic"],
+ "extra_headers": revision["extra_headers"],
"parents": [
{"id": revision["id"], "parent_id": parent, "parent_rank": i,}
for i, parent in enumerate(revision["parents"])
],
}
def db_to_revision(db_revision):
"""Convert a database representation of a revision to its swh-model
representation."""
author = db_to_author(
db_revision["author_fullname"],
db_revision["author_name"],
db_revision["author_email"],
)
date = db_to_date(
db_revision["date"],
db_revision["date_offset"],
db_revision["date_neg_utc_offset"],
)
committer = db_to_author(
db_revision["committer_fullname"],
db_revision["committer_name"],
db_revision["committer_email"],
)
committer_date = db_to_date(
db_revision["committer_date"],
db_revision["committer_date_offset"],
db_revision["committer_date_neg_utc_offset"],
)
- metadata = db_revision["metadata"]
-
- if metadata and "extra_headers" in metadata:
- extra_headers = db_to_git_headers(metadata["extra_headers"])
- metadata["extra_headers"] = extra_headers
-
parents = []
if "parents" in db_revision:
for parent in db_revision["parents"]:
if parent:
parents.append(parent)
+ metadata = db_revision["metadata"]
+ extra_headers = db_revision.get("extra_headers", ())
+ if not extra_headers and metadata and "extra_headers" in metadata:
+ extra_headers = db_to_git_headers(metadata.pop("extra_headers"))
+
ret = {
"id": db_revision["id"],
"author": author,
"date": date,
"committer": committer,
"committer_date": committer_date,
"type": db_revision["type"],
"directory": db_revision["directory"],
"message": db_revision["message"],
"metadata": metadata,
"synthetic": db_revision["synthetic"],
+ "extra_headers": extra_headers,
"parents": parents,
}
if "object_id" in db_revision:
ret["object_id"] = db_revision["object_id"]
return ret
def release_to_db(rel):
"""Convert a swh-model release to its database representation.
"""
release = rel.to_dict()
author = author_to_db(release["author"])
date = date_to_db(release["date"])
return {
"id": release["id"],
"author_fullname": author["fullname"],
"author_name": author["name"],
"author_email": author["email"],
"date": date["timestamp"],
"date_offset": date["offset"],
"date_neg_utc_offset": date["neg_utc_offset"],
"name": release["name"],
"target": release["target"],
"target_type": release["target_type"],
"comment": release["message"],
"synthetic": release["synthetic"],
}
def db_to_release(db_release):
"""Convert a database representation of a release to its swh-model
representation.
"""
author = db_to_author(
db_release["author_fullname"],
db_release["author_name"],
db_release["author_email"],
)
date = db_to_date(
db_release["date"], db_release["date_offset"], db_release["date_neg_utc_offset"]
)
ret = {
"author": author,
"date": date,
"id": db_release["id"],
"name": db_release["name"],
"message": db_release["comment"],
"synthetic": db_release["synthetic"],
"target": db_release["target"],
"target_type": db_release["target_type"],
}
if "object_id" in db_release:
ret["object_id"] = db_release["object_id"]
return ret
def origin_url_to_sha1(origin_url):
"""Convert an origin URL to a sha1. Encodes URL to utf-8."""
return MultiHash.from_data(origin_url.encode("utf-8"), {"sha1"}).digest()["sha1"]
diff --git a/swh/storage/db.py b/swh/storage/db.py
index 91935b46..41e83863 100644
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -1,1287 +1,1288 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import random
import select
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from swh.core.db import BaseDb
from swh.core.db.db_utils import stored_procedure, jsonize
from swh.core.db.db_utils import execute_values_generator
from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE
class Db(BaseDb):
"""Proxy to the SWH DB, with wrappers around stored procedures
"""
def mktemp_dir_entry(self, entry_type, cur=None):
self._cursor(cur).execute(
"SELECT swh_mktemp_dir_entry(%s)", (("directory_entry_%s" % entry_type),)
)
@stored_procedure("swh_mktemp_revision")
def mktemp_revision(self, cur=None):
pass
@stored_procedure("swh_mktemp_release")
def mktemp_release(self, cur=None):
pass
@stored_procedure("swh_mktemp_snapshot_branch")
def mktemp_snapshot_branch(self, cur=None):
pass
def register_listener(self, notify_queue, cur=None):
"""Register a listener for NOTIFY queue `notify_queue`"""
self._cursor(cur).execute("LISTEN %s" % notify_queue)
def listen_notifies(self, timeout):
"""Listen to notifications for `timeout` seconds"""
if select.select([self.conn], [], [], timeout) == ([], [], []):
return
else:
self.conn.poll()
while self.conn.notifies:
yield self.conn.notifies.pop(0)
@stored_procedure("swh_content_add")
def content_add_from_temp(self, cur=None):
pass
@stored_procedure("swh_directory_add")
def directory_add_from_temp(self, cur=None):
pass
@stored_procedure("swh_skipped_content_add")
def skipped_content_add_from_temp(self, cur=None):
pass
@stored_procedure("swh_revision_add")
def revision_add_from_temp(self, cur=None):
pass
@stored_procedure("swh_release_add")
def release_add_from_temp(self, cur=None):
pass
def content_update_from_temp(self, keys_to_update, cur=None):
cur = self._cursor(cur)
cur.execute(
"""select swh_content_update(ARRAY[%s] :: text[])""" % keys_to_update
)
content_get_metadata_keys = [
"sha1",
"sha1_git",
"sha256",
"blake2s256",
"length",
"status",
]
content_add_keys = content_get_metadata_keys + ["ctime"]
skipped_content_keys = [
"sha1",
"sha1_git",
"sha256",
"blake2s256",
"length",
"reason",
"status",
"origin",
]
def content_get_metadata_from_sha1s(self, sha1s, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
select t.sha1, %s from (values %%s) as t (sha1)
inner join content using (sha1)
"""
% ", ".join(self.content_get_metadata_keys[1:]),
((sha1,) for sha1 in sha1s),
)
def content_get_range(self, start, end, limit=None, cur=None):
"""Retrieve contents within range [start, end].
"""
cur = self._cursor(cur)
query = """select %s from content
where %%s <= sha1 and sha1 <= %%s
order by sha1
limit %%s""" % ", ".join(
self.content_get_metadata_keys
)
cur.execute(query, (start, end, limit))
yield from cur
content_hash_keys = ["sha1", "sha1_git", "sha256", "blake2s256"]
def content_missing_from_list(self, contents, cur=None):
cur = self._cursor(cur)
keys = ", ".join(self.content_hash_keys)
equality = " AND ".join(
("t.%s = c.%s" % (key, key)) for key in self.content_hash_keys
)
yield from execute_values_generator(
cur,
"""
SELECT %s
FROM (VALUES %%s) as t(%s)
WHERE NOT EXISTS (
SELECT 1 FROM content c
WHERE %s
)
"""
% (keys, keys, equality),
(tuple(c[key] for key in self.content_hash_keys) for c in contents),
)
def content_missing_per_sha1(self, sha1s, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
SELECT t.sha1 FROM (VALUES %s) AS t(sha1)
WHERE NOT EXISTS (
SELECT 1 FROM content c WHERE c.sha1 = t.sha1
)""",
((sha1,) for sha1 in sha1s),
)
def content_missing_per_sha1_git(self, contents, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
SELECT t.sha1_git FROM (VALUES %s) AS t(sha1_git)
WHERE NOT EXISTS (
SELECT 1 FROM content c WHERE c.sha1_git = t.sha1_git
)""",
((sha1,) for sha1 in contents),
)
def skipped_content_missing(self, contents, cur=None):
if not contents:
return []
cur = self._cursor(cur)
query = """SELECT * FROM (VALUES %s) AS t (%s)
WHERE not exists
(SELECT 1 FROM skipped_content s WHERE
s.sha1 is not distinct from t.sha1::sha1 and
s.sha1_git is not distinct from t.sha1_git::sha1 and
s.sha256 is not distinct from t.sha256::bytea);""" % (
(", ".join("%s" for _ in contents)),
", ".join(self.content_hash_keys),
)
cur.execute(
query,
[tuple(cont[key] for key in self.content_hash_keys) for cont in contents],
)
yield from cur
def snapshot_exists(self, snapshot_id, cur=None):
"""Check whether a snapshot with the given id exists"""
cur = self._cursor(cur)
cur.execute("""SELECT 1 FROM snapshot where id=%s""", (snapshot_id,))
return bool(cur.fetchone())
def snapshot_missing_from_list(self, snapshots, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
SELECT id FROM (VALUES %s) as t(id)
WHERE NOT EXISTS (
SELECT 1 FROM snapshot d WHERE d.id = t.id
)
""",
((id,) for id in snapshots),
)
def snapshot_add(self, snapshot_id, cur=None):
"""Add a snapshot from the temporary table"""
cur = self._cursor(cur)
cur.execute("""SELECT swh_snapshot_add(%s)""", (snapshot_id,))
snapshot_count_cols = ["target_type", "count"]
def snapshot_count_branches(self, snapshot_id, cur=None):
cur = self._cursor(cur)
query = """\
SELECT %s FROM swh_snapshot_count_branches(%%s)
""" % ", ".join(
self.snapshot_count_cols
)
cur.execute(query, (snapshot_id,))
yield from cur
snapshot_get_cols = ["snapshot_id", "name", "target", "target_type"]
def snapshot_get_by_id(
self,
snapshot_id,
branches_from=b"",
branches_count=None,
target_types=None,
cur=None,
):
cur = self._cursor(cur)
query = """\
SELECT %s
FROM swh_snapshot_get_by_id(%%s, %%s, %%s, %%s :: snapshot_target[])
""" % ", ".join(
self.snapshot_get_cols
)
cur.execute(query, (snapshot_id, branches_from, branches_count, target_types))
yield from cur
def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None):
cur = self._cursor(cur)
query = """\
SELECT ovs.snapshot
FROM origin_visit ov
INNER JOIN origin o ON o.id = ov.origin
INNER JOIN origin_visit_status ovs
ON ov.origin = ovs.origin AND ov.visit = ovs.visit
WHERE o.url=%s AND ov.visit=%s
ORDER BY ovs.date DESC LIMIT 1
"""
cur.execute(query, (origin_url, visit_id))
ret = cur.fetchone()
if ret:
return ret[0]
def snapshot_get_random(self, cur=None):
return self._get_random_row_from_table("snapshot", ["id"], "id", cur)
content_find_cols = [
"sha1",
"sha1_git",
"sha256",
"blake2s256",
"length",
"ctime",
"status",
]
def content_find(
self, sha1=None, sha1_git=None, sha256=None, blake2s256=None, cur=None
):
"""Find the content optionally on a combination of the following
checksums sha1, sha1_git, sha256 or blake2s256.
Args:
sha1: sha1 content
git_sha1: the sha1 computed `a la git` sha1 of the content
sha256: sha256 content
blake2s256: blake2s256 content
Returns:
The tuple (sha1, sha1_git, sha256, blake2s256) if found or None.
"""
cur = self._cursor(cur)
checksum_dict = {
"sha1": sha1,
"sha1_git": sha1_git,
"sha256": sha256,
"blake2s256": blake2s256,
}
where_parts = []
args = []
# Adds only those keys which have value other than None
for algorithm in checksum_dict:
if checksum_dict[algorithm] is not None:
args.append(checksum_dict[algorithm])
where_parts.append(algorithm + "= %s")
query = " AND ".join(where_parts)
cur.execute(
"""SELECT %s
FROM content WHERE %s
"""
% (",".join(self.content_find_cols), query),
args,
)
content = cur.fetchall()
return content
def content_get_random(self, cur=None):
return self._get_random_row_from_table("content", ["sha1_git"], "sha1_git", cur)
def directory_missing_from_list(self, directories, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
SELECT id FROM (VALUES %s) as t(id)
WHERE NOT EXISTS (
SELECT 1 FROM directory d WHERE d.id = t.id
)
""",
((id,) for id in directories),
)
directory_ls_cols = [
"dir_id",
"type",
"target",
"name",
"perms",
"status",
"sha1",
"sha1_git",
"sha256",
"length",
]
def directory_walk_one(self, directory, cur=None):
cur = self._cursor(cur)
cols = ", ".join(self.directory_ls_cols)
query = "SELECT %s FROM swh_directory_walk_one(%%s)" % cols
cur.execute(query, (directory,))
yield from cur
def directory_walk(self, directory, cur=None):
cur = self._cursor(cur)
cols = ", ".join(self.directory_ls_cols)
query = "SELECT %s FROM swh_directory_walk(%%s)" % cols
cur.execute(query, (directory,))
yield from cur
def directory_entry_get_by_path(self, directory, paths, cur=None):
"""Retrieve a directory entry by path.
"""
cur = self._cursor(cur)
cols = ", ".join(self.directory_ls_cols)
query = "SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)" % cols
cur.execute(query, (directory, paths))
data = cur.fetchone()
if set(data) == {None}:
return None
return data
def directory_get_random(self, cur=None):
return self._get_random_row_from_table("directory", ["id"], "id", cur)
def revision_missing_from_list(self, revisions, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
SELECT id FROM (VALUES %s) as t(id)
WHERE NOT EXISTS (
SELECT 1 FROM revision r WHERE r.id = t.id
)
""",
((id,) for id in revisions),
)
revision_add_cols = [
"id",
"date",
"date_offset",
"date_neg_utc_offset",
"committer_date",
"committer_date_offset",
"committer_date_neg_utc_offset",
"type",
"directory",
"message",
"author_fullname",
"author_name",
"author_email",
"committer_fullname",
"committer_name",
"committer_email",
"metadata",
"synthetic",
+ "extra_headers",
]
revision_get_cols = revision_add_cols + ["parents"]
def origin_visit_add(self, origin, ts, type, cur=None):
"""Add a new origin_visit for origin origin at timestamp ts.
Args:
origin: origin concerned by the visit
ts: the date of the visit
type: type of loader for the visit
Returns:
The new visit index step for that origin
"""
cur = self._cursor(cur)
self._cursor(cur).execute(
"SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type)
)
return cur.fetchone()[0]
origin_visit_status_cols = [
"origin",
"visit",
"date",
"status",
"snapshot",
"metadata",
]
def origin_visit_status_add(
self, visit_status: OriginVisitStatus, cur=None
) -> None:
"""Add new origin visit status
"""
assert self.origin_visit_status_cols[0] == "origin"
assert self.origin_visit_status_cols[-1] == "metadata"
cols = self.origin_visit_status_cols[1:-1]
cur = self._cursor(cur)
cur.execute(
f"WITH origin_id as (select id from origin where url=%s) "
f"INSERT INTO origin_visit_status "
f"(origin, {', '.join(cols)}, metadata) "
f"VALUES ((select id from origin_id), "
f"{', '.join(['%s']*len(cols))}, %s) "
f"ON CONFLICT (origin, visit, date) do nothing",
[visit_status.origin]
+ [getattr(visit_status, key) for key in cols]
+ [jsonize(visit_status.metadata)],
)
def origin_visit_add_with_id(self, origin_visit: OriginVisit, cur=None) -> None:
"""Insert origin visit when id are already set
"""
ov = origin_visit
assert ov.visit is not None
cur = self._cursor(cur)
origin_visit_cols = ["origin", "visit", "date", "type"]
query = """INSERT INTO origin_visit ({cols})
VALUES ((select id from origin where url=%s), {values})
ON CONFLICT (origin, visit) DO NOTHING""".format(
cols=", ".join(origin_visit_cols),
values=", ".join("%s" for col in origin_visit_cols[1:]),
)
cur.execute(query, (ov.origin, ov.visit, ov.date, ov.type))
origin_visit_get_cols = [
"origin",
"visit",
"date",
"type",
"status",
"metadata",
"snapshot",
]
origin_visit_select_cols = [
"o.url AS origin",
"ov.visit",
"ov.date",
"ov.type AS type",
"ovs.status",
"ovs.metadata",
"ovs.snapshot",
]
origin_visit_status_select_cols = [
"o.url AS origin",
"ovs.visit",
"ovs.date",
"ovs.status",
"ovs.snapshot",
"ovs.metadata",
]
def _make_origin_visit_status(
self, row: Optional[Tuple[Any]]
) -> Optional[Dict[str, Any]]:
"""Make an origin_visit_status dict out of a row
"""
if not row:
return None
return dict(zip(self.origin_visit_status_cols, row))
def origin_visit_status_get_latest(
self,
origin_url: str,
visit: int,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
cur=None,
) -> Optional[Dict[str, Any]]:
"""Given an origin visit id, return its latest origin_visit_status
"""
cur = self._cursor(cur)
query_parts = [
"SELECT %s" % ", ".join(self.origin_visit_status_select_cols),
"FROM origin_visit_status ovs ",
"INNER JOIN origin o ON o.id = ovs.origin",
]
query_parts.append("WHERE o.url = %s")
query_params: List[Any] = [origin_url]
query_parts.append("AND ovs.visit = %s")
query_params.append(visit)
if require_snapshot:
query_parts.append("AND ovs.snapshot is not null")
if allowed_statuses:
query_parts.append("AND ovs.status IN %s")
query_params.append(tuple(allowed_statuses))
query_parts.append("ORDER BY ovs.date DESC LIMIT 1")
query = "\n".join(query_parts)
cur.execute(query, tuple(query_params))
row = cur.fetchone()
return self._make_origin_visit_status(row)
def origin_visit_get_all(
self, origin_id, last_visit=None, order="asc", limit=None, cur=None
):
"""Retrieve all visits for origin with id origin_id.
Args:
origin_id: The occurrence's origin
Yields:
The visits for that origin
"""
cur = self._cursor(cur)
assert order.lower() in ["asc", "desc"]
query_parts = [
"SELECT DISTINCT ON (ov.visit) %s "
% ", ".join(self.origin_visit_select_cols),
"FROM origin_visit ov",
"INNER JOIN origin o ON o.id = ov.origin",
"INNER JOIN origin_visit_status ovs",
"ON ov.origin = ovs.origin AND ov.visit = ovs.visit",
]
query_parts.append("WHERE o.url = %s")
query_params: List[Any] = [origin_id]
if last_visit is not None:
op_comparison = ">" if order == "asc" else "<"
query_parts.append(f"and ov.visit {op_comparison} %s")
query_params.append(last_visit)
if order == "asc":
query_parts.append("ORDER BY ov.visit ASC, ovs.date DESC")
elif order == "desc":
query_parts.append("ORDER BY ov.visit DESC, ovs.date DESC")
else:
assert False
if limit is not None:
query_parts.append("LIMIT %s")
query_params.append(limit)
query = "\n".join(query_parts)
cur.execute(query, tuple(query_params))
yield from cur
def origin_visit_get(self, origin_id, visit_id, cur=None):
"""Retrieve information on visit visit_id of origin origin_id.
Args:
origin_id: the origin concerned
visit_id: The visit step for that origin
Returns:
The origin_visit information
"""
cur = self._cursor(cur)
query = """\
SELECT %s
FROM origin_visit ov
INNER JOIN origin o ON o.id = ov.origin
INNER JOIN origin_visit_status ovs
ON ov.origin = ovs.origin AND ov.visit = ovs.visit
WHERE o.url = %%s AND ov.visit = %%s
ORDER BY ovs.date DESC
LIMIT 1
""" % (
", ".join(self.origin_visit_select_cols)
)
cur.execute(query, (origin_id, visit_id))
r = cur.fetchall()
if not r:
return None
return r[0]
def origin_visit_find_by_date(self, origin, visit_date, cur=None):
cur = self._cursor(cur)
cur.execute(
"SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date)
)
rows = cur.fetchall()
if rows:
visit = dict(zip(self.origin_visit_get_cols, rows[0]))
visit["origin"] = origin
return visit
def origin_visit_exists(self, origin_id, visit_id, cur=None):
"""Check whether an origin visit with the given ids exists"""
cur = self._cursor(cur)
query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s"
cur.execute(query, (origin_id, visit_id))
return bool(cur.fetchone())
def origin_visit_get_latest(
self,
origin_id: str,
type: Optional[str],
allowed_statuses: Optional[Iterable[str]],
require_snapshot: bool,
cur=None,
):
"""Retrieve the most recent origin_visit of the given origin,
with optional filters.
Args:
origin_id: the origin concerned
type: Optional visit type to filter on
allowed_statuses: the visit statuses allowed for the returned visit
require_snapshot (bool): If True, only a visit with a known
snapshot will be returned.
Returns:
The origin_visit information, or None if no visit matches.
"""
cur = self._cursor(cur)
query_parts = [
"SELECT %s" % ", ".join(self.origin_visit_select_cols),
"FROM origin_visit ov ",
"INNER JOIN origin o ON o.id = ov.origin",
"INNER JOIN origin_visit_status ovs ",
"ON o.id = ovs.origin AND ov.visit = ovs.visit ",
]
query_parts.append("WHERE o.url = %s")
query_params: List[Any] = [origin_id]
if type is not None:
query_parts.append("AND ov.type = %s")
query_params.append(type)
if require_snapshot:
query_parts.append("AND ovs.snapshot is not null")
if allowed_statuses:
query_parts.append("AND ovs.status IN %s")
query_params.append(tuple(allowed_statuses))
query_parts.append(
"ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1"
)
query = "\n".join(query_parts)
cur.execute(query, tuple(query_params))
r = cur.fetchone()
if not r:
return None
return r
def origin_visit_get_random(self, type, cur=None):
"""Randomly select one origin visit that was full and in the last 3
months
"""
cur = self._cursor(cur)
columns = ",".join(self.origin_visit_select_cols)
query = f"""select {columns}
from origin_visit ov
inner join origin o on ov.origin=o.id
inner join origin_visit_status ovs
on ov.origin = ovs.origin and ov.visit = ovs.visit
where ovs.status='full'
and ov.type=%s
and ov.date > now() - '3 months'::interval
and random() < 0.1
limit 1
"""
cur.execute(query, (type,))
return cur.fetchone()
@staticmethod
def mangle_query_key(key, main_table):
if key == "id":
return "t.id"
if key == "parents":
return """
ARRAY(
SELECT rh.parent_id::bytea
FROM revision_history rh
WHERE rh.id = t.id
ORDER BY rh.parent_rank
)"""
if "_" not in key:
return "%s.%s" % (main_table, key)
head, tail = key.split("_", 1)
if head in ("author", "committer") and tail in (
"name",
"email",
"id",
"fullname",
):
return "%s.%s" % (head, tail)
return "%s.%s" % (main_table, key)
def revision_get_from_list(self, revisions, cur=None):
cur = self._cursor(cur)
query_keys = ", ".join(
self.mangle_query_key(k, "revision") for k in self.revision_get_cols
)
yield from execute_values_generator(
cur,
"""
SELECT %s FROM (VALUES %%s) as t(sortkey, id)
LEFT JOIN revision ON t.id = revision.id
LEFT JOIN person author ON revision.author = author.id
LEFT JOIN person committer ON revision.committer = committer.id
ORDER BY sortkey
"""
% query_keys,
((sortkey, id) for sortkey, id in enumerate(revisions)),
)
def revision_log(self, root_revisions, limit=None, cur=None):
cur = self._cursor(cur)
query = """SELECT %s
FROM swh_revision_log(%%s, %%s)
""" % ", ".join(
self.revision_get_cols
)
cur.execute(query, (root_revisions, limit))
yield from cur
revision_shortlog_cols = ["id", "parents"]
def revision_shortlog(self, root_revisions, limit=None, cur=None):
cur = self._cursor(cur)
query = """SELECT %s
FROM swh_revision_list(%%s, %%s)
""" % ", ".join(
self.revision_shortlog_cols
)
cur.execute(query, (root_revisions, limit))
yield from cur
def revision_get_random(self, cur=None):
return self._get_random_row_from_table("revision", ["id"], "id", cur)
def release_missing_from_list(self, releases, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
SELECT id FROM (VALUES %s) as t(id)
WHERE NOT EXISTS (
SELECT 1 FROM release r WHERE r.id = t.id
)
""",
((id,) for id in releases),
)
object_find_by_sha1_git_cols = ["sha1_git", "type"]
def object_find_by_sha1_git(self, ids, cur=None):
cur = self._cursor(cur)
yield from execute_values_generator(
cur,
"""
WITH t (sha1_git) AS (VALUES %s),
known_objects as ((
select
id as sha1_git,
'release'::object_type as type,
object_id
from release r
where exists (select 1 from t where t.sha1_git = r.id)
) union all (
select
id as sha1_git,
'revision'::object_type as type,
object_id
from revision r
where exists (select 1 from t where t.sha1_git = r.id)
) union all (
select
id as sha1_git,
'directory'::object_type as type,
object_id
from directory d
where exists (select 1 from t where t.sha1_git = d.id)
) union all (
select
sha1_git as sha1_git,
'content'::object_type as type,
object_id
from content c
where exists (select 1 from t where t.sha1_git = c.sha1_git)
))
select t.sha1_git as sha1_git, k.type
from t
left join known_objects k on t.sha1_git = k.sha1_git
""",
((id,) for id in ids),
)
def stat_counters(self, cur=None):
cur = self._cursor(cur)
cur.execute("SELECT * FROM swh_stat_counters()")
yield from cur
def origin_add(self, url, cur=None):
"""Insert a new origin and return the new identifier."""
insert = """INSERT INTO origin (url) values (%s)
RETURNING url"""
cur.execute(insert, (url,))
return cur.fetchone()[0]
origin_cols = ["url"]
def origin_get_by_url(self, origins, cur=None):
"""Retrieve origin `(type, url)` from urls if found."""
cur = self._cursor(cur)
query = """SELECT %s FROM (VALUES %%s) as t(url)
LEFT JOIN origin ON t.url = origin.url
""" % ",".join(
"origin." + col for col in self.origin_cols
)
yield from execute_values_generator(cur, query, ((url,) for url in origins))
def origin_get_by_sha1(self, sha1s, cur=None):
"""Retrieve origin urls from sha1s if found."""
cur = self._cursor(cur)
query = """SELECT %s FROM (VALUES %%s) as t(sha1)
LEFT JOIN origin ON t.sha1 = digest(origin.url, 'sha1')
""" % ",".join(
"origin." + col for col in self.origin_cols
)
yield from execute_values_generator(cur, query, ((sha1,) for sha1 in sha1s))
def origin_id_get_by_url(self, origins, cur=None):
"""Retrieve origin `(type, url)` from urls if found."""
cur = self._cursor(cur)
query = """SELECT id FROM (VALUES %s) as t(url)
LEFT JOIN origin ON t.url = origin.url
"""
for row in execute_values_generator(cur, query, ((url,) for url in origins)):
yield row[0]
origin_get_range_cols = ["id", "url"]
def origin_get_range(self, origin_from=1, origin_count=100, cur=None):
"""Retrieve ``origin_count`` origins whose ids are greater
or equal than ``origin_from``.
Origins are sorted by id before retrieving them.
Args:
origin_from (int): the minimum id of origins to retrieve
origin_count (int): the maximum number of origins to retrieve
"""
cur = self._cursor(cur)
query = """SELECT %s
FROM origin WHERE id >= %%s
ORDER BY id LIMIT %%s
""" % ",".join(
self.origin_get_range_cols
)
cur.execute(query, (origin_from, origin_count))
yield from cur
def _origin_query(
self,
url_pattern,
count=False,
offset=0,
limit=50,
regexp=False,
with_visit=False,
cur=None,
):
"""
Method factorizing query creation for searching and counting origins.
"""
cur = self._cursor(cur)
if count:
origin_cols = "COUNT(*)"
else:
origin_cols = ",".join(self.origin_cols)
query = """SELECT %s
FROM origin o
WHERE """
if with_visit:
query += """
EXISTS (
SELECT 1
FROM origin_visit ov
INNER JOIN origin_visit_status ovs
ON ov.origin = ovs.origin AND ov.visit = ovs.visit
INNER JOIN snapshot ON ovs.snapshot=snapshot.id
WHERE ov.origin=o.id
)
AND """
query += "url %s %%s "
if not count:
query += "ORDER BY id OFFSET %%s LIMIT %%s"
if not regexp:
query = query % (origin_cols, "ILIKE")
query_params = ("%" + url_pattern + "%", offset, limit)
else:
query = query % (origin_cols, "~*")
query_params = (url_pattern, offset, limit)
if count:
query_params = (query_params[0],)
cur.execute(query, query_params)
def origin_search(
self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, cur=None
):
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
The search is performed in a case insensitive way.
Args:
url_pattern (str): the string pattern to search for in origin urls
offset (int): number of found origins to skip before returning
results
limit (int): the maximum number of found origins to return
regexp (bool): if True, consider the provided pattern as a regular
expression and returns origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
"""
self._origin_query(
url_pattern,
offset=offset,
limit=limit,
regexp=regexp,
with_visit=with_visit,
cur=cur,
)
yield from cur
def origin_count(self, url_pattern, regexp=False, with_visit=False, cur=None):
"""Count origins whose urls contain a provided string pattern
or match a provided regular expression.
The pattern search in origin urls is performed in a case insensitive
way.
Args:
url_pattern (str): the string pattern to search for in origin urls
regexp (bool): if True, consider the provided pattern as a regular
expression and returns origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
"""
self._origin_query(
url_pattern, count=True, regexp=regexp, with_visit=with_visit, cur=cur
)
return cur.fetchone()[0]
release_add_cols = [
"id",
"target",
"target_type",
"date",
"date_offset",
"date_neg_utc_offset",
"name",
"comment",
"synthetic",
"author_fullname",
"author_name",
"author_email",
]
release_get_cols = release_add_cols
def release_get_from_list(self, releases, cur=None):
cur = self._cursor(cur)
query_keys = ", ".join(
self.mangle_query_key(k, "release") for k in self.release_get_cols
)
yield from execute_values_generator(
cur,
"""
SELECT %s FROM (VALUES %%s) as t(sortkey, id)
LEFT JOIN release ON t.id = release.id
LEFT JOIN person author ON release.author = author.id
ORDER BY sortkey
"""
% query_keys,
((sortkey, id) for sortkey, id in enumerate(releases)),
)
def release_get_random(self, cur=None):
return self._get_random_row_from_table("release", ["id"], "id", cur)
_object_metadata_context_cols = [
"origin",
"visit",
"snapshot",
"release",
"revision",
"path",
"directory",
]
"""The list of context columns for all artifact types."""
_object_metadata_insert_cols = [
"type",
"id",
"authority_id",
"fetcher_id",
"discovery_date",
"format",
"metadata",
*_object_metadata_context_cols,
]
"""List of columns of the object_metadata table, used when writing
metadata."""
_object_metadata_insert_query = f"""
INSERT INTO object_metadata
({', '.join(_object_metadata_insert_cols)})
VALUES ({', '.join('%s' for _ in _object_metadata_insert_cols)})
ON CONFLICT (id, authority_id, discovery_date, fetcher_id)
DO NOTHING
"""
object_metadata_get_cols = [
"id",
"discovery_date",
"metadata_authority.type",
"metadata_authority.url",
"metadata_fetcher.id",
"metadata_fetcher.name",
"metadata_fetcher.version",
*_object_metadata_context_cols,
"format",
"metadata",
]
"""List of columns of the object_metadata, metadata_authority,
and metadata_fetcher tables, used when reading object metadata."""
_object_metadata_select_query = f"""
SELECT
object_metadata.id AS id,
{', '.join(object_metadata_get_cols[1:-1])},
object_metadata.metadata AS metadata
FROM object_metadata
INNER JOIN metadata_authority
ON (metadata_authority.id=authority_id)
INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id)
WHERE object_metadata.id=%s AND authority_id=%s
"""
def object_metadata_add(
self,
object_type: str,
id: str,
context: Dict[str, Union[str, bytes, int]],
discovery_date: datetime.datetime,
authority_id: int,
fetcher_id: int,
format: str,
metadata: bytes,
cur,
):
query = self._object_metadata_insert_query
args: Dict[str, Any] = dict(
type=object_type,
id=id,
authority_id=authority_id,
fetcher_id=fetcher_id,
discovery_date=discovery_date,
format=format,
metadata=metadata,
)
for col in self._object_metadata_context_cols:
args[col] = context.get(col)
params = [args[col] for col in self._object_metadata_insert_cols]
cur.execute(query, params)
def object_metadata_get(
self,
object_type: str,
id: str,
authority_id: int,
after_time: Optional[datetime.datetime],
after_fetcher: Optional[int],
limit: int,
cur,
):
query_parts = [self._object_metadata_select_query]
args = [id, authority_id]
if after_fetcher is not None:
assert after_time
query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)")
args.extend([after_time, after_fetcher])
elif after_time is not None:
query_parts.append("AND discovery_date > %s")
args.append(after_time)
query_parts.append("ORDER BY discovery_date, fetcher_id")
if limit:
query_parts.append("LIMIT %s")
args.append(limit)
cur.execute(" ".join(query_parts), args)
yield from cur
metadata_fetcher_cols = ["name", "version", "metadata"]
def metadata_fetcher_add(
self, name: str, version: str, metadata: bytes, cur=None
) -> None:
cur = self._cursor(cur)
cur.execute(
"INSERT INTO metadata_fetcher (name, version, metadata) "
"VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
(name, version, jsonize(metadata)),
)
def metadata_fetcher_get(self, name: str, version: str, cur=None):
cur = self._cursor(cur)
cur.execute(
f"SELECT {', '.join(self.metadata_fetcher_cols)} "
f"FROM metadata_fetcher "
f"WHERE name=%s AND version=%s",
(name, version),
)
return cur.fetchone()
def metadata_fetcher_get_id(
self, name: str, version: str, cur=None
) -> Optional[int]:
cur = self._cursor(cur)
cur.execute(
"SELECT id FROM metadata_fetcher WHERE name=%s AND version=%s",
(name, version),
)
row = cur.fetchone()
if row:
return row[0]
else:
return None
metadata_authority_cols = ["type", "url", "metadata"]
def metadata_authority_add(
self, type: str, url: str, metadata: bytes, cur=None
) -> None:
cur = self._cursor(cur)
cur.execute(
"INSERT INTO metadata_authority (type, url, metadata) "
"VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
(type, url, jsonize(metadata)),
)
def metadata_authority_get(self, type: str, url: str, cur=None):
cur = self._cursor(cur)
cur.execute(
f"SELECT {', '.join(self.metadata_authority_cols)} "
f"FROM metadata_authority "
f"WHERE type=%s AND url=%s",
(type, url),
)
return cur.fetchone()
def metadata_authority_get_id(self, type: str, url: str, cur=None) -> Optional[int]:
cur = self._cursor(cur)
cur.execute(
"SELECT id FROM metadata_authority WHERE type=%s AND url=%s", (type, url)
)
row = cur.fetchone()
if row:
return row[0]
else:
return None
def _get_random_row_from_table(self, table_name, cols, id_col, cur=None):
random_sha1 = bytes(random.randint(0, 255) for _ in range(SHA1_SIZE))
cur = self._cursor(cur)
query = """
(SELECT {cols} FROM {table} WHERE {id_col} >= %s
ORDER BY {id_col} LIMIT 1)
UNION
(SELECT {cols} FROM {table} WHERE {id_col} < %s
ORDER BY {id_col} DESC LIMIT 1)
LIMIT 1
""".format(
cols=", ".join(cols), table=table_name, id_col=id_col
)
cur.execute(query, (random_sha1, random_sha1))
row = cur.fetchone()
if row:
return row[0]
diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql
index 02b3b1ab..d267d380 100644
--- a/swh/storage/sql/30-swh-schema.sql
+++ b/swh/storage/sql/30-swh-schema.sql
@@ -1,497 +1,499 @@
---
--- SQL implementation of the Software Heritage data model
---
-- schema versions
create table dbversion
(
version int primary key,
release timestamptz,
description text
);
comment on table dbversion is 'Details of current db version';
comment on column dbversion.version is 'SQL schema version';
comment on column dbversion.release is 'Version deployment timestamp';
comment on column dbversion.description is 'Release description';
-- latest schema version
insert into dbversion(version, release, description)
- values(157, now(), 'Work In Progress');
+ values(158, now(), 'Work In Progress');
-- a SHA1 checksum
create domain sha1 as bytea check (length(value) = 20);
-- a Git object ID, i.e., a Git-style salted SHA1 checksum
create domain sha1_git as bytea check (length(value) = 20);
-- a SHA256 checksum
create domain sha256 as bytea check (length(value) = 32);
-- a blake2 checksum
create domain blake2s256 as bytea check (length(value) = 32);
-- UNIX path (absolute, relative, individual path component, etc.)
create domain unix_path as bytea;
-- a set of UNIX-like access permissions, as manipulated by, e.g., chmod
create domain file_perms as int;
-- an SWHID
create domain swhid as text check (value ~ '^swh:[0-9]+:.*');
-- Checksums about actual file content. Note that the content itself is not
-- stored in the DB, but on external (key-value) storage. A single checksum is
-- used as key there, but the other can be used to verify that we do not inject
-- content collisions not knowingly.
create table content
(
sha1 sha1 not null,
sha1_git sha1_git not null,
sha256 sha256 not null,
blake2s256 blake2s256 not null,
length bigint not null,
ctime timestamptz not null default now(),
-- creation time, i.e. time of (first) injection into the storage
status content_status not null default 'visible',
object_id bigserial
);
comment on table content is 'Checksums of file content which is actually stored externally';
comment on column content.sha1 is 'Content sha1 hash';
comment on column content.sha1_git is 'Git object sha1 hash';
comment on column content.sha256 is 'Content Sha256 hash';
comment on column content.blake2s256 is 'Content blake2s hash';
comment on column content.length is 'Content length';
comment on column content.ctime is 'First seen time';
comment on column content.status is 'Content status (absent, visible, hidden)';
comment on column content.object_id is 'Content identifier';
-- An origin is a place, identified by an URL, where software source code
-- artifacts can be found. We support different kinds of origins, e.g., git and
-- other VCS repositories, web pages that list tarballs URLs (e.g.,
-- http://www.kernel.org), indirect tarball URLs (e.g.,
-- http://www.example.org/latest.tar.gz), etc. The key feature of an origin is
-- that it can be *fetched* from (wget, git clone, svn checkout, etc.) to
-- retrieve all the contained software.
create table origin
(
id bigserial not null,
url text not null
);
comment on column origin.id is 'Artifact origin id';
comment on column origin.url is 'URL of origin';
-- Content blobs observed somewhere, but not ingested into the archive for
-- whatever reason. This table is separate from the content table as we might
-- not have the sha1 checksum of skipped contents (for instance when we inject
-- git repositories, objects that are too big will be skipped here, and we will
-- only know their sha1_git). 'reason' contains the reason the content was
-- skipped. origin is a nullable column allowing to find out which origin
-- contains that skipped content.
create table skipped_content
(
sha1 sha1,
sha1_git sha1_git,
sha256 sha256,
blake2s256 blake2s256,
length bigint not null,
ctime timestamptz not null default now(),
status content_status not null default 'absent',
reason text not null,
origin bigint,
object_id bigserial
);
comment on table skipped_content is 'Content blobs observed, but not ingested in the archive';
comment on column skipped_content.sha1 is 'Skipped content sha1 hash';
comment on column skipped_content.sha1_git is 'Git object sha1 hash';
comment on column skipped_content.sha256 is 'Skipped content sha256 hash';
comment on column skipped_content.blake2s256 is 'Skipped content blake2s hash';
comment on column skipped_content.length is 'Skipped content length';
comment on column skipped_content.ctime is 'First seen time';
comment on column skipped_content.status is 'Skipped content status (absent, visible, hidden)';
comment on column skipped_content.reason is 'Reason for skipping';
comment on column skipped_content.origin is 'Origin table identifier';
comment on column skipped_content.object_id is 'Skipped content identifier';
-- A file-system directory. A directory is a list of directory entries (see
-- tables: directory_entry_{dir,file}).
--
-- To list the contents of a directory:
-- 1. list the contained directory_entry_dir using array dir_entries
-- 2. list the contained directory_entry_file using array file_entries
-- 3. list the contained directory_entry_rev using array rev_entries
-- 4. UNION
--
-- Synonyms/mappings:
-- * git: tree
create table directory
(
id sha1_git not null,
dir_entries bigint[], -- sub-directories, reference directory_entry_dir
file_entries bigint[], -- contained files, reference directory_entry_file
rev_entries bigint[], -- mounted revisions, reference directory_entry_rev
object_id bigserial -- short object identifier
);
comment on table directory is 'Contents of a directory, synonymous to tree (git)';
comment on column directory.id is 'Git object sha1 hash';
comment on column directory.dir_entries is 'Sub-directories, reference directory_entry_dir';
comment on column directory.file_entries is 'Contained files, reference directory_entry_file';
comment on column directory.rev_entries is 'Mounted revisions, reference directory_entry_rev';
comment on column directory.object_id is 'Short object identifier';
-- A directory entry pointing to a (sub-)directory.
create table directory_entry_dir
(
id bigserial,
target sha1_git not null, -- id of target directory
name unix_path not null, -- path name, relative to containing dir
perms file_perms not null -- unix-like permissions
);
comment on table directory_entry_dir is 'Directory entry for directory';
comment on column directory_entry_dir.id is 'Directory identifier';
comment on column directory_entry_dir.target is 'Target directory identifier';
comment on column directory_entry_dir.name is 'Path name, relative to containing directory';
comment on column directory_entry_dir.perms is 'Unix-like permissions';
-- A directory entry pointing to a file content.
create table directory_entry_file
(
id bigserial,
target sha1_git not null, -- id of target file
name unix_path not null, -- path name, relative to containing dir
perms file_perms not null -- unix-like permissions
);
comment on table directory_entry_file is 'Directory entry for file';
comment on column directory_entry_file.id is 'File identifier';
comment on column directory_entry_file.target is 'Target file identifier';
comment on column directory_entry_file.name is 'Path name, relative to containing directory';
comment on column directory_entry_file.perms is 'Unix-like permissions';
-- A directory entry pointing to a revision.
create table directory_entry_rev
(
id bigserial,
target sha1_git not null, -- id of target revision
name unix_path not null, -- path name, relative to containing dir
perms file_perms not null -- unix-like permissions
);
comment on table directory_entry_rev is 'Directory entry for revision';
comment on column directory_entry_dir.id is 'Revision identifier';
comment on column directory_entry_dir.target is 'Target revision in identifier';
comment on column directory_entry_dir.name is 'Path name, relative to containing directory';
comment on column directory_entry_dir.perms is 'Unix-like permissions';
-- A person referenced by some source code artifacts, e.g., a VCS revision or
-- release metadata.
create table person
(
id bigserial,
name bytea, -- advisory: not null if we managed to parse a name
email bytea, -- advisory: not null if we managed to parse an email
fullname bytea not null -- freeform specification; what is actually used in the checksums
-- will usually be of the form 'name '
);
comment on table person is 'Person referenced in code artifact release metadata';
comment on column person.id is 'Person identifier';
comment on column person.name is 'Name';
comment on column person.email is 'Email';
comment on column person.fullname is 'Full name (raw name)';
-- The state of a source code tree at a specific point in time.
--
-- Synonyms/mappings:
-- * git / subversion / etc: commit
-- * tarball: a specific tarball
--
-- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in
-- case of merges) parent revisions. Each revision points to a directory, i.e.,
-- a file-system tree containing files and directories.
create table revision
(
id sha1_git not null,
date timestamptz,
date_offset smallint,
committer_date timestamptz,
committer_date_offset smallint,
type revision_type not null,
directory sha1_git, -- source code 'root' directory
message bytea,
author bigint,
committer bigint,
synthetic boolean not null default false, -- true iff revision has been created by Software Heritage
metadata jsonb, -- extra metadata (tarball checksums, extra commit information, etc...)
object_id bigserial,
date_neg_utc_offset boolean,
- committer_date_neg_utc_offset boolean
+ committer_date_neg_utc_offset boolean,
+ extra_headers bytea[][] -- extra headers (used in hash computation)
);
comment on table revision is 'A revision represents the state of a source code tree at a specific point in time';
comment on column revision.id is 'Git-style SHA1 commit identifier';
comment on column revision.date is 'Author timestamp as UNIX epoch';
comment on column revision.date_offset is 'Author timestamp timezone, as minute offsets from UTC';
comment on column revision.date_neg_utc_offset is 'True indicates a -0 UTC offset on author timestamp';
comment on column revision.committer_date is 'Committer timestamp as UNIX epoch';
comment on column revision.committer_date_offset is 'Committer timestamp timezone, as minute offsets from UTC';
comment on column revision.committer_date_neg_utc_offset is 'True indicates a -0 UTC offset on committer timestamp';
comment on column revision.type is 'Type of revision';
comment on column revision.directory is 'Directory identifier';
comment on column revision.message is 'Commit message';
comment on column revision.author is 'Author identity';
comment on column revision.committer is 'Committer identity';
comment on column revision.synthetic is 'True iff revision has been synthesized by Software Heritage';
comment on column revision.metadata is 'Extra revision metadata';
comment on column revision.object_id is 'Non-intrinsic, sequential object identifier';
+comment on column revision.extra_headers is 'Extra revision headers; used in revision hash computation';
-- either this table or the sha1_git[] column on the revision table
create table revision_history
(
id sha1_git not null,
parent_id sha1_git not null,
parent_rank int not null default 0
-- parent position in merge commits, 0-based
);
comment on table revision_history is 'Sequence of revision history with parent and position in history';
comment on column revision_history.id is 'Revision history git object sha1 checksum';
comment on column revision_history.parent_id is 'Parent revision git object identifier';
comment on column revision_history.parent_rank is 'Parent position in merge commits, 0-based';
-- Crawling history of software origins visited by Software Heritage. Each
-- visit is a 3-way mapping between a software origin, a timestamp, and a
-- snapshot object capturing the full-state of the origin at visit time.
create table origin_visit
(
origin bigint not null,
visit bigint not null,
date timestamptz not null,
type text not null
);
comment on column origin_visit.origin is 'Visited origin';
comment on column origin_visit.visit is 'Sequential visit number for the origin';
comment on column origin_visit.date is 'Visit timestamp';
comment on column origin_visit.type is 'Type of loader that did the visit (hg, git, ...)';
-- Crawling history of software origin visits by Software Heritage. Each
-- visit see its history change through new origin visit status updates
create table origin_visit_status
(
origin bigint not null,
visit bigint not null,
date timestamptz not null,
status origin_visit_state not null,
metadata jsonb,
snapshot sha1_git
);
comment on column origin_visit_status.origin is 'Origin concerned by the visit update';
comment on column origin_visit_status.visit is 'Visit concerned by the visit update';
comment on column origin_visit_status.date is 'Visit update timestamp';
comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)';
comment on column origin_visit_status.metadata is 'Optional origin visit metadata';
comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit. It can be partial.';
-- A snapshot represents the entire state of a software origin as crawled by
-- Software Heritage. This table is a simple mapping between (public) intrinsic
-- snapshot identifiers and (private) numeric sequential identifiers.
create table snapshot
(
object_id bigserial not null, -- PK internal object identifier
id sha1_git not null -- snapshot intrinsic identifier
);
comment on table snapshot is 'State of a software origin as crawled by Software Heritage';
comment on column snapshot.object_id is 'Internal object identifier';
comment on column snapshot.id is 'Intrinsic snapshot identifier';
-- Each snapshot associate "branch" names to other objects in the Software
-- Heritage Merkle DAG. This table describes branches as mappings between names
-- and target typed objects.
create table snapshot_branch
(
object_id bigserial not null, -- PK internal object identifier
name bytea not null, -- branch name, e.g., "master" or "feature/drag-n-drop"
target bytea, -- target object identifier, e.g., a revision identifier
target_type snapshot_target -- target object type, e.g., "revision"
);
comment on table snapshot_branch is 'Associates branches with objects in Heritage Merkle DAG';
comment on column snapshot_branch.object_id is 'Internal object identifier';
comment on column snapshot_branch.name is 'Branch name';
comment on column snapshot_branch.target is 'Target object identifier';
comment on column snapshot_branch.target_type is 'Target object type';
-- Mapping between snapshots and their branches.
create table snapshot_branches
(
snapshot_id bigint not null, -- snapshot identifier, ref. snapshot.object_id
branch_id bigint not null -- branch identifier, ref. snapshot_branch.object_id
);
comment on table snapshot_branches is 'Mapping between snapshot and their branches';
comment on column snapshot_branches.snapshot_id is 'Snapshot identifier';
comment on column snapshot_branches.branch_id is 'Branch identifier';
-- A "memorable" point in time in the development history of a software
-- project.
--
-- Synonyms/mappings:
-- * git: tag (of the annotated kind, otherwise they are just references)
-- * tarball: the release version number
create table release
(
id sha1_git not null,
target sha1_git,
date timestamptz,
date_offset smallint,
name bytea,
comment bytea,
author bigint,
synthetic boolean not null default false, -- true iff release has been created by Software Heritage
object_id bigserial,
target_type object_type not null,
date_neg_utc_offset boolean
);
comment on table release is 'Details of a software release, synonymous with
a tag (git) or version number (tarball)';
comment on column release.id is 'Release git identifier';
comment on column release.target is 'Target git identifier';
comment on column release.date is 'Release timestamp';
comment on column release.date_offset is 'Timestamp offset from UTC';
comment on column release.name is 'Name';
comment on column release.comment is 'Comment';
comment on column release.author is 'Author';
comment on column release.synthetic is 'Indicates if created by Software Heritage';
comment on column release.object_id is 'Object identifier';
comment on column release.target_type is 'Object type (''content'', ''directory'', ''revision'',
''release'', ''snapshot'')';
comment on column release.date_neg_utc_offset is 'True indicates -0 UTC offset for release timestamp';
-- Tools
create table metadata_fetcher
(
id serial not null,
name text not null,
version text not null,
metadata jsonb not null
);
comment on table metadata_fetcher is 'Tools used to retrieve metadata';
comment on column metadata_fetcher.id is 'Internal identifier of the fetcher';
comment on column metadata_fetcher.name is 'Fetcher name';
comment on column metadata_fetcher.version is 'Fetcher version';
comment on column metadata_fetcher.metadata is 'Extra information about the fetcher';
create table metadata_authority
(
id serial not null,
type text not null,
url text not null,
metadata jsonb not null
);
comment on table metadata_authority is 'Metadata authority information';
comment on column metadata_authority.id is 'Internal identifier of the authority';
comment on column metadata_authority.type is 'Type of authority (deposit/forge/registry)';
comment on column metadata_authority.url is 'Authority''s uri';
comment on column metadata_authority.metadata is 'Other metadata about authority';
-- Extrinsic metadata on a DAG objects and origins.
create table object_metadata
(
type text not null,
id text not null,
-- metadata source
authority_id bigint not null,
fetcher_id bigint not null,
discovery_date timestamptz not null,
-- metadata itself
format text not null,
metadata bytea not null,
-- context
origin text,
visit bigint,
snapshot swhid,
release swhid,
revision swhid,
path bytea,
directory swhid
);
comment on table object_metadata is 'keeps all metadata found concerning an object';
comment on column object_metadata.type is 'the type of object (content/directory/revision/release/snapshot/origin) the metadata is on';
comment on column object_metadata.id is 'the SWHID or origin URL for which the metadata was found';
comment on column object_metadata.discovery_date is 'the date of retrieval';
comment on column object_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.';
comment on column object_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.';
comment on column object_metadata.format is 'name of the format of metadata, used by readers to interpret it.';
comment on column object_metadata.metadata is 'original metadata in opaque format';
-- Keep a cache of object counts
create table object_counts
(
object_type text, -- table for which we're counting objects (PK)
value bigint, -- count of objects in the table
last_update timestamptz, -- last update for the object count in this table
single_update boolean -- whether we update this table standalone (true) or through bucketed counts (false)
);
comment on table object_counts is 'Cache of object counts';
comment on column object_counts.object_type is 'Object type (''content'', ''directory'', ''revision'',
''release'', ''snapshot'')';
comment on column object_counts.value is 'Count of objects in the table';
comment on column object_counts.last_update is 'Last update for object count';
comment on column object_counts.single_update is 'standalone (true) or bucketed counts (false)';
create table object_counts_bucketed
(
line serial not null, -- PK
object_type text not null, -- table for which we're counting objects
identifier text not null, -- identifier across which we're bucketing objects
bucket_start bytea, -- lower bound (inclusive) for the bucket
bucket_end bytea, -- upper bound (exclusive) for the bucket
value bigint, -- count of objects in the bucket
last_update timestamptz -- last update for the object count in this bucket
);
comment on table object_counts_bucketed is 'Bucketed count for objects ordered by type';
comment on column object_counts_bucketed.line is 'Auto incremented idenitfier value';
comment on column object_counts_bucketed.object_type is 'Object type (''content'', ''directory'', ''revision'',
''release'', ''snapshot'')';
comment on column object_counts_bucketed.identifier is 'Common identifier for bucketed objects';
comment on column object_counts_bucketed.bucket_start is 'Lower bound (inclusive) for the bucket';
comment on column object_counts_bucketed.bucket_end is 'Upper bound (exclusive) for the bucket';
comment on column object_counts_bucketed.value is 'Count of objects in the bucket';
comment on column object_counts_bucketed.last_update is 'Last update for the object count in this bucket';
diff --git a/swh/storage/sql/40-swh-func.sql b/swh/storage/sql/40-swh-func.sql
index 0a8a0a7f..e244ebf1 100644
--- a/swh/storage/sql/40-swh-func.sql
+++ b/swh/storage/sql/40-swh-func.sql
@@ -1,949 +1,950 @@
create or replace function hash_sha1(text)
returns text
as $$
select encode(digest($1, 'sha1'), 'hex')
$$ language sql strict immutable;
comment on function hash_sha1(text) is 'Compute SHA1 hash as text';
-- create a temporary table called tmp_TBLNAME, mimicking existing table
-- TBLNAME
--
-- Args:
-- tblname: name of the table to mimic
create or replace function swh_mktemp(tblname regclass)
returns void
language plpgsql
as $$
begin
execute format('
create temporary table if not exists tmp_%1$I
(like %1$I including defaults)
on commit delete rows;
alter table tmp_%1$I drop column if exists object_id;
', tblname);
return;
end
$$;
-- create a temporary table for directory entries called tmp_TBLNAME,
-- mimicking existing table TBLNAME with an extra dir_id (sha1_git)
-- column, and dropping the id column.
--
-- This is used to create the tmp_directory_entry_ tables.
--
-- Args:
-- tblname: name of the table to mimic
create or replace function swh_mktemp_dir_entry(tblname regclass)
returns void
language plpgsql
as $$
begin
execute format('
create temporary table if not exists tmp_%1$I
(like %1$I including defaults, dir_id sha1_git)
on commit delete rows;
alter table tmp_%1$I drop column if exists id;
', tblname);
return;
end
$$;
-- create a temporary table for revisions called tmp_revisions,
-- mimicking existing table revision, replacing the foreign keys to
-- people with an email and name field
--
create or replace function swh_mktemp_revision()
returns void
language sql
as $$
create temporary table if not exists tmp_revision (
like revision including defaults,
author_fullname bytea,
author_name bytea,
author_email bytea,
committer_fullname bytea,
committer_name bytea,
committer_email bytea
) on commit delete rows;
alter table tmp_revision drop column if exists author;
alter table tmp_revision drop column if exists committer;
alter table tmp_revision drop column if exists object_id;
$$;
-- create a temporary table for releases called tmp_release,
-- mimicking existing table release, replacing the foreign keys to
-- people with an email and name field
--
create or replace function swh_mktemp_release()
returns void
language sql
as $$
create temporary table if not exists tmp_release (
like release including defaults,
author_fullname bytea,
author_name bytea,
author_email bytea
) on commit delete rows;
alter table tmp_release drop column if exists author;
alter table tmp_release drop column if exists object_id;
$$;
-- create a temporary table for the branches of a snapshot
create or replace function swh_mktemp_snapshot_branch()
returns void
language sql
as $$
create temporary table if not exists tmp_snapshot_branch (
name bytea not null,
target bytea,
target_type snapshot_target
) on commit delete rows;
$$;
-- a content signature is a set of cryptographic checksums that we use to
-- uniquely identify content, for the purpose of verifying if we already have
-- some content or not during content injection
create type content_signature as (
sha1 sha1,
sha1_git sha1_git,
sha256 sha256,
blake2s256 blake2s256
);
-- check which entries of tmp_skipped_content are missing from skipped_content
--
-- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content,
-- 2. call this function
create or replace function swh_skipped_content_missing()
returns setof content_signature
language plpgsql
as $$
begin
return query
select sha1, sha1_git, sha256, blake2s256 from tmp_skipped_content t
where not exists
(select 1 from skipped_content s where
s.sha1 is not distinct from t.sha1 and
s.sha1_git is not distinct from t.sha1_git and
s.sha256 is not distinct from t.sha256);
return;
end
$$;
-- add tmp_content entries to content, skipping duplicates
--
-- operates in bulk: 0. swh_mktemp(content), 1. COPY to tmp_content,
-- 2. call this function
create or replace function swh_content_add()
returns void
language plpgsql
as $$
begin
insert into content (sha1, sha1_git, sha256, blake2s256, length, status, ctime)
select distinct sha1, sha1_git, sha256, blake2s256, length, status, ctime from tmp_content;
return;
end
$$;
-- add tmp_skipped_content entries to skipped_content, skipping duplicates
--
-- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content,
-- 2. call this function
create or replace function swh_skipped_content_add()
returns void
language plpgsql
as $$
begin
insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, status, reason, origin)
select distinct sha1, sha1_git, sha256, blake2s256, length, status, reason, origin
from tmp_skipped_content
where (coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')) in (
select coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')
from swh_skipped_content_missing()
);
-- TODO XXX use postgres 9.5 "UPSERT" support here, when available.
-- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid
-- the extra swh_skipped_content_missing() query here.
return;
end
$$;
-- Update content entries from temporary table.
-- (columns are potential new columns added to the schema, this cannot be empty)
--
create or replace function swh_content_update(columns_update text[])
returns void
language plpgsql
as $$
declare
query text;
tmp_array text[];
begin
if array_length(columns_update, 1) = 0 then
raise exception 'Please, provide the list of column names to update.';
end if;
tmp_array := array(select format('%1$s=t.%1$s', unnest) from unnest(columns_update));
query = format('update content set %s
from tmp_content t where t.sha1 = content.sha1',
array_to_string(tmp_array, ', '));
execute query;
return;
end
$$;
comment on function swh_content_update(text[]) IS 'Update existing content''s columns';
create type directory_entry_type as enum('file', 'dir', 'rev');
-- Add tmp_directory_entry_* entries to directory_entry_* and directory,
-- skipping duplicates in directory_entry_*. This is a generic function that
-- works on all kind of directory entries.
--
-- operates in bulk: 0. swh_mktemp_dir_entry('directory_entry_*'), 1 COPY to
-- tmp_directory_entry_*, 2. call this function
--
-- Assumption: this function is used in the same transaction that inserts the
-- context directory in table "directory".
create or replace function swh_directory_entry_add(typ directory_entry_type)
returns void
language plpgsql
as $$
begin
execute format('
insert into directory_entry_%1$s (target, name, perms)
select distinct t.target, t.name, t.perms
from tmp_directory_entry_%1$s t
where not exists (
select 1
from directory_entry_%1$s i
where t.target = i.target and t.name = i.name and t.perms = i.perms)
', typ);
execute format('
with new_entries as (
select t.dir_id, array_agg(i.id) as entries
from tmp_directory_entry_%1$s t
inner join directory_entry_%1$s i
using (target, name, perms)
group by t.dir_id
)
update tmp_directory as d
set %1$s_entries = new_entries.entries
from new_entries
where d.id = new_entries.dir_id
', typ);
return;
end
$$;
-- Insert the data from tmp_directory, tmp_directory_entry_file,
-- tmp_directory_entry_dir, tmp_directory_entry_rev into their final
-- tables.
--
-- Prerequisites:
-- directory ids in tmp_directory
-- entries in tmp_directory_entry_{file,dir,rev}
--
create or replace function swh_directory_add()
returns void
language plpgsql
as $$
begin
perform swh_directory_entry_add('file');
perform swh_directory_entry_add('dir');
perform swh_directory_entry_add('rev');
insert into directory
select * from tmp_directory t
where not exists (
select 1 from directory d
where d.id = t.id);
return;
end
$$;
-- a directory listing entry with all the metadata
--
-- can be used to list a directory, and retrieve all the data in one go.
create type directory_entry as
(
dir_id sha1_git, -- id of the parent directory
type directory_entry_type, -- type of entry
target sha1_git, -- id of target
name unix_path, -- path name, relative to containing dir
perms file_perms, -- unix-like permissions
status content_status, -- visible or absent
sha1 sha1, -- content if sha1 if type is not dir
sha1_git sha1_git, -- content's sha1 git if type is not dir
sha256 sha256, -- content's sha256 if type is not dir
length bigint -- content length if type is not dir
);
-- List a single level of directory walked_dir_id
-- FIXME: order by name is not correct. For git, we need to order by
-- lexicographic order but as if a trailing / is present in directory
-- name
create or replace function swh_directory_walk_one(walked_dir_id sha1_git)
returns setof directory_entry
language sql
stable
as $$
with dir as (
select id as dir_id, dir_entries, file_entries, rev_entries
from directory
where id = walked_dir_id),
ls_d as (select dir_id, unnest(dir_entries) as entry_id from dir),
ls_f as (select dir_id, unnest(file_entries) as entry_id from dir),
ls_r as (select dir_id, unnest(rev_entries) as entry_id from dir)
(select dir_id, 'dir'::directory_entry_type as type,
e.target, e.name, e.perms, NULL::content_status,
NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint
from ls_d
left join directory_entry_dir e on ls_d.entry_id = e.id)
union
(select dir_id, 'file'::directory_entry_type as type,
e.target, e.name, e.perms, c.status,
c.sha1, c.sha1_git, c.sha256, c.length
from ls_f
left join directory_entry_file e on ls_f.entry_id = e.id
left join content c on e.target = c.sha1_git)
union
(select dir_id, 'rev'::directory_entry_type as type,
e.target, e.name, e.perms, NULL::content_status,
NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint
from ls_r
left join directory_entry_rev e on ls_r.entry_id = e.id)
order by name;
$$;
-- List recursively the revision directory arborescence
create or replace function swh_directory_walk(walked_dir_id sha1_git)
returns setof directory_entry
language sql
stable
as $$
with recursive entries as (
select dir_id, type, target, name, perms, status, sha1, sha1_git,
sha256, length
from swh_directory_walk_one(walked_dir_id)
union all
select dir_id, type, target, (dirname || '/' || name)::unix_path as name,
perms, status, sha1, sha1_git, sha256, length
from (select (swh_directory_walk_one(dirs.target)).*, dirs.name as dirname
from (select target, name from entries where type = 'dir') as dirs) as with_parent
)
select dir_id, type, target, name, perms, status, sha1, sha1_git, sha256, length
from entries
$$;
-- Find a directory entry by its path
create or replace function swh_find_directory_entry_by_path(
walked_dir_id sha1_git,
dir_or_content_path bytea[])
returns directory_entry
language plpgsql
as $$
declare
end_index integer;
paths bytea default '';
path bytea;
res bytea[];
r record;
begin
end_index := array_upper(dir_or_content_path, 1);
res[1] := walked_dir_id;
for i in 1..end_index
loop
path := dir_or_content_path[i];
-- concatenate path for patching the name in the result record (if we found it)
if i = 1 then
paths = path;
else
paths := paths || '/' || path; -- concatenate paths
end if;
if i <> end_index then
select *
from swh_directory_walk_one(res[i] :: sha1_git)
where name=path
and type = 'dir'
limit 1 into r;
else
select *
from swh_directory_walk_one(res[i] :: sha1_git)
where name=path
limit 1 into r;
end if;
-- find the path
if r is null then
return null;
else
-- store the next dir to lookup the next local path from
res[i+1] := r.target;
end if;
end loop;
-- at this moment, r is the result. Patch its 'name' with the full path before returning it.
r.name := paths;
return r;
end
$$;
-- List all revision IDs starting from a given revision, going back in time
--
-- TODO ordering: should be breadth-first right now (what do we want?)
-- TODO ordering: ORDER BY parent_rank somewhere?
create or replace function swh_revision_list(root_revisions bytea[], num_revs bigint default NULL)
returns table (id sha1_git, parents bytea[])
language sql
stable
as $$
with recursive full_rev_list(id) as (
(select id from revision where id = ANY(root_revisions))
union
(select h.parent_id
from revision_history as h
join full_rev_list on h.id = full_rev_list.id)
),
rev_list as (select id from full_rev_list limit num_revs)
select rev_list.id as id,
array(select rh.parent_id::bytea
from revision_history rh
where rh.id = rev_list.id
order by rh.parent_rank
) as parent
from rev_list;
$$;
-- Detailed entry for a revision
create type revision_entry as
(
id sha1_git,
date timestamptz,
date_offset smallint,
date_neg_utc_offset boolean,
committer_date timestamptz,
committer_date_offset smallint,
committer_date_neg_utc_offset boolean,
type revision_type,
directory sha1_git,
message bytea,
author_id bigint,
author_fullname bytea,
author_name bytea,
author_email bytea,
committer_id bigint,
committer_fullname bytea,
committer_name bytea,
committer_email bytea,
metadata jsonb,
synthetic boolean,
+ extra_headers bytea[][],
parents bytea[],
object_id bigint
);
-- "git style" revision log. Similar to swh_revision_list(), but returning all
-- information associated to each revision, and expanding authors/committers
create or replace function swh_revision_log(root_revisions bytea[], num_revs bigint default NULL)
returns setof revision_entry
language sql
stable
as $$
select t.id, r.date, r.date_offset, r.date_neg_utc_offset,
r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
r.type, r.directory, r.message,
a.id, a.fullname, a.name, a.email,
c.id, c.fullname, c.name, c.email,
- r.metadata, r.synthetic, t.parents, r.object_id
+ r.metadata, r.synthetic, r.extra_headers, t.parents, r.object_id
from swh_revision_list(root_revisions, num_revs) as t
left join revision r on t.id = r.id
left join person a on a.id = r.author
left join person c on c.id = r.committer;
$$;
-- Detailed entry for a release
create type release_entry as
(
id sha1_git,
target sha1_git,
target_type object_type,
date timestamptz,
date_offset smallint,
date_neg_utc_offset boolean,
name bytea,
comment bytea,
synthetic boolean,
author_id bigint,
author_fullname bytea,
author_name bytea,
author_email bytea,
object_id bigint
);
-- Create entries in person from tmp_revision
create or replace function swh_person_add_from_revision()
returns void
language plpgsql
as $$
begin
with t as (
select author_fullname as fullname, author_name as name, author_email as email from tmp_revision
union
select committer_fullname as fullname, committer_name as name, committer_email as email from tmp_revision
) insert into person (fullname, name, email)
select distinct on (fullname) fullname, name, email from t
where not exists (
select 1
from person p
where t.fullname = p.fullname
);
return;
end
$$;
-- Create entries in revision from tmp_revision
create or replace function swh_revision_add()
returns void
language plpgsql
as $$
begin
perform swh_person_add_from_revision();
- insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic)
- select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic
+ insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic, extra_headers)
+ select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic, t.extra_headers
from tmp_revision t
left join person a on a.fullname = t.author_fullname
left join person c on c.fullname = t.committer_fullname;
return;
end
$$;
-- Create entries in person from tmp_release
create or replace function swh_person_add_from_release()
returns void
language plpgsql
as $$
begin
with t as (
select distinct author_fullname as fullname, author_name as name, author_email as email from tmp_release
where author_fullname is not null
) insert into person (fullname, name, email)
select distinct on (fullname) fullname, name, email from t
where not exists (
select 1
from person p
where t.fullname = p.fullname
);
return;
end
$$;
-- Create entries in release from tmp_release
create or replace function swh_release_add()
returns void
language plpgsql
as $$
begin
perform swh_person_add_from_release();
insert into release (id, target, target_type, date, date_offset, date_neg_utc_offset, name, comment, author, synthetic)
select distinct t.id, t.target, t.target_type, t.date, t.date_offset, t.date_neg_utc_offset, t.name, t.comment, a.id, t.synthetic
from tmp_release t
left join person a on a.fullname = t.author_fullname
where not exists (select 1 from release where t.id = release.id);
return;
end
$$;
-- add a new origin_visit for origin origin_id at date.
--
-- Returns the new visit id.
create or replace function swh_origin_visit_add(origin_url text, date timestamptz, type text)
returns bigint
language sql
as $$
with origin_id as (
select id
from origin
where url = origin_url
), last_known_visit as (
select coalesce(max(visit), 0) as visit
from origin_visit
where origin = (select id from origin_id)
)
insert into origin_visit (origin, date, type, visit)
values ((select id from origin_id), date, type,
(select visit from last_known_visit) + 1)
returning visit;
$$;
create or replace function swh_snapshot_add(snapshot_id sha1_git)
returns void
language plpgsql
as $$
declare
snapshot_object_id snapshot.object_id%type;
begin
select object_id from snapshot where id = snapshot_id into snapshot_object_id;
if snapshot_object_id is null then
insert into snapshot (id) values (snapshot_id) returning object_id into snapshot_object_id;
insert into snapshot_branch (name, target_type, target)
select name, target_type, target from tmp_snapshot_branch tmp
where not exists (
select 1
from snapshot_branch sb
where sb.name = tmp.name
and sb.target = tmp.target
and sb.target_type = tmp.target_type
)
on conflict do nothing;
insert into snapshot_branches (snapshot_id, branch_id)
select snapshot_object_id, sb.object_id as branch_id
from tmp_snapshot_branch tmp
join snapshot_branch sb
using (name, target, target_type)
where tmp.target is not null and tmp.target_type is not null
union
select snapshot_object_id, sb.object_id as branch_id
from tmp_snapshot_branch tmp
join snapshot_branch sb
using (name)
where tmp.target is null and tmp.target_type is null
and sb.target is null and sb.target_type is null;
end if;
truncate table tmp_snapshot_branch;
end;
$$;
create type snapshot_result as (
snapshot_id sha1_git,
name bytea,
target bytea,
target_type snapshot_target
);
create or replace function swh_snapshot_get_by_id(id sha1_git,
branches_from bytea default '', branches_count bigint default null,
target_types snapshot_target[] default NULL)
returns setof snapshot_result
language sql
stable
as $$
-- with small limits, the "naive" version of this query can degenerate into
-- using the deduplication index on snapshot_branch (name, target,
-- target_type); The planner happily scans several hundred million rows.
-- Do the query in two steps: first pull the relevant branches for the given
-- snapshot (filtering them by type), then do the limiting. This two-step
-- process guides the planner into using the proper index.
with filtered_snapshot_branches as (
select swh_snapshot_get_by_id.id as snapshot_id, name, target, target_type
from snapshot_branches
inner join snapshot_branch on snapshot_branches.branch_id = snapshot_branch.object_id
where snapshot_id = (select object_id from snapshot where snapshot.id = swh_snapshot_get_by_id.id)
and (target_types is null or target_type = any(target_types))
order by name
)
select snapshot_id, name, target, target_type
from filtered_snapshot_branches
where name >= branches_from
order by name limit branches_count;
$$;
create type snapshot_size as (
target_type snapshot_target,
count bigint
);
create or replace function swh_snapshot_count_branches(id sha1_git)
returns setof snapshot_size
language sql
stable
as $$
SELECT target_type, count(name)
from swh_snapshot_get_by_id(swh_snapshot_count_branches.id)
group by target_type;
$$;
-- Absolute path: directory reference + complete path relative to it
create type content_dir as (
directory sha1_git,
path unix_path
);
-- Find the containing directory of a given content, specified by sha1
-- (note: *not* sha1_git).
--
-- Return a pair (dir_it, path) where path is a UNIX path that, from the
-- directory root, reach down to a file with the desired content. Return NULL
-- if no match is found.
--
-- In case of multiple paths (i.e., pretty much always), an arbitrary one is
-- chosen.
create or replace function swh_content_find_directory(content_id sha1)
returns content_dir
language sql
stable
as $$
with recursive path as (
-- Recursively build a path from the requested content to a root
-- directory. Each iteration returns a pair (dir_id, filename) where
-- filename is relative to dir_id. Stops when no parent directory can
-- be found.
(select dir.id as dir_id, dir_entry_f.name as name, 0 as depth
from directory_entry_file as dir_entry_f
join content on content.sha1_git = dir_entry_f.target
join directory as dir on dir.file_entries @> array[dir_entry_f.id]
where content.sha1 = content_id
limit 1)
union all
(select dir.id as dir_id,
(dir_entry_d.name || '/' || path.name)::unix_path as name,
path.depth + 1
from path
join directory_entry_dir as dir_entry_d on dir_entry_d.target = path.dir_id
join directory as dir on dir.dir_entries @> array[dir_entry_d.id]
limit 1)
)
select dir_id, name from path order by depth desc limit 1;
$$;
-- Find the visit of origin closest to date visit_date
-- Breaks ties by selecting the largest visit id
create or replace function swh_visit_find_by_date(origin_url text, visit_date timestamptz default NOW())
returns setof origin_visit
language plpgsql
stable
as $$
declare
origin_id bigint;
begin
select id into origin_id from origin where url=origin_url;
return query
with closest_two_visits as ((
select ov, (date - visit_date), visit as interval
from origin_visit ov
where ov.origin = origin_id
and ov.date >= visit_date
order by ov.date asc, ov.visit desc
limit 1
) union (
select ov, (visit_date - date), visit as interval
from origin_visit ov
where ov.origin = origin_id
and ov.date < visit_date
order by ov.date desc, ov.visit desc
limit 1
)) select (ov).* from closest_two_visits order by interval, visit limit 1;
end
$$;
-- Object listing by object_id
create or replace function swh_content_list_by_object_id(
min_excl bigint,
max_incl bigint
)
returns setof content
language sql
stable
as $$
select * from content
where object_id > min_excl and object_id <= max_incl
order by object_id;
$$;
create or replace function swh_revision_list_by_object_id(
min_excl bigint,
max_incl bigint
)
returns setof revision_entry
language sql
stable
as $$
with revs as (
select * from revision
where object_id > min_excl and object_id <= max_incl
)
select r.id, r.date, r.date_offset, r.date_neg_utc_offset,
r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
r.type, r.directory, r.message,
- a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic,
+ a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic, r.extra_headers,
array(select rh.parent_id::bytea from revision_history rh where rh.id = r.id order by rh.parent_rank)
as parents, r.object_id
from revs r
left join person a on a.id = r.author
left join person c on c.id = r.committer
order by r.object_id;
$$;
create or replace function swh_release_list_by_object_id(
min_excl bigint,
max_incl bigint
)
returns setof release_entry
language sql
stable
as $$
with rels as (
select * from release
where object_id > min_excl and object_id <= max_incl
)
select r.id, r.target, r.target_type, r.date, r.date_offset, r.date_neg_utc_offset, r.name, r.comment,
r.synthetic, p.id as author_id, p.fullname as author_fullname, p.name as author_name, p.email as author_email, r.object_id
from rels r
left join person p on p.id = r.author
order by r.object_id;
$$;
-- simple counter mapping a textual label to an integer value
create type counter as (
label text,
value bigint
);
-- return statistics about the number of tuples in various SWH tables
--
-- Note: the returned values are based on postgres internal statistics
-- (pg_class table), which are only updated daily (by autovacuum) or so
create or replace function swh_stat_counters()
returns setof counter
language sql
stable
as $$
select object_type as label, value as value
from object_counts
where object_type in (
'content',
'directory',
'directory_entry_dir',
'directory_entry_file',
'directory_entry_rev',
'origin',
'origin_visit',
'person',
'release',
'revision',
'revision_history',
'skipped_content',
'snapshot'
);
$$;
create or replace function swh_update_counter(object_type text)
returns void
language plpgsql
as $$
begin
execute format('
insert into object_counts
(value, last_update, object_type)
values
((select count(*) from %1$I), NOW(), %1$L)
on conflict (object_type) do update set
value = excluded.value,
last_update = excluded.last_update',
object_type);
return;
end;
$$;
create or replace function swh_update_counter_bucketed()
returns void
language plpgsql
as $$
declare
query text;
line_to_update int;
new_value bigint;
begin
select
object_counts_bucketed.line,
format(
'select count(%I) from %I where %s',
coalesce(identifier, '*'),
object_type,
coalesce(
concat_ws(
' and ',
case when bucket_start is not null then
format('%I >= %L', identifier, bucket_start) -- lower bound condition, inclusive
end,
case when bucket_end is not null then
format('%I < %L', identifier, bucket_end) -- upper bound condition, exclusive
end
),
'true'
)
)
from object_counts_bucketed
order by coalesce(last_update, now() - '1 month'::interval) asc
limit 1
into line_to_update, query;
execute query into new_value;
update object_counts_bucketed
set value = new_value,
last_update = now()
where object_counts_bucketed.line = line_to_update;
END
$$;
create or replace function swh_update_counters_from_buckets()
returns trigger
language plpgsql
as $$
begin
with to_update as (
select object_type, sum(value) as value, max(last_update) as last_update
from object_counts_bucketed ob1
where not exists (
select 1 from object_counts_bucketed ob2
where ob1.object_type = ob2.object_type
and value is null
)
group by object_type
) update object_counts
set
value = to_update.value,
last_update = to_update.last_update
from to_update
where
object_counts.object_type = to_update.object_type
and object_counts.value != to_update.value;
return null;
end
$$;
create trigger update_counts_from_bucketed
after insert or update
on object_counts_bucketed
for each row
when (NEW.line % 256 = 0)
execute procedure swh_update_counters_from_buckets();
diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py
index 11e94c86..842e7af7 100644
--- a/swh/storage/tests/storage_data.py
+++ b/swh/storage/tests/storage_data.py
@@ -1,598 +1,601 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model import from_disk
class StorageData:
def __getattr__(self, key):
try:
v = globals()[key]
except KeyError as e:
raise AttributeError(e.args[0])
if hasattr(v, "copy"):
return v.copy()
return v
data = StorageData()
cont = {
"data": b"42\n",
"length": 3,
"sha1": hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
"sha1_git": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
"sha256": hash_to_bytes(
"673650f936cb3b0a2f93ce09d81be10748b1b203c19e8176b4eefc1964a0cf3a"
),
"blake2s256": hash_to_bytes(
"d5fe1939576527e42cfd76a9455a2432fe7f56669564577dd93c4280e76d661d"
),
"status": "visible",
}
cont2 = {
"data": b"4242\n",
"length": 5,
"sha1": hash_to_bytes("61c2b3a30496d329e21af70dd2d7e097046d07b7"),
"sha1_git": hash_to_bytes("36fade77193cb6d2bd826161a0979d64c28ab4fa"),
"sha256": hash_to_bytes(
"859f0b154fdb2d630f45e1ecae4a862915435e663248bb8461d914696fc047cd"
),
"blake2s256": hash_to_bytes(
"849c20fad132b7c2d62c15de310adfe87be94a379941bed295e8141c6219810d"
),
"status": "visible",
}
cont3 = {
"data": b"424242\n",
"length": 7,
"sha1": hash_to_bytes("3e21cc4942a4234c9e5edd8a9cacd1670fe59f13"),
"sha1_git": hash_to_bytes("c932c7649c6dfa4b82327d121215116909eb3bea"),
"sha256": hash_to_bytes(
"92fb72daf8c6818288a35137b72155f507e5de8d892712ab96277aaed8cf8a36"
),
"blake2s256": hash_to_bytes(
"76d0346f44e5a27f6bafdd9c2befd304aff83780f93121d801ab6a1d4769db11"
),
"status": "visible",
"ctime": "2019-12-01 00:00:00Z",
}
contents = (cont, cont2, cont3)
missing_cont = {
"data": b"missing\n",
"length": 8,
"sha1": hash_to_bytes("f9c24e2abb82063a3ba2c44efd2d3c797f28ac90"),
"sha1_git": hash_to_bytes("33e45d56f88993aae6a0198013efa80716fd8919"),
"sha256": hash_to_bytes(
"6bbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a"
),
"blake2s256": hash_to_bytes(
"306856b8fd879edb7b6f1aeaaf8db9bbecc993cd7f776c333ac3a782fa5c6eba"
),
"status": "absent",
}
skipped_cont = {
"length": 1024 * 1024 * 200,
"sha1_git": hash_to_bytes("33e45d56f88993aae6a0198013efa80716fd8920"),
"sha1": hash_to_bytes("43e45d56f88993aae6a0198013efa80716fd8920"),
"sha256": hash_to_bytes(
"7bbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a"
),
"blake2s256": hash_to_bytes(
"ade18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b"
),
"reason": "Content too long",
"status": "absent",
"origin": "file:///dev/zero",
}
skipped_cont2 = {
"length": 1024 * 1024 * 300,
"sha1_git": hash_to_bytes("44e45d56f88993aae6a0198013efa80716fd8921"),
"sha1": hash_to_bytes("54e45d56f88993aae6a0198013efa80716fd8920"),
"sha256": hash_to_bytes(
"8cbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a"
),
"blake2s256": hash_to_bytes(
"9ce18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b"
),
"reason": "Content too long",
"status": "absent",
}
skipped_contents = (skipped_cont, skipped_cont2)
dir = {
"id": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"),
"entries": (
{
"name": b"foo",
"type": "file",
"target": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), # cont
"perms": from_disk.DentryPerms.content,
},
{
"name": b"bar\xc3",
"type": "dir",
"target": b"12345678901234567890",
"perms": from_disk.DentryPerms.directory,
},
),
}
dir2 = {
"id": hash_to_bytes("8505808532953da7d2581741f01b29c04b1cb9ab"),
"entries": (
{
"name": b"oof",
"type": "file",
"target": hash_to_bytes( # cont2
"36fade77193cb6d2bd826161a0979d64c28ab4fa"
),
"perms": from_disk.DentryPerms.content,
},
),
}
dir3 = {
"id": hash_to_bytes("4ea8c6b2f54445e5dd1a9d5bb2afd875d66f3150"),
"entries": (
{
"name": b"foo",
"type": "file",
"target": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), # cont
"perms": from_disk.DentryPerms.content,
},
{
"name": b"subdir",
"type": "dir",
"target": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), # dir
"perms": from_disk.DentryPerms.directory,
},
{
"name": b"hello",
"type": "file",
"target": b"12345678901234567890",
"perms": from_disk.DentryPerms.content,
},
),
}
dir4 = {
"id": hash_to_bytes("377aa5fcd944fbabf502dbfda55cd14d33c8c3c6"),
"entries": (
{
"name": b"subdir1",
"type": "dir",
"target": hash_to_bytes("4ea8c6b2f54445e5dd1a9d5bb2afd875d66f3150"), # dir3
"perms": from_disk.DentryPerms.directory,
},
),
}
directories = (dir, dir2, dir3, dir4)
minus_offset = datetime.timezone(datetime.timedelta(minutes=-120))
plus_offset = datetime.timezone(datetime.timedelta(minutes=120))
revision = {
"id": hash_to_bytes("066b1b62dbfa033362092af468bf6cfabec230e7"),
"message": b"hello",
"author": {
"name": b"Nicolas Dandrimont",
"email": b"nicolas@example.com",
"fullname": b"Nicolas Dandrimont ",
},
"date": {
"timestamp": {"seconds": 1234567890, "microseconds": 0},
"offset": 120,
"negative_utc": False,
},
"committer": {
"name": b"St\xc3fano Zacchiroli",
"email": b"stefano@example.com",
"fullname": b"St\xc3fano Zacchiroli ",
},
"committer_date": {
"timestamp": {"seconds": 1123456789, "microseconds": 0},
"offset": 0,
"negative_utc": True,
},
"parents": (b"01234567890123456789", b"23434512345123456789"),
"type": "git",
"directory": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), # dir
"metadata": {
"checksums": {"sha1": "tarball-sha1", "sha256": "tarball-sha256",},
"signed-off-by": "some-dude",
- "extra_headers": [
- ["gpgsig", b"test123"],
- ["mergetag", b"foo\\bar"],
- ["mergetag", b"\x22\xaf\x89\x80\x01\x00"],
- ],
},
+ "extra_headers": (
+ (b"gpgsig", b"test123"),
+ (b"mergetag", b"foo\\bar"),
+ (b"mergetag", b"\x22\xaf\x89\x80\x01\x00"),
+ ),
"synthetic": True,
}
revision2 = {
"id": hash_to_bytes("df7a6f6a99671fb7f7343641aff983a314ef6161"),
"message": b"hello again",
"author": {
"name": b"Roberto Dicosmo",
"email": b"roberto@example.com",
"fullname": b"Roberto Dicosmo ",
},
"date": {
"timestamp": {"seconds": 1234567843, "microseconds": 220000,},
"offset": -720,
"negative_utc": False,
},
"committer": {
"name": b"tony",
"email": b"ar@dumont.fr",
"fullname": b"tony ",
},
"committer_date": {
"timestamp": {"seconds": 1123456789, "microseconds": 0},
"offset": 0,
"negative_utc": False,
},
"parents": (b"01234567890123456789",),
"type": "git",
"directory": hash_to_bytes("8505808532953da7d2581741f01b29c04b1cb9ab"), # dir2
"metadata": None,
+ "extra_headers": (),
"synthetic": False,
}
revision3 = {
"id": hash_to_bytes("2cbd7bb22c653bbb23a29657852a50a01b591d46"),
"message": b"a simple revision with no parents this time",
"author": {
"name": b"Roberto Dicosmo",
"email": b"roberto@example.com",
"fullname": b"Roberto Dicosmo ",
},
"date": {
"timestamp": {"seconds": 1234567843, "microseconds": 220000,},
"offset": -720,
"negative_utc": False,
},
"committer": {
"name": b"tony",
"email": b"ar@dumont.fr",
"fullname": b"tony ",
},
"committer_date": {
"timestamp": {"seconds": 1127351742, "microseconds": 0},
"offset": 0,
"negative_utc": False,
},
"parents": (),
"type": "git",
"directory": hash_to_bytes("8505808532953da7d2581741f01b29c04b1cb9ab"), # dir2
"metadata": None,
+ "extra_headers": (),
"synthetic": True,
}
revision4 = {
"id": hash_to_bytes("88cd5126fc958ed70089d5340441a1c2477bcc20"),
"message": b"parent of self.revision2",
"author": {
"name": b"me",
"email": b"me@soft.heri",
"fullname": b"me ",
},
"date": {
"timestamp": {"seconds": 1244567843, "microseconds": 220000,},
"offset": -720,
"negative_utc": False,
},
"committer": {
"name": b"committer-dude",
"email": b"committer@dude.com",
"fullname": b"committer-dude ",
},
"committer_date": {
"timestamp": {"seconds": 1244567843, "microseconds": 220000,},
"offset": -720,
"negative_utc": False,
},
"parents": (
hash_to_bytes("2cbd7bb22c653bbb23a29657852a50a01b591d46"),
), # revision3
"type": "git",
"directory": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), # dir
"metadata": None,
+ "extra_headers": (),
"synthetic": False,
}
revisions = (revision, revision2, revision3, revision4)
origin = {
"url": "file:///dev/null",
}
origin2 = {
"url": "file:///dev/zero",
}
origins = (origin, origin2)
metadata_authority = {
"type": "deposit",
"url": "http://hal.inria.example.com/",
"metadata": {"location": "France"},
}
metadata_authority2 = {
"type": "registry",
"url": "http://wikidata.example.com/",
"metadata": {},
}
metadata_fetcher = {
"name": "swh-deposit",
"version": "0.0.1",
"metadata": {"sword_version": "2"},
}
metadata_fetcher2 = {
"name": "swh-example",
"version": "0.0.1",
"metadata": {},
}
date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc)
type_visit1 = "git"
date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc)
type_visit2 = "hg"
date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc)
type_visit3 = "deb"
release = {
"id": hash_to_bytes("a673e617fcc6234e29b2cad06b8245f96c415c61"),
"name": b"v0.0.1",
"author": {
"name": b"olasd",
"email": b"nic@olasd.fr",
"fullname": b"olasd ",
},
"date": {
"timestamp": {"seconds": 1234567890, "microseconds": 0},
"offset": 42,
"negative_utc": False,
},
"target": b"43210987654321098765",
"target_type": "revision",
"message": b"synthetic release",
"synthetic": True,
}
release2 = {
"id": hash_to_bytes("6902bd4c82b7d19a421d224aedab2b74197e420d"),
"name": b"v0.0.2",
"author": {
"name": b"tony",
"email": b"ar@dumont.fr",
"fullname": b"tony ",
},
"date": {
"timestamp": {"seconds": 1634366813, "microseconds": 0},
"offset": -120,
"negative_utc": False,
},
"target": b"432109\xa9765432\xc309\x00765",
"target_type": "revision",
"message": b"v0.0.2\nMisc performance improvements + bug fixes",
"synthetic": False,
}
release3 = {
"id": hash_to_bytes("3e9050196aa288264f2a9d279d6abab8b158448b"),
"name": b"v0.0.2",
"author": {
"name": b"tony",
"email": b"tony@ardumont.fr",
"fullname": b"tony ",
},
"date": {
"timestamp": {"seconds": 1634336813, "microseconds": 0},
"offset": 0,
"negative_utc": False,
},
"target": hash_to_bytes("df7a6f6a99671fb7f7343641aff983a314ef6161"),
"target_type": "revision",
"message": b"yet another synthetic release",
"synthetic": True,
}
releases = (release, release2, release3)
snapshot = {
"id": hash_to_bytes("409ee1ff3f10d166714bc90581debfd0446dda57"),
"branches": {
b"master": {
"target": hash_to_bytes("066b1b62dbfa033362092af468bf6cfabec230e7"),
"target_type": "revision",
},
},
}
empty_snapshot = {
"id": hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"),
"branches": {},
}
complete_snapshot = {
"id": hash_to_bytes("a56ce2d81c190023bb99a3a36279307522cb85f6"),
"branches": {
b"directory": {
"target": hash_to_bytes("1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8"),
"target_type": "directory",
},
b"directory2": {
"target": hash_to_bytes("1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8"),
"target_type": "directory",
},
b"content": {
"target": hash_to_bytes("fe95a46679d128ff167b7c55df5d02356c5a1ae1"),
"target_type": "content",
},
b"alias": {"target": b"revision", "target_type": "alias",},
b"revision": {
"target": hash_to_bytes("aafb16d69fd30ff58afdd69036a26047f3aebdc6"),
"target_type": "revision",
},
b"release": {
"target": hash_to_bytes("7045404f3d1c54e6473c71bbb716529fbad4be24"),
"target_type": "release",
},
b"snapshot": {
"target": hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"),
"target_type": "snapshot",
},
b"dangling": None,
},
}
snapshots = (snapshot, empty_snapshot, complete_snapshot)
content_metadata = {
"id": f"swh:1:cnt:{cont['sha1_git']}",
"context": {"origin": origin["url"]},
"discovery_date": datetime.datetime(
2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc
),
"authority": {
"type": metadata_authority["type"],
"url": metadata_authority["url"],
},
"fetcher": {
"name": metadata_fetcher["name"],
"version": metadata_fetcher["version"],
},
"format": "json",
"metadata": b'{"foo": "bar"}',
}
content_metadata2 = {
"id": f"swh:1:cnt:{cont['sha1_git']}",
"context": {"origin": origin2["url"]},
"discovery_date": datetime.datetime(
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc
),
"authority": {
"type": metadata_authority["type"],
"url": metadata_authority["url"],
},
"fetcher": {
"name": metadata_fetcher["name"],
"version": metadata_fetcher["version"],
},
"format": "yaml",
"metadata": b"foo: bar",
}
content_metadata3 = {
"id": f"swh:1:cnt:{cont['sha1_git']}",
"context": {
"origin": origin["url"],
"visit": 42,
"snapshot": f"swh:1:snp:{hash_to_hex(snapshot['id'])}",
"release": f"swh:1:rel:{hash_to_hex(release['id'])}",
"revision": f"swh:1:rev:{hash_to_hex(revision['id'])}",
"directory": f"swh:1:dir:{hash_to_hex(dir['id'])}",
"path": b"/foo/bar",
},
"discovery_date": datetime.datetime(
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc
),
"authority": {
"type": metadata_authority2["type"],
"url": metadata_authority2["url"],
},
"fetcher": {
"name": metadata_fetcher2["name"],
"version": metadata_fetcher2["version"],
},
"format": "yaml",
"metadata": b"foo: bar",
}
origin_metadata = {
"origin_url": origin["url"],
"discovery_date": datetime.datetime(
2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc
),
"authority": {
"type": metadata_authority["type"],
"url": metadata_authority["url"],
},
"fetcher": {
"name": metadata_fetcher["name"],
"version": metadata_fetcher["version"],
},
"format": "json",
"metadata": b'{"foo": "bar"}',
}
origin_metadata2 = {
"origin_url": origin["url"],
"discovery_date": datetime.datetime(
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc
),
"authority": {
"type": metadata_authority["type"],
"url": metadata_authority["url"],
},
"fetcher": {
"name": metadata_fetcher["name"],
"version": metadata_fetcher["version"],
},
"format": "yaml",
"metadata": b"foo: bar",
}
origin_metadata3 = {
"origin_url": origin["url"],
"discovery_date": datetime.datetime(
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc
),
"authority": {
"type": metadata_authority2["type"],
"url": metadata_authority2["url"],
},
"fetcher": {
"name": metadata_fetcher2["name"],
"version": metadata_fetcher2["version"],
},
"format": "yaml",
"metadata": b"foo: bar",
}
person = {
"name": b"John Doe",
"email": b"john.doe@institute.org",
"fullname": b"John Doe ",
}
objects = {
"content": contents,
"skipped_content": skipped_contents,
"directory": directories,
"revision": revisions,
"origin": origins,
"release": releases,
"snapshot": snapshots,
}
diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py
index 83f63e11..bd60590d 100644
--- a/swh/storage/tests/test_converters.py
+++ b/swh/storage/tests/test_converters.py
@@ -1,160 +1,151 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.storage import converters
def test_date_to_db():
date_to_db = converters.date_to_db
assert date_to_db(None) == {"timestamp": None, "offset": 0, "neg_utc_offset": None}
assert date_to_db(
{"timestamp": 1234567890, "offset": 120, "negative_utc": False,}
) == {
"timestamp": "2009-02-13T23:31:30+00:00",
"offset": 120,
"neg_utc_offset": False,
}
assert date_to_db(
{"timestamp": 1123456789, "offset": 0, "negative_utc": True,}
) == {
"timestamp": "2005-08-07T23:19:49+00:00",
"offset": 0,
"neg_utc_offset": True,
}
assert date_to_db(
{"timestamp": 1234567890, "offset": 42, "negative_utc": False,}
) == {
"timestamp": "2009-02-13T23:31:30+00:00",
"offset": 42,
"neg_utc_offset": False,
}
assert date_to_db(
{"timestamp": 1634366813, "offset": -120, "negative_utc": False,}
) == {
"timestamp": "2021-10-16T06:46:53+00:00",
"offset": -120,
"neg_utc_offset": False,
}
def test_db_to_author():
# when
actual_author = converters.db_to_author(b"fullname", b"name", b"email")
# then
assert actual_author == {
"fullname": b"fullname",
"name": b"name",
"email": b"email",
}
def test_db_to_author_none():
# when
actual_author = converters.db_to_author(None, None, None)
# then
assert actual_author is None
def test_db_to_revision():
# when
actual_revision = converters.db_to_revision(
{
"id": "revision-id",
"date": None,
"date_offset": None,
"date_neg_utc_offset": None,
"committer_date": None,
"committer_date_offset": None,
"committer_date_neg_utc_offset": None,
"type": "rev",
"directory": b"dir-sha1",
"message": b"commit message",
"author_fullname": b"auth-fullname",
"author_name": b"auth-name",
"author_email": b"auth-email",
"committer_fullname": b"comm-fullname",
"committer_name": b"comm-name",
"committer_email": b"comm-email",
"metadata": {},
"synthetic": False,
+ "extra_headers": (),
"parents": [123, 456],
}
)
# then
assert actual_revision == {
"id": "revision-id",
"author": {
"fullname": b"auth-fullname",
"name": b"auth-name",
"email": b"auth-email",
},
"date": None,
"committer": {
"fullname": b"comm-fullname",
"name": b"comm-name",
"email": b"comm-email",
},
"committer_date": None,
"type": "rev",
"directory": b"dir-sha1",
"message": b"commit message",
"metadata": {},
"synthetic": False,
+ "extra_headers": (),
"parents": [123, 456],
}
def test_db_to_release():
# when
actual_release = converters.db_to_release(
{
"id": b"release-id",
"target": b"revision-id",
"target_type": "revision",
"date": None,
"date_offset": None,
"date_neg_utc_offset": None,
"name": b"release-name",
"comment": b"release comment",
"synthetic": True,
"author_fullname": b"auth-fullname",
"author_name": b"auth-name",
"author_email": b"auth-email",
}
)
# then
assert actual_release == {
"author": {
"fullname": b"auth-fullname",
"name": b"auth-name",
"email": b"auth-email",
},
"date": None,
"id": b"release-id",
"name": b"release-name",
"message": b"release comment",
"synthetic": True,
"target": b"revision-id",
"target_type": "revision",
}
-
-
-def test_db_to_git_headers():
- raw_data = [
- ["gpgsig", b"garbage\x89a\x43b\x14"],
- ["extra", [b"foo\\\\\\o", b"bar\\", b"inval\\\\\x99id"]],
- ]
-
- db_data = converters.git_headers_to_db(raw_data)
- loop = converters.db_to_git_headers(db_data)
- assert raw_data == loop
diff --git a/swh/storage/tests/test_revision_bw_compat.py b/swh/storage/tests/test_revision_bw_compat.py
new file mode 100644
index 00000000..1acfe638
--- /dev/null
+++ b/swh/storage/tests/test_revision_bw_compat.py
@@ -0,0 +1,48 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import attr
+
+from swh.core.utils import decode_with_escape
+from swh.model.model import Revision
+from swh.storage import get_storage
+from swh.storage.tests.storage_data import data
+from swh.storage.tests.test_storage import db_transaction
+
+
+def headers_to_db(git_headers):
+ return [[key, decode_with_escape(value)] for key, value in git_headers]
+
+
+def test_revision_extra_header_in_metadata(swh_storage_backend_config):
+ storage = get_storage(**swh_storage_backend_config)
+ rev = Revision.from_dict(data.revision)
+
+ md_w_extra = dict(
+ rev.metadata.items(),
+ extra_headers=headers_to_db(
+ [
+ ["gpgsig", b"test123"],
+ ["mergetag", b"foo\\bar"],
+ ["mergetag", b"\x22\xaf\x89\x80\x01\x00"],
+ ]
+ ),
+ )
+
+ bw_rev = attr.evolve(rev, extra_headers=())
+ object.__setattr__(bw_rev, "metadata", md_w_extra)
+ assert bw_rev.extra_headers == ()
+
+ assert storage.revision_add([bw_rev]) == {"revision:add": 1}
+
+ # check data in the db are old format
+ with db_transaction(storage) as (_, cur):
+ cur.execute("SELECT metadata, extra_headers FROM revision")
+ metadata, extra_headers = cur.fetchone()
+ assert extra_headers == []
+ assert metadata == bw_rev.metadata
+
+ # check the Revision build from revision_get is the original, "new style", Revision
+ assert [Revision.from_dict(x) for x in storage.revision_get([rev.id])] == [rev]