Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/test_provenance_db.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import pytest | import pytest | ||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | from swh.model.tests.swh_model_data import TEST_OBJECTS | ||||
from swh.provenance.model import RevisionEntry | |||||
from swh.provenance.origin import OriginEntry | from swh.provenance.origin import OriginEntry | ||||
from swh.provenance.provenance import origin_add, revision_add | from swh.provenance.provenance import origin_add, revision_add | ||||
from swh.provenance.revision import RevisionEntry | from swh.provenance.storage.archive import ArchiveStorage | ||||
from swh.provenance.tests.conftest import synthetic_result | from swh.provenance.tests.conftest import synthetic_result | ||||
def ts2dt(ts: dict) -> datetime.datetime: | def ts2dt(ts: dict) -> datetime.datetime: | ||||
timestamp = datetime.datetime.fromtimestamp( | timestamp = datetime.datetime.fromtimestamp( | ||||
ts["timestamp"]["seconds"], | ts["timestamp"]["seconds"], | ||||
datetime.timezone(datetime.timedelta(minutes=ts["offset"])), | datetime.timezone(datetime.timedelta(minutes=ts["offset"])), | ||||
) | ) | ||||
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) | ||||
def test_provenance_origin_add(provenance, swh_storage_with_objects): | def test_provenance_origin_add(provenance, swh_storage_with_objects): | ||||
"""Test the ProvenanceDB.origin_add() method""" | """Test the ProvenanceDB.origin_add() method""" | ||||
for origin in TEST_OBJECTS["origin"]: | for origin in TEST_OBJECTS["origin"]: | ||||
entry = OriginEntry(url=origin.url, revisions=[]) | entry = OriginEntry(url=origin.url, revisions=[]) | ||||
origin_add(provenance, entry) | origin_add(ArchiveStorage(swh_storage_with_objects), provenance, entry) | ||||
# TODO: check some facts here | # TODO: check some facts here | ||||
def test_provenance_add_revision(provenance, storage_and_CMDBTS, archive_pg): | def test_provenance_add_revision(provenance, storage_and_CMDBTS, archive_pg): | ||||
storage, data = storage_and_CMDBTS | storage, data = storage_and_CMDBTS | ||||
for i in range(2): | for i in range(2): | ||||
# do it twice, there should be no change in results | # do it twice, there should be no change in results | ||||
for revision in data["revision"]: | for revision in data["revision"]: | ||||
entry = RevisionEntry( | entry = RevisionEntry( | ||||
archive_pg, | |||||
id=revision["id"], | id=revision["id"], | ||||
date=ts2dt(revision["date"]), | date=ts2dt(revision["date"]), | ||||
root=revision["directory"], | root=revision["directory"], | ||||
parents=revision["parents"], | |||||
) | ) | ||||
revision_add(provenance, archive_pg, entry) | revision_add(provenance, archive_pg, entry) | ||||
# there should be as many entries in 'revision' as revisions from the | # there should be as many entries in 'revision' as revisions from the | ||||
# test dataset | # test dataset | ||||
provenance.cursor.execute("SELECT count(*) FROM revision") | provenance.cursor.execute("SELECT count(*) FROM revision") | ||||
assert provenance.cursor.fetchone()[0] == len(data["revision"]) | assert provenance.cursor.fetchone()[0] == len(data["revision"]) | ||||
Show All 25 Lines | for i in range(2): | ||||
provenance.cursor.execute("SELECT count(*) FROM content_early_in_rev") | provenance.cursor.execute("SELECT count(*) FROM content_early_in_rev") | ||||
assert provenance.cursor.fetchone()[0] == 13 | assert provenance.cursor.fetchone()[0] == 13 | ||||
def test_provenance_content_find_first(provenance, storage_and_CMDBTS, archive_pg): | def test_provenance_content_find_first(provenance, storage_and_CMDBTS, archive_pg): | ||||
storage, data = storage_and_CMDBTS | storage, data = storage_and_CMDBTS | ||||
for revision in data["revision"]: | for revision in data["revision"]: | ||||
entry = RevisionEntry( | entry = RevisionEntry( | ||||
archive_pg, | id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], | ||||
id=revision["id"], | |||||
date=ts2dt(revision["date"]), | |||||
root=revision["directory"], | |||||
parents=revision["parents"], | |||||
) | ) | ||||
revision_add(provenance, archive_pg, entry) | revision_add(provenance, archive_pg, entry) | ||||
first_expected_content = [ | first_expected_content = [ | ||||
{ | { | ||||
"content": "43f3c871310a8e524004e91f033e7fb3b0bc8475", | "content": "43f3c871310a8e524004e91f033e7fb3b0bc8475", | ||||
"rev": "35ccb8dd1b53d2d8a5c1375eb513ef2beaa79ae5", | "rev": "35ccb8dd1b53d2d8a5c1375eb513ef2beaa79ae5", | ||||
"date": 1609757158, | "date": 1609757158, | ||||
▲ Show 20 Lines • Show All 91 Lines • ▼ Show 20 Lines | def test_provenance_db(provenance, storage_and_CMDBTS, archive_pg, syntheticfile, args): | ||||
def db_count(table): | def db_count(table): | ||||
provenance.cursor.execute(f"SELECT count(*) FROM {table}") | provenance.cursor.execute(f"SELECT count(*) FROM {table}") | ||||
return provenance.cursor.fetchone()[0] | return provenance.cursor.fetchone()[0] | ||||
for synth_rev in synthetic_result(syntheticfile): | for synth_rev in synthetic_result(syntheticfile): | ||||
revision = revisions[synth_rev["sha1"]] | revision = revisions[synth_rev["sha1"]] | ||||
entry = RevisionEntry( | entry = RevisionEntry( | ||||
archive_pg, | id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], | ||||
id=revision["id"], | |||||
date=ts2dt(revision["date"]), | |||||
root=revision["directory"], | |||||
parents=revision["parents"], | |||||
) | ) | ||||
revision_add(provenance, archive_pg, entry, **args) | revision_add(provenance, archive_pg, entry, **args) | ||||
# import pdb; pdb.set_trace() | |||||
# each "entry" in the synth file is one new revision | # each "entry" in the synth file is one new revision | ||||
rows["revision"].add(synth_rev["sha1"]) | rows["revision"].add(synth_rev["sha1"]) | ||||
assert len(rows["revision"]) == db_count("revision") | assert len(rows["revision"]) == db_count("revision") | ||||
# this revision might have added new content objects | # this revision might have added new content objects | ||||
rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) | rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) | ||||
rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) | rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) | ||||
assert len(rows["content"]) == db_count("content") | assert len(rows["content"]) == db_count("content") | ||||
Show All 28 Lines |