Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7450753
converters.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Subscribers
None
converters.py
View Options
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
datetime
from
swh.core.utils
import
decode_with_escape
,
encode_with_unescape
from
swh.model
import
identifiers
DEFAULT_AUTHOR
=
{
'fullname'
:
None
,
'name'
:
None
,
'email'
:
None
,
}
DEFAULT_DATE
=
{
'timestamp'
:
None
,
'offset'
:
0
,
'neg_utc_offset'
:
None
,
}
def
author_to_db
(
author
):
"""Convert a swh-model author to its DB representation.
Args:
author: a :mod:`swh.model` compatible author
Returns:
dict: a dictionary with three keys: author, fullname and email
"""
if
author
is
None
:
return
DEFAULT_AUTHOR
return
author
def
db_to_author
(
id
,
fullname
,
name
,
email
):
"""Convert the DB representation of an author to a swh-model author.
Args:
id (long): the author's identifier
fullname (bytes): the author's fullname
name (bytes): the author's name
email (bytes): the author's email
Returns:
dict: a dictionary with four keys: id, fullname, name and email, or
None if the id is None
"""
if
id
is
None
:
return
None
return
{
'id'
:
id
,
'fullname'
:
fullname
,
'name'
:
name
,
'email'
:
email
,
}
def
git_headers_to_db
(
git_headers
):
"""Convert git headers to their database representation.
We convert the bytes to unicode by decoding them into utf-8 and replacing
invalid utf-8 sequences with backslash escapes.
"""
ret
=
[]
for
key
,
values
in
git_headers
:
if
isinstance
(
values
,
list
):
ret
.
append
([
key
,
[
decode_with_escape
(
value
)
for
value
in
values
]])
else
:
ret
.
append
([
key
,
decode_with_escape
(
values
)])
return
ret
def
db_to_git_headers
(
db_git_headers
):
ret
=
[]
for
key
,
values
in
db_git_headers
:
if
isinstance
(
values
,
list
):
ret
.
append
([
key
,
[
encode_with_unescape
(
value
)
for
value
in
values
]])
else
:
ret
.
append
([
key
,
encode_with_unescape
(
values
)])
return
ret
def
db_to_date
(
date
,
offset
,
neg_utc_offset
):
"""Convert the DB representation of a date to a swh-model compatible date.
Args:
date (datetime.datetime): a date pulled out of the database
offset (int): an integer number of minutes representing an UTC offset
neg_utc_offset (boolean): whether an utc offset is negative
Returns:
dict: a dict with three keys:
- timestamp: a timestamp from UTC
- offset: the number of minutes since UTC
- negative_utc: whether a null UTC offset is negative
"""
if
date
is
None
:
return
None
return
{
'timestamp'
:
{
'seconds'
:
int
(
date
.
timestamp
()),
'microseconds'
:
date
.
microsecond
,
},
'offset'
:
offset
,
'negative_utc'
:
neg_utc_offset
,
}
def
date_to_db
(
date_offset
):
"""Convert a swh-model date_offset to its DB representation.
Args:
date_offset: a :mod:`swh.model` compatible date_offset
Returns:
dict: a dictionary with three keys:
- timestamp: a date in ISO format
- offset: the UTC offset in minutes
- neg_utc_offset: a boolean indicating whether a null offset is
negative or positive.
"""
if
date_offset
is
None
:
return
DEFAULT_DATE
normalized
=
identifiers
.
normalize_timestamp
(
date_offset
)
ts
=
normalized
[
'timestamp'
]
seconds
=
ts
.
get
(
'seconds'
,
0
)
microseconds
=
ts
.
get
(
'microseconds'
,
0
)
timestamp
=
datetime
.
datetime
.
fromtimestamp
(
seconds
,
datetime
.
timezone
.
utc
)
timestamp
=
timestamp
.
replace
(
microsecond
=
microseconds
)
return
{
# PostgreSQL supports isoformatted timestamps
'timestamp'
:
timestamp
.
isoformat
(),
'offset'
:
normalized
[
'offset'
],
'neg_utc_offset'
:
normalized
[
'negative_utc'
],
}
def
revision_to_db
(
revision
):
"""Convert a swh-model revision to its database representation.
"""
author
=
author_to_db
(
revision
[
'author'
])
date
=
date_to_db
(
revision
[
'date'
])
committer
=
author_to_db
(
revision
[
'committer'
])
committer_date
=
date_to_db
(
revision
[
'committer_date'
])
metadata
=
revision
[
'metadata'
]
if
metadata
and
'extra_headers'
in
metadata
:
metadata
=
metadata
.
copy
()
extra_headers
=
git_headers_to_db
(
metadata
[
'extra_headers'
])
metadata
[
'extra_headers'
]
=
extra_headers
return
{
'id'
:
revision
[
'id'
],
'author_fullname'
:
author
[
'fullname'
],
'author_name'
:
author
[
'name'
],
'author_email'
:
author
[
'email'
],
'date'
:
date
[
'timestamp'
],
'date_offset'
:
date
[
'offset'
],
'date_neg_utc_offset'
:
date
[
'neg_utc_offset'
],
'committer_fullname'
:
committer
[
'fullname'
],
'committer_name'
:
committer
[
'name'
],
'committer_email'
:
committer
[
'email'
],
'committer_date'
:
committer_date
[
'timestamp'
],
'committer_date_offset'
:
committer_date
[
'offset'
],
'committer_date_neg_utc_offset'
:
committer_date
[
'neg_utc_offset'
],
'type'
:
revision
[
'type'
],
'directory'
:
revision
[
'directory'
],
'message'
:
revision
[
'message'
],
'metadata'
:
metadata
,
'synthetic'
:
revision
[
'synthetic'
],
'parents'
:
[
{
'id'
:
revision
[
'id'
],
'parent_id'
:
parent
,
'parent_rank'
:
i
,
}
for
i
,
parent
in
enumerate
(
revision
[
'parents'
])
],
}
def
db_to_revision
(
db_revision
):
"""Convert a database representation of a revision to its swh-model
representation."""
author
=
db_to_author
(
db_revision
[
'author_id'
],
db_revision
[
'author_fullname'
],
db_revision
[
'author_name'
],
db_revision
[
'author_email'
],
)
date
=
db_to_date
(
db_revision
[
'date'
],
db_revision
[
'date_offset'
],
db_revision
[
'date_neg_utc_offset'
],
)
committer
=
db_to_author
(
db_revision
[
'committer_id'
],
db_revision
[
'committer_fullname'
],
db_revision
[
'committer_name'
],
db_revision
[
'committer_email'
],
)
committer_date
=
db_to_date
(
db_revision
[
'committer_date'
],
db_revision
[
'committer_date_offset'
],
db_revision
[
'committer_date_neg_utc_offset'
]
)
metadata
=
db_revision
[
'metadata'
]
if
metadata
and
'extra_headers'
in
metadata
:
extra_headers
=
db_to_git_headers
(
metadata
[
'extra_headers'
])
metadata
[
'extra_headers'
]
=
extra_headers
parents
=
[]
if
'parents'
in
db_revision
:
for
parent
in
db_revision
[
'parents'
]:
if
parent
:
parents
.
append
(
parent
)
ret
=
{
'id'
:
db_revision
[
'id'
],
'author'
:
author
,
'date'
:
date
,
'committer'
:
committer
,
'committer_date'
:
committer_date
,
'type'
:
db_revision
[
'type'
],
'directory'
:
db_revision
[
'directory'
],
'message'
:
db_revision
[
'message'
],
'metadata'
:
metadata
,
'synthetic'
:
db_revision
[
'synthetic'
],
'parents'
:
parents
,
}
if
'object_id'
in
db_revision
:
ret
[
'object_id'
]
=
db_revision
[
'object_id'
]
return
ret
def
release_to_db
(
release
):
"""Convert a swh-model release to its database representation.
"""
author
=
author_to_db
(
release
[
'author'
])
date
=
date_to_db
(
release
[
'date'
])
return
{
'id'
:
release
[
'id'
],
'author_fullname'
:
author
[
'fullname'
],
'author_name'
:
author
[
'name'
],
'author_email'
:
author
[
'email'
],
'date'
:
date
[
'timestamp'
],
'date_offset'
:
date
[
'offset'
],
'date_neg_utc_offset'
:
date
[
'neg_utc_offset'
],
'name'
:
release
[
'name'
],
'target'
:
release
[
'target'
],
'target_type'
:
release
[
'target_type'
],
'comment'
:
release
[
'message'
],
'synthetic'
:
release
[
'synthetic'
],
}
def
db_to_release
(
db_release
):
"""Convert a database representation of a release to its swh-model
representation.
"""
author
=
db_to_author
(
db_release
[
'author_id'
],
db_release
[
'author_fullname'
],
db_release
[
'author_name'
],
db_release
[
'author_email'
],
)
date
=
db_to_date
(
db_release
[
'date'
],
db_release
[
'date_offset'
],
db_release
[
'date_neg_utc_offset'
]
)
ret
=
{
'author'
:
author
,
'date'
:
date
,
'id'
:
db_release
[
'id'
],
'name'
:
db_release
[
'name'
],
'message'
:
db_release
[
'comment'
],
'synthetic'
:
db_release
[
'synthetic'
],
'target'
:
db_release
[
'target'
],
'target_type'
:
db_release
[
'target_type'
],
}
if
'object_id'
in
db_release
:
ret
[
'object_id'
]
=
db_release
[
'object_id'
]
return
ret
def
ctags_to_db
(
ctags
):
"""Convert a ctags entry into a ready ctags entry.
Args:
ctags (dict): ctags entry with the following keys:
- id (bytes): content's identifier
- indexer_configuration_id (int): tool id used to compute ctags
- ctags ([dict]): List of dictionary with the following keys:
- name (str): symbol's name
- kind (str): symbol's kind
- line (int): symbol's line in the content
- language (str): language
Returns:
list: list of ctags entries as dicts with the following keys:
- id (bytes): content's identifier
- name (str): symbol's name
- kind (str): symbol's kind
- language (str): language for that content
- indexer_configuration_id (int): tool id used to compute ctags
"""
id
=
ctags
[
'id'
]
tool_id
=
ctags
[
'indexer_configuration_id'
]
for
ctag
in
ctags
[
'ctags'
]:
yield
{
'id'
:
id
,
'name'
:
ctag
[
'name'
],
'kind'
:
ctag
[
'kind'
],
'line'
:
ctag
[
'line'
],
'lang'
:
ctag
[
'lang'
],
'indexer_configuration_id'
:
tool_id
,
}
def
db_to_ctags
(
ctag
):
"""Convert a ctags entry into a ready ctags entry.
Args:
ctags (dict): ctags entry with the following keys:
- id (bytes): content's identifier
- ctags ([dict]): List of dictionary with the following keys:
- name (str): symbol's name
- kind (str): symbol's kind
- line (int): symbol's line in the content
- language (str): language
Returns:
List of ctags ready entry (dict with the following keys):
- id (bytes): content's identifier
- name (str): symbol's name
- kind (str): symbol's kind
- language (str): language for that content
- tool (dict): tool used to compute the ctags
"""
return
{
'id'
:
ctag
[
'id'
],
'name'
:
ctag
[
'name'
],
'kind'
:
ctag
[
'kind'
],
'line'
:
ctag
[
'line'
],
'lang'
:
ctag
[
'lang'
],
'tool'
:
{
'id'
:
ctag
[
'tool_id'
],
'name'
:
ctag
[
'tool_name'
],
'version'
:
ctag
[
'tool_version'
],
'configuration'
:
ctag
[
'tool_configuration'
]
}
}
def
db_to_mimetype
(
mimetype
):
"""Convert a ctags entry into a ready ctags output.
"""
return
{
'id'
:
mimetype
[
'id'
],
'encoding'
:
mimetype
[
'encoding'
],
'mimetype'
:
mimetype
[
'mimetype'
],
'tool'
:
{
'id'
:
mimetype
[
'tool_id'
],
'name'
:
mimetype
[
'tool_name'
],
'version'
:
mimetype
[
'tool_version'
],
'configuration'
:
mimetype
[
'tool_configuration'
]
}
}
def
db_to_language
(
language
):
"""Convert a language entry into a ready language output.
"""
return
{
'id'
:
language
[
'id'
],
'lang'
:
language
[
'lang'
],
'tool'
:
{
'id'
:
language
[
'tool_id'
],
'name'
:
language
[
'tool_name'
],
'version'
:
language
[
'tool_version'
],
'configuration'
:
language
[
'tool_configuration'
]
}
}
def
db_to_metadata
(
metadata
):
"""Convert a metadata entry into a ready metadata output.
"""
return
{
'id'
:
metadata
[
'id'
],
'translated_metadata'
:
metadata
[
'translated_metadata'
],
'tool'
:
{
'id'
:
metadata
[
'tool_id'
],
'name'
:
metadata
[
'tool_name'
],
'version'
:
metadata
[
'tool_version'
],
'configuration'
:
metadata
[
'tool_configuration'
]
}
}
def
db_to_fossology_license
(
license
):
return
{
'id'
:
license
[
'id'
],
'licenses'
:
license
[
'licenses'
],
'tool'
:
{
'id'
:
license
[
'tool_id'
],
'name'
:
license
[
'tool_name'
],
'version'
:
license
[
'tool_version'
],
'configuration'
:
license
[
'tool_configuration'
],
}
}
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Thu, Apr 17, 8:36 AM (5 d, 16 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3287246
Attached To
rDSTOC swh-storage-cassandra
Event Timeline
Log In to Comment