Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/writer.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterable, Union | from typing import Iterable | ||||
from attr import evolve | from attr import evolve | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
Snapshot, | Snapshot, | ||||
Directory, | Directory, | ||||
Show All 22 Lines | def __init__(self, journal_writer): | ||||
raise EnvironmentError( | raise EnvironmentError( | ||||
"You need the swh.journal package to use the " | "You need the swh.journal package to use the " | ||||
"journal_writer feature" | "journal_writer feature" | ||||
) | ) | ||||
self.journal = get_journal_writer(**journal_writer) | self.journal = get_journal_writer(**journal_writer) | ||||
else: | else: | ||||
self.journal = None | self.journal = None | ||||
def write_additions(self, obj_type, values) -> None: | |||||
if self.journal: | |||||
self.journal.write_additions(obj_type, values) | |||||
def content_add(self, contents: Iterable[Content]) -> None: | def content_add(self, contents: Iterable[Content]) -> None: | ||||
"""Add contents to the journal. Drop the data field if provided. | """Add contents to the journal. Drop the data field if provided. | ||||
""" | """ | ||||
if not self.journal: | |||||
return | |||||
contents = [evolve(item, data=None) for item in contents] | contents = [evolve(item, data=None) for item in contents] | ||||
self.journal.write_additions("content", contents) | self.write_additions("content", contents) | ||||
def content_update(self, contents: Iterable[Content]) -> None: | def content_update(self, contents: Iterable[Content]) -> None: | ||||
if not self.journal: | if self.journal: | ||||
return | raise NotImplementedError("content_update is not supported by the journal.") | ||||
raise NotImplementedError( | |||||
"content_update is not yet supported with a journal writer." | |||||
) | |||||
def content_add_metadata(self, contents: Iterable[Content]) -> None: | def content_add_metadata(self, contents: Iterable[Content]) -> None: | ||||
return self.content_add(contents) | self.content_add(contents) | ||||
def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None: | def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None: | ||||
if not self.journal: | self.write_additions("content", contents) | ||||
return | |||||
self.journal.write_additions("content", contents) | |||||
def directory_add(self, directories: Iterable[Directory]) -> None: | def directory_add(self, directories: Iterable[Directory]) -> None: | ||||
if not self.journal: | self.write_additions("directory", directories) | ||||
return | |||||
self.journal.write_additions("directory", directories) | |||||
def revision_add(self, revisions: Iterable[Revision]) -> None: | def revision_add(self, revisions: Iterable[Revision]) -> None: | ||||
if not self.journal: | self.write_additions("revision", revisions) | ||||
return | |||||
self.journal.write_additions("revision", revisions) | |||||
def release_add(self, releases: Iterable[Release]) -> None: | def release_add(self, releases: Iterable[Release]) -> None: | ||||
if not self.journal: | self.write_additions("release", releases) | ||||
return | |||||
self.journal.write_additions("release", releases) | def snapshot_add(self, snapshots: Iterable[Snapshot]) -> None: | ||||
self.write_additions("snapshot", snapshots) | |||||
def snapshot_add(self, snapshots: Union[Iterable[Snapshot], Snapshot]) -> None: | |||||
if not self.journal: | def origin_visit_add(self, visits: Iterable[OriginVisit]) -> None: | ||||
return | self.write_additions("origin_visit", visits) | ||||
snaps = snapshots if isinstance(snapshots, list) else [snapshots] | |||||
self.journal.write_additions("snapshot", snaps) | def origin_visit_update(self, visits: Iterable[OriginVisit]) -> None: | ||||
self.write_additions("origin_visit", visits) | |||||
def origin_visit_add(self, visit: OriginVisit): | |||||
if not self.journal: | def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | ||||
return | self.write_additions("origin_visit", visits) | ||||
self.journal.write_addition("origin_visit", visit) | |||||
def origin_add(self, origins: Iterable[Origin]) -> None: | |||||
def origin_visit_update(self, visit: OriginVisit): | self.write_additions("origin", origins) | ||||
if not self.journal: | |||||
return | |||||
self.journal.write_update("origin_visit", visit) | |||||
def origin_visit_upsert(self, visits: Iterable[OriginVisit]): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_additions("origin_visit", visits) | |||||
def origin_add_one(self, origin: Origin): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_addition("origin", origin) |