Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/test_provenance_db.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import pytest | |||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | from swh.model.tests.swh_model_data import TEST_OBJECTS | ||||
from swh.provenance.model import DirectoryEntry, RevisionEntry | from swh.provenance.model import DirectoryEntry, RevisionEntry | ||||
from swh.provenance.origin import OriginEntry | from swh.provenance.origin import OriginEntry | ||||
from swh.provenance.provenance import build_isochrone_graph, origin_add, revision_add | from swh.provenance.provenance import build_isochrone_graph, origin_add, revision_add | ||||
from swh.provenance.storage.archive import ArchiveStorage | from swh.provenance.storage.archive import ArchiveStorage | ||||
from swh.provenance.tests.conftest import synthetic_result | |||||
def ts2dt(ts: dict) -> datetime.datetime: | def ts2dt(ts: dict) -> datetime.datetime: | ||||
timestamp = datetime.datetime.fromtimestamp( | timestamp = datetime.datetime.fromtimestamp( | ||||
ts["timestamp"]["seconds"], | ts["timestamp"]["seconds"], | ||||
datetime.timezone(datetime.timedelta(minutes=ts["offset"])), | datetime.timezone(datetime.timedelta(minutes=ts["offset"])), | ||||
) | ) | ||||
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | ||||
▲ Show 20 Lines • Show All 299 Lines • ▼ Show 20 Lines | for content, results in expected_content.items(): | ||||
contentid = bytes.fromhex(content) | contentid = bytes.fromhex(content) | ||||
occurrences = [ | occurrences = [ | ||||
(blob.hex(), rev.hex(), date.timestamp(), path) | (blob.hex(), rev.hex(), date.timestamp(), path) | ||||
for blob, rev, date, path in provenance.content_find_all(contentid) | for blob, rev, date, path in provenance.content_find_all(contentid) | ||||
] | ] | ||||
expected = [(content, *result) for result in results] | expected = [(content, *result) for result in results] | ||||
assert len(occurrences) == len(expected) | assert len(occurrences) == len(expected) | ||||
assert set(occurrences) == set(expected) | assert set(occurrences) == set(expected) | ||||
def sha1s(cur, table): | |||||
"""return the 'sha1' column from the DB 'table' (as hex) | |||||
'cur' is a cursor to the provenance index DB. | |||||
""" | |||||
cur.execute(f"SELECT sha1 FROM {table}") | |||||
return set(sha1.hex() for (sha1,) in cur.fetchall()) | |||||
def locations(cur): | |||||
"""return the 'path' column from the DB location table | |||||
'cur' is a cursor to the provenance index DB. | |||||
""" | |||||
cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") | |||||
return set(x for (x,) in cur.fetchall()) | |||||
def relations(cur, src, dst): | |||||
"""return the triplets ('sha1', 'sha1', 'path') from the DB | |||||
for the relation between 'src' table and 'dst' table | |||||
(i.e. for C-R, C-D and D-R relations). | |||||
'cur' is a cursor to the provenance index DB. | |||||
""" | |||||
relation = { | |||||
("content", "revision"): "content_early_in_rev", | |||||
("content", "directory"): "content_in_dir", | |||||
("directory", "revision"): "directory_in_rev", | |||||
}[(src, dst)] | |||||
srccol = {"content": "blob", "directory": "dir"}[src] | |||||
dstcol = {"directory": "dir", "revision": "rev"}[dst] | |||||
cur.execute( | |||||
f"SELECT encode(src.sha1::bytea, 'hex')," | |||||
f" encode(dst.sha1::bytea, 'hex')," | |||||
f" encode(location.path::bytea, 'escape') " | |||||
f"FROM {relation} as rel, " | |||||
f" {src} as src, {dst} as dst, location " | |||||
f"WHERE rel.{srccol}=src.id AND rel.{dstcol}=dst.id AND rel.loc=location.id" | |||||
) | |||||
return set(cur.fetchall()) | |||||
@pytest.mark.parametrize( | |||||
"syntheticfile, args", | |||||
( | |||||
("synthetic_lower_1.txt", {"lower": True, "mindepth": 1}), | |||||
("synthetic_upper_1.txt", {"lower": False, "mindepth": 1}), | |||||
("synthetic_lower_2.txt", {"lower": True, "mindepth": 2}), | |||||
("synthetic_upper_2.txt", {"lower": False, "mindepth": 2}), | |||||
), | |||||
) | |||||
def test_provenance_heuristics( | |||||
provenance, storage_and_CMDBTS, archive, syntheticfile, args | |||||
): | |||||
storage, data = storage_and_CMDBTS | |||||
revisions = {rev["id"]: rev for rev in data["revision"]} | |||||
rows = { | |||||
"content": set(), | |||||
"content_in_dir": set(), | |||||
"content_early_in_rev": set(), | |||||
"directory": set(), | |||||
"directory_in_rev": set(), | |||||
"location": set(), | |||||
"revision": set(), | |||||
} | |||||
for synth_rev in synthetic_result(syntheticfile): | |||||
revision = revisions[synth_rev["sha1"]] | |||||
entry = RevisionEntry( | |||||
id=revision["id"], | |||||
date=ts2dt(revision["date"]), | |||||
root=revision["directory"], | |||||
) | |||||
revision_add(provenance, archive, [entry], **args) | |||||
# each "entry" in the synth file is one new revision | |||||
rows["revision"].add(synth_rev["sha1"].hex()) | |||||
assert rows["revision"] == sha1s(provenance.cursor, "revision"), synth_rev[ | |||||
"msg" | |||||
] | |||||
# this revision might have added new content objects | |||||
rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) | |||||
rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) | |||||
assert rows["content"] == sha1s(provenance.cursor, "content"), synth_rev["msg"] | |||||
# check for R-C (direct) entries | |||||
rows["content_early_in_rev"] |= set( | |||||
(x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] | |||||
) | |||||
assert rows["content_early_in_rev"] == relations( | |||||
provenance.cursor, "content", "revision" | |||||
), synth_rev["msg"] | |||||
# check directories | |||||
rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) | |||||
assert rows["directory"] == sha1s(provenance.cursor, "directory"), synth_rev[ | |||||
"msg" | |||||
] | |||||
# check for R-D entries | |||||
rows["directory_in_rev"] |= set( | |||||
(x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] | |||||
) | |||||
assert rows["directory_in_rev"] == relations( | |||||
provenance.cursor, "directory", "revision" | |||||
), synth_rev["msg"] | |||||
# check for D-C entries | |||||
rows["content_in_dir"] |= set( | |||||
(x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] | |||||
) | |||||
assert rows["content_in_dir"] == relations( | |||||
provenance.cursor, "content", "directory" | |||||
), synth_rev["msg"] | |||||
# check for location entries | |||||
rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) | |||||
rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) | |||||
rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) | |||||
assert rows["location"] == locations(provenance.cursor), synth_rev["msg"] |