Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/test_provenance_heuristics.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Any, Dict, List, Optional, Set, Tuple | from typing import Any, Dict, List, Optional, Set, Tuple | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.provenance.archive import ArchiveInterface | from swh.provenance.archive import ArchiveInterface | ||||
from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType | from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType | ||||
from swh.provenance.model import RevisionEntry | from swh.provenance.model import RevisionEntry | ||||
from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase | |||||
from swh.provenance.revision import revision_add | from swh.provenance.revision import revision_add | ||||
from swh.provenance.tests.conftest import ( | from swh.provenance.tests.conftest import ( | ||||
fill_storage, | fill_storage, | ||||
get_datafile, | get_datafile, | ||||
load_repo_data, | load_repo_data, | ||||
synthetic_result, | synthetic_result, | ||||
) | ) | ||||
from swh.provenance.tests.test_provenance_db import ts2dt | from swh.provenance.tests.test_provenance_db import ts2dt | ||||
Show All 33 Lines | rows: Dict[str, Set[Any]] = { | ||||
"content_in_revision": set(), | "content_in_revision": set(), | ||||
"directory": set(), | "directory": set(), | ||||
"directory_in_revision": set(), | "directory_in_revision": set(), | ||||
"location": set(), | "location": set(), | ||||
"revision": set(), | "revision": set(), | ||||
} | } | ||||
def maybe_path(path: str) -> Optional[bytes]: | def maybe_path(path: str) -> Optional[bytes]: | ||||
assert isinstance(provenance.storage, ProvenanceDBBase) | if provenance.storage.with_path(): | ||||
if provenance.storage.with_path: | |||||
return path.encode("utf-8") | return path.encode("utf-8") | ||||
return None | return None | ||||
for synth_rev in synthetic_result(syntheticfile): | for synth_rev in synthetic_result(syntheticfile): | ||||
revision = revisions[synth_rev["sha1"]] | revision = revisions[synth_rev["sha1"]] | ||||
entry = RevisionEntry( | entry = RevisionEntry( | ||||
id=revision["id"], | id=revision["id"], | ||||
date=ts2dt(revision["date"]), | date=ts2dt(revision["date"]), | ||||
▲ Show 20 Lines • Show All 76 Lines • ▼ Show 20 Lines | for synth_rev in synthetic_result(syntheticfile): | ||||
}, synth_rev["msg"] | }, synth_rev["msg"] | ||||
# check timestamps | # check timestamps | ||||
for dc in synth_rev["D_C"]: | for dc in synth_rev["D_C"]: | ||||
assert ( | assert ( | ||||
rev_ts + dc["rel_ts"] | rev_ts + dc["rel_ts"] | ||||
== provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() | == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() | ||||
), synth_rev["msg"] | ), synth_rev["msg"] | ||||
assert isinstance(provenance.storage, ProvenanceDBBase) | if provenance.storage.with_path(): | ||||
if provenance.storage.with_path: | |||||
# check for location entries | # check for location entries | ||||
rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) | rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) | ||||
rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) | rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) | ||||
rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) | rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) | ||||
assert rows["location"] == provenance.storage.location_get(), synth_rev[ | assert rows["location"] == provenance.storage.location_get(), synth_rev[ | ||||
"msg" | "msg" | ||||
] | ] | ||||
Show All 26 Lines | revisions = [ | ||||
id=revision["id"], | id=revision["id"], | ||||
date=ts2dt(revision["date"]), | date=ts2dt(revision["date"]), | ||||
root=revision["directory"], | root=revision["directory"], | ||||
) | ) | ||||
for revision in data["revision"] | for revision in data["revision"] | ||||
] | ] | ||||
def maybe_path(path: str) -> str: | def maybe_path(path: str) -> str: | ||||
assert isinstance(provenance.storage, ProvenanceDBBase) | if provenance.storage.with_path(): | ||||
if provenance.storage.with_path: | |||||
return path | return path | ||||
return "" | return "" | ||||
if batch: | if batch: | ||||
revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) | revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) | ||||
else: | else: | ||||
for revision in revisions: | for revision in revisions: | ||||
revision_add( | revision_add( | ||||
Show All 13 Lines | for synth_rev in synthetic_result(syntheticfile): | ||||
(rev_id, rev_ts, None, maybe_path(rc["path"])) | (rev_id, rev_ts, None, maybe_path(rc["path"])) | ||||
) | ) | ||||
for dc in synth_rev["D_C"]: | for dc in synth_rev["D_C"]: | ||||
assert dc["prefix"] is not None # to please mypy | assert dc["prefix"] is not None # to please mypy | ||||
expected_occurrences.setdefault(dc["dst"].hex(), []).append( | expected_occurrences.setdefault(dc["dst"].hex(), []).append( | ||||
(rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) | (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) | ||||
) | ) | ||||
assert isinstance(provenance.storage, ProvenanceDBBase) | |||||
for content_id, results in expected_occurrences.items(): | for content_id, results in expected_occurrences.items(): | ||||
expected = [(content_id, *result) for result in results] | expected = [(content_id, *result) for result in results] | ||||
db_occurrences = [ | db_occurrences = [ | ||||
( | ( | ||||
occur.content.hex(), | occur.content.hex(), | ||||
occur.revision.hex(), | occur.revision.hex(), | ||||
occur.date.timestamp(), | occur.date.timestamp(), | ||||
occur.origin, | occur.origin, | ||||
occur.path.decode(), | occur.path.decode(), | ||||
) | ) | ||||
for occur in provenance.content_find_all(hash_to_bytes(content_id)) | for occur in provenance.content_find_all(hash_to_bytes(content_id)) | ||||
] | ] | ||||
if provenance.storage.with_path: | if provenance.storage.with_path(): | ||||
# this is not true if the db stores no path, because a same content | # this is not true if the db stores no path, because a same content | ||||
# that appears several times in a given revision may be reported | # that appears several times in a given revision may be reported | ||||
# only once by content_find_all() | # only once by content_find_all() | ||||
assert len(db_occurrences) == len(expected) | assert len(db_occurrences) == len(expected) | ||||
assert set(db_occurrences) == set(expected) | assert set(db_occurrences) == set(expected) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | for synth_rev in synthetic_result(syntheticfile): | ||||
elif rev_ts < expected_first[sha1][1]: | elif rev_ts < expected_first[sha1][1]: | ||||
expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) | expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) | ||||
for dc in synth_rev["D_C"]: | for dc in synth_rev["D_C"]: | ||||
sha1 = rc["dst"].hex() | sha1 = rc["dst"].hex() | ||||
assert sha1 in expected_first | assert sha1 in expected_first | ||||
# nothing to do there, this content cannot be a "first seen file" | # nothing to do there, this content cannot be a "first seen file" | ||||
assert isinstance(provenance.storage, ProvenanceDBBase) | |||||
for content_id, (rev_id, ts, paths) in expected_first.items(): | for content_id, (rev_id, ts, paths) in expected_first.items(): | ||||
occur = provenance.content_find_first(hash_to_bytes(content_id)) | occur = provenance.content_find_first(hash_to_bytes(content_id)) | ||||
assert occur is not None | assert occur is not None | ||||
assert occur.content.hex() == content_id | assert occur.content.hex() == content_id | ||||
assert occur.revision.hex() == rev_id | assert occur.revision.hex() == rev_id | ||||
assert occur.date.timestamp() == ts | assert occur.date.timestamp() == ts | ||||
assert occur.origin is None | assert occur.origin is None | ||||
if provenance.storage.with_path: | if provenance.storage.with_path(): | ||||
assert occur.path.decode() in paths | assert occur.path.decode() in paths |