diff --git a/swh/loader/package/deposit/loader.py b/swh/loader/package/deposit/loader.py index a95a0a3..3e63d05 100644 --- a/swh/loader/package/deposit/loader.py +++ b/swh/loader/package/deposit/loader.py @@ -1,388 +1,383 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from datetime import timezone import json import logging from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple, Union import attr import requests from swh.core.config import load_from_envvar from swh.loader.core.loader import DEFAULT_CONFIG from swh.loader.package.loader import ( BasePackageInfo, PackageLoader, RawExtrinsicMetadataCore, ) from swh.loader.package.utils import cached_method, download from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Person, Revision, RevisionType, Sha1Git, TimestampWithTimezone, ) from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) def now() -> datetime.datetime: return datetime.datetime.now(tz=timezone.utc) @attr.s class DepositPackageInfo(BasePackageInfo): filename = attr.ib(type=str) # instead of Optional[str] raw_info = attr.ib(type=Dict[str, Any]) author_date = attr.ib(type=datetime.datetime) """codemeta:dateCreated if any, deposit completed_date otherwise""" commit_date = attr.ib(type=datetime.datetime) """codemeta:datePublished if any, deposit completed_date otherwise""" client = attr.ib(type=str) id = attr.ib(type=int) """Internal ID of the deposit in the deposit DB""" collection = attr.ib(type=str) """The collection in the deposit; see SWORD specification.""" author = attr.ib(type=Person) committer = attr.ib(type=Person) - revision_parents = attr.ib(type=Tuple[Sha1Git, ...]) - """Revisions created from previous deposits, that will be used as parents of the - revision created for this deposit.""" @classmethod def from_metadata( cls, metadata: Dict[str, Any], url: str, filename: str ) -> "DepositPackageInfo": # Note: # `date` and `committer_date` are always transmitted by the deposit read api # which computes itself the values. The loader needs to use those to create the # revision. all_metadata_raw: List[str] = metadata["metadata_raw"] raw_info = { "origin": metadata["origin"], "origin_metadata": { "metadata": metadata["metadata_dict"], "provider": metadata["provider"], "tool": metadata["tool"], }, } depo = metadata["deposit"] return cls( url=url, filename=filename, author_date=depo["author_date"], commit_date=depo["committer_date"], client=depo["client"], id=depo["id"], collection=depo["collection"], author=parse_author(depo["author"]), committer=parse_author(depo["committer"]), - revision_parents=tuple(hash_to_bytes(p) for p in depo["revision_parents"]), raw_info=raw_info, directory_extrinsic_metadata=[ RawExtrinsicMetadataCore( discovery_date=now(), metadata=raw_metadata.encode(), format="sword-v2-atom-codemeta-v2", ) for raw_metadata in all_metadata_raw ], ) def extid(self) -> None: # For now, we don't try to deduplicate deposits. There is little point anyway, # as it only happens when the exact same tarball was deposited twice. return None class DepositLoader(PackageLoader[DepositPackageInfo]): """Load a deposited artifact into swh archive. """ visit_type = "deposit" def __init__( self, storage: StorageInterface, url: str, deposit_id: str, deposit_client: "ApiClient", max_content_size: Optional[int] = None, default_filename: str = "archive.tar", ): """Constructor Args: url: Origin url to associate the artifacts/metadata to deposit_id: Deposit identity deposit_client: Deposit api client """ super().__init__(storage=storage, url=url, max_content_size=max_content_size) self.deposit_id = deposit_id self.client = deposit_client self.default_filename = default_filename @classmethod def from_configfile(cls, **kwargs: Any): """Instantiate a loader from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to the loader instantiation """ config = dict(load_from_envvar(DEFAULT_CONFIG)) config.update({k: v for k, v in kwargs.items() if v is not None}) deposit_client = ApiClient(**config.pop("deposit")) return cls.from_config(deposit_client=deposit_client, **config) def get_versions(self) -> Sequence[str]: # only 1 branch 'HEAD' with no alias since we only have 1 snapshot # branch return ["HEAD"] def get_metadata_authority(self) -> MetadataAuthority: provider = self.metadata()["provider"] assert provider["provider_type"] == MetadataAuthorityType.DEPOSIT_CLIENT.value return MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url=provider["provider_url"], metadata={ "name": provider["provider_name"], **(provider["metadata"] or {}), }, ) def get_metadata_fetcher(self) -> MetadataFetcher: tool = self.metadata()["tool"] return MetadataFetcher( name=tool["name"], version=tool["version"], metadata=tool["configuration"], ) def get_package_info( self, version: str ) -> Iterator[Tuple[str, DepositPackageInfo]]: p_info = DepositPackageInfo.from_metadata( self.metadata(), url=self.url, filename=self.default_filename, ) yield "HEAD", p_info def download_package( self, p_info: DepositPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: """Override to allow use of the dedicated deposit client """ return [self.client.archive_get(self.deposit_id, tmpdir, p_info.filename)] def build_revision( self, p_info: DepositPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: message = ( f"{p_info.client}: Deposit {p_info.id} in collection {p_info.collection}" ).encode("utf-8") return Revision( type=RevisionType.TAR, message=message, author=p_info.author, date=TimestampWithTimezone.from_dict(p_info.author_date), committer=p_info.committer, committer_date=TimestampWithTimezone.from_dict(p_info.commit_date), - parents=p_info.revision_parents, directory=directory, synthetic=True, ) def get_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadataCore]: metadata = self.metadata() all_metadata_raw: List[str] = metadata["metadata_raw"] origin_metadata = json.dumps( { "metadata": all_metadata_raw, "provider": metadata["provider"], "tool": metadata["tool"], } ).encode() return [ RawExtrinsicMetadataCore( discovery_date=now(), metadata=raw_meta.encode(), format="sword-v2-atom-codemeta-v2", ) for raw_meta in all_metadata_raw ] + [ RawExtrinsicMetadataCore( discovery_date=now(), metadata=origin_metadata, format="original-artifacts-json", ) ] @cached_method def metadata(self): """Returns metadata from the deposit server""" return self.client.metadata_get(self.deposit_id) def load(self) -> Dict: # First making sure the deposit is known on the deposit's RPC server # prior to trigger a loading try: self.metadata() except ValueError: logger.error(f"Unknown deposit {self.deposit_id}, ignoring") return {"status": "failed"} # Then usual loading return super().load() def finalize_visit( self, status_visit: str, errors: Optional[List[str]] = None, **kwargs ) -> Dict[str, Any]: r = super().finalize_visit(status_visit=status_visit, **kwargs) success = status_visit == "full" # Update deposit status try: if not success: self.client.status_update( self.deposit_id, status="failed", errors=errors, ) return r snapshot_id = hash_to_bytes(r["snapshot_id"]) snapshot = snapshot_get_all_branches(self.storage, snapshot_id) if not snapshot: return r branches = snapshot.branches logger.debug("branches: %s", branches) if not branches: return r rev_id = branches[b"HEAD"].target revision = self.storage.revision_get([rev_id])[0] if not revision: return r # update the deposit's status to success with its # revision-id and directory-id self.client.status_update( self.deposit_id, status="done", revision_id=hash_to_hex(rev_id), directory_id=hash_to_hex(revision.directory), snapshot_id=r["snapshot_id"], origin_url=self.url, ) except Exception: logger.exception("Problem when trying to update the deposit's status") return {"status": "failed"} return r def parse_author(author) -> Person: """See prior fixme """ return Person( fullname=author["fullname"].encode("utf-8"), name=author["name"].encode("utf-8"), email=author["email"].encode("utf-8"), ) class ApiClient: """Private Deposit Api client """ def __init__(self, url, auth: Optional[Mapping[str, str]]): self.base_url = url.rstrip("/") self.auth = None if not auth else (auth["username"], auth["password"]) def do(self, method: str, url: str, *args, **kwargs): """Internal method to deal with requests, possibly with basic http authentication. Args: method (str): supported http methods as in get/post/put Returns: The request's execution output """ method_fn = getattr(requests, method) if self.auth: kwargs["auth"] = self.auth return method_fn(url, *args, **kwargs) def archive_get( self, deposit_id: Union[int, str], tmpdir: str, filename: str ) -> Tuple[str, Dict]: """Retrieve deposit's archive artifact locally """ url = f"{self.base_url}/{deposit_id}/raw/" return download(url, dest=tmpdir, filename=filename, auth=self.auth) def metadata_url(self, deposit_id: Union[int, str]) -> str: return f"{self.base_url}/{deposit_id}/meta/" def metadata_get(self, deposit_id: Union[int, str]) -> Dict[str, Any]: """Retrieve deposit's metadata artifact as json """ url = self.metadata_url(deposit_id) r = self.do("get", url) if r.ok: return r.json() msg = f"Problem when retrieving deposit metadata at {url}" logger.error(msg) raise ValueError(msg) def status_update( self, deposit_id: Union[int, str], status: str, errors: Optional[List[str]] = None, revision_id: Optional[str] = None, directory_id: Optional[str] = None, snapshot_id: Optional[str] = None, origin_url: Optional[str] = None, ): """Update deposit's information including status, and persistent identifiers result of the loading. """ url = f"{self.base_url}/{deposit_id}/update/" payload: Dict[str, Any] = {"status": status} if revision_id: payload["revision_id"] = revision_id if directory_id: payload["directory_id"] = directory_id if snapshot_id: payload["snapshot_id"] = snapshot_id if origin_url: payload["origin_url"] = origin_url if errors: payload["status_detail"] = {"loading": errors} self.do("put", url, json=payload) diff --git a/swh/loader/package/deposit/tests/test_deposit.py b/swh/loader/package/deposit/tests/test_deposit.py index 192efba..80a4ff7 100644 --- a/swh/loader/package/deposit/tests/test_deposit.py +++ b/swh/loader/package/deposit/tests/test_deposit.py @@ -1,454 +1,482 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import re from typing import List import pytest from swh.core.pytest_plugin import requests_mock_datadir_factory from swh.loader.package.deposit.loader import ApiClient, DepositLoader from swh.loader.package.loader import now from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Origin, + Person, RawExtrinsicMetadata, + Revision, + RevisionType, Snapshot, SnapshotBranch, TargetType, + Timestamp, + TimestampWithTimezone, ) from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID, ObjectType DEPOSIT_URL = "https://deposit.softwareheritage.org/1/private" @pytest.fixture def requests_mock_datadir(requests_mock_datadir): """Enhance default mock data to mock put requests as the loader does some internal update queries there. """ requests_mock_datadir.put(re.compile("https")) return requests_mock_datadir def test_deposit_init_ok(swh_storage, deposit_client, swh_loader_config): url = "some-url" deposit_id = 999 loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) # Something that does not exist assert loader.url == url assert loader.client is not None assert loader.client.base_url == swh_loader_config["deposit"]["url"] def test_deposit_from_configfile(swh_config): """Ensure the deposit instantiation is ok """ loader = DepositLoader.from_configfile( url="some-url", deposit_id="666", default_filename="archive.zip" ) assert isinstance(loader.client, ApiClient) def test_deposit_loading_unknown_deposit( swh_storage, deposit_client, requests_mock_datadir ): """Loading an unknown deposit should fail no origin, no visit, no snapshot """ # private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' url = "some-url" unknown_deposit_id = 667 loader = DepositLoader( swh_storage, url, unknown_deposit_id, deposit_client, default_filename="archive.zip", ) # does not exist actual_load_status = loader.load() assert actual_load_status == {"status": "failed"} stats = get_stats(loader.storage) assert { "content": 0, "directory": 0, "origin": 0, "origin_visit": 0, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 0, } == stats requests_mock_datadir_missing_one = requests_mock_datadir_factory( ignore_urls=[f"{DEPOSIT_URL}/666/raw/",] ) def test_deposit_loading_failure_to_retrieve_1_artifact( swh_storage, deposit_client, requests_mock_datadir_missing_one ): """Deposit with missing artifact ends up with an uneventful/partial visit """ # private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' url = "some-url-2" deposit_id = 666 requests_mock_datadir_missing_one.put(re.compile("https")) loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) actual_load_status = loader.load() assert actual_load_status["status"] == "uneventful" assert actual_load_status["snapshot_id"] is not None assert_last_visit_matches(loader.storage, url, status="partial", type="deposit") stats = get_stats(loader.storage) assert { "content": 0, "directory": 0, "origin": 1, "origin_visit": 1, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 1, } == stats # Retrieve the information for deposit status update query to the deposit urls = [ m for m in requests_mock_datadir_missing_one.request_history if m.url == f"{DEPOSIT_URL}/{deposit_id}/update/" ] assert len(urls) == 1 update_query = urls[0] body = update_query.json() expected_body = { "status": "failed", "status_detail": { "loading": [ "Failed to load branch HEAD for some-url-2: Fail to query " "'https://deposit.softwareheritage.org/1/private/666/raw/'. Reason: 404" ] }, } assert body == expected_body def test_deposit_loading_ok(swh_storage, deposit_client, requests_mock_datadir): url = "https://hal-test.archives-ouvertes.fr/some-external-id" deposit_id = 666 loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) actual_load_status = loader.load() expected_snapshot_id = "b2b327b33dc85818bd23c3ccda8b7e675a66ecbd" assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id, } assert_last_visit_matches(loader.storage, url, status="full", type="deposit") stats = get_stats(loader.storage) assert { "content": 303, "directory": 12, "origin": 1, "origin_visit": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } == stats revision_id_hex = "637318680351f5d78856d13264faebbd91efe9bb" revision_id = hash_to_bytes(revision_id_hex) expected_snapshot = Snapshot( id=hash_to_bytes(expected_snapshot_id), branches={ b"HEAD": SnapshotBranch( target=revision_id, target_type=TargetType.REVISION, ), }, ) check_snapshot(expected_snapshot, storage=loader.storage) revision = loader.storage.revision_get([revision_id])[0] - assert revision is not None + date = TimestampWithTimezone( + timestamp=Timestamp(seconds=1507389428, microseconds=0), + offset=0, + negative_utc=False, + ) + person = Person( + fullname=b"Software Heritage", + name=b"Software Heritage", + email=b"robot@softwareheritage.org", + ) + assert revision == Revision( + id=revision_id, + message=b"hal: Deposit 666 in collection hal", + author=person, + committer=person, + date=date, + committer_date=date, + type=RevisionType.TAR, + directory=b"\xfd-\xf1-\xc5SL\x1d\xa1\xe9\x18\x0b\x91Q\x02\xfbo`\x1d\x19", + synthetic=True, + metadata=None, + parents=(), + extra_headers=(), + ) # check metadata fetcher = MetadataFetcher(name="swh-deposit", version="0.0.1",) authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url="https://hal-test.archives-ouvertes.fr/", ) # Check origin metadata orig_meta = loader.storage.raw_extrinsic_metadata_get( Origin(url).swhid(), authority ) assert orig_meta.next_page_token is None raw_meta = loader.client.metadata_get(deposit_id) all_metadata_raw: List[str] = raw_meta["metadata_raw"] # 2 raw metadata xml + 1 json dict assert len(orig_meta.results) == len(all_metadata_raw) + 1 orig_meta0 = orig_meta.results[0] assert orig_meta0.authority == authority assert orig_meta0.fetcher == fetcher # Check directory metadata directory_swhid = CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=revision.directory ) actual_dir_meta = loader.storage.raw_extrinsic_metadata_get( directory_swhid, authority ) assert actual_dir_meta.next_page_token is None assert len(actual_dir_meta.results) == len(all_metadata_raw) for dir_meta in actual_dir_meta.results: assert dir_meta.authority == authority assert dir_meta.fetcher == fetcher assert dir_meta.metadata.decode() in all_metadata_raw # Retrieve the information for deposit status update query to the deposit urls = [ m for m in requests_mock_datadir.request_history if m.url == f"{DEPOSIT_URL}/{deposit_id}/update/" ] assert len(urls) == 1 update_query = urls[0] body = update_query.json() expected_body = { "status": "done", "revision_id": revision_id_hex, "directory_id": hash_to_hex(revision.directory), "snapshot_id": expected_snapshot_id, "origin_url": url, } assert body == expected_body def test_deposit_loading_ok_2(swh_storage, deposit_client, requests_mock_datadir): """Field dates should be se appropriately """ external_id = "some-external-id" url = f"https://hal-test.archives-ouvertes.fr/{external_id}" deposit_id = 777 loader = DepositLoader( swh_storage, url, deposit_id, deposit_client, default_filename="archive.zip" ) actual_load_status = loader.load() expected_snapshot_id = "3e68440fdd7c81d283f8f3aebb6f0c8657864192" assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id, } assert_last_visit_matches(loader.storage, url, status="full", type="deposit") revision_id = "564d18943d71be80d0d73b43a77cfb205bcde96c" expected_snapshot = Snapshot( id=hash_to_bytes(expected_snapshot_id), branches={ b"HEAD": SnapshotBranch( target=hash_to_bytes(revision_id), target_type=TargetType.REVISION ) }, ) check_snapshot(expected_snapshot, storage=loader.storage) raw_meta = loader.client.metadata_get(deposit_id) # Ensure the date fields are set appropriately in the revision # Retrieve the revision revision = loader.storage.revision_get([hash_to_bytes(revision_id)])[0] assert revision assert revision.date.to_dict() == raw_meta["deposit"]["author_date"] assert revision.committer_date.to_dict() == raw_meta["deposit"]["committer_date"] assert not revision.metadata provider = { "provider_name": "hal", "provider_type": "deposit_client", "provider_url": "https://hal-test.archives-ouvertes.fr/", "metadata": None, } tool = { "name": "swh-deposit", "version": "0.0.1", "configuration": {"sword_version": "2"}, } fetcher = MetadataFetcher(name="swh-deposit", version="0.0.1",) authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url="https://hal-test.archives-ouvertes.fr/", ) # Check the origin metadata swh side origin_extrinsic_metadata = loader.storage.raw_extrinsic_metadata_get( Origin(url).swhid(), authority ) assert origin_extrinsic_metadata.next_page_token is None all_metadata_raw: List[str] = raw_meta["metadata_raw"] # 1 raw metadata xml + 1 json dict assert len(origin_extrinsic_metadata.results) == len(all_metadata_raw) + 1 origin_swhid = Origin(url).swhid() expected_metadata = [] for idx, raw_meta in enumerate(all_metadata_raw): origin_meta = origin_extrinsic_metadata.results[idx] expected_metadata.append( RawExtrinsicMetadata( target=origin_swhid, discovery_date=origin_meta.discovery_date, metadata=raw_meta.encode(), format="sword-v2-atom-codemeta-v2", authority=authority, fetcher=fetcher, ) ) origin_metadata = { "metadata": all_metadata_raw, "provider": provider, "tool": tool, } expected_metadata.append( RawExtrinsicMetadata( target=origin_swhid, discovery_date=origin_extrinsic_metadata.results[-1].discovery_date, metadata=json.dumps(origin_metadata).encode(), format="original-artifacts-json", authority=authority, fetcher=fetcher, ) ) assert sorted(origin_extrinsic_metadata.results) == sorted(expected_metadata) # Check the revision metadata swh side directory_swhid = ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory ) actual_directory_metadata = loader.storage.raw_extrinsic_metadata_get( directory_swhid, authority ) assert actual_directory_metadata.next_page_token is None assert len(actual_directory_metadata.results) == len(all_metadata_raw) revision_swhid = CoreSWHID( object_type=ObjectType.REVISION, object_id=hash_to_bytes(revision_id) ) dir_metadata_template = RawExtrinsicMetadata( target=directory_swhid, format="sword-v2-atom-codemeta-v2", authority=authority, fetcher=fetcher, origin=url, revision=revision_swhid, # to satisfy the constructor discovery_date=now(), metadata=b"", ) expected_directory_metadata = [] for idx, raw_meta in enumerate(all_metadata_raw): dir_metadata = actual_directory_metadata.results[idx] expected_directory_metadata.append( RawExtrinsicMetadata.from_dict( { **{ k: v for (k, v) in dir_metadata_template.to_dict().items() if k != "id" }, "discovery_date": dir_metadata.discovery_date, "metadata": raw_meta.encode(), } ) ) assert sorted(actual_directory_metadata.results) == sorted( expected_directory_metadata ) # Retrieve the information for deposit status update query to the deposit urls = [ m for m in requests_mock_datadir.request_history if m.url == f"{DEPOSIT_URL}/{deposit_id}/update/" ] assert len(urls) == 1 update_query = urls[0] body = update_query.json() expected_body = { "status": "done", "revision_id": revision_id, "directory_id": hash_to_hex(revision.directory), "snapshot_id": expected_snapshot_id, "origin_url": url, } assert body == expected_body def test_deposit_loading_ok_3(swh_storage, deposit_client, requests_mock_datadir): """Deposit loading can happen on tarball artifacts as well The latest deposit changes introduce the internal change. """ external_id = "hal-123456" url = f"https://hal-test.archives-ouvertes.fr/{external_id}" deposit_id = 888 loader = DepositLoader(swh_storage, url, deposit_id, deposit_client) actual_load_status = loader.load() expected_snapshot_id = "0ac7b54c042a026389f2087dc16f1d5c644ed0e4" assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id, } assert_last_visit_matches(loader.storage, url, status="full", type="deposit")