Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/writer.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterable | from typing import Any, Iterable | ||||
from attr import evolve | from attr import asdict, evolve, has | ||||
from hashlib import sha256 | |||||
from swh.model.model import ( | from swh.model import model | ||||
Origin, | |||||
OriginVisit, | |||||
Snapshot, | |||||
Directory, | |||||
Revision, | |||||
Release, | |||||
Content, | |||||
SkippedContent, | |||||
) | |||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
def anonymize(obj: Any) -> Any: | |||||
"""Anonymize the model obj | |||||
rebuild an obj entity with all instances of Person (recursively) present | |||||
in ``obj`` replaced with an anonymized version of the Person object. | |||||
An anonymized ``Person`` is built by hashing the original Person's values (fullname | |||||
+ name + email) and use this hash value as fullname. | |||||
""" | |||||
if not has(obj): | |||||
return obj | |||||
if isinstance(obj, model.Person): | |||||
tohash = obj.fullname + (obj.name or b"") + (obj.email or b"") | |||||
return model.Person(fullname=sha256(tohash).digest(), name=None, email=None) | |||||
return evolve( | |||||
obj, **{k: anonymize(v) for (k, v) in asdict(obj, recurse=False).items()} | |||||
) | |||||
class JournalWriter: | class JournalWriter: | ||||
"""Journal writer storage collaborator. It's in charge of adding objects to | """Journal writer storage collaborator. It's in charge of adding objects to | ||||
the journal. | the journal. | ||||
""" | """ | ||||
anonymizable = ["release", "revision"] | |||||
def __init__(self, journal_writer): | def __init__(self, journal_writer): | ||||
if journal_writer: | if journal_writer: | ||||
self.anonymize = journal_writer.pop("anonymize", False) | |||||
if get_journal_writer is None: | if get_journal_writer is None: | ||||
raise EnvironmentError( | raise EnvironmentError( | ||||
"You need the swh.journal package to use the " | "You need the swh.journal package to use the " | ||||
"journal_writer feature" | "journal_writer feature" | ||||
) | ) | ||||
self.journal = get_journal_writer(**journal_writer) | self.journal = get_journal_writer(**journal_writer) | ||||
else: | else: | ||||
self.journal = None | self.journal = None | ||||
self.anonymize = False | |||||
def write_additions(self, obj_type, values) -> None: | def write_additions(self, obj_type, values) -> None: | ||||
if self.journal: | if self.journal: | ||||
if obj_type not in self.anonymizable or not self.anonymize: | |||||
self.journal.write_additions(obj_type, values) | self.journal.write_additions(obj_type, values) | ||||
else: | |||||
self.journal.write_additions( | |||||
obj_type, [anonymize(value) for value in values], privileged=False | |||||
) | |||||
self.journal.write_additions(obj_type, values, privileged=True) | |||||
def content_add(self, contents: Iterable[Content]) -> None: | def content_add(self, contents: Iterable[model.Content]) -> None: | ||||
"""Add contents to the journal. Drop the data field if provided. | """Add contents to the journal. Drop the data field if provided. | ||||
""" | """ | ||||
contents = [evolve(item, data=None) for item in contents] | contents = [evolve(item, data=None) for item in contents] | ||||
self.write_additions("content", contents) | self.write_additions("content", contents) | ||||
def content_update(self, contents: Iterable[Content]) -> None: | def content_update(self, contents: Iterable[model.Content]) -> None: | ||||
if self.journal: | if self.journal: | ||||
raise NotImplementedError("content_update is not supported by the journal.") | raise NotImplementedError("content_update is not supported by the journal.") | ||||
def content_add_metadata(self, contents: Iterable[Content]) -> None: | def content_add_metadata(self, contents: Iterable[model.Content]) -> None: | ||||
self.content_add(contents) | self.content_add(contents) | ||||
def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None: | def skipped_content_add(self, contents: Iterable[model.SkippedContent]) -> None: | ||||
self.write_additions("skipped_content", contents) | self.write_additions("skipped_content", contents) | ||||
def directory_add(self, directories: Iterable[Directory]) -> None: | def directory_add(self, directories: Iterable[model.Directory]) -> None: | ||||
self.write_additions("directory", directories) | self.write_additions("directory", directories) | ||||
def revision_add(self, revisions: Iterable[Revision]) -> None: | def revision_add(self, revisions: Iterable[model.Revision]) -> None: | ||||
self.write_additions("revision", revisions) | self.write_additions("revision", revisions) | ||||
def release_add(self, releases: Iterable[Release]) -> None: | def release_add(self, releases: Iterable[model.Release]) -> None: | ||||
self.write_additions("release", releases) | self.write_additions("release", releases) | ||||
def snapshot_add(self, snapshots: Iterable[Snapshot]) -> None: | def snapshot_add(self, snapshots: Iterable[model.Snapshot]) -> None: | ||||
self.write_additions("snapshot", snapshots) | self.write_additions("snapshot", snapshots) | ||||
def origin_visit_add(self, visits: Iterable[OriginVisit]) -> None: | def origin_visit_add(self, visits: Iterable[model.OriginVisit]) -> None: | ||||
self.write_additions("origin_visit", visits) | self.write_additions("origin_visit", visits) | ||||
def origin_visit_update(self, visits: Iterable[OriginVisit]) -> None: | def origin_visit_update(self, visits: Iterable[model.OriginVisit]) -> None: | ||||
self.write_additions("origin_visit", visits) | self.write_additions("origin_visit", visits) | ||||
def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | def origin_visit_upsert(self, visits: Iterable[model.OriginVisit]) -> None: | ||||
self.write_additions("origin_visit", visits) | self.write_additions("origin_visit", visits) | ||||
def origin_add(self, origins: Iterable[Origin]) -> None: | def origin_add(self, origins: Iterable[model.Origin]) -> None: | ||||
self.write_additions("origin", origins) | self.write_additions("origin", origins) |