Page MenuHomeSoftware Heritage

D4931.id17657.diff
No OneTemporary

D4931.id17657.diff

diff --git a/swh/clearlydefined/mapping_utils.py b/swh/clearlydefined/mapping_utils.py
--- a/swh/clearlydefined/mapping_utils.py
+++ b/swh/clearlydefined/mapping_utils.py
@@ -3,10 +3,17 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import json
+from typing import Dict
from typing import Optional
+from typing import Tuple
+from typing import Union
+import gzip
from swh.model.hashutil import hash_to_bytes
from swh.model.hashutil import hash_to_hex
+from swh.model.model import MetadataTargetType
+from swh.model.model import Origin
import psycopg2
@@ -27,3 +34,153 @@
sha1_git = hash_to_hex(sha1_git_tuple_data[0][0])
swh_id = "swh:1:cnt:{sha1_git}".format(sha1_git=sha1_git)
return swh_id
+
+
+def sha1_git_in_revisions(sha1_git: str, dsn: str) -> bool:
+ """
+ Take sha1_git and dsn as input and
+ tell whether that sha1_git exists in revision
+ table
+ """
+ read_connection = psycopg2.connect(dsn=dsn)
+ cur = read_connection.cursor()
+ sha1_git = hash_to_bytes(sha1_git)
+ cur.execute("SELECT id FROM revision WHERE id= %s;", (sha1_git,))
+ rows = cur.fetchall()
+ if len(rows) == 1:
+ return True
+ else:
+ return False
+
+
+def map_scancode(metadata_string: str, dsn: str) -> Tuple[bool, list]:
+ metadata = json.loads(metadata_string)
+ content = metadata.get("content") or {}
+ files = content.get("files") or {}
+ flag = True
+ data = []
+ for file in files:
+ sha1 = file.get("sha1")
+ if sha1:
+ swh_id = map_sha1_with_swhid(sha1, dsn)
+ if swh_id:
+ data.append((swh_id, MetadataTargetType.CONTENT, None))
+ else:
+ flag = False
+ return flag, data
+
+
+def map_licensee(metadata_string: str, dsn) -> Tuple[bool, list]:
+ metadata = json.loads(metadata_string)
+ licensee = metadata.get("licensee") or {}
+ output = licensee.get("output") or {}
+ content = output.get("content") or {}
+ files = content.get("matched_files") or []
+ flag = True
+ data = []
+ for file in files:
+ sha1 = file.get("content_hash")
+ if sha1:
+ swh_id = map_sha1_with_swhid(sha1, dsn)
+ if swh_id:
+ data.append((swh_id, MetadataTargetType.CONTENT, None))
+ else:
+ flag = False
+ return flag, data
+
+
+def map_clearlydefined(metadata_string: str, dsn) -> Tuple[bool, list]:
+ metadata = json.loads(metadata_string)
+ files = metadata.get("files") or []
+ flag = True
+ data = []
+ for file in files:
+ hashes = file.get("hashes") or {}
+ sha1 = hashes.get("sha1")
+ if sha1:
+ swh_id = map_sha1_with_swhid(sha1, dsn)
+ if swh_id:
+ data.append((swh_id, MetadataTargetType.CONTENT, None))
+ else:
+ flag = False
+ return flag, data
+
+
+def map_harvest(tool, metadata_string, dsn) -> Tuple[bool, list]:
+ tools = {
+ "scancode": map_scancode,
+ "licensee": map_licensee,
+ "clearlydefined": map_clearlydefined,
+ }
+
+ if tool in tools:
+ return tools.get(tool)(metadata_string, dsn)
+
+ return False, []
+
+
+def map_definition(metadata_string: str, dsn: str) -> Tuple[bool, list]:
+ metadata: Dict[str,Dict[str,Optional[Dict]]] = json.loads(metadata_string)
+ described: Dict[str,Dict[str,str]] = metadata.get("described") or {}
+ hashes: Dict[str,str] = described.get("hashes") or {}
+ sha1_git: str = hashes.get("gitSha")
+ source: Dict[str,str] = described.get("sourceLocation") or {}
+ url: str = source.get("url")
+ origin = Origin(url=url)
+
+ if sha1_git:
+ if not sha1_git_in_revisions(sha1_git=sha1_git, dsn=dsn):
+ return False, []
+ swh_id = "swh:1:rev:{sha1_git}".format(sha1_git=sha1_git)
+ metadata_type = MetadataTargetType.REVISION
+ return True, [(swh_id, metadata_type, origin)]
+
+ sha1: str = hashes.get("sha1")
+ swh_id = map_sha1_with_swhid(sha1=sha1, dsn=dsn)
+ metadata_type = MetadataTargetType.CONTENT
+ if not swh_id:
+ return False, []
+ return True, [(swh_id, metadata_type, origin)]
+
+
+def check_for_valid_ID(list_cd_path: list) -> bool:
+ if len(list_cd_path) < 6:
+ return False
+ if list_cd_path[4] != "revision":
+ return False
+ if not list_cd_path[-1].endswith(".json"):
+ return False
+ return True
+
+
+def map_row(row: tuple, swh_dsn: str) -> Optional[Tuple[bool, list]]:
+ cd_path = row[0]
+ list_cd_path = cd_path.split("/")
+ metadata_string = gzip.decompress(row[1]).decode()
+
+ if not check_for_valid_ID(list_cd_path):
+ return None
+
+ # if the row doesn't contain any information in metadata return None
+ if metadata_string == "":
+ return None
+
+ # if the ID of row contains 9 components:
+ # <package_manager>/<instance>/<namespace>/<name>/revision/<version>/tool/<tool_name>/<tool_version>.json
+ # then it is a harvest
+ if len(list_cd_path) == 9:
+ if list_cd_path[6] != "tool":
+ return None
+ tool = list_cd_path[7]
+ if tool not in ("scancode", "licensee", "clearlydefined"):
+ return None
+ return map_harvest(tool=tool, metadata_string=metadata_string, dsn=swh_dsn)
+
+ # if the ID of row contains 6 components:
+ # <package_manager>/<instance>/<namespace>/<name>/revision/<version>.json
+ # then it is a defintion
+ if len(list_cd_path) == 6:
+ return map_definition(
+ metadata_string=metadata_string,
+ dsn=swh_dsn,
+ )

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 5:52 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216181

Event Timeline