Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338505
metadata_dictionary.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
metadata_dictionary.py
View Options
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
os
import
re
import
abc
import
json
import
logging
import
xmltodict
from
swh.indexer.codemeta
import
CROSSWALK_TABLE
,
SCHEMA_URI
from
swh.indexer.codemeta
import
compact
,
expand
MAPPINGS
=
{}
def
register_mapping
(
cls
):
MAPPINGS
[
cls
.
__name__
]
=
cls
()
return
cls
class
BaseMapping
(
metaclass
=
abc
.
ABCMeta
):
"""Base class for mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
def
__init__
(
self
):
self
.
log
=
logging
.
getLogger
(
'
%s
.
%s
'
%
(
self
.
__class__
.
__module__
,
self
.
__class__
.
__name__
))
@abc.abstractmethod
def
detect_metadata_files
(
self
,
files
):
"""
Detects files potentially containing metadata
Args:
- file_entries (list): list of files
Returns:
- empty list if nothing was found
- list of sha1 otherwise
"""
pass
@abc.abstractmethod
def
translate
(
self
,
file_content
):
pass
def
normalize_translation
(
self
,
metadata
):
return
compact
(
metadata
)
class
SingleFileMapping
(
BaseMapping
):
"""Base class for all mappings that use a single file as input."""
@property
@abc.abstractmethod
def
filename
(
self
):
"""The .json file to extract metadata from."""
pass
def
detect_metadata_files
(
self
,
file_entries
):
for
entry
in
file_entries
:
if
entry
[
'name'
]
==
self
.
filename
:
return
[
entry
[
'sha1'
]]
return
[]
class
DictMapping
(
BaseMapping
):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
@property
@abc.abstractmethod
def
mapping
(
self
):
"""A translation dict to map dict keys into a canonical name."""
pass
def
translate_dict
(
self
,
content_dict
,
*
,
normalize
=
True
):
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict)
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
translated_metadata
=
{
'@type'
:
SCHEMA_URI
+
'SoftwareSourceCode'
}
for
k
,
v
in
content_dict
.
items
():
# First, check if there is a specific translation
# method for this key
translation_method
=
getattr
(
self
,
'translate_'
+
k
,
None
)
if
translation_method
:
translation_method
(
translated_metadata
,
v
)
elif
k
in
self
.
mapping
:
# if there is no method, but the key is known from the
# crosswalk table
# if there is a normalization method, use it on the value
normalization_method
=
getattr
(
self
,
'normalize_'
+
k
,
None
)
if
normalization_method
:
v
=
normalization_method
(
v
)
# set the translation metadata with the normalized value
translated_metadata
[
self
.
mapping
[
k
]]
=
v
if
normalize
:
return
self
.
normalize_translation
(
translated_metadata
)
else
:
return
translated_metadata
class
JsonMapping
(
DictMapping
,
SingleFileMapping
):
"""Base class for all mappings that use a JSON file as input."""
def
translate
(
self
,
raw_content
):
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
Args:
raw_content: bytes
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
try
:
raw_content
=
raw_content
.
decode
()
except
UnicodeDecodeError
:
self
.
log
.
warning
(
'Error unidecoding
%r
'
,
raw_content
)
return
try
:
content_dict
=
json
.
loads
(
raw_content
)
except
json
.
JSONDecodeError
:
self
.
log
.
warning
(
'Error unjsoning
%r
'
%
raw_content
)
return
return
self
.
translate_dict
(
content_dict
)
@register_mapping
class
NpmMapping
(
JsonMapping
):
"""
dedicated class for NPM (package.json) mapping and translation
"""
mapping
=
CROSSWALK_TABLE
[
'NodeJS'
]
filename
=
b
'package.json'
_schema_shortcuts
=
{
'github'
:
'https://github.com/'
,
'gist'
:
'https://gist.github.com/'
,
'bitbucket'
:
'https://bitbucket.org/'
,
'gitlab'
:
'https://gitlab.com/'
,
}
def
normalize_repository
(
self
,
d
):
"""https://docs.npmjs.com/files/package.json#repository"""
if
isinstance
(
d
,
dict
):
return
'{type}+{url}'
.
format
(
**
d
)
elif
isinstance
(
d
,
str
):
if
'://'
in
d
:
return
d
elif
':'
in
d
:
(
schema
,
rest
)
=
d
.
split
(
':'
,
1
)
if
schema
in
self
.
_schema_shortcuts
:
return
self
.
_schema_shortcuts
[
schema
]
+
rest
else
:
return
None
else
:
return
self
.
_schema_shortcuts
[
'github'
]
+
d
else
:
return
None
def
normalize_bugs
(
self
,
d
):
return
'{url}'
.
format
(
**
d
)
_parse_author
=
re
.
compile
(
r'^ *'
r'(?P<name>.*?)'
r'( +<(?P<email>.*)>)?'
r'( +\((?P<url>.*)\))?'
r' *$'
)
def
normalize_author
(
self
,
d
):
'https://docs.npmjs.com/files/package.json'
\
'#people-fields-author-contributors'
author
=
{
'@type'
:
SCHEMA_URI
+
'Person'
}
if
isinstance
(
d
,
dict
):
name
=
d
.
get
(
'name'
,
None
)
email
=
d
.
get
(
'email'
,
None
)
url
=
d
.
get
(
'url'
,
None
)
elif
isinstance
(
d
,
str
):
match
=
self
.
_parse_author
.
match
(
d
)
name
=
match
.
group
(
'name'
)
email
=
match
.
group
(
'email'
)
url
=
match
.
group
(
'url'
)
else
:
return
None
if
name
:
author
[
SCHEMA_URI
+
'name'
]
=
name
if
email
:
author
[
SCHEMA_URI
+
'email'
]
=
email
if
url
:
author
[
SCHEMA_URI
+
'url'
]
=
url
return
author
@register_mapping
class
CodemetaMapping
(
SingleFileMapping
):
"""
dedicated class for CodeMeta (codemeta.json) mapping and translation
"""
filename
=
b
'codemeta.json'
def
translate
(
self
,
content
):
return
self
.
normalize_translation
(
expand
(
json
.
loads
(
content
.
decode
())))
@register_mapping
class
MavenMapping
(
DictMapping
,
SingleFileMapping
):
"""
dedicated class for Maven (pom.xml) mapping and translation
"""
filename
=
b
'pom.xml'
mapping
=
CROSSWALK_TABLE
[
'Java (Maven)'
]
def
translate
(
self
,
content
):
d
=
xmltodict
.
parse
(
content
)[
'project'
]
metadata
=
self
.
translate_dict
(
d
,
normalize
=
False
)
metadata
[
SCHEMA_URI
+
'codeRepository'
]
=
self
.
parse_repositories
(
d
)
return
self
.
normalize_translation
(
metadata
)
_default_repository
=
{
'url'
:
'https://repo.maven.apache.org/maven2/'
}
def
parse_repositories
(
self
,
d
):
"""https://maven.apache.org/pom.html#Repositories"""
if
'repositories'
not
in
d
:
return
[
self
.
parse_repository
(
d
,
self
.
_default_repository
)]
else
:
repositories
=
d
[
'repositories'
]
.
get
(
'repository'
,
[])
if
not
isinstance
(
repositories
,
list
):
repositories
=
[
repositories
]
results
=
[]
for
repo
in
repositories
:
res
=
self
.
parse_repository
(
d
,
repo
)
if
res
:
results
.
append
(
res
)
return
results
def
parse_repository
(
self
,
d
,
repo
):
if
repo
.
get
(
'layout'
,
'default'
)
!=
'default'
:
return
# TODO ?
url
=
repo
[
'url'
]
if
d
[
'groupId'
]:
url
=
os
.
path
.
join
(
url
,
*
d
[
'groupId'
]
.
split
(
'.'
))
if
d
[
'artifactId'
]:
url
=
os
.
path
.
join
(
url
,
d
[
'artifactId'
])
return
url
def
main
():
raw_content
=
"""{"name": "test_name", "unknown_term": "ut"}"""
raw_content1
=
b
"""{"name": "test_name",
"unknown_term": "ut",
"prerequisites" :"packageXYZ"}"""
result
=
MAPPINGS
[
"NpmMapping"
]
.
translate
(
raw_content
)
result1
=
MAPPINGS
[
"MavenMapping"
]
.
translate
(
raw_content1
)
print
(
result
)
print
(
result1
)
if
__name__
==
"__main__"
:
main
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 8:52 AM (6 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3360158
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment