Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345793
writer.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
3 KB
Subscribers
None
writer.py
View Options
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
typing
import
Iterable
,
Union
from
attr
import
evolve
from
swh.model.model
import
(
Origin
,
OriginVisit
,
Snapshot
,
Directory
,
Revision
,
Release
,
Content
,
SkippedContent
,
)
try
:
from
swh.journal.writer
import
get_journal_writer
except
ImportError
:
get_journal_writer
=
None
# type: ignore
# mypy limitation, see https://github.com/python/mypy/issues/1153
class
JournalWriter
:
"""Journal writer storage collaborator. It's in charge of adding objects to
the journal.
"""
def
__init__
(
self
,
journal_writer
):
if
journal_writer
:
if
get_journal_writer
is
None
:
raise
EnvironmentError
(
'You need the swh.journal package to use the '
'journal_writer feature'
)
self
.
journal
=
get_journal_writer
(
**
journal_writer
)
else
:
self
.
journal
=
None
def
content_add
(
self
,
contents
:
Iterable
[
Content
])
->
None
:
"""Add contents to the journal. Drop the data field if provided.
"""
if
not
self
.
journal
:
return
contents
=
[
evolve
(
item
,
data
=
None
)
for
item
in
contents
]
self
.
journal
.
write_additions
(
'content'
,
contents
)
def
content_update
(
self
,
contents
:
Iterable
[
Content
])
->
None
:
if
not
self
.
journal
:
return
raise
NotImplementedError
(
'content_update is not yet supported with a journal writer.'
)
def
content_add_metadata
(
self
,
contents
:
Iterable
[
Content
])
->
None
:
return
self
.
content_add
(
contents
)
def
skipped_content_add
(
self
,
contents
:
Iterable
[
SkippedContent
])
->
None
:
if
not
self
.
journal
:
return
self
.
journal
.
write_additions
(
'content'
,
contents
)
def
directory_add
(
self
,
directories
:
Iterable
[
Directory
])
->
None
:
if
not
self
.
journal
:
return
self
.
journal
.
write_additions
(
'directory'
,
directories
)
def
revision_add
(
self
,
revisions
:
Iterable
[
Revision
])
->
None
:
if
not
self
.
journal
:
return
self
.
journal
.
write_additions
(
'revision'
,
revisions
)
def
release_add
(
self
,
releases
:
Iterable
[
Release
])
->
None
:
if
not
self
.
journal
:
return
self
.
journal
.
write_additions
(
'release'
,
releases
)
def
snapshot_add
(
self
,
snapshots
:
Union
[
Iterable
[
Snapshot
],
Snapshot
])
->
None
:
if
not
self
.
journal
:
return
snaps
=
snapshots
if
isinstance
(
snapshots
,
list
)
else
[
snapshots
]
self
.
journal
.
write_additions
(
'snapshot'
,
snaps
)
def
origin_visit_add
(
self
,
visit
:
OriginVisit
):
if
not
self
.
journal
:
return
self
.
journal
.
write_addition
(
'origin_visit'
,
visit
)
def
origin_visit_update
(
self
,
visit
:
OriginVisit
):
if
not
self
.
journal
:
return
self
.
journal
.
write_update
(
'origin_visit'
,
visit
)
def
origin_visit_upsert
(
self
,
visits
:
Iterable
[
OriginVisit
]):
if
not
self
.
journal
:
return
self
.
journal
.
write_additions
(
'origin_visit'
,
visits
)
def
origin_add_one
(
self
,
origin
:
Origin
):
if
not
self
.
journal
:
return
self
.
journal
.
write_addition
(
'origin'
,
origin
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 3:31 PM (1 w, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3347899
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment