Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/requirements-swh.txt b/requirements-swh.txt
index 969da2e..4534fd9 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
-swh.core[db,http] >= 0.14.0
+swh.core[db,http] >= 0.14.4
swh.model >= 0.3
swh.objstorage >= 0.0.17
swh.scheduler >= 0.7.0
swh.storage >= 0.29.0
diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
index f88f49f..5473ff6 100644
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -1,149 +1,151 @@
# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import io
import logging
from typing import Optional
from psycopg2.extensions import QueryCanceledError
from swh.model import hashutil
+from swh.model.model import Sha1Git
+from swh.storage.interface import StorageInterface
MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB
DEFAULT_CONFIG_PATH = "vault/cooker"
DEFAULT_CONFIG = {
"max_bundle_size": ("int", MAX_BUNDLE_SIZE),
}
class PolicyError(Exception):
"""Raised when the bundle violates the cooking policy."""
pass
class BundleTooLargeError(PolicyError):
"""Raised when the bundle is too large to be cooked."""
pass
class BytesIOBundleSizeLimit(io.BytesIO):
def __init__(self, *args, size_limit=None, **kwargs):
super().__init__(*args, **kwargs)
self.size_limit = size_limit
def write(self, chunk):
if (
self.size_limit is not None
and self.getbuffer().nbytes + len(chunk) > self.size_limit
):
raise BundleTooLargeError(
"The requested bundle exceeds the maximum allowed "
"size of {} bytes.".format(self.size_limit)
)
return super().write(chunk)
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
This class describes a common API for the cookers.
To define a new cooker, inherit from this class and override:
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache
- def cook(): cook the object into a bundle
"""
CACHE_TYPE_KEY = None # type: Optional[str]
def __init__(
self,
- obj_type,
- obj_id,
+ obj_type: str,
+ obj_id: Sha1Git,
backend,
- storage,
+ storage: StorageInterface,
graph=None,
objstorage=None,
- max_bundle_size=MAX_BUNDLE_SIZE,
+ max_bundle_size: int = MAX_BUNDLE_SIZE,
):
"""Initialize the cooker.
The type of the object represented by the id depends on the
concrete class. Very likely, each type of bundle will have its
own cooker class.
Args:
obj_type: type of the object to be cooked into a bundle (directory,
revision_flat or revision_gitfast; see
swh.vault.cooker.COOKER_TYPES).
obj_id: id of the object to be cooked into a bundle.
backend: the vault backend (swh.vault.backend.VaultBackend).
"""
self.obj_type = obj_type
self.obj_id = hashutil.hash_to_bytes(obj_id)
self.backend = backend
self.storage = storage
self.objstorage = objstorage
self.graph = graph
self.max_bundle_size = max_bundle_size
@abc.abstractmethod
def check_exists(self):
"""Checks that the requested object exists and can be cooked.
Override this in the cooker implementation.
"""
raise NotImplementedError
@abc.abstractmethod
def prepare_bundle(self):
"""Implementation of the cooker. Yields chunks of the bundle bytes.
Override this with the cooker implementation.
"""
raise NotImplementedError
def cache_type_key(self) -> str:
assert self.CACHE_TYPE_KEY
return self.CACHE_TYPE_KEY
def write(self, chunk):
self.fileobj.write(chunk)
def cook(self):
"""Cook the requested object into a bundle
"""
self.backend.set_status(self.obj_type, self.obj_id, "pending")
self.backend.set_progress(self.obj_type, self.obj_id, "Processing...")
self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size)
try:
try:
self.prepare_bundle()
except QueryCanceledError:
raise PolicyError(
"Timeout reached while assembling the requested bundle"
)
bundle = self.fileobj.getvalue()
# TODO: use proper content streaming instead of put_bundle()
self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle)
except PolicyError as e:
self.backend.set_status(self.obj_type, self.obj_id, "failed")
self.backend.set_progress(self.obj_type, self.obj_id, str(e))
except Exception:
self.backend.set_status(self.obj_type, self.obj_id, "failed")
self.backend.set_progress(
self.obj_type,
self.obj_id,
"Internal Server Error. This incident will be reported.",
)
logging.exception("Bundle cooking failed.")
else:
self.backend.set_status(self.obj_type, self.obj_id, "done")
self.backend.set_progress(self.obj_type, self.obj_id, None)
finally:
self.backend.send_notif(self.obj_type, self.obj_id)
diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py
index d6f8ae7..379f55a 100644
--- a/swh/vault/cookers/git_bare.py
+++ b/swh/vault/cookers/git_bare.py
@@ -1,468 +1,501 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
This cooker creates tarballs containing a bare .git directory,
that can be unpacked and cloned like any git repository.
It works in three steps:
1. Write objects one by one in :file:`.git/objects/`
2. Calls ``git repack`` to pack all these objects into git packfiles.
3. Creates a tarball of the resulting repository
It keeps a set of all written (or about-to-be-written) object hashes in memory
to avoid downloading and writing the same objects twice.
"""
import datetime
+import logging
import os.path
import re
import subprocess
import tarfile
import tempfile
-from typing import Any, Dict, Iterable, List, Optional, Set
+from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple
import zlib
-from swh.core.api.classes import stream_results
+from swh.core.api.classes import stream_results_optional
from swh.model import identifiers
from swh.model.hashutil import hash_to_bytehex, hash_to_hex
from swh.model.model import (
+ Content,
+ DirectoryEntry,
ObjectType,
Person,
Revision,
RevisionType,
Sha1Git,
TargetType,
TimestampWithTimezone,
)
from swh.storage.algos.revisions_walker import DFSRevisionsWalker
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.vault.cookers.base import BaseVaultCooker
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE
RELEASE_BATCH_SIZE = 10000
REVISION_BATCH_SIZE = 10000
DIRECTORY_BATCH_SIZE = 10000
CONTENT_BATCH_SIZE = 100
+logger = logging.getLogger(__name__)
+
+
class GitBareCooker(BaseVaultCooker):
use_fsck = True
def cache_type_key(self) -> str:
return self.obj_type
def check_exists(self):
obj_type = self.obj_type.split("_")[0]
if obj_type == "revision":
return not list(self.storage.revision_missing([self.obj_id]))
elif obj_type == "directory":
return not list(self.storage.directory_missing([self.obj_id]))
if obj_type == "snapshot":
return not list(self.storage.snapshot_missing([self.obj_id]))
else:
raise NotImplementedError(f"GitBareCooker for {obj_type}")
def obj_swhid(self) -> identifiers.CoreSWHID:
obj_type = self.obj_type.split("_")[0]
return identifiers.CoreSWHID(
object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id,
)
def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None:
assert not isinstance(obj_ids, bytes)
revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen]
self._seen.update(revision_ids)
stack.extend(revision_ids)
def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]:
obj_ids = stack[-n:]
stack[-n:] = []
return obj_ids
def prepare_bundle(self):
# Objects we will visit soon:
self._rel_stack: List[Sha1Git] = []
self._rev_stack: List[Sha1Git] = []
self._dir_stack: List[Sha1Git] = []
self._cnt_stack: List[Sha1Git] = []
# Set of objects already in any of the stacks:
self._seen: Set[Sha1Git] = set()
self._walker_state: Optional[Any] = None
# Set of errors we expect git-fsck to raise at the end:
self._expected_fsck_errors = set()
with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir:
# Initialize a Git directory
self.workdir = workdir
self.gitdir = os.path.join(workdir, "clone.git")
os.mkdir(self.gitdir)
self.init_git()
# Add the root object to the stack of objects to visit
self.push_subgraph(self.obj_type.split("_")[0], self.obj_id)
# Load and write all the objects to disk
self.load_objects()
# Write the root object as a ref (this step is skipped if it's a snapshot)
# This must be done before repacking; git-repack ignores orphan objects.
self.write_refs()
self.repack()
self.write_archive()
def init_git(self) -> None:
subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True)
# Create all possible dirs ahead of time, so we don't have to check for
# existence every time.
for byte in range(256):
os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}"))
def repack(self) -> None:
if self.use_fsck:
self.git_fsck()
# Add objects we wrote in a pack
subprocess.run(["git", "-C", self.gitdir, "repack"], check=True)
# Remove their non-packed originals
subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True)
def git_fsck(self) -> None:
proc = subprocess.run(
["git", "-C", self.gitdir, "fsck"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env={"LANG": "C.utf8"},
)
# Split on newlines not followed by a space
errors = re.split("\n(?! )", proc.stdout.decode())
errors = [
error for error in errors if error and not error.startswith("warning ")
]
unexpected_errors = set(errors) - self._expected_fsck_errors
if unexpected_errors:
raise Exception(
"\n".join(
["Unexpected errors from git-fsck:"] + sorted(unexpected_errors)
)
)
def write_refs(self, snapshot=None):
refs: Dict[bytes, bytes] # ref name -> target
obj_type = self.obj_type.split("_")[0]
if obj_type == "directory":
# We need a synthetic revision pointing to the directory
author = Person.from_fullname(
b"swh-vault, git-bare cooker <robot@softwareheritage.org>"
)
dt = datetime.datetime.now(tz=datetime.timezone.utc)
dt = dt.replace(microsecond=0) # not supported by git
date = TimestampWithTimezone.from_datetime(dt)
revision = Revision(
author=author,
committer=author,
date=date,
committer_date=date,
message=b"Initial commit",
type=RevisionType.GIT,
directory=self.obj_id,
synthetic=True,
)
self.write_revision_node(revision.to_dict())
refs = {b"refs/heads/master": hash_to_bytehex(revision.id)}
elif obj_type == "revision":
refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)}
elif obj_type == "snapshot":
if snapshot is None:
# refs were already written in a previous step
return
refs = {
branch_name: (
b"ref: " + branch.target
if branch.target_type == TargetType.ALIAS
else hash_to_bytehex(branch.target)
)
for (branch_name, branch) in snapshot.branches.items()
}
else:
assert False, obj_type
for (ref_name, ref_target) in refs.items():
path = os.path.join(self.gitdir.encode(), ref_name)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as fd:
fd.write(ref_target)
def write_archive(self):
with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf:
tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True)
def _obj_path(self, obj_id: Sha1Git):
return os.path.join(self.gitdir, self._obj_relative_path(obj_id))
def _obj_relative_path(self, obj_id: Sha1Git):
obj_id_hex = hash_to_hex(obj_id)
directory = obj_id_hex[0:2]
filename = obj_id_hex[2:]
return os.path.join("objects", directory, filename)
def object_exists(self, obj_id: Sha1Git) -> bool:
return os.path.exists(self._obj_path(obj_id))
def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool:
"""Writes a git object on disk.
Returns whether it was already written."""
# Git requires objects to be zlib-compressed; but repacking decompresses and
# removes them, so we don't need to compress them too much.
data = zlib.compress(obj, level=1)
with open(self._obj_path(obj_id), "wb") as fd:
fd.write(data)
return True
def push_subgraph(self, obj_type, obj_id) -> None:
if obj_type == "revision":
self.push_revision_subgraph(obj_id)
elif obj_type == "directory":
self._push(self._dir_stack, [obj_id])
elif obj_type == "snapshot":
self.push_snapshot_subgraph(obj_id)
else:
raise NotImplementedError(
f"GitBareCooker.queue_subgraph({obj_type!r}, ...)"
)
def load_objects(self) -> None:
while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack:
release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE)
self.push_releases_subgraphs(release_ids)
revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE)
self.load_revisions(revision_ids)
directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE)
self.load_directories(directory_ids)
content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE)
self.load_contents(content_ids)
def push_revision_subgraph(self, obj_id: Sha1Git) -> None:
"""Fetches a revision and all its children, and writes them to disk"""
loaded_from_graph = False
if self.graph:
from swh.graph.client import GraphArgumentException
# First, try to cook using swh-graph, as it is more efficient than
# swh-storage for querying the history
obj_swhid = identifiers.CoreSWHID(
object_type=identifiers.ObjectType.REVISION, object_id=obj_id,
)
try:
revision_ids = (
swhid.object_id
for swhid in map(
identifiers.CoreSWHID.from_string,
self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"),
)
)
self._push(self._rev_stack, revision_ids)
except GraphArgumentException:
# Revision not found in the graph
pass
else:
loaded_from_graph = True
if not loaded_from_graph:
# If swh-graph is not available, or the revision is not yet in
# swh-graph, fall back to self.storage.revision_log.
# self.storage.revision_log also gives us the full revisions,
# so we load them right now instead of just pushing them on the stack.
walker = DFSRevisionsWalker(self.storage, obj_id, state=self._walker_state)
for revision in walker:
self.write_revision_node(revision)
self._push(self._dir_stack, [revision["directory"]])
# Save the state, so the next call to the walker won't return the same
# revisions
self._walker_state = walker.export_state()
def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None:
"""Fetches a snapshot and all its children, and writes them to disk"""
loaded_from_graph = False
if self.graph:
revision_ids = []
release_ids = []
from swh.graph.client import GraphArgumentException
# First, try to cook using swh-graph, as it is more efficient than
# swh-storage for querying the history
obj_swhid = identifiers.CoreSWHID(
object_type=identifiers.ObjectType.SNAPSHOT, object_id=obj_id,
)
try:
swhids = map(
identifiers.CoreSWHID.from_string,
self.graph.visit_nodes(str(obj_swhid), edges="snp:*,rel:*,rev:rev"),
)
for swhid in swhids:
if swhid.object_type == identifiers.ObjectType.REVISION:
revision_ids.append(swhid.object_id)
elif swhid.object_type == identifiers.ObjectType.RELEASE:
release_ids.append(swhid.object_id)
elif swhid.object_type == identifiers.ObjectType.SNAPSHOT:
assert (
swhid.object_id == obj_id
), f"Snapshot {obj_id.hex()} references a different snapshot"
else:
raise NotImplementedError(
f"{swhid.object_type} objects in snapshot subgraphs."
)
except GraphArgumentException:
# Revision not found in the graph
pass
else:
self._push(self._rev_stack, revision_ids)
self._push(self._rel_stack, release_ids)
loaded_from_graph = True
# TODO: when self.graph is available and supports edge labels, use it
# directly to get branch names.
snapshot = snapshot_get_all_branches(self.storage, obj_id)
assert snapshot, "Unknown snapshot" # should have been caught by check_exists()
for branch in snapshot.branches.values():
if not loaded_from_graph:
if branch.target_type == TargetType.REVISION:
self.push_revision_subgraph(branch.target)
elif branch.target_type == TargetType.RELEASE:
self.push_releases_subgraphs([branch.target])
elif branch.target_type == TargetType.ALIAS:
# Nothing to do, this for loop also iterates on the target branch
# (if it exists)
pass
else:
raise NotImplementedError(f"{branch.target_type} branches")
self.write_refs(snapshot=snapshot)
def load_revisions(self, obj_ids: List[Sha1Git]) -> None:
"""Given a list of revision ids, loads these revisions and their directories;
but not their parent revisions."""
- revisions = self.storage.revision_get(obj_ids)
+ ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids)
+
+ revisions: List[Revision] = list(filter(None, ret))
+ if len(ret) != len(revisions):
+ logger.error("Missing revision(s), ignoring them.")
+
for revision in revisions:
self.write_revision_node(revision.to_dict())
self._push(self._dir_stack, (rev.directory for rev in revisions))
def write_revision_node(self, revision: Dict[str, Any]) -> bool:
"""Writes a revision object to disk"""
git_object = identifiers.revision_git_object(revision)
return self.write_object(revision["id"], git_object)
def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None:
"""Given a list of release ids, loads these releases and adds their
target to the list of objects to visit"""
- releases = self.storage.release_get(obj_ids)
+ ret = self.storage.release_get(obj_ids)
+
+ releases = list(filter(None, ret))
+ if len(ret) != len(releases):
+ logger.error("Missing release(s), ignoring them.")
+
revision_ids: List[Sha1Git] = []
for release in releases:
self.write_release_node(release.to_dict())
if release.target_type == ObjectType.REVISION:
+ assert release.target, "{release.swhid(}) has no target"
self.push_revision_subgraph(release.target)
else:
raise NotImplementedError(f"{release.target_type} release targets")
self._push(self._rev_stack, revision_ids)
def write_release_node(self, release: Dict[str, Any]) -> bool:
"""Writes a release object to disk"""
git_object = identifiers.release_git_object(release)
return self.write_object(release["id"], git_object)
def load_directories(self, obj_ids: List[Sha1Git]) -> None:
for obj_id in obj_ids:
self.load_directory(obj_id)
def load_directory(self, obj_id: Sha1Git) -> None:
# Load the directory
- entries = [
- entry.to_dict()
- for entry in stream_results(self.storage.directory_get_entries, obj_id)
- ]
+ entries_it: Optional[Iterable[DirectoryEntry]] = stream_results_optional(
+ self.storage.directory_get_entries, obj_id
+ )
+
+ if entries_it is None:
+ logger.error("Missing swh:1:dir:%s, ignoring.", hash_to_hex(obj_id))
+ return
+
+ entries = [entry.to_dict() for entry in entries_it]
directory = {"id": obj_id, "entries": entries}
git_object = identifiers.directory_git_object(directory)
self.write_object(obj_id, git_object)
# Add children to the stack
entry_loaders: Dict[str, List[Sha1Git]] = {
"file": self._cnt_stack,
"dir": self._dir_stack,
"rev": self._rev_stack,
}
for entry in directory["entries"]:
stack = entry_loaders[entry["type"]]
self._push(stack, [entry["target"]])
def load_contents(self, obj_ids: List[Sha1Git]) -> None:
# TODO: add support of filtered objects, somehow?
# It's tricky, because, by definition, we can't write a git object with
# the expected hash, so git-fsck *will* choke on it.
contents = self.storage.content_get(obj_ids, "sha1_git")
visible_contents = []
for (obj_id, content) in zip(obj_ids, contents):
if content is None:
# FIXME: this may also happen for missing content
self.write_content(obj_id, SKIPPED_MESSAGE)
self._expect_mismatched_object_error(obj_id)
elif content.status == "visible":
visible_contents.append(content)
elif content.status == "hidden":
self.write_content(obj_id, HIDDEN_MESSAGE)
self._expect_mismatched_object_error(obj_id)
else:
assert False, (
f"unexpected status {content.status!r} "
f"for content {hash_to_hex(content.sha1_git)}"
)
+ contents_and_data: Iterator[Tuple[Content, Optional[bytes]]]
if self.objstorage is None:
- for content in visible_contents:
- data = self.storage.content_get_data(content.sha1)
- self.write_content(content.sha1_git, data)
+ contents_and_data = (
+ (content, self.storage.content_get_data(content.sha1))
+ for content in visible_contents
+ )
else:
- content_data = self.objstorage.get_batch(c.sha1 for c in visible_contents)
- for (content, data) in zip(contents, content_data):
- self.write_content(content.sha1_git, data)
+ contents_and_data = zip(
+ visible_contents,
+ self.objstorage.get_batch(c.sha1 for c in visible_contents),
+ )
+
+ for (content, datum) in contents_and_data:
+ if datum is None:
+ logger.error(
+ "{content.swhid()} is visible, but is missing data. Skipping."
+ )
+ continue
+ self.write_content(content.sha1_git, datum)
def write_content(self, obj_id: Sha1Git, content: bytes) -> None:
header = identifiers.git_object_header("blob", len(content))
self.write_object(obj_id, header + content)
def _expect_mismatched_object_error(self, obj_id):
obj_id_hex = hash_to_hex(obj_id)
obj_path = self._obj_relative_path(obj_id)
# For Git < 2.21:
self._expected_fsck_errors.add(
f"error: sha1 mismatch for ./{obj_path} (expected {obj_id_hex})"
)
# For Git >= 2.21:
self._expected_fsck_errors.add(
f"error: hash mismatch for ./{obj_path} (expected {obj_id_hex})"
)
self._expected_fsck_errors.add(
f"error: {obj_id_hex}: object corrupt or missing: ./{obj_path}"
)
self._expected_fsck_errors.add(f"missing blob {obj_id_hex}")

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 2:37 PM (4 d, 30 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293048

Event Timeline