Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/backend.py
from datetime import datetime | from datetime import datetime | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple | from typing import Dict, Generator, Iterable, Optional, Set, Tuple | ||||
import psycopg2 # TODO: remove this dependency | |||||
from typing_extensions import Literal, TypedDict | from typing_extensions import Literal, TypedDict | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | ||||
from .provenance import ProvenanceResult | from .provenance import ProvenanceResult, ProvenanceStorageInterface, RelationType | ||||
class DatetimeCache(TypedDict): | class DatetimeCache(TypedDict): | ||||
data: Dict[Sha1Git, Optional[datetime]] | data: Dict[Sha1Git, Optional[datetime]] | ||||
added: Set[Sha1Git] | added: Set[Sha1Git] | ||||
class OriginCache(TypedDict): | class OriginCache(TypedDict): | ||||
Show All 32 Lines | return ProvenanceCache( | ||||
origin=OriginCache(data={}, added=set()), | origin=OriginCache(data={}, added=set()), | ||||
revision_origin=RevisionCache(data={}, added=set()), | revision_origin=RevisionCache(data={}, added=set()), | ||||
revision_before_revision={}, | revision_before_revision={}, | ||||
revision_in_origin=set(), | revision_in_origin=set(), | ||||
) | ) | ||||
class ProvenanceBackend: | class ProvenanceBackend: | ||||
raise_on_commit: bool = False | def __init__(self, storage: ProvenanceStorageInterface): | ||||
self.storage = storage | |||||
self.cache = new_cache() | |||||
def __init__(self, conn: psycopg2.extensions.connection): | def clear_caches(self) -> None: | ||||
from .postgresql.provenancedb_base import ProvenanceDBBase | self.cache = new_cache() | ||||
# TODO: this class should not know what the actual used DB is. | def flush(self) -> None: | ||||
self.storage: ProvenanceDBBase | # Revision-content layer insertions ############################################ | ||||
flavor = ProvenanceDBBase(conn).flavor | |||||
if flavor == "with-path": | |||||
from .postgresql.provenancedb_with_path import ProvenanceWithPathDB | |||||
self.storage = ProvenanceWithPathDB(conn) | # For this layer, relations need to be inserted first so that, in case of | ||||
else: | # failure, reprocessing the input does not generated an inconsistent database. | ||||
from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB | while not self.storage.relation_add( | ||||
RelationType.CNT_EARLY_IN_REV, self.cache["content_in_revision"] | |||||
): | |||||
logging.warning( | |||||
f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " | |||||
f"Data: {self.cache['content_in_revision']}. Retrying..." | |||||
) | |||||
print( | |||||
"content_in_revision", | |||||
self.storage.relation_get( | |||||
RelationType.CNT_EARLY_IN_REV, | |||||
(src for src, _, _ in self.cache["content_in_revision"]), | |||||
), | |||||
) | |||||
self.storage = ProvenanceWithoutPathDB(conn) | while not self.storage.relation_add( | ||||
self.cache: ProvenanceCache = new_cache() | RelationType.CNT_IN_DIR, self.cache["content_in_directory"] | ||||
): | |||||
logging.warning( | |||||
f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " | |||||
f"Data: {self.cache['content_in_directory']}. Retrying..." | |||||
) | |||||
print( | |||||
douardda: This could probably be factorized. Also are you sure you want to write the whole content of the… | |||||
Done Inline ActionsProbably not, but this is part of an ongoing refactoring. It will be improved in the process aeviso: Probably not, but this is part of an ongoing refactoring. It will be improved in the process | |||||
"content_in_directory", | |||||
self.storage.relation_get( | |||||
RelationType.CNT_IN_DIR, | |||||
(src for src, _, _ in self.cache["content_in_directory"]), | |||||
), | |||||
) | |||||
def clear_caches(self) -> None: | while not self.storage.relation_add( | ||||
self.cache = new_cache() | RelationType.DIR_IN_REV, self.cache["directory_in_revision"] | ||||
): | |||||
logging.warning( | |||||
f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " | |||||
f"Data: {self.cache['directory_in_revision']}. Retrying..." | |||||
) | |||||
print( | |||||
"directory_in_revision", | |||||
self.storage.relation_get( | |||||
RelationType.DIR_IN_REV, | |||||
(src for src, _, _ in self.cache["directory_in_revision"]), | |||||
), | |||||
) | |||||
def flush(self) -> None: | # After relations, dates for the entities can be safely set, acknowledging that | ||||
# TODO: for now we just forward the cache. This should be improved! | # these entities won't need to be reprocessed in case of failure. | ||||
while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): | dates = { | ||||
sha1: date | |||||
for sha1, date in self.cache["content"]["data"].items() | |||||
if sha1 in self.cache["content"]["added"] and date is not None | |||||
} | |||||
while not self.storage.content_set_date(dates): | |||||
logging.warning( | logging.warning( | ||||
f"Unable to commit cached information {self.cache}. Retrying..." | f"Unable to write content dates to the storage. " | ||||
f"Data: {dates}. Retrying..." | |||||
) | ) | ||||
dates = { | |||||
sha1: date | |||||
for sha1, date in self.cache["directory"]["data"].items() | |||||
if sha1 in self.cache["directory"]["added"] and date is not None | |||||
} | |||||
while not self.storage.directory_set_date(dates): | |||||
logging.warning( | |||||
f"Unable to write directory dates to the storage. " | |||||
f"Data: {dates}. Retrying..." | |||||
) | |||||
dates = { | |||||
sha1: date | |||||
for sha1, date in self.cache["revision"]["data"].items() | |||||
if sha1 in self.cache["revision"]["added"] and date is not None | |||||
} | |||||
while not self.storage.revision_set_date(dates): | |||||
logging.warning( | |||||
f"Unable to write revision dates to the storage. " | |||||
f"Data: {dates}. Retrying..." | |||||
Done Inline ActionsSame here, this could be factorized (looping on the 3 entity types). douardda: Same here, this could be factorized (looping on the 3 entity types). | |||||
) | |||||
# Origin-revision layer insertions ############################################# | |||||
# Origins urls should be inserted first so that internal ids' resolution works | |||||
# properly. | |||||
urls = { | |||||
sha1: date | |||||
for sha1, date in self.cache["origin"]["data"].items() | |||||
if sha1 in self.cache["origin"]["added"] | |||||
} | |||||
while not self.storage.origin_set_url(urls): | |||||
logging.warning( | |||||
f"Unable to write origins urls to the storage. " | |||||
f"Data: {urls}. Retrying..." | |||||
) | |||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | |||||
rbr_data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = sum( | |||||
[ | |||||
[ | |||||
(prev, next, None) | |||||
for next in self.cache["revision_before_revision"][prev] | |||||
] | |||||
for prev in self.cache["revision_before_revision"] | |||||
], | |||||
[], | |||||
) | |||||
while not self.storage.relation_add(RelationType.REV_BEFORE_REV, rbr_data): | |||||
logging.warning( | |||||
f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " | |||||
f"Data: {rbr_data}. Retrying..." | |||||
) | |||||
print( | |||||
"revision_before_revision", | |||||
self.storage.relation_get( | |||||
RelationType.REV_BEFORE_REV, | |||||
self.cache["revision_before_revision"], | |||||
), | |||||
) | |||||
# Heads (ie. revision-in-origin entries) should be inserted once flat models for | |||||
# their histories were already added. This is to guarantee consistent results if | |||||
# something needs to be reprocessed due to a failure: already inserted heads | |||||
# won't get reprocessed in such a case. | |||||
rio_data = [(rev, org, None) for rev, org in self.cache["revision_in_origin"]] | |||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, rio_data): | |||||
logging.warning( | |||||
f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " | |||||
f"Data: {rio_data}. Retrying..." | |||||
) | |||||
print( | |||||
"revision_in_origin", | |||||
self.storage.relation_get( | |||||
RelationType.REV_IN_ORG, | |||||
(src for src, _ in self.cache["revision_in_origin"]), | |||||
), | |||||
) | |||||
# Finally, preferred origins for the visited revisions are set (this step can be | |||||
# reordered if required). | |||||
origins = { | |||||
sha1: self.cache["revision_origin"]["data"][sha1] | |||||
for sha1 in self.cache["revision_origin"]["added"] | |||||
} | |||||
while not self.storage.revision_set_origin(origins): | |||||
logging.warning( | |||||
f"Unable to write preferred origins to the storage. " | |||||
f"Data: {origins}. Retrying..." | |||||
) | |||||
# clear local cache ############################################################ | |||||
self.clear_caches() | self.clear_caches() | ||||
def content_add_to_directory( | def content_add_to_directory( | ||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | ||||
) -> None: | ) -> None: | ||||
self.cache["content_in_directory"].add( | self.cache["content_in_directory"].add( | ||||
(blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | ||||
) | ) | ||||
Done Inline ActionsI'm not sure I get this chunk of code. What does it do exactly? What's the empty list for? Is it the start value of the sum() function? (If so it's much better to use named argument, much easier to understand). But if I get this right, a more pythonic way (?) could be: cache = self.cache["revision_before_revision"] rbr_data = itertools.chain(*(( (prev, next, None) for next in cache[prev] ) for prev in cache )) douardda: I'm not sure I get this chunk of code. What does it do exactly? What's the empty list for? Is… | |||||
Done Inline ActionsI think this comment is out of place. What empty list do you mean? aeviso: I think this comment is out of place. What empty list do you mean? | |||||
def content_add_to_revision( | def content_add_to_revision( | ||||
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | ||||
) -> None: | ) -> None: | ||||
self.cache["content_in_revision"].add( | self.cache["content_in_revision"].add( | ||||
(blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) | (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) | ||||
) | ) | ||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
Show All 35 Lines | class ProvenanceBackend: | ||||
def directory_set_date_in_isochrone_frontier( | def directory_set_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry, date: datetime | self, directory: DirectoryEntry, date: datetime | ||||
) -> None: | ) -> None: | ||||
self.cache["directory"]["data"][directory.id] = date | self.cache["directory"]["data"][directory.id] = date | ||||
self.cache["directory"]["added"].add(directory.id) | self.cache["directory"]["added"].add(directory.id) | ||||
def get_dates( | def get_dates( | ||||
self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] | self, | ||||
entity: Literal["content", "directory", "revision"], | |||||
ids: Iterable[Sha1Git], | |||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
cache = self.cache[entity] | cache = self.cache[entity] | ||||
missing_ids = set(id for id in ids if id not in cache) | missing_ids = set(id for id in ids if id not in cache) | ||||
if missing_ids: | if missing_ids: | ||||
cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) | if entity == "revision": | ||||
updated = { | |||||
id: date | |||||
for id, (date, _) in self.storage.revision_get(missing_ids).items() | |||||
if date is not None | |||||
} | |||||
else: | |||||
updated = getattr(self.storage, f"{entity}_get")(missing_ids) | |||||
cache["data"].update(updated) | |||||
return { | return { | ||||
sha1: date | sha1: date | ||||
for sha1, date in cache["data"].items() | for sha1, date in cache["data"].items() | ||||
if sha1 in ids and date is not None | if sha1 in ids and date is not None | ||||
} | } | ||||
def origin_add(self, origin: OriginEntry) -> None: | def origin_add(self, origin: OriginEntry) -> None: | ||||
self.cache["origin"]["data"][origin.id] = origin.url | self.cache["origin"]["data"][origin.id] = origin.url | ||||
Show All 16 Lines | ) -> None: | ||||
self.cache["revision_in_origin"].add((revision.id, origin.id)) | self.cache["revision_in_origin"].add((revision.id, origin.id)) | ||||
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: | def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: | ||||
return self.get_dates("revision", [revision.id]).get(revision.id, None) | return self.get_dates("revision", [revision.id]).get(revision.id, None) | ||||
def revision_get_preferred_origin( | def revision_get_preferred_origin( | ||||
self, revision: RevisionEntry | self, revision: RevisionEntry | ||||
) -> Optional[Sha1Git]: | ) -> Optional[Sha1Git]: | ||||
cache = self.cache["revision_origin"] | cache = self.cache["revision_origin"]["data"] | ||||
if revision.id not in cache: | if revision.id not in cache: | ||||
origin = self.storage.revision_get_preferred_origin(revision.id) | ret = self.storage.revision_get([revision.id]) | ||||
if revision.id in ret: | |||||
origin = ret[revision.id][1] # TODO: make this not a tuple | |||||
if origin is not None: | if origin is not None: | ||||
cache["data"][revision.id] = origin | cache[revision.id] = origin | ||||
return cache["data"].get(revision.id) | return cache.get(revision.id) | ||||
def revision_in_history(self, revision: RevisionEntry) -> bool: | def revision_in_history(self, revision: RevisionEntry) -> bool: | ||||
return revision.id in self.cache[ | return revision.id in self.cache["revision_before_revision"] or bool( | ||||
"revision_before_revision" | self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) | ||||
] or self.storage.revision_in_history(revision.id) | ) | ||||
Done Inline ActionsThe explicit bool cast is a bit weird here. Since the methods returns a set(), I would find it more readable to compare the results with an empty set or use the length(). douardda: The explicit `bool` cast is a bit weird here. Since the methods returns a set(), I would find… | |||||
def revision_set_preferred_origin( | def revision_set_preferred_origin( | ||||
self, origin: OriginEntry, revision: RevisionEntry | self, origin: OriginEntry, revision: RevisionEntry | ||||
) -> None: | ) -> None: | ||||
self.cache["revision_origin"]["data"][revision.id] = origin.id | self.cache["revision_origin"]["data"][revision.id] = origin.id | ||||
self.cache["revision_origin"]["added"].add(revision.id) | self.cache["revision_origin"]["added"].add(revision.id) | ||||
def revision_visited(self, revision: RevisionEntry) -> bool: | def revision_visited(self, revision: RevisionEntry) -> bool: | ||||
return revision.id in dict( | return revision.id in dict(self.cache["revision_in_origin"]) or bool( | ||||
self.cache["revision_in_origin"] | self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) | ||||
) or self.storage.revision_visited(revision.id) | ) | ||||
def normalize(path: bytes) -> bytes: | def normalize(path: bytes) -> bytes: | ||||
return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path | return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path |
This could probably be factorized. Also are you sure you want to write the whole content of the cache in the logs? Might generate a lots of logs...