Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/README.md b/README.md
index 051f188..1fad711 100644
--- a/README.md
+++ b/README.md
@@ -1,80 +1,81 @@
swh-loader-git
==============
The Software Heritage Git Loader is a tool and a library to walk a local
Git repository and inject into the SWH dataset all contained files that
weren't known before.
-The main entry points are
+The main entry points are:
-- :class:`swh.loader.git.loader.GitLoader` for the main loader which ingests a remote git
- repository's contents.
+- :class:`swh.loader.git.loader.GitLoader` for the main loader which can ingest either
+ local or remote git repository's contents. This is the main implementation deployed in
+ production.
-- :class:`swh.loader.git.from_disk.GitLoaderFromDisk` which ingests a local git clone
+- :class:`swh.loader.git.from_disk.GitLoaderFromDisk` which ingests only local git clone
repository.
- :class:`swh.loader.git.loader.GitLoaderFromArchive` which ingests a git repository
wrapped in an archive.
License
-------
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
See top-level LICENSE file for the full text of the GNU General Public
License along with this program.
Dependencies
------------
### Runtime
- python3
- python3-dulwich
- python3-retrying
- python3-swh.core
- python3-swh.model
- python3-swh.storage
- python3-swh.scheduler
### Test
- python3-nose
Requirements
------------
- implementation language, Python3
- coding guidelines: conform to PEP8
- Git access: via dulwich
CLI Run
----------
You can run the loader from a remote origin (*loader*) or from an origin on disk
(*from_disk*) directly by calling:
```
swh loader -C <config-file> run git <git-repository-url>
```
or "git_disk".
## Configuration sample
/tmp/git.yml:
```
storage:
cls: remote
args:
url: http://localhost:5002/
```
diff --git a/requirements-test.txt b/requirements-test.txt
index f9a850e..78f8c9b 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,6 +1,7 @@
pytest
pytest-mock
swh.scheduler[testing] >= 0.5.0
swh.storage[testing]
types-click
+types-Deprecated
types-python-dateutil
diff --git a/swh/loader/git/from_disk.py b/swh/loader/git/from_disk.py
index fb64803..41fed95 100644
--- a/swh/loader/git/from_disk.py
+++ b/swh/loader/git/from_disk.py
@@ -1,450 +1,452 @@
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from datetime import datetime
import logging
import os
import shutil
from typing import Dict, Optional
from dulwich.errors import ObjectFormatException
try:
from dulwich.errors import EmptyFileException # type: ignore
except ImportError:
# dulwich >= 0.20
from dulwich.objects import EmptyFileException
+from deprecated import deprecated
import dulwich.objects
import dulwich.repo
from swh.loader.core.loader import DVCSLoader
from swh.model import hashutil
from swh.model.model import Origin, Snapshot, SnapshotBranch, TargetType
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.interface import StorageInterface
from . import converters, utils
logger = logging.getLogger(__name__)
def _check_tag(tag):
"""Copy-paste of dulwich.objects.Tag, minus the tagger and time checks,
which are too strict and error on old tags."""
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
dulwich.objects.ShaFile.check(tag)
tag._check_has_member("_object_sha", "missing object sha")
tag._check_has_member("_object_class", "missing object type")
tag._check_has_member("_name", "missing tag name")
if not tag._name:
raise ObjectFormatException("empty tag name")
dulwich.objects.check_hexsha(tag._object_sha, "invalid object sha")
if tag._tag_time is not None:
dulwich.objects.check_time(tag._tag_time)
from dulwich.objects import (
_OBJECT_HEADER,
_TAG_HEADER,
_TAGGER_HEADER,
_TYPE_HEADER,
)
last = None
for field, _ in dulwich.objects._parse_message(tag._chunked_text):
if field == _OBJECT_HEADER and last is not None:
raise ObjectFormatException("unexpected object")
elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
raise ObjectFormatException("unexpected type")
elif field == _TAG_HEADER and last != _TYPE_HEADER:
raise ObjectFormatException("unexpected tag name")
elif field == _TAGGER_HEADER and last != _TAG_HEADER:
raise ObjectFormatException("unexpected tagger")
last = field
+@deprecated(version="1.1", reason="Use `swh.loader.git.loader.GitLoader` instead")
class GitLoaderFromDisk(DVCSLoader):
"""Load a git repository from a directory.
"""
visit_type = "git"
def __init__(
self,
storage: StorageInterface,
url: str,
visit_date: Optional[datetime] = None,
directory: Optional[str] = None,
save_data_path: Optional[str] = None,
max_content_size: Optional[int] = None,
):
super().__init__(
storage=storage,
save_data_path=save_data_path,
max_content_size=max_content_size,
)
self.origin_url = url
self.visit_date = visit_date
self.directory = directory
def prepare_origin_visit(self):
self.origin = Origin(url=self.origin_url)
def prepare(self):
self.repo = dulwich.repo.Repo(self.directory)
def iter_objects(self):
object_store = self.repo.object_store
for pack in object_store.packs:
objs = list(pack.index.iterentries())
objs.sort(key=lambda x: x[1])
for sha, offset, crc32 in objs:
yield hashutil.hash_to_bytehex(sha)
yield from object_store._iter_loose_objects()
yield from object_store._iter_alternate_objects()
def _check(self, obj):
"""Check the object's repository representation.
If any errors in check exists, an ObjectFormatException is
raised.
Args:
obj (object): Dulwich object read from the repository.
"""
if isinstance(obj, dulwich.objects.Tag):
_check_tag(obj)
else:
obj.check()
try:
# For additional checks on dulwich objects with date
# for now, only checks on *time
if isinstance(obj, dulwich.objects.Commit):
commit_time = obj._commit_time
utils.check_date_time(commit_time)
author_time = obj._author_time
utils.check_date_time(author_time)
elif isinstance(obj, dulwich.objects.Tag):
tag_time = obj._tag_time
if tag_time:
utils.check_date_time(tag_time)
except Exception as e:
raise ObjectFormatException(e)
def get_object(self, oid):
"""Given an object id, return the object if it is found and not
malformed in some way.
Args:
oid (bytes): the object's identifier
Returns:
The object if found without malformation
"""
try:
# some errors are raised when reading the object
obj = self.repo[oid]
# some we need to check ourselves
self._check(obj)
except KeyError:
_id = oid.decode("utf-8")
logger.warn(
"object %s not found, skipping" % _id,
extra={
"swh_type": "swh_loader_git_missing_object",
"swh_object_id": _id,
"origin_url": self.origin.url,
},
)
return None
except ObjectFormatException as e:
id_ = oid.decode("utf-8")
logger.warn(
"object %s malformed (%s), skipping",
id_,
e.args[0],
extra={
"swh_type": "swh_loader_git_missing_object",
"swh_object_id": id_,
"origin_url": self.origin.url,
},
)
return None
except EmptyFileException:
id_ = oid.decode("utf-8")
logger.warn(
"object %s corrupted (empty file), skipping",
id_,
extra={
"swh_type": "swh_loader_git_missing_object",
"swh_object_id": id_,
"origin_url": self.origin.url,
},
)
else:
return obj
def fetch_data(self):
"""Fetch the data from the data source"""
visit_status = origin_get_latest_visit_status(
self.storage, self.origin_url, require_snapshot=True
)
self.previous_snapshot_id = (
None if visit_status is None else visit_status.snapshot
)
type_to_ids = defaultdict(list)
for oid in self.iter_objects():
obj = self.get_object(oid)
if obj is None:
continue
type_name = obj.type_name
type_to_ids[type_name].append(oid)
self.type_to_ids = type_to_ids
def has_contents(self):
"""Checks whether we need to load contents"""
return bool(self.type_to_ids[b"blob"])
def get_content_ids(self):
"""Get the content identifiers from the git repository"""
for oid in self.type_to_ids[b"blob"]:
yield converters.dulwich_blob_to_content_id(self.repo[oid])
def get_contents(self):
"""Get the contents that need to be loaded"""
missing_contents = set(
self.storage.content_missing(self.get_content_ids(), "sha1_git")
)
for oid in missing_contents:
yield converters.dulwich_blob_to_content(
self.repo[hashutil.hash_to_bytehex(oid)]
)
def has_directories(self):
"""Checks whether we need to load directories"""
return bool(self.type_to_ids[b"tree"])
def get_directory_ids(self):
"""Get the directory identifiers from the git repository"""
return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tree"])
def get_directories(self):
"""Get the directories that need to be loaded"""
missing_dirs = set(
self.storage.directory_missing(sorted(self.get_directory_ids()))
)
for oid in missing_dirs:
yield converters.dulwich_tree_to_directory(
self.repo[hashutil.hash_to_bytehex(oid)],
)
def has_revisions(self):
"""Checks whether we need to load revisions"""
return bool(self.type_to_ids[b"commit"])
def get_revision_ids(self):
"""Get the revision identifiers from the git repository"""
return (
hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"commit"]
)
def get_revisions(self):
"""Get the revisions that need to be loaded"""
missing_revs = set(
self.storage.revision_missing(sorted(self.get_revision_ids()))
)
for oid in missing_revs:
yield converters.dulwich_commit_to_revision(
self.repo[hashutil.hash_to_bytehex(oid)],
)
def has_releases(self):
"""Checks whether we need to load releases"""
return bool(self.type_to_ids[b"tag"])
def get_release_ids(self):
"""Get the release identifiers from the git repository"""
return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tag"])
def get_releases(self):
"""Get the releases that need to be loaded"""
missing_rels = set(self.storage.release_missing(sorted(self.get_release_ids())))
for oid in missing_rels:
yield converters.dulwich_tag_to_release(
self.repo[hashutil.hash_to_bytehex(oid)],
)
def get_snapshot(self):
"""Turn the list of branches into a snapshot to load"""
branches: Dict[bytes, Optional[SnapshotBranch]] = {}
for ref, target in self.repo.refs.as_dict().items():
if utils.ignore_branch_name(ref):
continue
obj = self.get_object(target)
if obj:
target_type = converters.DULWICH_TARGET_TYPES[obj.type_name]
branches[ref] = SnapshotBranch(
target=hashutil.bytehex_to_hash(target), target_type=target_type,
)
else:
branches[ref] = None
dangling_branches = {}
for ref, target in self.repo.refs.get_symrefs().items():
if utils.ignore_branch_name(ref):
continue
branches[ref] = SnapshotBranch(target=target, target_type=TargetType.ALIAS)
if target not in branches:
# This handles the case where the pointer is "dangling".
# There's a chance that a further symbolic reference will
# override this default value, which is totally fine.
dangling_branches[target] = ref
branches[target] = None
utils.warn_dangling_branches(
branches, dangling_branches, logger, self.origin_url
)
self.snapshot = Snapshot(branches=branches)
return self.snapshot
def save_data(self):
"""We already have the data locally, no need to save it"""
pass
def load_status(self):
"""The load was eventful if the current occurrences are different to
the ones we retrieved at the beginning of the run"""
eventful = False
if self.previous_snapshot_id:
eventful = self.snapshot.id != self.previous_snapshot_id
else:
eventful = bool(self.snapshot.branches)
return {"status": ("eventful" if eventful else "uneventful")}
class GitLoaderFromArchive(GitLoaderFromDisk):
"""Load a git repository from an archive.
This loader ingests a git repository compressed into an archive.
The supported archive formats are ``.zip`` and ``.tar.gz``.
From an input tarball named ``my-git-repo.zip``, the following layout is
expected in it::
my-git-repo/
├── .git
│ ├── branches
│ ├── COMMIT_EDITMSG
│ ├── config
│ ├── description
│ ├── HEAD
...
Nevertheless, the loader is able to ingest tarballs with the following
layouts too::
.
├── .git
│ ├── branches
│ ├── COMMIT_EDITMSG
│ ├── config
│ ├── description
│ ├── HEAD
...
or::
other-repo-name/
├── .git
│ ├── branches
│ ├── COMMIT_EDITMSG
│ ├── config
│ ├── description
│ ├── HEAD
...
"""
def __init__(self, *args, archive_path, **kwargs):
super().__init__(*args, **kwargs)
self.temp_dir = self.repo_path = None
self.archive_path = archive_path
def project_name_from_archive(self, archive_path):
"""Compute the project name from the archive's path.
"""
archive_name = os.path.basename(archive_path)
for ext in (".zip", ".tar.gz", ".tgz"):
if archive_name.lower().endswith(ext):
archive_name = archive_name[: -len(ext)]
break
return archive_name
def prepare(self):
"""1. Uncompress the archive in temporary location.
2. Prepare as the GitLoaderFromDisk does
3. Load as GitLoaderFromDisk does
"""
project_name = self.project_name_from_archive(self.archive_path)
self.temp_dir, self.repo_path = utils.init_git_repo_from_archive(
project_name, self.archive_path
)
logger.info(
"Project %s - Uncompressing archive %s at %s",
self.origin_url,
os.path.basename(self.archive_path),
self.repo_path,
)
self.directory = self.repo_path
super().prepare()
def cleanup(self):
"""Cleanup the temporary location (if it exists).
"""
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
logger.info(
"Project %s - Done injecting %s" % (self.origin_url, self.repo_path)
)
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
index dfcc00d..929477a 100644
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -1,517 +1,520 @@
# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
import datetime
import logging
import os
import pickle
import sys
from tempfile import SpooledTemporaryFile
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type
import dulwich.client
from dulwich.errors import GitProtocolError, NotGitRepository
from dulwich.object_store import ObjectStoreGraphWalker
from dulwich.objects import ShaFile
from dulwich.pack import PackData, PackInflater
from swh.loader.core.loader import DVCSLoader
from swh.loader.exception import NotFound
from swh.model import hashutil
from swh.model.model import (
BaseContent,
Directory,
Origin,
Release,
Revision,
Snapshot,
SnapshotBranch,
TargetType,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface
from . import converters, dumb, utils
from .utils import HexBytes
logger = logging.getLogger(__name__)
class RepoRepresentation:
"""Repository representation for a Software Heritage origin."""
def __init__(
self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False
):
self.storage = storage
self.ignore_history = ignore_history
if base_snapshot and not ignore_history:
self.base_snapshot: Snapshot = base_snapshot
else:
self.base_snapshot = Snapshot(branches={})
self.heads: Set[HexBytes] = set()
def get_parents(self, commit: bytes) -> List[bytes]:
"""This method should return the list of known parents"""
return []
def graph_walker(self) -> ObjectStoreGraphWalker:
return ObjectStoreGraphWalker(self.heads, self.get_parents)
def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
"""Get the list of bytehex sha1s that the git loader should fetch.
This compares the remote refs sent by the server with the base snapshot
provided by the loader.
"""
if not refs:
return []
# Cache existing heads
local_heads: Set[HexBytes] = set()
for branch_name, branch in self.base_snapshot.branches.items():
if not branch or branch.target_type == TargetType.ALIAS:
continue
local_heads.add(hashutil.hash_to_hex(branch.target).encode())
self.heads = local_heads
# Get the remote heads that we want to fetch
remote_heads: Set[HexBytes] = set()
for ref_name, ref_target in refs.items():
if utils.ignore_branch_name(ref_name):
continue
remote_heads.add(ref_target)
logger.debug("local_heads_count=%s", len(local_heads))
logger.debug("remote_heads_count=%s", len(remote_heads))
wanted_refs = list(remote_heads - local_heads)
logger.debug("wanted_refs_count=%s", len(wanted_refs))
return wanted_refs
@dataclass
class FetchPackReturn:
remote_refs: Dict[bytes, HexBytes]
symbolic_refs: Dict[bytes, HexBytes]
pack_buffer: SpooledTemporaryFile
pack_size: int
class GitLoader(DVCSLoader):
"""A bulk loader for a git repository"""
visit_type = "git"
def __init__(
self,
storage: StorageInterface,
url: str,
base_url: Optional[str] = None,
ignore_history: bool = False,
repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
save_data_path: Optional[str] = None,
max_content_size: Optional[int] = None,
):
"""Initialize the bulk updater.
Args:
repo_representation: swh's repository representation
which is in charge of filtering between known and remote
data.
"""
super().__init__(
storage=storage,
save_data_path=save_data_path,
max_content_size=max_content_size,
)
self.origin_url = url
self.base_url = base_url
self.ignore_history = ignore_history
self.repo_representation = repo_representation
self.pack_size_bytes = pack_size_bytes
self.temp_file_cutoff = temp_file_cutoff
# state initialized in fetch_data
self.remote_refs: Dict[bytes, HexBytes] = {}
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
def fetch_pack_from_origin(
self,
origin_url: str,
base_repo: RepoRepresentation,
do_activity: Callable[[bytes], None],
) -> FetchPackReturn:
"""Fetch a pack from the origin"""
pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
# Hardcode the use of the tcp transport (for GitHub origins)
# Even if the Dulwich API lets us process the packfile in chunks as it's
# received, the HTTP transport implementation needs to entirely allocate
# the packfile in memory *twice*, once in the HTTP library, and once in
# a BytesIO managed by Dulwich, before passing chunks to the `do_pack`
# method Overall this triples the memory usage before we can even try to
# interrupt the loader before it overruns its memory limit.
# In contrast, the Dulwich TCP transport just gives us the read handle
# on the underlying socket, doing no processing or copying of the bytes.
# We can interrupt it as soon as we've received too many bytes.
transport_url = origin_url
if transport_url.startswith("https://github.com/"):
transport_url = "git" + transport_url[5:]
logger.debug("Transport url to communicate with server: %s", transport_url)
client, path = dulwich.client.get_transport_and_path(
transport_url, thin_packs=False
)
logger.debug("Client %s to fetch pack at %s", client, path)
size_limit = self.pack_size_bytes
def do_pack(data: bytes) -> None:
cur_size = pack_buffer.tell()
would_write = len(data)
if cur_size + would_write > size_limit:
raise IOError(
f"Pack file too big for repository {origin_url}, "
f"limit is {size_limit} bytes, current size is {cur_size}, "
f"would write {would_write}"
)
pack_buffer.write(data)
pack_result = client.fetch_pack(
path,
base_repo.determine_wants,
base_repo.graph_walker(),
do_pack,
progress=do_activity,
)
remote_refs = pack_result.refs or {}
symbolic_refs = pack_result.symrefs or {}
pack_buffer.flush()
pack_size = pack_buffer.tell()
pack_buffer.seek(0)
logger.debug("fetched_pack_size=%s", pack_size)
# check if repository only supports git dumb transfer protocol,
# fetched pack file will be empty in that case as dulwich do
# not support it and do not fetch any refs
self.dumb = transport_url.startswith("http") and client.dumb
return FetchPackReturn(
remote_refs=utils.filter_refs(remote_refs),
symbolic_refs=utils.filter_refs(symbolic_refs),
pack_buffer=pack_buffer,
pack_size=pack_size,
)
def prepare_origin_visit(self) -> None:
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
self.origin = Origin(url=self.origin_url)
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]:
return snapshot_get_latest(self.storage, origin_url)
def prepare(self) -> None:
assert self.origin is not None
prev_snapshot: Optional[Snapshot] = None
if not self.ignore_history:
prev_snapshot = self.get_full_snapshot(self.origin.url)
if self.base_url and prev_snapshot is None:
base_origin = list(self.storage.origin_get([self.base_url]))[0]
if base_origin:
prev_snapshot = self.get_full_snapshot(base_origin.url)
if prev_snapshot is not None:
self.base_snapshot = prev_snapshot
else:
self.base_snapshot = Snapshot(branches={})
def fetch_data(self) -> bool:
assert self.origin is not None
base_repo = self.repo_representation(
storage=self.storage,
base_snapshot=self.base_snapshot,
ignore_history=self.ignore_history,
)
def do_progress(msg: bytes) -> None:
sys.stderr.buffer.write(msg)
sys.stderr.flush()
try:
fetch_info = self.fetch_pack_from_origin(
self.origin.url, base_repo, do_progress
)
except NotGitRepository as e:
raise NotFound(e)
except GitProtocolError as e:
# unfortunately, that kind of error is not specific to a not found
# scenario... It depends on the value of message within the exception.
for msg in [
"Repository unavailable", # e.g DMCA takedown
"Repository not found",
"unexpected http resp 401",
]:
if msg in e.args[0]:
raise NotFound(e)
# otherwise transmit the error
raise
except (AttributeError, NotImplementedError, ValueError):
# with old dulwich versions, those exceptions types can be raised
# by the fetch_pack operation when encountering a repository with
# dumb transfer protocol so we check if the repository supports it
# here to continue the loading if it is the case
self.dumb = dumb.check_protocol(self.origin_url)
if not self.dumb:
raise
logger.debug(
"Protocol used for communication: %s", "dumb" if self.dumb else "smart"
)
if self.dumb:
self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin_url, base_repo)
self.dumb_fetcher.fetch_object_ids()
self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) # type: ignore
self.symbolic_refs = self.dumb_fetcher.head
else:
self.pack_buffer = fetch_info.pack_buffer
self.pack_size = fetch_info.pack_size
self.remote_refs = fetch_info.remote_refs
self.symbolic_refs = fetch_info.symbolic_refs
self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()}
logger.info(
"Listed %d refs for repo %s",
len(self.remote_refs),
self.origin.url,
extra={
"swh_type": "git_repo_list_refs",
"swh_repo": self.origin.url,
"swh_num_refs": len(self.remote_refs),
},
)
# No more data to fetch
return False
def save_data(self) -> None:
"""Store a pack for archival"""
assert isinstance(self.visit_date, datetime.datetime)
write_size = 8192
pack_dir = self.get_save_data_path()
pack_name = "%s.pack" % self.visit_date.isoformat()
refs_name = "%s.refs" % self.visit_date.isoformat()
with open(os.path.join(pack_dir, pack_name), "xb") as f:
self.pack_buffer.seek(0)
while True:
r = self.pack_buffer.read(write_size)
if not r:
break
f.write(r)
self.pack_buffer.seek(0)
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
yield from self.dumb_fetcher.iter_objects(object_type)
else:
self.pack_buffer.seek(0)
count = 0
for obj in PackInflater.for_pack_data(
PackData.from_file(self.pack_buffer, self.pack_size)
):
if obj.type_name != object_type:
continue
yield obj
count += 1
logger.debug("packfile_read_count_%s=%s", object_type.decode(), count)
def get_contents(self) -> Iterable[BaseContent]:
"""Format the blobs from the git repository as swh contents"""
for raw_obj in self.iter_objects(b"blob"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.CONTENT
yield converters.dulwich_blob_to_content(
raw_obj, max_content_size=self.max_content_size
)
def get_directories(self) -> Iterable[Directory]:
"""Format the trees as swh directories"""
for raw_obj in self.iter_objects(b"tree"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.DIRECTORY
yield converters.dulwich_tree_to_directory(raw_obj)
def get_revisions(self) -> Iterable[Revision]:
"""Format commits as swh revisions"""
for raw_obj in self.iter_objects(b"commit"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.REVISION
yield converters.dulwich_commit_to_revision(raw_obj)
def get_releases(self) -> Iterable[Release]:
"""Retrieve all the release objects from the git repository"""
for raw_obj in self.iter_objects(b"tag"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.RELEASE
yield converters.dulwich_tag_to_release(raw_obj)
def get_snapshot(self) -> Snapshot:
"""Get the snapshot for the current visit.
The main complexity of this function is mapping target objects to their
types, as the `refs` dictionaries returned by the git server only give
us the identifiers for the target objects, and not their types.
The loader itself only knows the types of the objects that it has
fetched from the server (as it has parsed them while loading them to
the archive). As we only fetched an increment between the previous
snapshot and the current state of the server, we are missing the type
information for the objects that would already have been referenced by
the previous snapshot, and that the git server didn't send us. We infer
the type of these objects from the previous snapshot.
"""
branches: Dict[bytes, Optional[SnapshotBranch]] = {}
unfetched_refs: Dict[bytes, bytes] = {}
# Retrieve types from the objects loaded by the current loader
for ref_name, ref_object in self.remote_refs.items():
if ref_name in self.symbolic_refs:
continue
target = hashutil.hash_to_bytes(ref_object.decode())
target_type = self.ref_object_types.get(ref_object)
if target_type:
branches[ref_name] = SnapshotBranch(
target=target, target_type=target_type
)
else:
# The object pointed at by this ref was not fetched, supposedly
# because it existed in the base snapshot. We record it here,
# and we can get it from the base snapshot later.
unfetched_refs[ref_name] = target
dangling_branches = {}
# Handle symbolic references as alias branches
for ref_name, target in self.symbolic_refs.items():
branches[ref_name] = SnapshotBranch(
target_type=TargetType.ALIAS, target=target,
)
if target not in branches and target not in unfetched_refs:
# This handles the case where the pointer is "dangling".
# There's a chance that a further symbolic reference
# override this default value, which is totally fine.
dangling_branches[target] = ref_name
branches[target] = None
if unfetched_refs:
# Handle inference of object types from the contents of the
# previous snapshot
unknown_objects = {}
base_snapshot_reverse_branches = {
branch.target: branch
for branch in self.base_snapshot.branches.values()
if branch and branch.target_type != TargetType.ALIAS
}
for ref_name, target in unfetched_refs.items():
branch = base_snapshot_reverse_branches.get(target)
branches[ref_name] = branch
if not branch:
unknown_objects[ref_name] = target
if unknown_objects:
# This object was referenced by the server; We did not fetch
# it, and we do not know it from the previous snapshot. This is
# likely a bug in the loader.
raise RuntimeError(
"Unknown objects referenced by remote refs: %s"
% (
", ".join(
f"{name.decode()}: {hashutil.hash_to_hex(obj)}"
for name, obj in unknown_objects.items()
)
)
)
utils.warn_dangling_branches(
branches, dangling_branches, logger, self.origin_url
)
self.snapshot = Snapshot(branches=branches)
return self.snapshot
def load_status(self) -> Dict[str, Any]:
"""The load was eventful if the current snapshot is different to
the one we retrieved at the beginning of the run"""
eventful = False
if self.base_snapshot and self.snapshot:
eventful = self.snapshot.id != self.base_snapshot.id
elif self.snapshot:
eventful = bool(self.snapshot.branches)
return {"status": ("eventful" if eventful else "uneventful")}
if __name__ == "__main__":
import click
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s"
)
+ from deprecated import deprecated
+
+ @deprecated(version="1.1", reason="Use `swh loader run git --help` instead")
@click.command()
@click.option("--origin-url", help="Origin url", required=True)
@click.option("--base-url", default=None, help="Optional Base url")
@click.option(
"--ignore-history/--no-ignore-history",
help="Ignore the repository history",
default=False,
)
def main(origin_url: str, base_url: str, ignore_history: bool) -> Dict[str, Any]:
from swh.storage import get_storage
storage = get_storage(cls="memory")
loader = GitLoader(
storage, origin_url, base_url=base_url, ignore_history=ignore_history,
)
return loader.load()
main()

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 7:52 PM (3 w, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3262802

Event Timeline