Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
index 8dc31f2..ad0d042 100644
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -1,255 +1,292 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from datetime import datetime, timezone
import os
from typing import Any, Dict, Generator, Optional, Tuple
import click
import iso8601
import yaml
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.model import Sha1Git
# All generic config code should reside in swh.core.config
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
DEFAULT_CONFIG: Dict[str, Any] = {
"provenance": {
"archive": {
# Storage API based Archive object
# "cls": "api",
# "storage": {
# "cls": "remote",
# "url": "http://uffizi.internal.softwareheritage.org:5002",
# }
# Direct access Archive object
"cls": "direct",
"db": {
"host": "belvedere.internal.softwareheritage.org",
"port": 5432,
"dbname": "softwareheritage",
"user": "guest",
},
},
"storage": {
# Local PostgreSQL Storage
# "cls": "postgresql",
# "db": {
# "host": "localhost",
# "user": "postgres",
# "password": "postgres",
# "dbname": "provenance",
# },
# Local MongoDB Storage
# "cls": "mongodb",
# "db": {
# "dbname": "provenance",
# },
# Remote RabbitMQ/PostgreSQL Storage
"cls": "rabbitmq",
"url": "amqp://localhost:5672/%2f",
"storage_config": {
"cls": "postgresql",
"db": {
"host": "localhost",
"user": "postgres",
"password": "postgres",
"dbname": "provenance",
},
},
"batch_size": 100,
"prefetch_count": 100,
},
}
}
CONFIG_FILE_HELP = f"""
\b Configuration can be loaded from a yaml file given either as --config-file
option or the {CONFIG_ENVVAR} environment variable. If no configuration file
is specified, use the following default configuration::
\b
{yaml.dump(DEFAULT_CONFIG)}"""
PROVENANCE_HELP = f"""Software Heritage provenance index database tools
{CONFIG_FILE_HELP}
"""
@swh_cli_group.group(
name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP
)
@click.option(
"-C",
"--config-file",
default=None,
type=click.Path(exists=True, dir_okay=False, path_type=str),
help="""YAML configuration file.""",
)
@click.option(
"-P",
"--profile",
default=None,
type=click.Path(exists=False, dir_okay=False, path_type=str),
help="""Enable profiling to specified file.""",
)
@click.pass_context
def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None:
if (
config_file is None
and DEFAULT_PATH is not None
and config.config_exists(DEFAULT_PATH)
):
config_file = DEFAULT_PATH
if config_file is None:
conf = DEFAULT_CONFIG
else:
# read_raw_config do not fail on ENOENT
if not os.path.exists(config_file):
raise FileNotFoundError(config_file)
conf = yaml.safe_load(open(config_file, "rb"))
ctx.ensure_object(dict)
ctx.obj["config"] = conf
if profile:
import atexit
import cProfile
print("Profiling...")
pr = cProfile.Profile()
pr.enable()
def exit() -> None:
pr.disable()
pr.dump_stats(profile)
atexit.register(exit)
+@cli.command(name="iter-frontiers")
+@click.argument("filename")
+@click.option("-l", "--limit", type=int)
+@click.option("-s", "--min-size", default=0, type=int)
+@click.pass_context
+def iter_frontiers(
+ ctx: click.core.Context,
+ filename: str,
+ limit: Optional[int],
+ min_size: int,
+) -> None:
+ """Process a provided list of directories in the isochrone frontier."""
+ from . import get_archive, get_provenance
+ from .directory import CSVDirectoryIterator, directory_add
+
+ archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
+ directories_provider = generate_directory_ids(filename)
+ directories = CSVDirectoryIterator(directories_provider, limit=limit)
+
+ with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
+ for directory in directories:
+ directory_add(
+ provenance,
+ archive,
+ [directory],
+ minsize=min_size,
+ )
+
+
+def generate_directory_ids(
+ filename: str,
+) -> Generator[Sha1Git, None, None]:
+ for line in open(filename, "r"):
+ if line.strip():
+ yield hash_to_bytes(line.strip())
+
+
@cli.command(name="iter-revisions")
@click.argument("filename")
@click.option("-a", "--track-all", default=True, type=bool)
@click.option("-l", "--limit", type=int)
@click.option("-m", "--min-depth", default=1, type=int)
@click.option("-r", "--reuse", default=True, type=bool)
@click.option("-s", "--min-size", default=0, type=int)
@click.pass_context
def iter_revisions(
ctx: click.core.Context,
filename: str,
track_all: bool,
limit: Optional[int],
min_depth: int,
reuse: bool,
min_size: int,
) -> None:
"""Process a provided list of revisions."""
from . import get_archive, get_provenance
from .revision import CSVRevisionIterator, revision_add
archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
revisions_provider = generate_revision_tuples(filename)
revisions = CSVRevisionIterator(revisions_provider, limit=limit)
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
for revision in revisions:
revision_add(
provenance,
archive,
[revision],
trackall=track_all,
lower=reuse,
mindepth=min_depth,
minsize=min_size,
)
def generate_revision_tuples(
filename: str,
) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]:
for line in open(filename, "r"):
if line.strip():
revision, date, root = line.strip().split(",")
yield (
hash_to_bytes(revision),
iso8601.parse_date(date, default_timezone=timezone.utc),
hash_to_bytes(root),
)
@cli.command(name="iter-origins")
@click.argument("filename")
@click.option("-l", "--limit", type=int)
@click.pass_context
def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None:
"""Process a provided list of origins."""
from . import get_archive, get_provenance
from .origin import CSVOriginIterator, origin_add
archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
origins_provider = generate_origin_tuples(filename)
origins = CSVOriginIterator(origins_provider, limit=limit)
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
for origin in origins:
origin_add(provenance, archive, [origin])
def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]:
for line in open(filename, "r"):
if line.strip():
url, snapshot = line.strip().split(",")
yield (url, hash_to_bytes(snapshot))
@cli.command(name="find-first")
@click.argument("swhid")
@click.pass_context
def find_first(ctx: click.core.Context, swhid: str) -> None:
"""Find first occurrence of the requested blob."""
from . import get_provenance
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
occur = provenance.content_find_first(hash_to_bytes(swhid))
if occur is not None:
print(
f"swh:1:cnt:{hash_to_hex(occur.content)}, "
f"swh:1:rev:{hash_to_hex(occur.revision)}, "
f"{occur.date}, "
f"{occur.origin}, "
f"{os.fsdecode(occur.path)}"
)
else:
print(f"Cannot find a content with the id {swhid}")
@cli.command(name="find-all")
@click.argument("swhid")
@click.option("-l", "--limit", type=int)
@click.pass_context
def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None:
"""Find all occurrences of the requested blob."""
from . import get_provenance
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit):
print(
f"swh:1:cnt:{hash_to_hex(occur.content)}, "
f"swh:1:rev:{hash_to_hex(occur.revision)}, "
f"{occur.date}, "
f"{occur.origin}, "
f"{os.fsdecode(occur.path)}"
)
diff --git a/swh/provenance/directory.py b/swh/provenance/directory.py
new file mode 100644
index 0000000..5430454
--- /dev/null
+++ b/swh/provenance/directory.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+from typing import Generator, Iterable, Iterator, List, Optional
+
+from swh.core.statsd import statsd
+from swh.model.model import Sha1Git
+
+from .archive import ArchiveInterface
+from .interface import ProvenanceInterface
+from .model import DirectoryEntry
+
+REVISION_DURATION_METRIC = "swh_provenance_directory_duration_seconds"
+
+
+class CSVDirectoryIterator:
+ """Iterator over directories typically present in the given CSV file.
+
+ The input is an iterator that produces ids (sha1_git) of directories
+ """
+
+ def __init__(
+ self,
+ directories: Iterable[Sha1Git],
+ limit: Optional[int] = None,
+ ) -> None:
+ self.directories: Iterator[Sha1Git]
+ if limit is not None:
+ from itertools import islice
+
+ self.directories = islice(directories, limit)
+ else:
+ self.directories = iter(directories)
+
+ def __iter__(self) -> Generator[DirectoryEntry, None, None]:
+ for id in self.directories:
+ yield DirectoryEntry(id)
+
+
+@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"})
+def directory_add(
+ provenance: ProvenanceInterface,
+ archive: ArchiveInterface,
+ directories: List[DirectoryEntry],
+ minsize: int = 0,
+ commit: bool = True,
+) -> None:
+ for directory in directories:
+ # Only flatten directories that are present in the provenance model, but not
+ # flattenned yet.
+ flattenned = provenance.directory_already_flattenned(directory)
+ if flattenned is not None and not flattenned:
+ directory_flatten(
+ provenance,
+ archive,
+ directory,
+ minsize=minsize,
+ )
+ if commit:
+ provenance.flush()
+
+
+@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten"})
+def directory_flatten(
+ provenance: ProvenanceInterface,
+ archive: ArchiveInterface,
+ directory: DirectoryEntry,
+ minsize: int = 0,
+) -> None:
+ """Recursively retrieve all the files of 'directory' and insert them in the
+ 'provenance' database in the 'content_to_directory' table.
+ """
+ stack = [(directory, b"")]
+ while stack:
+ current, prefix = stack.pop()
+ current.retrieve_children(archive, minsize=minsize)
+ for f_child in current.files:
+ # Add content to the directory with the computed prefix.
+ provenance.content_add_to_directory(directory, f_child, prefix)
+ for d_child in current.dirs:
+ # Recursively walk the child directory.
+ stack.append((d_child, os.path.join(prefix, d_child.name)))
+ provenance.directory_flag_as_flattenned(directory)
diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py
index 7b5ba3f..78a7279 100644
--- a/swh/provenance/interface.py
+++ b/swh/provenance/interface.py
@@ -1,384 +1,396 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
import enum
from types import TracebackType
from typing import Dict, Generator, Iterable, Optional, Set, Type, Union
from typing_extensions import Protocol, runtime_checkable
from swh.core.api import remote_api_endpoint
from swh.model.model import Sha1Git
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
class EntityType(enum.Enum):
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
ORIGIN = "origin"
class RelationType(enum.Enum):
CNT_EARLY_IN_REV = "content_in_revision"
CNT_IN_DIR = "content_in_directory"
DIR_IN_REV = "directory_in_revision"
REV_IN_ORG = "revision_in_origin"
REV_BEFORE_REV = "revision_before_revision"
@dataclass(eq=True, frozen=True)
class ProvenanceResult:
content: Sha1Git
revision: Sha1Git
date: datetime
origin: Optional[str]
path: bytes
@dataclass(eq=True, frozen=True)
class DirectoryData:
"""Object representing the data associated to a directory in the provenance model,
where `date` is the date of the directory in the isochrone frontier, and `flat` is a
flag acknowledging that a flat model for the elements outside the frontier has
already been created.
"""
date: datetime
flat: bool
@dataclass(eq=True, frozen=True)
class RevisionData:
"""Object representing the data associated to a revision in the provenance model,
where `date` is the optional date of the revision (specifying it acknowledges that
the revision was already processed by the revision-content algorithm); and `origin`
identifies the preferred origin for the revision, if any.
"""
date: Optional[datetime]
origin: Optional[Sha1Git]
@dataclass(eq=True, frozen=True)
class RelationData:
"""Object representing a relation entry in the provenance model, where `src` and
`dst` are the sha1 ids of the entities being related, and `path` is optional
depending on the relation being represented.
"""
dst: Sha1Git
path: Optional[bytes]
@runtime_checkable
class ProvenanceStorageInterface(Protocol):
def __enter__(self) -> ProvenanceStorageInterface:
...
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
...
@remote_api_endpoint("close")
def close(self) -> None:
"""Close connection to the storage and release resources."""
...
@remote_api_endpoint("content_add")
def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool:
"""Add blobs identified by sha1 ids, with an associated date (as paired in
`cnts`) to the provenance storage. Return a boolean stating whether the
information was successfully stored.
"""
...
@remote_api_endpoint("content_find_first")
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
"""Retrieve the first occurrence of the blob identified by `id`."""
...
@remote_api_endpoint("content_find_all")
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
"""Retrieve all the occurrences of the blob identified by `id`."""
...
@remote_api_endpoint("content_get")
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
"""Retrieve the associated date for each blob sha1 in `ids`."""
...
@remote_api_endpoint("directory_add")
def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool:
"""Add directories identified by sha1 ids, with associated date and (optional)
flatten flag (as paired in `dirs`) to the provenance storage. If the flatten
flag is set to None, the previous value present in the storage is preserved.
Return a boolean stating if the information was successfully stored.
"""
...
@remote_api_endpoint("directory_get")
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]:
"""Retrieve the associated date and (optional) flatten flag for each directory
sha1 in `ids`. If some directories has no associated date, it is not present in
the resulting dictionary.
"""
...
@remote_api_endpoint("entity_get_all")
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
"""Retrieve all sha1 ids for entities of type `entity` present in the provenance
model. This method is used only in tests.
"""
...
@remote_api_endpoint("location_add")
def location_add(self, paths: Iterable[bytes]) -> bool:
"""Register the given `paths` in the storage."""
...
@remote_api_endpoint("location_get_all")
def location_get_all(self) -> Set[bytes]:
"""Retrieve all paths present in the provenance model.
This method is used only in tests."""
...
@remote_api_endpoint("open")
def open(self) -> None:
"""Open connection to the storage and allocate necessary resources."""
...
@remote_api_endpoint("origin_add")
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
"""Add origins identified by sha1 ids, with their corresponding url (as paired
in `orgs`) to the provenance storage. Return a boolean stating if the
information was successfully stored.
"""
...
@remote_api_endpoint("origin_get")
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
"""Retrieve the associated url for each origin sha1 in `ids`."""
...
@remote_api_endpoint("revision_add")
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
"""Add revisions identified by sha1 ids, with optional associated date or origin
(as paired in `revs`) to the provenance storage. Return a boolean stating if the
information was successfully stored.
"""
...
@remote_api_endpoint("revision_get")
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
"""Retrieve the associated date and origin for each revision sha1 in `ids`. If
some revision has no associated date nor origin, it is not present in the
resulting dictionary.
"""
...
@remote_api_endpoint("relation_add")
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
"""Add entries in the selected `relation`. This method assumes all entities
being related are already registered in the storage. See `content_add`,
`directory_add`, `origin_add`, and `revision_add`.
"""
...
@remote_api_endpoint("relation_get")
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Dict[Sha1Git, Set[RelationData]]:
"""Retrieve all entries in the selected `relation` whose source entities are
identified by some sha1 id in `ids`. If `reverse` is set, destination entities
are matched instead.
"""
...
@remote_api_endpoint("relation_get_all")
def relation_get_all(
self, relation: RelationType
) -> Dict[Sha1Git, Set[RelationData]]:
"""Retrieve all entries in the selected `relation` that are present in the
provenance model. This method is used only in tests.
"""
...
@remote_api_endpoint("with_path")
def with_path(self) -> bool:
...
@runtime_checkable
class ProvenanceInterface(Protocol):
storage: ProvenanceStorageInterface
def __enter__(self) -> ProvenanceInterface:
...
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
...
def close(self) -> None:
"""Close connection to the underlying `storage` and release resources."""
...
def flush(self) -> None:
"""Flush internal cache to the underlying `storage`."""
...
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
) -> None:
"""Associate `blob` with `directory` in the provenance model. `prefix` is the
relative path from `directory` to `blob` (excluding `blob`'s name).
"""
...
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
) -> None:
"""Associate `blob` with `revision` in the provenance model. `prefix` is the
absolute path from `revision`'s root directory to `blob` (excluding `blob`'s
name).
"""
...
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
"""Retrieve the first occurrence of the blob identified by `id`."""
...
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
"""Retrieve all the occurrences of the blob identified by `id`."""
...
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
"""Retrieve the earliest known date of `blob`."""
...
def content_get_early_dates(
self, blobs: Iterable[FileEntry]
) -> Dict[Sha1Git, datetime]:
"""Retrieve the earliest known date for each blob in `blobs`. If some blob has
no associated date, it is not present in the resulting dictionary.
"""
...
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None:
"""Associate `date` to `blob` as it's earliest known date."""
...
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
) -> None:
"""Associate `directory` with `revision` in the provenance model. `path` is the
absolute path from `revision`'s root directory to `directory` (including
`directory`'s name).
"""
...
+ def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]:
+ """Check if the directory is already flattenned in the provenance model. If the
+ directory is unknown for the model, the methods returns None.
+ """
+ ...
+
+ def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None:
+ """Mark the directory as flattenned in the provenance model. If the
+ directory is unknown for the model, this method has no effect.
+ """
+ ...
+
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
"""Retrieve the earliest known date of `directory` as an isochrone frontier in
the provenance model.
"""
...
def directory_get_dates_in_isochrone_frontier(
self, dirs: Iterable[DirectoryEntry]
) -> Dict[Sha1Git, datetime]:
"""Retrieve the earliest known date for each directory in `dirs` as isochrone
frontiers provenance model. If some directory has no associated date, it is not
present in the resulting dictionary.
"""
...
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
) -> None:
"""Associate `date` to `directory` as it's earliest known date as an isochrone
frontier in the provenance model.
"""
...
def open(self) -> None:
"""Open connection to the underlying `storage` and allocate necessary
resources.
"""
...
def origin_add(self, origin: OriginEntry) -> None:
"""Add `origin` to the provenance model."""
...
def revision_add(self, revision: RevisionEntry) -> None:
"""Add `revision` to the provenance model. This implies storing `revision`'s
date in the model, thus `revision.date` must be a valid date.
"""
...
def revision_add_before_revision(
self, head: RevisionEntry, revision: RevisionEntry
) -> None:
"""Associate `revision` to `head` as an ancestor of the latter."""
...
def revision_add_to_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
"""Associate `revision` to `origin` as a head revision of the latter (ie. the
target of an snapshot for `origin` in the archive)."""
...
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]:
"""Retrieve the date associated to `revision`."""
...
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
"""Retrieve the preferred origin associated to `revision`."""
...
def revision_in_history(self, revision: RevisionEntry) -> bool:
"""Check if `revision` is known to be an ancestor of some head revision in the
provenance model.
"""
...
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
"""Associate `origin` as the preferred origin for `revision`."""
...
def revision_visited(self, revision: RevisionEntry) -> bool:
"""Check if `revision` is known to be a head revision for some origin in the
provenance model.
"""
...
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
index 5ca39b2..b0f43c6 100644
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,489 +1,504 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import logging
import os
from types import TracebackType
from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type
from typing_extensions import Literal, TypedDict
from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from .interface import (
DirectoryData,
ProvenanceInterface,
ProvenanceResult,
ProvenanceStorageInterface,
RelationData,
RelationType,
RevisionData,
)
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
from .util import path_normalize
LOGGER = logging.getLogger(__name__)
BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds"
BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total"
class DatetimeCache(TypedDict):
data: Dict[Sha1Git, Optional[datetime]] # None means unknown
added: Set[Sha1Git]
class OriginCache(TypedDict):
data: Dict[Sha1Git, str]
added: Set[Sha1Git]
class RevisionCache(TypedDict):
data: Dict[Sha1Git, Sha1Git]
added: Set[Sha1Git]
class ProvenanceCache(TypedDict):
content: DatetimeCache
directory: DatetimeCache
directory_flatten: Dict[Sha1Git, Optional[bool]] # None means unknown
revision: DatetimeCache
# below are insertion caches only
content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]]
content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]]
directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]]
# these two are for the origin layer
origin: OriginCache
revision_origin: RevisionCache
revision_before_revision: Dict[Sha1Git, Set[Sha1Git]]
revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]]
def new_cache() -> ProvenanceCache:
return ProvenanceCache(
content=DatetimeCache(data={}, added=set()),
directory=DatetimeCache(data={}, added=set()),
directory_flatten={},
revision=DatetimeCache(data={}, added=set()),
content_in_revision=set(),
content_in_directory=set(),
directory_in_revision=set(),
origin=OriginCache(data={}, added=set()),
revision_origin=RevisionCache(data={}, added=set()),
revision_before_revision={},
revision_in_origin=set(),
)
class Provenance:
def __init__(self, storage: ProvenanceStorageInterface) -> None:
self.storage = storage
self.cache = new_cache()
def __enter__(self) -> ProvenanceInterface:
self.open()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()
def clear_caches(self) -> None:
self.cache = new_cache()
def close(self) -> None:
self.storage.close()
@statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"})
def flush(self) -> None:
self.flush_revision_content_layer()
self.flush_origin_revision_layer()
self.clear_caches()
@statsd.timed(
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"}
)
def flush_origin_revision_layer(self) -> None:
# Origins and revisions should be inserted first so that internal ids'
# resolution works properly.
urls = {
sha1: url
for sha1, url in self.cache["origin"]["data"].items()
if sha1 in self.cache["origin"]["added"]
}
if urls:
while not self.storage.origin_add(urls):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_origin"},
)
LOGGER.warning(
"Unable to write origins urls to the storage. Retrying..."
)
rev_orgs = {
# Destinations in this relation should match origins in the next one
**{
src: RevisionData(date=None, origin=None)
for src in self.cache["revision_before_revision"]
},
**{
# This relation comes second so that non-None origins take precedence
src: RevisionData(date=None, origin=org)
for src, org in self.cache["revision_in_origin"]
},
}
if rev_orgs:
while not self.storage.revision_add(rev_orgs):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_revision"},
)
LOGGER.warning(
"Unable to write revision entities to the storage. Retrying..."
)
# Second, flat models for revisions' histories (ie. revision-before-revision).
if self.cache["revision_before_revision"]:
rev_before_rev = {
src: {RelationData(dst=dst, path=None) for dst in dsts}
for src, dsts in self.cache["revision_before_revision"].items()
}
while not self.storage.relation_add(
RelationType.REV_BEFORE_REV, rev_before_rev
):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_origin_revision_retry_revision_before_revision"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.REV_BEFORE_REV,
)
# Heads (ie. revision-in-origin entries) should be inserted once flat models for
# their histories were already added. This is to guarantee consistent results if
# something needs to be reprocessed due to a failure: already inserted heads
# won't get reprocessed in such a case.
if self.cache["revision_in_origin"]:
rev_in_org: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst in self.cache["revision_in_origin"]:
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None))
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_revision_in_origin"},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.REV_IN_ORG,
)
@statsd.timed(
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"}
)
def flush_revision_content_layer(self) -> None:
# Register in the storage all entities, to ensure the coming relations can
# properly resolve any internal reference if needed. Content and directory
# entries may safely be registered with their associated dates. In contrast,
# revision entries should be registered without date, as it is used to
# acknowledge that the flushing was successful. Also, directories are
# registered with their flatten flag not set.
cnt_dates = {
sha1: date
for sha1, date in self.cache["content"]["data"].items()
if sha1 in self.cache["content"]["added"] and date is not None
}
if cnt_dates:
while not self.storage.content_add(cnt_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_content_date"},
)
LOGGER.warning(
"Unable to write content dates to the storage. Retrying..."
)
dir_dates = {
sha1: DirectoryData(date=date, flat=False)
for sha1, date in self.cache["directory"]["data"].items()
if sha1 in self.cache["directory"]["added"] and date is not None
}
if dir_dates:
while not self.storage.directory_add(dir_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_directory_date"},
)
LOGGER.warning(
"Unable to write directory dates to the storage. Retrying..."
)
revs = {
sha1
for sha1, date in self.cache["revision"]["data"].items()
if sha1 in self.cache["revision"]["added"] and date is not None
}
if revs:
while not self.storage.revision_add(revs):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_revision_none"},
)
LOGGER.warning(
"Unable to write revision entities to the storage. Retrying..."
)
paths = {
path
for _, _, path in self.cache["content_in_revision"]
| self.cache["content_in_directory"]
| self.cache["directory_in_revision"]
}
if paths:
while not self.storage.location_add(paths):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_location"},
)
LOGGER.warning(
"Unable to write locations entities to the storage. Retrying..."
)
# For this layer, relations need to be inserted first so that, in case of
# failure, reprocessing the input does not generated an inconsistent database.
if self.cache["content_in_revision"]:
cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["content_in_revision"]:
cnt_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(
RelationType.CNT_EARLY_IN_REV, cnt_in_rev
):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_content_in_revision"},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.CNT_EARLY_IN_REV,
)
if self.cache["content_in_directory"]:
cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["content_in_directory"]:
cnt_in_dir.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(RelationType.CNT_IN_DIR, cnt_in_dir):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_revision_content_retry_content_in_directory"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.CNT_IN_DIR,
)
if self.cache["directory_in_revision"]:
dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["directory_in_revision"]:
dir_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(RelationType.DIR_IN_REV, dir_in_rev):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_revision_content_retry_directory_in_revision"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.DIR_IN_REV,
)
# After relations, flatten flags for directories can be safely set (if
# applicable) acknowledging those directories that have already be flattened.
# Similarly, dates for the revisions are set to acknowledge that these revisions
# won't need to be reprocessed in case of failure.
dir_acks = {
sha1: DirectoryData(
date=date, flat=self.cache["directory_flatten"].get(sha1) or False
)
for sha1, date in self.cache["directory"]["data"].items()
- if sha1 in self.cache["directory"]["added"] and date is not None
+ if self.cache["directory_flatten"].get(sha1) and date is not None
}
if dir_acks:
while not self.storage.directory_add(dir_acks):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_directory_ack"},
)
LOGGER.warning(
"Unable to write directory dates to the storage. Retrying..."
)
rev_dates = {
sha1: RevisionData(date=date, origin=None)
for sha1, date in self.cache["revision"]["data"].items()
if sha1 in self.cache["revision"]["added"] and date is not None
}
if rev_dates:
while not self.storage.revision_add(rev_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_revision_date"},
)
LOGGER.warning(
"Unable to write revision dates to the storage. Retrying..."
)
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
) -> None:
self.cache["content_in_directory"].add(
(blob.id, directory.id, path_normalize(os.path.join(prefix, blob.name)))
)
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
) -> None:
self.cache["content_in_revision"].add(
(blob.id, revision.id, path_normalize(os.path.join(prefix, blob.name)))
)
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
return self.storage.content_find_first(id)
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
yield from self.storage.content_find_all(id, limit=limit)
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
return self.get_dates("content", [blob.id]).get(blob.id)
def content_get_early_dates(
self, blobs: Iterable[FileEntry]
) -> Dict[Sha1Git, datetime]:
return self.get_dates("content", [blob.id for blob in blobs])
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None:
self.cache["content"]["data"][blob.id] = date
self.cache["content"]["added"].add(blob.id)
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
) -> None:
self.cache["directory_in_revision"].add(
(directory.id, revision.id, path_normalize(path))
)
+ def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]:
+ cache = self.cache["directory_flatten"]
+ if directory.id not in cache:
+ cache.setdefault(directory.id, None)
+ ret = self.storage.directory_get([directory.id])
+ if directory.id in ret:
+ dir = ret[directory.id]
+ cache[directory.id] = dir.flat
+ # date is kept to ensure we have it available when flushing
+ self.cache["directory"]["data"][directory.id] = dir.date
+ return cache.get(directory.id)
+
+ def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None:
+ self.cache["directory_flatten"][directory.id] = True
+
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
return self.get_dates("directory", [directory.id]).get(directory.id)
def directory_get_dates_in_isochrone_frontier(
self, dirs: Iterable[DirectoryEntry]
) -> Dict[Sha1Git, datetime]:
return self.get_dates("directory", [directory.id for directory in dirs])
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
) -> None:
self.cache["directory"]["data"][directory.id] = date
self.cache["directory"]["added"].add(directory.id)
def get_dates(
self,
entity: Literal["content", "directory", "revision"],
ids: Iterable[Sha1Git],
) -> Dict[Sha1Git, datetime]:
cache = self.cache[entity]
missing_ids = set(id for id in ids if id not in cache)
if missing_ids:
if entity == "content":
cache["data"].update(self.storage.content_get(missing_ids))
elif entity == "directory":
cache["data"].update(
{
id: dir.date
for id, dir in self.storage.directory_get(missing_ids).items()
}
)
elif entity == "revision":
cache["data"].update(
{
id: rev.date
for id, rev in self.storage.revision_get(missing_ids).items()
}
)
dates: Dict[Sha1Git, datetime] = {}
for sha1 in ids:
date = cache["data"].setdefault(sha1, None)
if date is not None:
dates[sha1] = date
return dates
def open(self) -> None:
self.storage.open()
def origin_add(self, origin: OriginEntry) -> None:
self.cache["origin"]["data"][origin.id] = origin.url
self.cache["origin"]["added"].add(origin.id)
def revision_add(self, revision: RevisionEntry) -> None:
self.cache["revision"]["data"][revision.id] = revision.date
self.cache["revision"]["added"].add(revision.id)
def revision_add_before_revision(
self, head: RevisionEntry, revision: RevisionEntry
) -> None:
self.cache["revision_before_revision"].setdefault(revision.id, set()).add(
head.id
)
def revision_add_to_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
self.cache["revision_in_origin"].add((revision.id, origin.id))
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]:
return self.get_dates("revision", [revision.id]).get(revision.id)
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
cache = self.cache["revision_origin"]["data"]
if revision.id not in cache:
ret = self.storage.revision_get([revision.id])
if revision.id in ret:
origin = ret[revision.id].origin
if origin is not None:
cache[revision.id] = origin
return cache.get(revision.id)
def revision_in_history(self, revision: RevisionEntry) -> bool:
return revision.id in self.cache["revision_before_revision"] or bool(
self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id])
)
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
self.cache["revision_origin"]["data"][revision.id] = origin.id
self.cache["revision_origin"]["added"].add(revision.id)
def revision_visited(self, revision: RevisionEntry) -> bool:
return revision.id in dict(self.cache["revision_in_origin"]) or bool(
self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id])
)
diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py
index a6c81ec..2b578e1 100644
--- a/swh/provenance/revision.py
+++ b/swh/provenance/revision.py
@@ -1,246 +1,224 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
-import os
from typing import Generator, Iterable, Iterator, List, Optional, Tuple
from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from .archive import ArchiveInterface
+from .directory import directory_flatten
from .graph import IsochroneNode, build_isochrone_graph
from .interface import ProvenanceInterface
from .model import DirectoryEntry, RevisionEntry
REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds"
class CSVRevisionIterator:
"""Iterator over revisions typically present in the given CSV file.
The input is an iterator that produces 3 elements per row:
(id, date, root)
where:
- id: is the id (sha1_git) of the revision
- date: is the author date
- root: sha1 of the directory
"""
def __init__(
self,
revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]],
limit: Optional[int] = None,
) -> None:
self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]]
if limit is not None:
from itertools import islice
self.revisions = islice(revisions, limit)
else:
self.revisions = iter(revisions)
def __iter__(self) -> Generator[RevisionEntry, None, None]:
for id, date, root in self.revisions:
if date.tzinfo is None:
date = date.replace(tzinfo=timezone.utc)
yield RevisionEntry(id, date=date, root=root)
@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"})
def revision_add(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
revisions: List[RevisionEntry],
trackall: bool = True,
lower: bool = True,
mindepth: int = 1,
minsize: int = 0,
commit: bool = True,
) -> None:
for revision in revisions:
assert revision.date is not None
assert revision.root is not None
# Processed content starting from the revision's root directory.
date = provenance.revision_get_date(revision)
if date is None or revision.date < date:
graph = build_isochrone_graph(
provenance,
archive,
revision,
DirectoryEntry(revision.root),
minsize=minsize,
)
revision_process_content(
provenance,
archive,
revision,
graph,
trackall=trackall,
lower=lower,
mindepth=mindepth,
minsize=minsize,
)
if commit:
provenance.flush()
@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"})
def revision_process_content(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
revision: RevisionEntry,
graph: IsochroneNode,
trackall: bool = True,
lower: bool = True,
mindepth: int = 1,
minsize: int = 0,
) -> None:
assert revision.date is not None
provenance.revision_add(revision)
stack = [graph]
while stack:
current = stack.pop()
if current.dbdate is not None:
assert current.dbdate <= revision.date
if trackall:
# Current directory is an outer isochrone frontier for a previously
# processed revision. It should be reused as is.
provenance.directory_add_to_revision(
revision, current.entry, current.path
)
else:
assert current.maxdate is not None
# Current directory is not an outer isochrone frontier for any previous
# revision. It might be eligible for this one.
if is_new_frontier(
current,
revision=revision,
trackall=trackall,
lower=lower,
mindepth=mindepth,
):
# Outer frontier should be moved to current position in the isochrone
# graph. This is the first time this directory is found in the isochrone
# frontier.
provenance.directory_set_date_in_isochrone_frontier(
current.entry, current.maxdate
)
if trackall:
provenance.directory_add_to_revision(
revision, current.entry, current.path
)
- flatten_directory(
+ directory_flatten(
provenance, archive, current.entry, minsize=minsize
)
else:
# If current node is an invalidated frontier, update its date for future
# revisions to get the proper value.
if current.invalid:
provenance.directory_set_date_in_isochrone_frontier(
current.entry, current.maxdate
)
# No point moving the frontier here. Either there are no files or they
# are being seen for the first time here. Add all blobs to current
# revision updating date if necessary, and recursively analyse
# subdirectories as candidates to the outer frontier.
for blob in current.entry.files:
date = provenance.content_get_early_date(blob)
if date is None or revision.date < date:
provenance.content_set_early_date(blob, revision.date)
provenance.content_add_to_revision(revision, blob, current.path)
for child in current.children:
stack.append(child)
-@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten_directory"})
-def flatten_directory(
- provenance: ProvenanceInterface,
- archive: ArchiveInterface,
- directory: DirectoryEntry,
- minsize: int = 0,
-) -> None:
- """Recursively retrieve all the files of 'directory' and insert them in the
- 'provenance' database in the 'content_to_directory' table.
- """
- stack = [(directory, b"")]
- while stack:
- current, prefix = stack.pop()
- current.retrieve_children(archive, minsize=minsize)
- for f_child in current.files:
- # Add content to the directory with the computed prefix.
- provenance.content_add_to_directory(directory, f_child, prefix)
- for d_child in current.dirs:
- # Recursively walk the child directory.
- stack.append((d_child, os.path.join(prefix, d_child.name)))
-
-
def is_new_frontier(
node: IsochroneNode,
revision: RevisionEntry,
trackall: bool = True,
lower: bool = True,
mindepth: int = 1,
) -> bool:
assert node.maxdate is not None # for mypy
assert revision.date is not None # idem
if trackall:
# The only real condition for a directory to be a frontier is that its content
# is already known and its maxdate is less (or equal) than current revision's
# date. Checking mindepth is meant to skip root directories (or any arbitrary
# depth) to improve the result. The option lower tries to maximize the reuse
# rate of previously defined frontiers by keeping them low in the directory
# tree.
return (
node.known
and node.maxdate <= revision.date # all content is earlier than revision
and node.depth
>= mindepth # current node is deeper than the min allowed depth
and (has_blobs(node) if lower else True) # there is at least one blob in it
)
else:
# If we are only tracking first occurrences, we want to ensure that all first
# occurrences end up in the content_early_in_rev relation. Thus, we force for
# every blob outside a frontier to have an strictly earlier date.
return (
node.maxdate < revision.date # all content is earlier than revision
and node.depth >= mindepth # deeper than the min allowed depth
and (has_blobs(node) if lower else True) # there is at least one blob
)
def has_blobs(node: IsochroneNode) -> bool:
# We may want to look for files in different ways to decide whether to define a
# frontier or not:
# 1. Only files in current node:
return any(node.entry.files)
# 2. Files anywhere in the isochrone graph
# stack = [node]
# while stack:
# current = stack.pop()
# if any(
# map(lambda child: isinstance(child.entry, FileEntry), current.children)):
# return True
# else:
# # All children are directory entries.
# stack.extend(current.children)
# return False
# 3. Files in the intermediate directories between current node and any previously
# defined frontier:
# TODO: complete this case!
# return any(
# map(lambda child: isinstance(child.entry, FileEntry), node.children)
# ) or all(
# map(
# lambda child: (
# not (isinstance(child.entry, DirectoryEntry) and child.date is None)
# )
# or has_blobs(child),
# node.children,
# )
# )
diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py
index cceb30a..6ed98a2 100644
--- a/swh/provenance/tests/test_cli.py
+++ b/swh/provenance/tests/test_cli.py
@@ -1,104 +1,105 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Set
from _pytest.monkeypatch import MonkeyPatch
from click.testing import CliRunner
import psycopg2.extensions
import pytest
from swh.core.cli import swh as swhmain
import swh.core.cli.db # noqa ; ensure cli is loaded
from swh.core.db import BaseDb
import swh.provenance.cli # noqa ; ensure cli is loaded
def test_cli_swh_db_help() -> None:
# swhmain.add_command(provenance_cli)
result = CliRunner().invoke(swhmain, ["provenance", "-h"])
assert result.exit_code == 0
assert "Commands:" in result.output
commands = result.output.split("Commands:")[1]
for command in (
"find-all",
"find-first",
+ "iter-frontiers",
"iter-origins",
"iter-revisions",
):
assert f" {command} " in commands
TABLES = {
"dbflavor",
"dbversion",
"content",
"content_in_revision",
"content_in_directory",
"directory",
"directory_in_revision",
"location",
"origin",
"revision",
"revision_before_revision",
"revision_in_origin",
}
@pytest.mark.parametrize(
"flavor, dbtables", (("with-path", TABLES | {"location"}), ("without-path", TABLES))
)
def test_cli_db_create_and_init_db_with_flavor(
monkeypatch: MonkeyPatch,
postgresql: psycopg2.extensions.connection,
flavor: str,
dbtables: Set[str],
) -> None:
"""Test that 'swh db init provenance' works with flavors
for both with-path and without-path flavors"""
dbname = f"{flavor}-db"
# DB creation using 'swh db create'
db_params = postgresql.get_dsn_parameters()
monkeypatch.setenv("PGHOST", db_params["host"])
monkeypatch.setenv("PGUSER", db_params["user"])
monkeypatch.setenv("PGPORT", db_params["port"])
result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"])
assert result.exit_code == 0, result.output
# DB init using 'swh db init'
result = CliRunner().invoke(
swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"]
)
assert result.exit_code == 0, result.output
assert f"(flavor {flavor})" in result.output
db_params["dbname"] = dbname
cnx = BaseDb.connect(**db_params).conn
# check the DB looks OK (check for db_flavor and expected tables)
with cnx.cursor() as cur:
cur.execute("select swh_get_dbflavor()")
assert cur.fetchone() == (flavor,)
cur.execute(
"select table_name from information_schema.tables "
"where table_schema = 'public' "
f"and table_catalog = '{dbname}'"
)
tables = set(x for (x,) in cur.fetchall())
assert tables == dbtables
def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None:
"Test that 'swh db init provenance' defaults to a with-path flavored DB"
dbname = postgresql.dsn
result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"])
assert result.exit_code == 0, result.output
with postgresql.cursor() as cur:
cur.execute("select swh_get_dbflavor()")
assert cur.fetchone() == ("with-path",)
diff --git a/swh/provenance/tests/test_directory_flatten.py b/swh/provenance/tests/test_directory_flatten.py
new file mode 100644
index 0000000..82f7257
--- /dev/null
+++ b/swh/provenance/tests/test_directory_flatten.py
@@ -0,0 +1,72 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from datetime import datetime, timezone
+
+from swh.model.hashutil import hash_to_bytes
+from swh.provenance.archive import ArchiveInterface
+from swh.provenance.directory import directory_add
+from swh.provenance.interface import (
+ DirectoryData,
+ ProvenanceInterface,
+ RelationData,
+ RelationType,
+)
+from swh.provenance.model import DirectoryEntry, FileEntry
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+
+
+def test_directory_add(
+ provenance: ProvenanceInterface,
+ archive: ArchiveInterface,
+) -> None:
+ # read data/README.md for more details on how these datasets are generated
+ data = load_repo_data("cmdbts2")
+ fill_storage(archive.storage, data)
+
+ # just take a directory that is known to exists in cmdbts2
+ directory = DirectoryEntry(
+ id=hash_to_bytes("48007c961cc734d1f63886d0413a6dc605e3e2ea")
+ )
+ content1 = FileEntry(
+ id=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), name=b"a"
+ )
+ content2 = FileEntry(
+ id=hash_to_bytes("50e9cdb03f9719261dd39d7f2920b906db3711a3"), name=b"b"
+ )
+ date = datetime.fromtimestamp(1000000010, timezone.utc)
+
+ # directory_add and the internal directory_flatten require the directory and its
+ # content to be known by the provenance object. Otherwise, they do nothing
+ provenance.directory_set_date_in_isochrone_frontier(directory, date)
+ provenance.content_set_early_date(content1, date)
+ provenance.content_set_early_date(content2, date)
+ provenance.flush()
+ assert provenance.storage.directory_get([directory.id]) == {
+ directory.id: DirectoryData(date=date, flat=False)
+ }
+ assert provenance.storage.content_get([content1.id, content2.id]) == {
+ content1.id: date,
+ content2.id: date,
+ }
+
+ # this query forces the directory date to be retrieved from the storage and cached
+ # (otherwise, the flush below won't update the directory flatten flag)
+ flattenned = provenance.directory_already_flattenned(directory)
+ assert flattenned is not None and not flattenned
+
+ # flatten the directory and check the expected result
+ directory_add(provenance, archive, [directory])
+ assert provenance.storage.directory_get([directory.id]) == {
+ directory.id: DirectoryData(date=date, flat=True)
+ }
+ assert provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) == {
+ content1.id: {
+ RelationData(dst=directory.id, path=b"a"),
+ RelationData(dst=directory.id, path=b"C/a"),
+ },
+ content2.id: {RelationData(dst=directory.id, path=b"C/b")},
+ }
diff --git a/swh/provenance/tests/test_directory_iterator.py b/swh/provenance/tests/test_directory_iterator.py
new file mode 100644
index 0000000..f86c8c4
--- /dev/null
+++ b/swh/provenance/tests/test_directory_iterator.py
@@ -0,0 +1,29 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.provenance.directory import CSVDirectoryIterator
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+
+@pytest.mark.parametrize(
+ "repo",
+ (
+ "cmdbts2",
+ "out-of-order",
+ ),
+)
+def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None:
+ """Test CSVDirectoryIterator"""
+ data = load_repo_data(repo)
+ fill_storage(swh_storage, data)
+
+ directories_ids = [dir["id"] for dir in data["directory"]]
+ directories = list(CSVDirectoryIterator(directories_ids))
+
+ assert directories
+ assert len(directories) == len(data["directory"])
diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py
index b7bc070..6b81a37 100644
--- a/swh/provenance/tests/test_revision_content_layer.py
+++ b/swh/provenance/tests/test_revision_content_layer.py
@@ -1,456 +1,453 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple
import pytest
from typing_extensions import TypedDict
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.provenance.archive import ArchiveInterface
from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType
from swh.provenance.model import RevisionEntry
from swh.provenance.revision import revision_add
from swh.provenance.tests.conftest import (
fill_storage,
get_datafile,
load_repo_data,
ts2dt,
)
class SynthRelation(TypedDict):
prefix: Optional[str]
path: str
src: Sha1Git
dst: Sha1Git
rel_ts: float
class SynthRevision(TypedDict):
sha1: Sha1Git
date: float
msg: str
R_C: List[SynthRelation]
R_D: List[SynthRelation]
D_C: List[SynthRelation]
def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]:
"""Generates dict representations of synthetic revisions found in the synthetic
file (from the data/ directory) given as argument of the generator.
Generated SynthRevision (typed dict) with the following elements:
"sha1": (Sha1Git) sha1 of the revision,
"date": (float) timestamp of the revision,
"msg": (str) commit message of the revision,
"R_C": (list) new R---C relations added by this revision
"R_D": (list) new R-D relations added by this revision
"D_C": (list) new D-C relations added by this revision
Each relation above is a SynthRelation typed dict with:
"path": (str) location
"src": (Sha1Git) sha1 of the source of the relation
"dst": (Sha1Git) sha1 of the destination of the relation
"rel_ts": (float) timestamp of the target of the relation
(related to the timestamp of the revision)
"""
with open(get_datafile(filename), "r") as fobj:
yield from _parse_synthetic_revision_content_file(fobj)
def _parse_synthetic_revision_content_file(
fobj: Iterable[str],
) -> Iterator[SynthRevision]:
"""Read a 'synthetic' file and generate a dict representation of the synthetic
revision for each revision listed in the synthetic file.
"""
regs = [
"(?P<revname>R[0-9]{2,4})?",
"(?P<reltype>[^| ]*)",
"([+] )?(?P<path>[^| +]*?)[/]?",
"(?P<type>[RDC]) (?P<sha1>[0-9a-f]{40})",
"(?P<ts>-?[0-9]+(.[0-9]+)?)",
]
regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$")
current_rev: List[dict] = []
for m in (regex.match(line) for line in fobj):
if m:
d = m.groupdict()
if d["revname"]:
if current_rev:
yield _mk_synth_rev(current_rev)
current_rev.clear()
current_rev.append(d)
if current_rev:
yield _mk_synth_rev(current_rev)
def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision:
assert synth_rev[0]["type"] == "R"
rev = SynthRevision(
sha1=hash_to_bytes(synth_rev[0]["sha1"]),
date=float(synth_rev[0]["ts"]),
msg=synth_rev[0]["revname"],
R_C=[],
R_D=[],
D_C=[],
)
current_path = None
# path of the last R-D relation we parsed, used a prefix for next D-C
# relations
for row in synth_rev[1:]:
if row["reltype"] == "R---C":
assert row["type"] == "C"
rev["R_C"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = None
elif row["reltype"] == "R-D":
assert row["type"] == "D"
rev["R_D"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = row["path"]
elif row["reltype"] == "D-C":
assert row["type"] == "C"
rev["D_C"].append(
SynthRelation(
prefix=current_path,
path=row["path"],
src=rev["R_D"][-1]["dst"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
return rev
@pytest.mark.parametrize(
"repo, lower, mindepth",
(
("cmdbts2", True, 1),
("cmdbts2", False, 1),
("cmdbts2", True, 2),
("cmdbts2", False, 2),
("out-of-order", True, 1),
),
)
def test_revision_content_result(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
repo: str,
lower: bool,
mindepth: int,
) -> None:
# read data/README.md for more details on how these datasets are generated
data = load_repo_data(repo)
fill_storage(archive.storage, data)
syntheticfile = get_datafile(
f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"
)
revisions = {rev["id"]: rev for rev in data["revision"]}
rows: Dict[str, Set[Any]] = {
"content": set(),
"content_in_directory": set(),
"content_in_revision": set(),
"directory": set(),
"directory_in_revision": set(),
"location": set(),
"revision": set(),
}
def maybe_path(path: str) -> Optional[bytes]:
if provenance.storage.with_path():
return path.encode("utf-8")
return None
for synth_rev in synthetic_revision_content_result(syntheticfile):
revision = revisions[synth_rev["sha1"]]
entry = RevisionEntry(
id=revision["id"],
date=ts2dt(revision["date"]),
root=revision["directory"],
)
revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth)
# each "entry" in the synth file is one new revision
rows["revision"].add(synth_rev["sha1"])
assert rows["revision"] == provenance.storage.entity_get_all(
EntityType.REVISION
), synth_rev["msg"]
# check the timestamp of the revision
rev_ts = synth_rev["date"]
rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[
synth_rev["sha1"]
]
assert (
rev_data.date is not None and rev_ts == rev_data.date.timestamp()
), synth_rev["msg"]
# this revision might have added new content objects
rows["content"] |= set(x["dst"] for x in synth_rev["R_C"])
rows["content"] |= set(x["dst"] for x in synth_rev["D_C"])
assert rows["content"] == provenance.storage.entity_get_all(
EntityType.CONTENT
), synth_rev["msg"]
# check for R-C (direct) entries
# these are added directly in the content_early_in_rev table
rows["content_in_revision"] |= set(
(x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"]
)
assert rows["content_in_revision"] == {
(src, rel.dst, rel.path)
for src, rels in provenance.storage.relation_get_all(
RelationType.CNT_EARLY_IN_REV
).items()
for rel in rels
}, synth_rev["msg"]
# check timestamps
for rc in synth_rev["R_C"]:
assert (
rev_ts + rc["rel_ts"]
== provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp()
), synth_rev["msg"]
# check directories
# each directory stored in the provenance index is an entry
# in the "directory" table...
rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"])
assert rows["directory"] == provenance.storage.entity_get_all(
EntityType.DIRECTORY
), synth_rev["msg"]
# ... + a number of rows in the "directory_in_rev" table...
# check for R-D entries
rows["directory_in_revision"] |= set(
(x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"]
)
assert rows["directory_in_revision"] == {
(src, rel.dst, rel.path)
for src, rels in provenance.storage.relation_get_all(
RelationType.DIR_IN_REV
).items()
for rel in rels
}, synth_rev["msg"]
# check timestamps
for rd in synth_rev["R_D"]:
- assert (
- rev_ts + rd["rel_ts"]
- == provenance.storage.directory_get([rd["dst"]])[
- rd["dst"]
- ].date.timestamp()
- ), synth_rev["msg"]
+ dir_data = provenance.storage.directory_get([rd["dst"]])[rd["dst"]]
+ assert rev_ts + rd["rel_ts"] == dir_data.date.timestamp(), synth_rev["msg"]
+ assert dir_data.flat, synth_rev["msg"]
# ... + a number of rows in the "content_in_dir" table
# for content of the directory.
# check for D-C entries
rows["content_in_directory"] |= set(
(x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"]
)
assert rows["content_in_directory"] == {
(src, rel.dst, rel.path)
for src, rels in provenance.storage.relation_get_all(
RelationType.CNT_IN_DIR
).items()
for rel in rels
}, synth_rev["msg"]
# check timestamps
for dc in synth_rev["D_C"]:
assert (
rev_ts + dc["rel_ts"]
== provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp()
), synth_rev["msg"]
if provenance.storage.with_path():
# check for location entries
rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"])
rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"])
rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"])
assert rows["location"] == provenance.storage.location_get_all(), synth_rev[
"msg"
]
@pytest.mark.parametrize(
"repo, lower, mindepth",
(
("cmdbts2", True, 1),
("cmdbts2", False, 1),
("cmdbts2", True, 2),
("cmdbts2", False, 2),
("out-of-order", True, 1),
),
)
@pytest.mark.parametrize("batch", (True, False))
def test_provenance_heuristics_content_find_all(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
repo: str,
lower: bool,
mindepth: int,
batch: bool,
) -> None:
# read data/README.md for more details on how these datasets are generated
data = load_repo_data(repo)
fill_storage(archive.storage, data)
revisions = [
RevisionEntry(
id=revision["id"],
date=ts2dt(revision["date"]),
root=revision["directory"],
)
for revision in data["revision"]
]
def maybe_path(path: str) -> str:
if provenance.storage.with_path():
return path
return ""
if batch:
revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth)
else:
for revision in revisions:
revision_add(
provenance, archive, [revision], lower=lower, mindepth=mindepth
)
syntheticfile = get_datafile(
f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"
)
expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {}
for synth_rev in synthetic_revision_content_result(syntheticfile):
rev_id = synth_rev["sha1"].hex()
rev_ts = synth_rev["date"]
for rc in synth_rev["R_C"]:
expected_occurrences.setdefault(rc["dst"].hex(), []).append(
(rev_id, rev_ts, None, maybe_path(rc["path"]))
)
for dc in synth_rev["D_C"]:
assert dc["prefix"] is not None # to please mypy
expected_occurrences.setdefault(dc["dst"].hex(), []).append(
(rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"]))
)
for content_id, results in expected_occurrences.items():
expected = [(content_id, *result) for result in results]
db_occurrences = [
(
occur.content.hex(),
occur.revision.hex(),
occur.date.timestamp(),
occur.origin,
occur.path.decode(),
)
for occur in provenance.content_find_all(hash_to_bytes(content_id))
]
if provenance.storage.with_path():
# this is not true if the db stores no path, because a same content
# that appears several times in a given revision may be reported
# only once by content_find_all()
assert len(db_occurrences) == len(expected)
assert set(db_occurrences) == set(expected)
@pytest.mark.parametrize(
"repo, lower, mindepth",
(
("cmdbts2", True, 1),
("cmdbts2", False, 1),
("cmdbts2", True, 2),
("cmdbts2", False, 2),
("out-of-order", True, 1),
),
)
@pytest.mark.parametrize("batch", (True, False))
def test_provenance_heuristics_content_find_first(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
repo: str,
lower: bool,
mindepth: int,
batch: bool,
) -> None:
# read data/README.md for more details on how these datasets are generated
data = load_repo_data(repo)
fill_storage(archive.storage, data)
revisions = [
RevisionEntry(
id=revision["id"],
date=ts2dt(revision["date"]),
root=revision["directory"],
)
for revision in data["revision"]
]
if batch:
revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth)
else:
for revision in revisions:
revision_add(
provenance, archive, [revision], lower=lower, mindepth=mindepth
)
syntheticfile = get_datafile(
f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"
)
expected_first: Dict[str, Tuple[str, float, List[str]]] = {}
# dict of tuples (blob_id, rev_id, [path, ...]) the third element for path
# is a list because a content can be added at several places in a single
# revision, in which case the result of content_find_first() is one of
# those path, but we have no guarantee which one it will return.
for synth_rev in synthetic_revision_content_result(syntheticfile):
rev_id = synth_rev["sha1"].hex()
rev_ts = synth_rev["date"]
for rc in synth_rev["R_C"]:
sha1 = rc["dst"].hex()
if sha1 not in expected_first:
assert rc["rel_ts"] == 0
expected_first[sha1] = (rev_id, rev_ts, [rc["path"]])
else:
if rev_ts == expected_first[sha1][1]:
expected_first[sha1][2].append(rc["path"])
elif rev_ts < expected_first[sha1][1]:
expected_first[sha1] = (rev_id, rev_ts, [rc["path"]])
for dc in synth_rev["D_C"]:
sha1 = rc["dst"].hex()
assert sha1 in expected_first
# nothing to do there, this content cannot be a "first seen file"
for content_id, (rev_id, ts, paths) in expected_first.items():
occur = provenance.content_find_first(hash_to_bytes(content_id))
assert occur is not None
assert occur.content.hex() == content_id
assert occur.revision.hex() == rev_id
assert occur.date.timestamp() == ts
assert occur.origin is None
if provenance.storage.with_path():
assert occur.path.decode() in paths

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:18 PM (1 w, 11 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3282606

Event Timeline