diff --git a/requirements-swh.txt b/requirements-swh.txt index c1af7e51..a06e8fdd 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1 @@ -swh.core >= 0.0.75 +swh.core[http] >= 0.0.75 diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index e6444b0f..84d1715e 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,216 +1,192 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil import tempfile from contextlib import contextmanager from django.http import FileResponse from rest_framework import status from swh.core import tarball from swh.model import identifiers from swh.deposit.utils import normalize_date from . import DepositReadMixin, APIPrivateView from ...config import SWH_PERSON, ARCHIVE_TYPE from ..common import APIGet from ...models import Deposit @contextmanager def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ # rebuild one zip archive from (possibly) multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix="swh.deposit-", dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, "aggregate") os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = shutil.make_archive( aggregated_tarball_rootdir, "zip", aggregated_tarball_rootdir ) # can already clean up temporary directory shutil.rmtree(aggregated_tarball_rootdir) try: yield temp_tarpath finally: shutil.rmtree(dir_path) class APIReadArchives(APIPrivateView, APIGet, DepositReadMixin): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ - ADDITIONAL_CONFIG = { - "extraction_dir": ("str", "/tmp/swh-deposit/archive/"), - } - def __init__(self): super().__init__() self.extraction_dir = self.config["extraction_dir"] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def process_get(self, request, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: request (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = [ r.archive.path for r in self._deposit_requests(deposit_id, request_type=ARCHIVE_TYPE) ] with aggregate_tarballs(self.extraction_dir, archive_paths) as path: return FileResponse( open(path, "rb"), status=status.HTTP_200_OK, content_type="application/zip", ) class APIReadMetadata(APIPrivateView, APIGet, DepositReadMixin): """Class in charge of aggregating metadata on a deposit. """ - ADDITIONAL_CONFIG = { - "provider": ( - "dict", - { - # 'provider_name': '', # those are not set since read from the - # 'provider_url': '', # deposit's client - "provider_type": "deposit_client", - "metadata": {}, - }, - ), - "tool": ( - "dict", - { - "name": "swh-deposit", - "version": "0.0.1", - "configuration": {"sword_version": "2"}, - }, - ), - } - def __init__(self): super().__init__() self.provider = self.config["provider"] self.tool = self.config["tool"] def _normalize_dates(self, deposit, metadata): """Normalize the date to use as a tuple of author date, committer date from the incoming metadata. Args: deposit (Deposit): Deposit model representation metadata (Dict): Metadata dict representation Returns: Tuple of author date, committer date. Those dates are swh normalized. """ commit_date = metadata.get("codemeta:datePublished") author_date = metadata.get("codemeta:dateCreated") if author_date and commit_date: pass elif commit_date: author_date = commit_date elif author_date: commit_date = author_date else: author_date = deposit.complete_date commit_date = deposit.complete_date return (normalize_date(author_date), normalize_date(commit_date)) def metadata_read(self, deposit): """Read and aggregate multiple data on deposit into one unified data dictionary. Args: deposit (Deposit): Deposit concerned by the data aggregation. Returns: Dictionary of data representing the deposit to inject in swh. """ metadata = self._metadata_get(deposit) # Read information metadata data = {"origin": {"type": "deposit", "url": deposit.origin_url,}} # metadata provider self.provider["provider_name"] = deposit.client.last_name self.provider["provider_url"] = deposit.client.provider_url author_date, commit_date = self._normalize_dates(deposit, metadata) if deposit.parent: swh_persistent_id = deposit.parent.swh_id swhid = identifiers.parse_swhid(swh_persistent_id) parent_revision = swhid.object_id parents = [parent_revision] else: parents = [] data["origin_metadata"] = { "provider": self.provider, "tool": self.tool, "metadata": metadata, } data["deposit"] = { "id": deposit.id, "client": deposit.client.username, "collection": deposit.collection.name, "author": SWH_PERSON, "author_date": author_date, "committer": SWH_PERSON, "committer_date": commit_date, "revision_parents": parents, } return data def process_get(self, request, collection_name, deposit_id): deposit = Deposit.objects.get(pk=deposit_id) data = self.metadata_read(deposit) d = {} if data: d = json.dumps(data) return status.HTTP_200_OK, d, "application/json" diff --git a/swh/deposit/config.py b/swh/deposit/config.py index 11be8c8d..e658bef3 100644 --- a/swh/deposit/config.py +++ b/swh/deposit/config.py @@ -1,110 +1,101 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -import logging -from typing import Any, Dict, Tuple +from typing import Any, Dict -from swh.core.config import SWHConfig +from swh.core import config from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface + # IRIs (Internationalized Resource identifier) sword 2.0 specified EDIT_SE_IRI = "edit_se_iri" EM_IRI = "em_iri" CONT_FILE_IRI = "cont_file_iri" SD_IRI = "servicedocument" COL_IRI = "upload" STATE_IRI = "state_iri" PRIVATE_GET_RAW_CONTENT = "private-download" PRIVATE_CHECK_DEPOSIT = "check-deposit" PRIVATE_PUT_DEPOSIT = "private-update" PRIVATE_GET_DEPOSIT_METADATA = "private-read" PRIVATE_LIST_DEPOSITS = "private-deposit-list" ARCHIVE_KEY = "archive" METADATA_KEY = "metadata" RAW_METADATA_KEY = "raw-metadata" ARCHIVE_TYPE = "archive" METADATA_TYPE = "metadata" AUTHORIZED_PLATFORMS = ["development", "production", "testing"] DEPOSIT_STATUS_REJECTED = "rejected" DEPOSIT_STATUS_PARTIAL = "partial" DEPOSIT_STATUS_DEPOSITED = "deposited" DEPOSIT_STATUS_VERIFIED = "verified" DEPOSIT_STATUS_LOAD_SUCCESS = "done" DEPOSIT_STATUS_LOAD_FAILURE = "failed" # Revision author for deposit SWH_PERSON = { "name": "Software Heritage", "fullname": "Software Heritage", "email": "robot@softwareheritage.org", } +DEFAULT_CONFIG = { + "max_upload_size": 209715200, + "checks": True, +} + + def setup_django_for(platform=None, config_file=None): """Setup function for command line tools (swh.deposit.create_user) to initialize the needed db access. Note: Do not import any django related module prior to this function call. Otherwise, this will raise an django.core.exceptions.ImproperlyConfigured error message. Args: platform (str): the platform the scheduling is running config_file (str): Extra configuration file (typically for the production platform) Raises: ValueError in case of wrong platform inputs. """ if platform is not None: if platform not in AUTHORIZED_PLATFORMS: raise ValueError("Platform should be one of %s" % AUTHORIZED_PLATFORMS) if "DJANGO_SETTINGS_MODULE" not in os.environ: os.environ["DJANGO_SETTINGS_MODULE"] = "swh.deposit.settings.%s" % platform if config_file: os.environ.setdefault("SWH_CONFIG_FILENAME", config_file) import django django.setup() -class APIConfig(SWHConfig): - """Mixin intended to enrich views with SWH configuration. +class APIConfig: + """API Configuration centralized class. This loads explicitly the configuration file out + of the SWH_CONFIG_FILENAME environment variable. """ - CONFIG_BASE_FILENAME = "deposit/server" - - DEFAULT_CONFIG = { - "max_upload_size": ("int", 209715200), - "checks": ("bool", True), - "scheduler": ( - "dict", - {"cls": "remote", "args": {"url": "http://localhost:5008/"}}, - ), - } - - ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] - - def __init__(self, **config): - super().__init__() - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG] - ) - self.config.update(config) - self.log = logging.getLogger("swh.deposit") - if self.config.get("scheduler"): - self.scheduler = get_scheduler(**self.config["scheduler"]) + def __init__(self): + config_file = os.environ["SWH_CONFIG_FILENAME"] + conf = config.read_raw_config(config.config_basepath(config_file)) + self.config: Dict[str, Any] = config.merge_configs(DEFAULT_CONFIG, conf) + self.scheduler: SchedulerInterface = get_scheduler(**self.config["scheduler"]) diff --git a/swh/deposit/tests/api/test_deposit_schedule.py b/swh/deposit/tests/api/test_deposit_schedule.py index 8541420a..21091cb2 100644 --- a/swh/deposit/tests/api/test_deposit_schedule.py +++ b/swh/deposit/tests/api/test_deposit_schedule.py @@ -1,91 +1,85 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy import datetime + from io import BytesIO -from typing import Dict from django.urls import reverse import pytest from rest_framework import status from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, ) from swh.deposit.parsers import parse_xml -from ..conftest import TEST_CONFIG - - -TEST_CONFIG_WITH_CHECKS: Dict[str, object] = { - **TEST_CONFIG, - "checks": True, -} - - @pytest.fixture() -def deposit_config(): +def deposit_config(deposit_config): """Overrides the `deposit_config` fixture define in swh/deposit/tests/conftest.py to re-enable the checks.""" - return TEST_CONFIG_WITH_CHECKS + config_d = copy.deepcopy(deposit_config) + config_d["checks"] = True + return config_d def now() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc) def test_add_deposit_schedules_check( authenticated_client, deposit_collection, sample_archive, swh_scheduler ): """Posting deposit on collection creates a checker task """ external_id = "external-id-schedules-check" url = reverse(COL_IRI, args=[deposit_collection.name]) timestamp_before_call = now() response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"]), ) timestamp_after_call = now() assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) actual_state = response_content["deposit_status"] assert actual_state == DEPOSIT_STATUS_DEPOSITED deposit_id = response_content["deposit_id"] tasks = swh_scheduler.grab_ready_tasks("check-deposit") assert len(tasks) == 1 task = tasks[0] assert timestamp_before_call <= task.pop("next_run") <= timestamp_after_call assert task == { "arguments": { "args": [], "kwargs": {"collection": "test", "deposit_id": int(deposit_id),}, }, "current_interval": datetime.timedelta(days=1), "id": 1, "policy": "oneshot", "priority": None, "retries_left": 3, "status": "next_run_scheduled", "type": "check-deposit", } diff --git a/swh/deposit/tests/conftest.py b/swh/deposit/tests/conftest.py index 6d13f8d3..8599a4c1 100644 --- a/swh/deposit/tests/conftest.py +++ b/swh/deposit/tests/conftest.py @@ -1,427 +1,425 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import base64 import pytest import psycopg2 +import yaml + from django.urls import reverse from django.test.utils import setup_databases # type: ignore # mypy is asked to ignore the import statement above because setup_databases # is not part of the d.t.utils.__all__ variable. from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from rest_framework import status from rest_framework.test import APIClient from typing import Mapping from swh.scheduler import get_scheduler from swh.model.identifiers import DIRECTORY, swhid, REVISION, SNAPSHOT from swh.deposit.config import setup_django_for from swh.deposit.parsers import parse_xml -from swh.deposit.config import APIConfig from swh.deposit.config import ( COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_LOAD_FAILURE, ) from swh.deposit.tests.common import create_arborescence_archive TEST_USER = { "username": "test", "password": "password", "email": "test@example.org", "provider_url": "https://hal-test.archives-ouvertes.fr/", "domain": "archives-ouvertes.fr/", "collection": {"name": "test"}, } -TEST_CONFIG = { - "max_upload_size": 500, - "extraction_dir": "/tmp/swh-deposit/test/extraction-dir", - "checks": False, - "provider": { - "provider_name": "", - "provider_type": "deposit_client", - "provider_url": "", - "metadata": {}, - }, - "tool": { - "name": "swh-deposit", - "version": "0.0.1", - "configuration": {"sword_version": "2"}, - }, -} - - def pytest_configure(): setup_django_for("testing") @pytest.fixture() -def deposit_config(): - return TEST_CONFIG +def deposit_config(swh_scheduler_config): + return { + "max_upload_size": 500, + "extraction_dir": "/tmp/swh-deposit/test/extraction-dir", + "checks": False, + "provider": { + "provider_name": "", + "provider_type": "deposit_client", + "provider_url": "", + "metadata": {}, + }, + "tool": { + "name": "swh-deposit", + "version": "0.0.1", + "configuration": {"sword_version": "2"}, + }, + "scheduler": {"cls": "local", "args": swh_scheduler_config,}, + } -@pytest.fixture(autouse=True) -def deposit_autoconfig(monkeypatch, deposit_config, swh_scheduler_config): - """Enforce config for deposit classes inherited from APIConfig.""" +@pytest.fixture() +def deposit_config_path(tmp_path, monkeypatch, deposit_config): + conf_path = os.path.join(tmp_path, "deposit.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(deposit_config)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path - def mock_parse_config(*args, **kw): - config = deposit_config.copy() - config["scheduler"] = { - "cls": "local", - "args": swh_scheduler_config, - } - return config - monkeypatch.setattr(APIConfig, "parse_config_file", mock_parse_config) +@pytest.fixture(autouse=True) +def deposit_autoconfig(deposit_config_path, swh_scheduler_config): + """Enforce config for deposit classes inherited from APIConfig.""" scheduler = get_scheduler("local", swh_scheduler_config) task_type = { "type": "load-deposit", "backend_name": "swh.loader.packages.deposit.tasks.LoadDeposit", - "description": "why does this have not-null constraint?", + "description": "Load deposit task", } scheduler.create_task_type(task_type) @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", "tests"), ("USER", postgresql_proc.user), # noqa ("HOST", postgresql_proc.host), # noqa ("PORT", postgresql_proc.port), # noqa } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) def execute_sql(sql): """Execute sql to postgres db""" with psycopg2.connect(database="postgres") as conn: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() cur.execute(sql) @pytest.fixture(autouse=True, scope="session") def swh_proxy(): """Automatically inject this fixture in all tests to ensure no outside connection takes place. """ os.environ["http_proxy"] = "http://localhost:999" os.environ["https_proxy"] = "http://localhost:999" def create_deposit_collection(collection_name: str): """Create a deposit collection with name collection_name """ from swh.deposit.models import DepositCollection try: collection = DepositCollection._default_manager.get(name=collection_name) except DepositCollection.DoesNotExist: collection = DepositCollection(name=collection_name) collection.save() return collection def deposit_collection_factory(collection_name=TEST_USER["collection"]["name"]): @pytest.fixture def _deposit_collection(db, collection_name=collection_name): return create_deposit_collection(collection_name) return _deposit_collection deposit_collection = deposit_collection_factory() deposit_another_collection = deposit_collection_factory("another-collection") @pytest.fixture def deposit_user(db, deposit_collection): """Create/Return the test_user "test" """ from swh.deposit.models import DepositClient try: user = DepositClient._default_manager.get(username=TEST_USER["username"]) except DepositClient.DoesNotExist: user = DepositClient._default_manager.create_user( username=TEST_USER["username"], email=TEST_USER["email"], password=TEST_USER["password"], provider_url=TEST_USER["provider_url"], domain=TEST_USER["domain"], ) user.collections = [deposit_collection.id] user.save() return user @pytest.fixture def client(): """Override pytest-django one which does not work for djangorestframework. """ return APIClient() # <- drf's client @pytest.yield_fixture def authenticated_client(client, deposit_user): """Returned a logged client """ _token = "%s:%s" % (deposit_user.username, TEST_USER["password"]) token = base64.b64encode(_token.encode("utf-8")) authorization = "Basic %s" % token.decode("utf-8") client.credentials(HTTP_AUTHORIZATION=authorization) yield client client.logout() @pytest.fixture def sample_archive(tmp_path): """Returns a sample archive """ tmp_path = str(tmp_path) # pytest version limitation in previous version archive = create_arborescence_archive( tmp_path, "archive1", "file1", b"some content in file" ) return archive @pytest.fixture def atom_dataset(datadir) -> Mapping[str, str]: """Compute the paths to atom files. Returns: Dict of atom name per content (bytes) """ atom_path = os.path.join(datadir, "atom") data = {} for filename in os.listdir(atom_path): filepath = os.path.join(atom_path, filename) with open(filepath, "rb") as f: raw_content = f.read().decode("utf-8") # Keep the filename without extension atom_name = filename.split(".")[0] data[atom_name] = raw_content return data def create_deposit( authenticated_client, collection_name: str, sample_archive, external_id: str, deposit_status=DEPOSIT_STATUS_DEPOSITED, ): """Create a skeleton shell deposit """ url = reverse(COL_IRI, args=[collection_name]) # when response = authenticated_client.post( url, content_type="application/zip", # as zip data=sample_archive["data"], # + headers CONTENT_LENGTH=sample_archive["length"], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"]), ) # then assert response.status_code == status.HTTP_201_CREATED from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(external_id=external_id) if deposit.status != deposit_status: deposit.status = deposit_status deposit.save() assert deposit.status == deposit_status return deposit def create_binary_deposit( authenticated_client, collection_name: str, sample_archive, external_id: str, deposit_status: str = DEPOSIT_STATUS_DEPOSITED, atom_dataset: Mapping[str, bytes] = {}, ): """Create a deposit with both metadata and archive set. Then alters its status to `deposit_status`. """ deposit = create_deposit( authenticated_client, collection_name, sample_archive, external_id=external_id, deposit_status=DEPOSIT_STATUS_PARTIAL, ) response = authenticated_client.post( reverse(EDIT_SE_IRI, args=[collection_name, deposit.id]), content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % deposit.external_id.encode("utf-8"), HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED assert deposit.status == DEPOSIT_STATUS_PARTIAL from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit.id) if deposit.status != deposit_status: deposit.status = deposit_status deposit.save() assert deposit.status == deposit_status return deposit def deposit_factory(deposit_status=DEPOSIT_STATUS_DEPOSITED): """Build deposit with a specific status """ @pytest.fixture() def _deposit( sample_archive, deposit_collection, authenticated_client, deposit_status=deposit_status, ): external_id = "external-id-%s" % deposit_status return create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id=external_id, deposit_status=deposit_status, ) return _deposit deposited_deposit = deposit_factory() rejected_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_REJECTED) partial_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_PARTIAL) verified_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_VERIFIED) completed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS) failed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_FAILURE) @pytest.fixture def partial_deposit_with_metadata( sample_archive, deposit_collection, authenticated_client, atom_dataset ): """Returns deposit with archive and metadata provided, status 'partial' """ return create_binary_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id="external-id-partial", deposit_status=DEPOSIT_STATUS_PARTIAL, atom_dataset=atom_dataset, ) @pytest.fixture def partial_deposit_only_metadata( deposit_collection, authenticated_client, atom_dataset ): response = authenticated_client.post( reverse(COL_IRI, args=[deposit_collection.name]), content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG="external-id-partial", HTTP_IN_PROGRESS=True, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_id = response_content["deposit_id"] from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_PARTIAL return deposit @pytest.fixture def complete_deposit(sample_archive, deposit_collection, authenticated_client): """Returns a completed deposit (load success) """ deposit = create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id="external-id-complete", deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS, ) origin = "https://hal.archives-ouvertes.fr/hal-01727745" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10" snapshot_id = "e5e82d064a9c3df7464223042e0c55d72ccff7f0" deposit.swh_id = swhid(DIRECTORY, directory_id) deposit.swh_id_context = swhid( DIRECTORY, directory_id, metadata={ "origin": origin, "visit": swhid(SNAPSHOT, snapshot_id), "anchor": swhid(REVISION, revision_id), "path": "/", }, ) deposit.save() return deposit @pytest.fixture() def tmp_path(tmp_path): return str(tmp_path) # issue with oldstable's pytest version