diff --git a/swh/clearlydefined/mapping_utils.py b/swh/clearlydefined/mapping_utils.py --- a/swh/clearlydefined/mapping_utils.py +++ b/swh/clearlydefined/mapping_utils.py @@ -3,10 +3,18 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import json +from typing import Any +from typing import Dict from typing import Optional +from typing import Tuple +from typing import Union +import gzip from swh.model.hashutil import hash_to_bytes from swh.model.hashutil import hash_to_hex +from swh.model.model import MetadataTargetType +from swh.model.model import Origin import psycopg2 @@ -27,3 +35,153 @@ sha1_git = hash_to_hex(sha1_git_tuple_data[0][0]) swh_id = "swh:1:cnt:{sha1_git}".format(sha1_git=sha1_git) return swh_id + + +def sha1_git_in_revisions(sha1_git: str, dsn: str) -> bool: + """ + Take sha1_git and dsn as input and + tell whether that sha1_git exists in revision + table + """ + read_connection = psycopg2.connect(dsn=dsn) + cur = read_connection.cursor() + sha1_git = hash_to_bytes(sha1_git) + cur.execute("SELECT id FROM revision WHERE id= %s;", (sha1_git,)) + rows = cur.fetchall() + if len(rows) == 1: + return True + else: + return False + + +def map_scancode(metadata_string: str, dsn: str) -> Tuple[bool, list]: + metadata = json.loads(metadata_string) + content = metadata.get("content") or {} + files = content.get("files") or {} + flag = True + data = [] + for file in files: + sha1 = file.get("sha1") + if sha1: + swh_id = map_sha1_with_swhid(sha1, dsn) + if swh_id: + data.append((swh_id, MetadataTargetType.CONTENT, None)) + else: + flag = False + return flag, data + + +def map_licensee(metadata_string: str, dsn) -> Tuple[bool, list]: + metadata = json.loads(metadata_string) + licensee = metadata.get("licensee") or {} + output = licensee.get("output") or {} + content = output.get("content") or {} + files = content.get("matched_files") or [] + flag = True + data = [] + for file in files: + sha1 = file.get("content_hash") + if sha1: + swh_id = map_sha1_with_swhid(sha1, dsn) + if swh_id: + data.append((swh_id, MetadataTargetType.CONTENT, None)) + else: + flag = False + return flag, data + + +def map_clearlydefined(metadata_string: str, dsn) -> Tuple[bool, list]: + metadata = json.loads(metadata_string) + files = metadata.get("files") or [] + flag = True + data = [] + for file in files: + hashes = file.get("hashes") or {} + sha1 = hashes.get("sha1") + if sha1: + swh_id = map_sha1_with_swhid(sha1, dsn) + if swh_id: + data.append((swh_id, MetadataTargetType.CONTENT, None)) + else: + flag = False + return flag, data + + +def map_harvest(tool, metadata_string, dsn) -> Tuple[bool, list]: + tools = { + "scancode": map_scancode, + "licensee": map_licensee, + "clearlydefined": map_clearlydefined, + } + + if tool in tools: + return tools.get(tool)(metadata_string, dsn) + + return False, [] + + +def map_definition(metadata_string: str, dsn: str) -> Tuple[bool, list]: + metadata: Dict[str,Dict[str,Optional[Dict]]] = json.loads(metadata_string) + described: Dict[str, Optional[Dict[str, Any]]] = metadata.get("described") or {} + hashes: Dict[str,str] = described.get("hashes") or {} + sha1_git: str = hashes.get("gitSha") + source: Dict[str,str] = described.get("sourceLocation") or {} + url: str = source.get("url") + origin = Origin(url=url) + + if sha1_git: + if not sha1_git_in_revisions(sha1_git=sha1_git, dsn=dsn): + return False, [] + swh_id = "swh:1:rev:{sha1_git}".format(sha1_git=sha1_git) + metadata_type = MetadataTargetType.REVISION + return True, [(swh_id, metadata_type, origin)] + + sha1: str = hashes.get("sha1") + swh_id = map_sha1_with_swhid(sha1=sha1, dsn=dsn) + metadata_type = MetadataTargetType.CONTENT + if not swh_id: + return False, [] + return True, [(swh_id, metadata_type, origin)] + + +def check_for_valid_ID(list_cd_path: list) -> bool: + if len(list_cd_path) < 6: + return False + if list_cd_path[4] != "revision": + return False + if not list_cd_path[-1].endswith(".json"): + return False + return True + + +def map_row(row: tuple, swh_dsn: str) -> Optional[Tuple[bool, list]]: + cd_path = row[0] + list_cd_path = cd_path.split("/") + metadata_string = gzip.decompress(row[1]).decode() + + if not check_for_valid_ID(list_cd_path): + return None + + # if the row doesn't contain any information in metadata return None + if metadata_string == "": + return None + + # if the ID of row contains 9 components: + # ////revision//tool//.json + # then it is a harvest + if len(list_cd_path) == 9: + if list_cd_path[6] != "tool": + return None + tool = list_cd_path[7] + if tool not in ("scancode", "licensee", "clearlydefined"): + return None + return map_harvest(tool=tool, metadata_string=metadata_string, dsn=swh_dsn) + + # if the ID of row contains 6 components: + # ////revision/.json + # then it is a defintion + if len(list_cd_path) == 6: + return map_definition( + metadata_string=metadata_string, + dsn=swh_dsn, + )