Page MenuHomeSoftware Heritage

tasks.py
No OneTemporary

tasks.py

# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict
from celery import shared_task
from swh.loader.core.utils import parse_visit_date
from swh.loader.git.from_disk import GitLoaderFromArchive, GitLoaderFromDisk
from swh.loader.git.loader import GitLoader
def _process_kwargs(kwargs):
if "visit_date" in kwargs:
kwargs["visit_date"] = parse_visit_date(kwargs["visit_date"])
return kwargs
@shared_task(name=__name__ + ".UpdateGitRepository")
def load_git(**kwargs) -> Dict[str, Any]:
"""Import a git repository from a remote location"""
loader = GitLoader.from_configfile(**_process_kwargs(kwargs))
return loader.load()
@shared_task(name=__name__ + ".LoadDiskGitRepository")
def load_git_from_dir(**kwargs) -> Dict[str, Any]:
"""Import a git repository from a local repository"""
loader = GitLoaderFromDisk.from_configfile(**_process_kwargs(kwargs))
return loader.load()
@shared_task(name=__name__ + ".UncompressAndLoadDiskGitRepository")
def load_git_from_zip(**kwargs) -> Dict[str, Any]:
"""Import a git repository from a zip archive
1. Uncompress an archive repository in a local and temporary folder
2. Load it through the git disk loader
3. Clean up the temporary folder
"""
loader = GitLoaderFromArchive.from_configfile(**_process_kwargs(kwargs))
return loader.load()

File Metadata

Mime Type
text/x-python
Expires
Wed, Jun 4, 7:24 PM (5 d, 19 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3239294

Event Timeline