diff --git a/requirements-swh.txt b/requirements-swh.txt index 3405acd..ee21161 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] >= 0.3 -swh.model >= 0.9.0 +swh.model >= 1.0.0 swh.storage >= 0.11.1 diff --git a/swh/clearlydefined/mapping_utils.py b/swh/clearlydefined/mapping_utils.py index b6b5161..424bb54 100644 --- a/swh/clearlydefined/mapping_utils.py +++ b/swh/clearlydefined/mapping_utils.py @@ -1,353 +1,347 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from enum import Enum import gzip import json import re from typing import Any, Dict, List, Optional, Tuple from swh.clearlydefined.error import ( InvalidComponents, NoJsonExtension, RevisionNotFound, ToolNotFound, ToolNotSupported, ) from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.model.identifiers import parse_swhid +from swh.model.identifiers import ExtendedSWHID from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, - MetadataTargetType, Origin, RawExtrinsicMetadata, ) class ToolType(Enum): """The type of a row""" DEFINITION = "definition" SCANCODE = "scancode" CLEARLYDEFINED = "clearlydefined" LICENSEE = "licensee" FOSSOLOGY = "fossology" class MappingStatus(Enum): """The type of mapping status of a row""" MAPPED = "mapped" UNMAPPED = "unmapped" IGNORE = "ignore" AUTHORITY = MetadataAuthority( type=MetadataAuthorityType.REGISTRY, url="https://clearlydefined.io/", metadata=None, ) FETCHER = MetadataFetcher( name="swh-clearlydefined", version="0.0.1", metadata=None, ) def is_sha1(s): return bool(re.match("^[a-fA-F0-9]+$", s)) def map_row_data_with_metadata( swh_id: str, - type: MetadataTargetType, origin: Optional[Origin], metadata: Dict, date: datetime, format: str, ) -> RawExtrinsicMetadata: """ Take and data_list as input and write data inside RawExtrensicMetadata table inside swh storage """ return RawExtrinsicMetadata( - type=type, - target=parse_swhid(swh_id), + target=ExtendedSWHID.from_string(swh_id), discovery_date=date, authority=AUTHORITY, fetcher=FETCHER, format=format, origin=origin.url if origin else None, metadata=json.dumps(metadata).encode("utf-8"), ) def map_sha1_with_swhid(storage, sha1: str) -> Optional[str]: """ Take sha1 and storage as input and give the corresponding swhID for that sha1 """ if not sha1: return None content = storage.content_get([hash_to_bytes(sha1)])[0] if not content: return None sha1_git = hash_to_hex(content.sha1_git) swh_id = "swh:1:cnt:{sha1_git}".format(sha1_git=sha1_git) return swh_id def sha1_git_in_revisions(storage, sha1_git: str) -> bool: """ Take sha1_git and storage as input and tell whether that sha1_git exists in revision table """ sha1_git_bytes = hash_to_bytes(sha1_git) missing_revision = storage.revision_missing([sha1_git_bytes]) if len(list(missing_revision)) == 0: return True return False def map_sha1_and_add_in_data( storage, sha1: Optional[str], data: List[RawExtrinsicMetadata], file: Dict, date: datetime, format: str, mapping_status=True, ) -> bool: """ Take sha1, data, file, date, mapping_status as input and return whether the sha1 exists in content, if it exists map sha1 with swhid and push RawExtrensicMetadata object that got mapping row data with RawExtrensicMetadata """ if sha1: assert isinstance(sha1, str) swh_id = map_sha1_with_swhid(storage=storage, sha1=sha1) if swh_id: data.append( map_row_data_with_metadata( swh_id=swh_id, - type=MetadataTargetType.CONTENT, origin=None, metadata=file, date=date, format=format, ) ) else: mapping_status = False return mapping_status def list_scancode_files(metadata_string: str) -> List[Tuple[str, Dict]]: """ Returns (sha1, filename) pairs for each ScanCode metadata file referenced in the metadata_string. """ metadata = json.loads(metadata_string) content = metadata.get("content") or {} files = content.get("files") or {} files_with_sha1 = [] for file in files: sha1 = file.get("sha1") files_with_sha1.append((sha1, file)) return files_with_sha1 def list_licensee_files(metadata_string: str) -> List[Tuple[str, Dict]]: """ Returns (sha1, filename) pairs for each Licensee metadata file referenced in the metadata_string. """ metadata = json.loads(metadata_string) licensee = metadata.get("licensee") or {} output = licensee.get("output") or {} content = output.get("content") or {} files = content.get("matched_files") or [] files_with_sha1 = [] for file in files: sha1 = file.get("content_hash") files_with_sha1.append((sha1, file)) return files_with_sha1 def list_clearlydefined_files(metadata_string: str) -> List[Tuple[str, Dict]]: """ Returns (sha1, filename) pairs for each ClearlyDefined metadata file referenced in the metadata_string. """ metadata = json.loads(metadata_string) files = metadata.get("files") or [] files_with_sha1 = [] for file in files: hashes = file.get("hashes") or {} sha1 = hashes.get("sha1") assert sha1 files_with_sha1.append((sha1, file)) return files_with_sha1 def map_harvest( storage, tool: str, metadata_string: str, date: datetime ) -> Tuple[MappingStatus, List[RawExtrinsicMetadata]]: """ Take tool, metadata_string and storage as input and try to map the sha1 of files with content, return status of harvest and data to be written in storage """ tools = { "scancode": list_scancode_files, "licensee": list_licensee_files, "clearlydefined": list_clearlydefined_files, } formats = { "scancode": "clearlydefined-harvest-scancode-json", "licensee": "clearlydefined-harvest-licensee-json", "clearlydefined": "clearlydefined-harvest-clearlydefined-json", } format_ = formats[tool] mapping_status = True data: List[RawExtrinsicMetadata] = [] for (sha1, file) in tools[tool](metadata_string): mapping_status = ( map_sha1_and_add_in_data(storage, sha1, data, file, date, format_) and mapping_status ) status = MappingStatus.UNMAPPED if mapping_status: status = MappingStatus.MAPPED return status, data def map_definition( storage, metadata_string: str, date: datetime ) -> Tuple[MappingStatus, List[RawExtrinsicMetadata]]: """ Take metadata_string and storage as input and try to map the sha1 of defintion with content/ gitSha in revision return None if not able to map else return data to be written in storage """ metadata: Dict[str, Dict[str, Optional[Dict]]] = json.loads(metadata_string) described: Dict[str, Optional[Dict[str, Any]]] = metadata.get("described") or {} hashes: Dict[str, str] = described.get("hashes") or {} sha1_git = hashes.get("gitSha") source: Dict[str, str] = described.get("sourceLocation") or {} url = source.get("url") origin = None if url: assert isinstance(url, str) origin = Origin(url=url) if not sha1_git: sha1_git = source.get("revision") if sha1_git: assert isinstance(sha1_git, str) if len(sha1_git) != 40 and not is_sha1(sha1_git): return MappingStatus.IGNORE, [] if not sha1_git_in_revisions(sha1_git=sha1_git, storage=storage): return MappingStatus.UNMAPPED, [] swh_id = "swh:1:rev:{sha1_git}".format(sha1_git=sha1_git) - metadata_type = MetadataTargetType.REVISION else: return MappingStatus.IGNORE, [] return MappingStatus.MAPPED, [ map_row_data_with_metadata( swh_id=swh_id, - type=metadata_type, origin=origin, metadata=metadata, date=date, format="clearlydefined-definition-json", ) ] def get_type_of_tool(cd_path) -> ToolType: """ Take cd_path as input if cd_path is invalid then raise exception, else return tyoe of tool of that row """ list_cd_path = cd_path.split("/") # For example: maven/mavencentral/cobol-parser/abc/0.4.0.json if list_cd_path[4] != "revision": raise RevisionNotFound( "Not a supported/known ID, A valid ID should have" '5th component as "revision".' ) # For example: maven/mavencentral/cobol-parser/revision/0.4.0.txt if not list_cd_path[-1].endswith(".json"): raise NoJsonExtension( 'Not a supported/known ID, A valid ID should end with ".json" extension.' ) # if the ID of row contains 9 components: # ////revision//tool//.json # then it is a harvest if len(list_cd_path) == 9: # npm/npmjs/@ngtools/webpack/revision/10.2.1/abc/scancode/3.2.2.json if list_cd_path[6] != "tool": raise ToolNotFound( 'Not a supported/known harvest ID, A valid harvest ID should have 7th\ component as "tool".' ) tool = list_cd_path[7] # if the row contains an unknown tool if tool not in ("scancode", "licensee", "clearlydefined", "fossology"): raise ToolNotSupported(f"Tool for this ID {cd_path} is not supported") return ToolType(tool) elif len(list_cd_path) == 6: return ToolType.DEFINITION # For example: maven/mavencentral/cobol-parser/abc/revision/def/0.4.0.json raise InvalidComponents( "Not a supported/known ID, A valid ID should have 6 or 9 components." ) def map_row( storage, metadata: bytes, id: str, date: datetime ) -> Tuple[MappingStatus, List[RawExtrinsicMetadata]]: """ Take row and storage as input and try to map that row, if ID of row is invalid then raise exception, if not able to map that row, then return None else return status of that row and data to be written in storage """ tool = get_type_of_tool(id).value # if the row doesn't contain any information in metadata return None so it can be # mapped later on metadata_string = gzip.decompress(metadata).decode() if metadata_string == "": return MappingStatus.UNMAPPED, [] if tool == "definition": return map_definition( metadata_string=metadata_string, storage=storage, date=date ) else: return map_harvest( tool=tool, metadata_string=metadata_string, storage=storage, date=date, ) diff --git a/swh/clearlydefined/tests/test_mapping_utils.py b/swh/clearlydefined/tests/test_mapping_utils.py index 153f1a0..daa3aa5 100644 --- a/swh/clearlydefined/tests/test_mapping_utils.py +++ b/swh/clearlydefined/tests/test_mapping_utils.py @@ -1,597 +1,587 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import gzip import json import os import pytest from swh.clearlydefined.error import ( InvalidComponents, NoJsonExtension, RevisionNotFound, ToolNotFound, ToolNotSupported, ) from swh.clearlydefined.mapping_utils import ( AUTHORITY, FETCHER, MappingStatus, map_definition, map_row, map_sha1_with_swhid, ) from swh.model import from_disk from swh.model.hashutil import hash_to_bytes -from swh.model.identifiers import parse_swhid +from swh.model.identifiers import ExtendedSWHID from swh.model.model import ( Content, Directory, DirectoryEntry, - MetadataTargetType, Person, RawExtrinsicMetadata, Revision, RevisionType, Timestamp, TimestampWithTimezone, ) content_data = [ Content.from_data(b"42\n"), Content.from_data(b"4242\n"), ] directory = Directory( id=hash_to_bytes("5256e856a0a0898966d6ba14feb4388b8b82d302"), entries=tuple( [ DirectoryEntry( name=b"foo", type="file", target=content_data[0].sha1_git, perms=from_disk.DentryPerms.content, ), ], ), ) revision_data = [ Revision( id=hash_to_bytes("4c66129b968ab8122964823d1d77677f50884cf6"), message=b"hello", author=Person( name=b"Nicolas Dandrimont", email=b"nicolas@example.com", fullname=b"Nicolas Dandrimont ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567890, microseconds=0), offset=120, negative_utc=False, ), committer=Person( name=b"St\xc3fano Zacchiroli", email=b"stefano@example.com", fullname=b"St\xc3fano Zacchiroli ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1123456789, microseconds=0), offset=120, negative_utc=False, ), parents=(), type=RevisionType.GIT, directory=directory.id, metadata={ "checksums": { "sha1": "tarball-sha1", "sha256": "tarball-sha256", }, "signed-off-by": "some-dude", }, extra_headers=( (b"gpgsig", b"test123"), (b"mergetag", b"foo\\bar"), (b"mergetag", b"\x22\xaf\x89\x80\x01\x00"), ), synthetic=True, ), Revision( id=hash_to_bytes("3c66129b968ab8122964823d1d77677f50884cf6"), message=b"hello again", author=Person( name=b"Roberto Dicosmo", email=b"roberto@example.com", fullname=b"Roberto Dicosmo ", ), date=TimestampWithTimezone( timestamp=Timestamp( seconds=1234567843, microseconds=220000, ), offset=-720, negative_utc=False, ), committer=Person( name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp( seconds=1123456789, microseconds=220000, ), offset=0, negative_utc=False, ), parents=(), type=RevisionType.GIT, directory=directory.id, metadata=None, extra_headers=(), synthetic=False, ), ] def file_data(file_name): with open(file_name) as file: data = file.read() return data def add_content_data(swh_storage): swh_storage.content_add(content_data) def add_revision_data(swh_storage): swh_storage.revision_add(revision_data) def test_mapping_sha1_with_swhID(swh_storage): add_content_data(swh_storage) sha1 = "34973274ccef6ab4dfaaf86599792fa9c3fe4689" assert "swh:1:cnt:d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" == map_sha1_with_swhid( sha1=sha1, storage=swh_storage ) def test_mapping_with_empty_sha1(swh_storage): add_content_data(swh_storage) sha1 = "" assert map_sha1_with_swhid(sha1=sha1, storage=swh_storage) is None def test_mapping_with_wrong_sha1(swh_storage): add_content_data(swh_storage) sha1 = "6ac599151a7aaa8ca5d38dc5bb61b49193a3cadc1ed33de5a57e4d1ecc53c846" assert map_sha1_with_swhid(sha1=sha1, storage=swh_storage) is None def test_map_row_for_definitions_with_no_sha1_sha1git(swh_storage, datadir): add_content_data(swh_storage) expected = MappingStatus.UNMAPPED, [] assert ( map_row( storage=swh_storage, id="maven/mavencentral/za.co.absa.cobrix/cobol-parser/revision/0.4.0.json", metadata=gzip.compress( file_data( os.path.join(datadir, "def_with_no_sha1_and_sha1git.json") ).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_for_definitions_with_gitsha1(swh_storage, datadir): add_revision_data(swh_storage) expected = ( MappingStatus.MAPPED, [ RawExtrinsicMetadata( - type=MetadataTargetType.REVISION, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:rev:4c66129b968ab8122964823d1d77677f50884cf6" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-definition-json", origin="http://central.maven.org/maven2/za/co/absa/cobrix/cobol-parser/" "0.4.0/cobol-parser-0.4.0-sources.jar", metadata=json.dumps( json.loads( file_data(os.path.join(datadir, "definitions_sha1git.json")) ) ).encode("utf-8"), ), ], ) assert ( map_row( storage=swh_storage, id="maven/mavencentral/za.co.absa.cobrix/cobol-parser/revision/0.4.0.json", metadata=gzip.compress( file_data(os.path.join(datadir, "definitions_sha1git.json")).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_for_scancode(swh_storage, datadir): add_content_data(swh_storage) expected = ( MappingStatus.UNMAPPED, [ RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-scancode-json", origin=None, metadata=json.dumps( json.loads( file_data(os.path.join(datadir, "scancode_metadata.json")) ) ).encode("utf-8"), ), ], ) assert ( map_row( storage=swh_storage, id="npm/npmjs/@ngtools/webpack/revision/10.2.1/tool/scancode/3.2.2.json", metadata=gzip.compress( file_data(os.path.join(datadir, "scancode.json")).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_for_scancode_true_mapping_status(swh_storage, datadir): add_content_data(swh_storage) expected = ( MappingStatus.MAPPED, [ RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-scancode-json", origin=None, metadata=json.dumps( json.loads( file_data(os.path.join(datadir, "scancode_metadata.json")) ) ).encode("utf-8"), ), ], ) assert ( map_row( storage=swh_storage, id="npm/npmjs/@ngtools/webpack/revision/10.2.1/tool/scancode/3.2.2.json", metadata=gzip.compress( file_data(os.path.join(datadir, "scancode_true.json")).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_for_licensee(swh_storage, datadir): add_content_data(swh_storage) expected = ( MappingStatus.UNMAPPED, [ RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:36fade77193cb6d2bd826161a0979d64c28ab4fa" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-licensee-json", origin=None, metadata=json.dumps( json.loads( file_data(os.path.join(datadir, "licensee_metadata.json")) ) ).encode("utf-8"), ), ], ) assert ( map_row( storage=swh_storage, id="npm/npmjs/@fluidframework/replay-driver/revision/0.31.0/tool/licensee/" "9.13.0.json", metadata=gzip.compress( file_data(os.path.join(datadir, "licensee.json")).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_for_licensee_true_mapping_status(swh_storage, datadir): add_content_data(swh_storage) expected = ( MappingStatus.MAPPED, [ RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:36fade77193cb6d2bd826161a0979d64c28ab4fa" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-licensee-json", origin=None, metadata=json.dumps( json.loads( file_data(os.path.join(datadir, "licensee_metadata.json")) ) ).encode("utf-8"), ), ], ) assert ( map_row( storage=swh_storage, id="npm/npmjs/@fluidframework/replay-driver/revision/0.31.0/tool/licensee/" "9.13.0.json", metadata=gzip.compress( file_data(os.path.join(datadir, "licensee_true.json")).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_for_clearlydefined(swh_storage, datadir): add_content_data(swh_storage) expected = ( MappingStatus.UNMAPPED, [ RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:36fade77193cb6d2bd826161a0979d64c28ab4fa" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-clearlydefined-json", origin=None, metadata=json.dumps( json.loads( file_data(os.path.join(datadir, "clearlydefined_metadata.json")) ) ).encode("utf-8"), ), RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-clearlydefined-json", origin=None, metadata=json.dumps( json.loads( file_data( os.path.join(datadir, "clearlydefined_metadata_2.json") ) ) ).encode("utf-8"), ), ], ) assert ( map_row( storage=swh_storage, id="npm/npmjs/@pixi/mesh-extras/revision/5.3.5/tool/clearlydefined/" "1.3.4.json", metadata=gzip.compress( file_data(os.path.join(datadir, "clearlydefined.json")).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_for_clearlydefined_true_mapping_status(swh_storage, datadir): add_content_data(swh_storage) expected = ( MappingStatus.MAPPED, [ RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:36fade77193cb6d2bd826161a0979d64c28ab4fa" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-clearlydefined-json", origin=None, metadata=json.dumps( json.loads( file_data(os.path.join(datadir, "clearlydefined_metadata.json")) ) ).encode("utf-8"), ), RawExtrinsicMetadata( - type=MetadataTargetType.CONTENT, - target=parse_swhid( + target=ExtendedSWHID.from_string( "swh:1:cnt:d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), discovery_date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), authority=AUTHORITY, fetcher=FETCHER, format="clearlydefined-harvest-clearlydefined-json", origin=None, metadata=json.dumps( json.loads( file_data( os.path.join(datadir, "clearlydefined_metadata_2.json") ) ) ).encode("utf-8"), ), ], ) assert ( map_row( storage=swh_storage, id="npm/npmjs/@pixi/mesh-extras/revision/5.3.5/tool/clearlydefined/" "1.3.4.json", metadata=gzip.compress( file_data(os.path.join(datadir, "clearlydefined_true.json")).encode() ), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_sha1git_not_in_revision(swh_storage, datadir): add_revision_data(swh_storage) expected = MappingStatus.UNMAPPED, [] assert ( map_definition( metadata_string=file_data( os.path.join(datadir, "definitions_not_mapped_sha1_git.json") ), storage=swh_storage, date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_sha1_not_in_content(swh_storage, datadir): add_content_data(swh_storage) expected = MappingStatus.IGNORE, [] assert ( map_definition( metadata_string=file_data( os.path.join(datadir, "definitions_not_mapped.json") ), storage=swh_storage, date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_definition_with_data_to_be_ignored(swh_storage, datadir): add_content_data(swh_storage) expected = MappingStatus.IGNORE, [] assert ( map_definition( metadata_string=file_data(os.path.join(datadir, "licensee.json")), storage=swh_storage, date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) == expected ) def test_map_row_with_invalid_ID(swh_storage): with pytest.raises(InvalidComponents): map_row( storage=swh_storage, id="maven/mavencentral/cobol-parser/abc/revision/def/0.4.0.json", metadata=gzip.compress(" ".encode()), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) def test_map_row_with_empty_metadata_string(swh_storage): map_row( storage=swh_storage, id="maven/mavencentral/za.co.absa.cobrix/cobol-parser/revision/0.4.0.json", metadata=gzip.compress("".encode()), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) is None def test_map_row_with_invalid_ID_without_revision(swh_storage): with pytest.raises(RevisionNotFound): map_row( storage=swh_storage, id="maven/mavencentral/za.co.absa.cobrix/cobol-parser/abc/0.4.0.json", metadata=gzip.compress("".encode()), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) def test_map_row_with_invalid_ID_without_json_extension(swh_storage): with pytest.raises(NoJsonExtension): map_row( storage=swh_storage, id="maven/mavencentral/za.co.absa.cobrix/cobol-parser/revision/0.4.0.txt", metadata=gzip.compress("".encode()), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) def test_map_row_with_invalid_ID_without_6_or_9_length(swh_storage): with pytest.raises(InvalidComponents): map_row( storage=swh_storage, id="npm/npmjs/@ngtools/webpack/revision/10.2.1/tool/3.2.2.json", metadata=gzip.compress("".encode()), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) def test_map_row_with_invalid_tool(swh_storage): with pytest.raises(ToolNotSupported): map_row( storage=swh_storage, id="npm/npmjs/@ngtools/webpack/revision/10.2.1/tool/abc/3.2.2.json", metadata=gzip.compress("".encode()), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), ) def test_map_row_with_invalid_harvest_ID(swh_storage): with pytest.raises(ToolNotFound): map_row( storage=swh_storage, id="npm/npmjs/@ngtools/webpack/revision/10.2.1/abc/scancode/3.2.2.json", metadata=gzip.compress("".encode()), date=datetime(year=2021, month=2, day=6, tzinfo=timezone.utc), )