Changeset View
Changeset View
Standalone View
Standalone View
swh/clearlydefined/mapping_utils.py
Show All 14 Lines | from swh.clearlydefined.error import ( | ||||
InvalidComponents, | InvalidComponents, | ||||
WrongMetadata, | WrongMetadata, | ||||
ToolNotFound, | ToolNotFound, | ||||
NoJsonExtension, | NoJsonExtension, | ||||
RevisionNotFound, | RevisionNotFound, | ||||
ToolNotSupported, | ToolNotSupported, | ||||
) | ) | ||||
def map_sha1_with_swhid(storage, sha1: str) -> Optional[str]: | def map_sha1_with_swhid(storage, sha1: str) -> Optional[str]: | ||||
vlorentz: and undo this change | |||||
Done Inline Actionsmetadata = None, gives error TG1999: metadata = None, gives error | |||||
Done Inline ActionsUse attr.evolve vlorentz: Use `attr.evolve` | |||||
Done Inline ActionsWhere ? In orchestrator ? TG1999: Where ? In orchestrator ? | |||||
Done Inline Actionswhere you need it to be not-None, yes vlorentz: where you need it to be not-`None`, yes | |||||
""" | """ | ||||
Take sha1 and storage as input and give the corresponding | Take sha1 and storage as input and give the corresponding | ||||
swhID for that sha1 | swhID for that sha1 | ||||
""" | """ | ||||
if not sha1: | if not sha1: | ||||
return None | return None | ||||
content = storage.content_get([hash_to_bytes(sha1)])[0] | content = storage.content_get([hash_to_bytes(sha1)])[0] | ||||
if not content: | if not content: | ||||
Show All 12 Lines | def sha1_git_in_revisions(storage, sha1_git: str) -> bool: | ||||
sha1_git_bytes = hash_to_bytes(sha1_git) | sha1_git_bytes = hash_to_bytes(sha1_git) | ||||
missing_revision = storage.revision_missing([sha1_git_bytes]) | missing_revision = storage.revision_missing([sha1_git_bytes]) | ||||
if len(list(missing_revision)) == 0: | if len(list(missing_revision)) == 0: | ||||
return True | return True | ||||
return False | return False | ||||
def map_sha1_and_add_in_data( | def map_sha1_and_add_in_data( | ||||
storage, sha1: Optional[str], data: list, mapping_status=True | storage, sha1: Optional[str], data: list, file: Dict, mapping_status=True | ||||
) -> bool: | ) -> bool: | ||||
if sha1: | if sha1: | ||||
assert isinstance(sha1, str) | assert isinstance(sha1, str) | ||||
swh_id = map_sha1_with_swhid(storage=storage, sha1=sha1) | swh_id = map_sha1_with_swhid(storage=storage, sha1=sha1) | ||||
if swh_id: | if swh_id: | ||||
data.append((swh_id, MetadataTargetType.CONTENT, None)) | data.append((swh_id, MetadataTargetType.CONTENT, None, file)) | ||||
else: | else: | ||||
mapping_status = False | mapping_status = False | ||||
return mapping_status | return mapping_status | ||||
def map_scancode( | def map_scancode( | ||||
storage, metadata_string: str | storage, metadata_string: str | ||||
) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None]]]: | ) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None, Dict]]]: | ||||
""" | """ | ||||
Take metadata_string and storage as input and try to | Take metadata_string and storage as input and try to | ||||
map the sha1 of files with content, return mapping | map the sha1 of files with content, return mapping | ||||
status of harvest (True if able to map every sha1, | status of harvest (True if able to map every sha1, | ||||
False if not able to map every sha1) and | False if not able to map every sha1) and | ||||
data to be written in storage | data to be written in storage | ||||
""" | """ | ||||
metadata = json.loads(metadata_string) | metadata = json.loads(metadata_string) | ||||
content = metadata.get("content") or {} | content = metadata.get("content") or {} | ||||
files = content.get("files") or {} | files = content.get("files") or {} | ||||
mapping_status = True | mapping_status = True | ||||
data: list = [] | data: list = [] | ||||
for file in files: | for file in files: | ||||
sha1 = file.get("sha1") | sha1 = file.get("sha1") | ||||
mapping_status = ( | mapping_status = ( | ||||
map_sha1_and_add_in_data(storage, sha1, data) and mapping_status | map_sha1_and_add_in_data(storage, sha1, data, file) and mapping_status | ||||
) | ) | ||||
return mapping_status, data | return mapping_status, data | ||||
def map_licensee( | def map_licensee( | ||||
storage, metadata_string: str | storage, metadata_string: str | ||||
) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None]]]: | ) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None, Dict]]]: | ||||
""" | """ | ||||
Take metadata_string and storage as input and try to | Take metadata_string and storage as input and try to | ||||
map the sha1 of files with content, return mapping | map the sha1 of files with content, return mapping | ||||
status of harvest (True if able to map every sha1, | status of harvest (True if able to map every sha1, | ||||
False if not able to map every sha1) and | False if not able to map every sha1) and | ||||
data to be written in storage | data to be written in storage | ||||
""" | """ | ||||
metadata = json.loads(metadata_string) | metadata = json.loads(metadata_string) | ||||
licensee = metadata.get("licensee") or {} | licensee = metadata.get("licensee") or {} | ||||
output = licensee.get("output") or {} | output = licensee.get("output") or {} | ||||
content = output.get("content") or {} | content = output.get("content") or {} | ||||
files = content.get("matched_files") or [] | files = content.get("matched_files") or [] | ||||
mapping_status = True | mapping_status = True | ||||
data: list = [] | data: list = [] | ||||
for file in files: | for file in files: | ||||
sha1 = file.get("content_hash") | sha1 = file.get("content_hash") | ||||
mapping_status = ( | mapping_status = ( | ||||
map_sha1_and_add_in_data(storage, sha1, data) and mapping_status | map_sha1_and_add_in_data(storage, sha1, data, file) and mapping_status | ||||
) | ) | ||||
return mapping_status, data | return mapping_status, data | ||||
def map_clearlydefined( | def map_clearlydefined( | ||||
storage, metadata_string: str | storage, metadata_string: str | ||||
) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None]]]: | ) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None, Dict]]]: | ||||
""" | """ | ||||
Take metadata_string and storage as input and try to | Take metadata_string and storage as input and try to | ||||
map the sha1 of files with content, return mapping | map the sha1 of files with content, return mapping | ||||
status of harvest (True if able to map every sha1, | status of harvest (True if able to map every sha1, | ||||
False if not able to map every sha1) and | False if not able to map every sha1) and | ||||
data to be written in storage | data to be written in storage | ||||
""" | """ | ||||
metadata = json.loads(metadata_string) | metadata = json.loads(metadata_string) | ||||
files = metadata.get("files") or [] | files = metadata.get("files") or [] | ||||
mapping_status = True | mapping_status = True | ||||
data: list = [] | data: list = [] | ||||
for file in files: | for file in files: | ||||
hashes = file.get("hashes") or {} | hashes = file.get("hashes") or {} | ||||
sha1 = hashes.get("sha1") | sha1 = hashes.get("sha1") | ||||
mapping_status = ( | mapping_status = ( | ||||
map_sha1_and_add_in_data(storage, sha1, data) and mapping_status | map_sha1_and_add_in_data(storage, sha1, data, file) and mapping_status | ||||
) | ) | ||||
return mapping_status, data | return mapping_status, data | ||||
def map_harvest( | def map_harvest( | ||||
storage, tool: str, metadata_string: str | storage, tool: str, metadata_string: str | ||||
) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None]]]: | ) -> Tuple[bool, List[Tuple[str, MetadataTargetType, None, Dict]]]: | ||||
""" | """ | ||||
Take tool, metadata_string and storage as input and try to | Take tool, metadata_string and storage as input and try to | ||||
map the sha1 of files with content, return status of | map the sha1 of files with content, return status of | ||||
harvest and data to be written in storage | harvest and data to be written in storage | ||||
""" | """ | ||||
tools = { | tools = { | ||||
"scancode": map_scancode, | "scancode": map_scancode, | ||||
"licensee": map_licensee, | "licensee": map_licensee, | ||||
"clearlydefined": map_clearlydefined, | "clearlydefined": map_clearlydefined, | ||||
} | } | ||||
return tools[tool](storage=storage, metadata_string=metadata_string) | return tools[tool](storage=storage, metadata_string=metadata_string) | ||||
def map_definition( | def map_definition( | ||||
storage, metadata_string: str | storage, metadata_string: str | ||||
) -> Optional[Tuple[bool, List[Tuple[str, MetadataTargetType, Optional[Origin]]]]]: | ) -> Optional[ | ||||
Tuple[bool, List[Tuple[str, MetadataTargetType, Optional[Origin], Dict]]] | |||||
]: | |||||
""" | """ | ||||
Take metadata_string and storage as input and try to | Take metadata_string and storage as input and try to | ||||
map the sha1 of defintion with content/ gitSha in revision | map the sha1 of defintion with content/ gitSha in revision | ||||
return None if not able to map | return None if not able to map | ||||
else return data to be written in storage | else return data to be written in storage | ||||
""" | """ | ||||
metadata: Dict[str, Dict[str, Optional[Dict]]] = json.loads(metadata_string) | metadata: Dict[str, Dict[str, Optional[Dict]]] = json.loads(metadata_string) | ||||
described: Dict[str, Optional[Dict[str, Any]]] = metadata.get("described") or {} | described: Dict[str, Optional[Dict[str, Any]]] = metadata.get("described") or {} | ||||
Show All 21 Lines | elif sha1: | ||||
return None | return None | ||||
assert isinstance(swh_id_sha1, str) | assert isinstance(swh_id_sha1, str) | ||||
swh_id = swh_id_sha1 | swh_id = swh_id_sha1 | ||||
metadata_type = MetadataTargetType.CONTENT | metadata_type = MetadataTargetType.CONTENT | ||||
else: | else: | ||||
raise WrongMetadata("Wrong metadata") | raise WrongMetadata("Wrong metadata") | ||||
return True, [(swh_id, metadata_type, origin)] | return True, [(swh_id, metadata_type, origin, metadata)] | ||||
def map_row( | def get_type_of_tool(cd_path) -> str: | ||||
Done Inline Actionsreturn an enum instead. vlorentz: return an enum instead. | |||||
storage, row: tuple | |||||
) -> Union[ | |||||
Optional[Tuple[bool, List[Tuple[str, MetadataTargetType, Optional[Origin]]]]], | |||||
Tuple[bool, List[Tuple[str, MetadataTargetType, None]]], | |||||
]: | |||||
""" | """ | ||||
Take row and storage as input and try to map that row, | Take cd_path as input if cd_path is invalid then raise exception, | ||||
if ID of row is invalid then raise exception, | else return tyoe of tool of that row | ||||
if not able to map that row, then return None | |||||
else return status of that row and data to be written | |||||
in storage | |||||
""" | """ | ||||
cd_path = row[0] | |||||
list_cd_path = cd_path.split("/") | list_cd_path = cd_path.split("/") | ||||
# For example: maven/mavencentral/cobol-parser/abc/0.4.0.json | # For example: maven/mavencentral/cobol-parser/abc/0.4.0.json | ||||
if list_cd_path[4] != "revision": | if list_cd_path[4] != "revision": | ||||
raise RevisionNotFound( | raise RevisionNotFound( | ||||
"Not a supported/known ID, A valid ID should have" | "Not a supported/known ID, A valid ID should have" | ||||
'5th component as "revision".' | '5th component as "revision".' | ||||
) | ) | ||||
# For example: maven/mavencentral/cobol-parser/revision/0.4.0.txt | # For example: maven/mavencentral/cobol-parser/revision/0.4.0.txt | ||||
if not list_cd_path[-1].endswith(".json"): | if not list_cd_path[-1].endswith(".json"): | ||||
raise NoJsonExtension( | raise NoJsonExtension( | ||||
'Not a supported/known ID, A valid ID should end with ".json" extension.' | 'Not a supported/known ID, A valid ID should end with ".json" extension.' | ||||
) | ) | ||||
metadata_string = gzip.decompress(row[1]).decode() | |||||
# if the row doesn't contain any information in metadata return None so it can be | |||||
# mapped later on | |||||
if metadata_string == "": | |||||
return None | |||||
# if the ID of row contains 9 components: | # if the ID of row contains 9 components: | ||||
# <package_manager>/<instance>/<namespace>/<name>/revision/<version>/tool/<tool_name>/<tool_version>.json | # <package_manager>/<instance>/<namespace>/<name>/revision/<version>/tool/<tool_name>/<tool_version>.json | ||||
# then it is a harvest | # then it is a harvest | ||||
if len(list_cd_path) == 9: | if len(list_cd_path) == 9: | ||||
# npm/npmjs/@ngtools/webpack/revision/10.2.1/abc/scancode/3.2.2.json | # npm/npmjs/@ngtools/webpack/revision/10.2.1/abc/scancode/3.2.2.json | ||||
if list_cd_path[6] != "tool": | if list_cd_path[6] != "tool": | ||||
raise ToolNotFound( | raise ToolNotFound( | ||||
'Not a supported/known harvest ID, A valid harvest ID should have 7th\ | 'Not a supported/known harvest ID, A valid harvest ID should have 7th\ | ||||
component as "tool".' | component as "tool".' | ||||
) | ) | ||||
tool = list_cd_path[7] | tool = list_cd_path[7] | ||||
# if the row contains an unknown tool | # if the row contains an unknown tool | ||||
if tool not in ("scancode", "licensee", "clearlydefined"): | if tool not in ("scancode", "licensee", "clearlydefined", "fossology"): | ||||
raise ToolNotSupported(f"Tool for this ID {cd_path} is not supported") | raise ToolNotSupported(f"Tool for this ID {cd_path} is not supported") | ||||
return map_harvest( | return tool | ||||
tool=tool, | elif len(list_cd_path) == 6: | ||||
metadata_string=metadata_string, | return "definition" | ||||
storage=storage, | # For example: maven/mavencentral/cobol-parser/abc/revision/def/0.4.0.json | ||||
raise InvalidComponents( | |||||
"Not a supported/known ID, A valid ID should have 6 or 9 components." | |||||
) | ) | ||||
elif len(list_cd_path) == 6: | |||||
# if the ID of row contains 6 components: | def map_row( | ||||
# <package_manager>/<instance>/<namespace>/<name>/revision/<version>.json | storage, row: tuple | ||||
# then it is a defintion | ) -> Union[ | ||||
Optional[Tuple[bool, List[Tuple[str, MetadataTargetType, Optional[Origin], Dict]]]], | |||||
Tuple[bool, List[Tuple[str, MetadataTargetType, None, Dict]]], | |||||
]: | |||||
""" | |||||
Take row and storage as input and try to map that row, | |||||
if not able to map that row, then return None | |||||
else return status of that row and data to be written | |||||
in storage | |||||
""" | |||||
tool = get_type_of_tool(row[0]) | |||||
# if the row doesn't contain any information in metadata return None so it can be | |||||
# mapped later on | |||||
metadata_string = gzip.decompress(row[1]).decode() | |||||
if metadata_string == "": | |||||
return None | |||||
if tool == "definition": | |||||
return map_definition( | return map_definition( | ||||
metadata_string=metadata_string, | metadata_string=metadata_string, | ||||
storage=storage, | storage=storage, | ||||
) | ) | ||||
# For example: maven/mavencentral/cobol-parser/abc/revision/def/0.4.0.json | |||||
raise InvalidComponents( | else: | ||||
"Not a supported/known ID, A valid ID should have 6 or 9 components." | return map_harvest( | ||||
tool=tool, | |||||
metadata_string=metadata_string, | |||||
storage=storage, | |||||
) | ) |
and undo this change