Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/writer.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from typing import Dict, Generator, Iterable, Union | |||||
from swh.model.model import Content | |||||
try: | |||||
from swh.journal.writer import get_journal_writer | |||||
except ImportError: | |||||
get_journal_writer = None # type: ignore | |||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | |||||
def to_one_dict(obj): | |||||
"""Convert dag object element to a dict | |||||
""" | |||||
return obj if not hasattr(obj, 'to_dict') else obj.to_dict() | |||||
def to_dict(list_or_gen: Iterable) -> Generator: | |||||
"""Convert the dag object list (or generator) to dict | |||||
""" | |||||
for obj in list_or_gen: | |||||
yield to_one_dict(obj) | |||||
vlorentz: You don't need this, swh-journal already handles swh-model objects. | |||||
ardumontAuthorUnsubmitted Done Inline ActionsNeat. ardumont: Neat.
Thanks.
Adapted. | |||||
class JournalWriter: | |||||
"""Journal writer storage collaborator. It's in charge of adding objects to | |||||
the journal. | |||||
""" | |||||
def __init__(self, journal_writer): | |||||
if journal_writer: | |||||
if get_journal_writer is None: | |||||
raise EnvironmentError( | |||||
'You need the swh.journal package to use the ' | |||||
'journal_writer feature') | |||||
self.journal = get_journal_writer(**journal_writer) | |||||
else: | |||||
self.journal = None | |||||
def content_add(self, contents: Iterable[Union[Dict, Content]]) -> None: | |||||
"""Add contents to the journal. Drop the data field if provided. | |||||
""" | |||||
if not self.journal: | |||||
return | |||||
for item in to_dict(contents): | |||||
if 'data' in item: | |||||
item = item.copy() | |||||
del item['data'] | |||||
self.journal.write_addition('content', item) | |||||
def content_update(self, contents: Iterable[Union[Dict, Content]]) -> None: | |||||
if not self.journal: | |||||
return | |||||
raise NotImplementedError( | |||||
'content_update is not yet supported with a journal writer.') | |||||
def content_add_metadata( | |||||
self, contents: Iterable[Union[Dict, Content]]) -> None: | |||||
return self.content_add(contents) | |||||
def skipped_content_add( | |||||
self, contents: Iterable[Union[Dict, Content]]) -> None: | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_additions('content', to_dict(contents)) | |||||
def directory_add(self, directories): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_additions('directory', to_dict(directories)) | |||||
def revision_add(self, revisions): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_additions('revision', to_dict(revisions)) | |||||
def release_add(self, releases): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_additions('release', to_dict(releases)) | |||||
def snapshot_add(self, snapshots): | |||||
if not self.journal: | |||||
return | |||||
snaps = snapshots if isinstance(snapshots, list) else [snapshots] | |||||
self.journal.write_additions('snapshot', to_dict(snaps)) | |||||
def origin_visit_add(self, visit): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_addition('origin_visit', to_one_dict(visit)) | |||||
def origin_visit_update(self, visit): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_update('origin_visit', to_one_dict(visit)) | |||||
def origin_visit_upsert(self, visits): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_additions('origin_visit', to_dict(visits)) | |||||
def origin_add_one(self, origin): | |||||
if not self.journal: | |||||
return | |||||
self.journal.write_addition('origin', to_one_dict(origin)) |
You don't need this, swh-journal already handles swh-model objects.