Page MenuHomeSoftware Heritage

writer.py
No OneTemporary

writer.py

# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Iterable, Union
from attr import evolve
from swh.model.model import (
Origin, OriginVisit, Snapshot, Directory, Revision, Release,
Content, SkippedContent,
)
try:
from swh.journal.writer import get_journal_writer
except ImportError:
get_journal_writer = None # type: ignore
# mypy limitation, see https://github.com/python/mypy/issues/1153
class JournalWriter:
"""Journal writer storage collaborator. It's in charge of adding objects to
the journal.
"""
def __init__(self, journal_writer):
if journal_writer:
if get_journal_writer is None:
raise EnvironmentError(
'You need the swh.journal package to use the '
'journal_writer feature')
self.journal = get_journal_writer(**journal_writer)
else:
self.journal = None
def content_add(self, contents: Iterable[Content]) -> None:
"""Add contents to the journal. Drop the data field if provided.
"""
if not self.journal:
return
contents = [evolve(item, data=None) for item in contents]
self.journal.write_additions('content', contents)
def content_update(self, contents: Iterable[Content]) -> None:
if not self.journal:
return
raise NotImplementedError(
'content_update is not yet supported with a journal writer.')
def content_add_metadata(
self, contents: Iterable[Content]) -> None:
return self.content_add(contents)
def skipped_content_add(
self, contents: Iterable[SkippedContent]) -> None:
if not self.journal:
return
self.journal.write_additions('content', contents)
def directory_add(self, directories: Iterable[Directory]) -> None:
if not self.journal:
return
self.journal.write_additions('directory', directories)
def revision_add(self, revisions: Iterable[Revision]) -> None:
if not self.journal:
return
self.journal.write_additions('revision', revisions)
def release_add(self, releases: Iterable[Release]) -> None:
if not self.journal:
return
self.journal.write_additions('release', releases)
def snapshot_add(
self, snapshots: Union[Iterable[Snapshot], Snapshot]) -> None:
if not self.journal:
return
snaps = snapshots if isinstance(snapshots, list) else [snapshots]
self.journal.write_additions('snapshot', snaps)
def origin_visit_add(self, visit: OriginVisit):
if not self.journal:
return
self.journal.write_addition('origin_visit', visit)
def origin_visit_update(self, visit: OriginVisit):
if not self.journal:
return
self.journal.write_update('origin_visit', visit)
def origin_visit_upsert(self, visits: Iterable[OriginVisit]):
if not self.journal:
return
self.journal.write_additions('origin_visit', visits)
def origin_add_one(self, origin: Origin):
if not self.journal:
return
self.journal.write_addition('origin', origin)

File Metadata

Mime Type
text/x-python
Expires
Fri, Jul 4, 3:31 PM (1 w, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3347899

Event Timeline