Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/git_bare.py
Show All 9 Lines | |||||
It works in three steps: | It works in three steps: | ||||
1. Write objects one by one in :file:`.git/objects/` | 1. Write objects one by one in :file:`.git/objects/` | ||||
2. Calls ``git repack`` to pack all these objects into git packfiles. | 2. Calls ``git repack`` to pack all these objects into git packfiles. | ||||
3. Creates a tarball of the resulting repository | 3. Creates a tarball of the resulting repository | ||||
It keeps a set of all written (or about-to-be-written) object hashes in memory | It keeps a set of all written (or about-to-be-written) object hashes in memory | ||||
to avoid downloading and writing the same objects twice. | to avoid downloading and writing the same objects twice. | ||||
The first step is the most complex. When swh-graph is available, this roughly does | |||||
the following: | |||||
1. Find all the revisions and releases in the induced subgraph, adds them to | |||||
todo-lists | |||||
2. Grab a batch from (release/revision/directory/content) todo-lists, and load them. | |||||
Add directory and content objects they reference to the todo-list | |||||
3. If any todo-list is not empty, goto 1 | |||||
When swh-graph is not available, steps 1 and 2 are merged, because revisions need | |||||
to be loaded in order to compute the subgraph. | |||||
""" | """ | ||||
import datetime | import datetime | ||||
import enum | import enum | ||||
import glob | import glob | ||||
import logging | import logging | ||||
import multiprocessing.dummy | import multiprocessing.dummy | ||||
import os.path | import os.path | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | class GitBareCooker(BaseVaultCooker): | ||||
use_fsck = True | use_fsck = True | ||||
obj_type: RootObjectType | obj_type: RootObjectType | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.obj_type = RootObjectType[self.swhid.object_type.name] | self.obj_type = RootObjectType[self.swhid.object_type.name] | ||||
def check_exists(self) -> bool: | def check_exists(self) -> bool: | ||||
ardumont: should be in the imperative form to match consistently the remaining part of our docstring. | |||||
Done Inline ActionsOther methods in this module already use present continuous, so I'll keep it like this for this diff. Maybe change it later vlorentz: Other methods in this module already use present continuous, so I'll keep it like this for this… | |||||
"""Returns whether the root object is present in the archive.""" | |||||
if self.obj_type is RootObjectType.REVISION: | if self.obj_type is RootObjectType.REVISION: | ||||
return not list(self.storage.revision_missing([self.obj_id])) | return not list(self.storage.revision_missing([self.obj_id])) | ||||
elif self.obj_type is RootObjectType.DIRECTORY: | elif self.obj_type is RootObjectType.DIRECTORY: | ||||
return not list(self.storage.directory_missing([self.obj_id])) | return not list(self.storage.directory_missing([self.obj_id])) | ||||
elif self.obj_type is RootObjectType.SNAPSHOT: | elif self.obj_type is RootObjectType.SNAPSHOT: | ||||
return not list(self.storage.snapshot_missing([self.obj_id])) | return not list(self.storage.snapshot_missing([self.obj_id])) | ||||
else: | else: | ||||
assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") | assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") | ||||
def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: | def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: | ||||
"""Adds all the given ``obj_ids`` to the given ``stack``, unless they are | |||||
already in ``self._seen``, and adds them to ``self._seen``.""" | |||||
assert not isinstance(obj_ids, bytes) | assert not isinstance(obj_ids, bytes) | ||||
revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] | revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] | ||||
self._seen.update(revision_ids) | self._seen.update(revision_ids) | ||||
stack.extend(revision_ids) | stack.extend(revision_ids) | ||||
def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: | ||||
"""Removes ``n`` object from the ``stack`` and returns them.""" | |||||
obj_ids = stack[-n:] | obj_ids = stack[-n:] | ||||
stack[-n:] = [] | stack[-n:] = [] | ||||
return obj_ids | return obj_ids | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
# Objects we will visit soon: | """Main entry point. Initializes the state, creates the bundle, and | ||||
sends it to the backend.""" | |||||
# Objects we will visit soon (aka. "todo-lists"): | |||||
self._rel_stack: List[Sha1Git] = [] | self._rel_stack: List[Sha1Git] = [] | ||||
self._rev_stack: List[Sha1Git] = [] | self._rev_stack: List[Sha1Git] = [] | ||||
self._dir_stack: List[Sha1Git] = [] | self._dir_stack: List[Sha1Git] = [] | ||||
self._cnt_stack: List[Sha1Git] = [] | self._cnt_stack: List[Sha1Git] = [] | ||||
# Set of objects already in any of the stacks: | # Set of objects already in any of the stacks: | ||||
self._seen: Set[Sha1Git] = set() | self._seen: Set[Sha1Git] = set() | ||||
self._walker_state: Optional[Any] = None | self._walker_state: Optional[Any] = None | ||||
Show All 37 Lines | def prepare_bundle(self): | ||||
self.repack() | self.repack() | ||||
self.write_archive() | self.write_archive() | ||||
self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Uploading bundle") | self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Uploading bundle") | ||||
def init_git(self) -> None: | def init_git(self) -> None: | ||||
"""Creates an empty :file:`.git` directory.""" | |||||
subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) | subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) | ||||
self.create_object_dirs() | self.create_object_dirs() | ||||
# Remove example hooks; they take ~40KB and we don't use them | # Remove example hooks; they take ~40KB and we don't use them | ||||
for filename in glob.glob(os.path.join(self.gitdir, "hooks", "*.sample")): | for filename in glob.glob(os.path.join(self.gitdir, "hooks", "*.sample")): | ||||
os.unlink(filename) | os.unlink(filename) | ||||
def create_object_dirs(self) -> None: | def create_object_dirs(self) -> None: | ||||
"""Creates all possible subdirectories of :file:`.git/objects/`""" | |||||
# Create all possible dirs ahead of time, so we don't have to check for | # Create all possible dirs ahead of time, so we don't have to check for | ||||
# existence every time. | # existence every time. | ||||
for byte in range(256): | for byte in range(256): | ||||
try: | try: | ||||
os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) | os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) | ||||
except FileExistsError: | except FileExistsError: | ||||
pass | pass | ||||
def repack(self) -> None: | def repack(self) -> None: | ||||
# Add objects we wrote in a pack | """Moves all objects from :file:`.git/objects/` to a packfile.""" | ||||
try: | try: | ||||
subprocess.run(["git", "-C", self.gitdir, "repack", "-d"], check=True) | subprocess.run(["git", "-C", self.gitdir, "repack", "-d"], check=True) | ||||
except subprocess.CalledProcessError: | except subprocess.CalledProcessError: | ||||
logging.exception("git-repack failed with:") | logging.exception("git-repack failed with:") | ||||
# Remove their non-packed originals | # Remove their non-packed originals | ||||
subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) | subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) | ||||
def git_fsck(self) -> None: | def git_fsck(self) -> None: | ||||
"""Runs git-fsck and ignores expected errors (eg. because of missing | |||||
objects).""" | |||||
proc = subprocess.run( | proc = subprocess.run( | ||||
["git", "-C", self.gitdir, "fsck"], | ["git", "-C", self.gitdir, "fsck"], | ||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
stderr=subprocess.STDOUT, | stderr=subprocess.STDOUT, | ||||
env={"LANG": "C.utf8"}, | env={"LANG": "C.utf8"}, | ||||
) | ) | ||||
# Split on newlines not followed by a space | # Split on newlines not followed by a space | ||||
errors = re.split("\n(?! )", proc.stdout.decode()) | errors = re.split("\n(?! )", proc.stdout.decode()) | ||||
errors = [ | errors = [ | ||||
error for error in errors if error and not error.startswith("warning ") | error for error in errors if error and not error.startswith("warning ") | ||||
] | ] | ||||
unexpected_errors = set(errors) - self._expected_fsck_errors | unexpected_errors = set(errors) - self._expected_fsck_errors | ||||
if unexpected_errors: | if unexpected_errors: | ||||
logging.error( | logging.error( | ||||
"Unexpected errors from git-fsck after cooking %s: %s", | "Unexpected errors from git-fsck after cooking %s: %s", | ||||
self.swhid, | self.swhid, | ||||
"\n".join(sorted(unexpected_errors)), | "\n".join(sorted(unexpected_errors)), | ||||
) | ) | ||||
def write_refs(self, snapshot=None): | def write_refs(self, snapshot=None): | ||||
"""Writes all files in :file:`.git/refs/`. | |||||
For non-snapshot objects, this is only ``master``.""" | |||||
refs: Dict[bytes, bytes] # ref name -> target | refs: Dict[bytes, bytes] # ref name -> target | ||||
if self.obj_type == RootObjectType.DIRECTORY: | if self.obj_type == RootObjectType.DIRECTORY: | ||||
# We need a synthetic revision pointing to the directory | # We need a synthetic revision pointing to the directory | ||||
author = Person.from_fullname( | author = Person.from_fullname( | ||||
b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | b"swh-vault, git-bare cooker <robot@softwareheritage.org>" | ||||
) | ) | ||||
dt = datetime.datetime.now(tz=datetime.timezone.utc) | dt = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
dt = dt.replace(microsecond=0) # not supported by git | dt = dt.replace(microsecond=0) # not supported by git | ||||
Show All 37 Lines | def write_refs(self, snapshot=None): | ||||
for (ref_name, ref_target) in refs.items(): | for (ref_name, ref_target) in refs.items(): | ||||
path = os.path.join(self.gitdir.encode(), ref_name) | path = os.path.join(self.gitdir.encode(), ref_name) | ||||
os.makedirs(os.path.dirname(path), exist_ok=True) | os.makedirs(os.path.dirname(path), exist_ok=True) | ||||
with open(path, "wb") as fd: | with open(path, "wb") as fd: | ||||
fd.write(ref_target) | fd.write(ref_target) | ||||
def write_archive(self): | def write_archive(self): | ||||
"""Creates the final .tar file.""" | |||||
with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: | with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: | ||||
tf.add(self.gitdir, arcname=f"{self.swhid}.git", recursive=True) | tf.add(self.gitdir, arcname=f"{self.swhid}.git", recursive=True) | ||||
def _obj_path(self, obj_id: Sha1Git): | def _obj_path(self, obj_id: Sha1Git): | ||||
"""Returns the absolute path of file (in :file:`.git/objects/`) that will | |||||
contain the git object identified by the ``obj_id``.""" | |||||
return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) | return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) | ||||
def _obj_relative_path(self, obj_id: Sha1Git): | def _obj_relative_path(self, obj_id: Sha1Git): | ||||
"""Same as :meth:`_obj_path`, but relative.""" | |||||
obj_id_hex = hash_to_hex(obj_id) | obj_id_hex = hash_to_hex(obj_id) | ||||
directory = obj_id_hex[0:2] | directory = obj_id_hex[0:2] | ||||
filename = obj_id_hex[2:] | filename = obj_id_hex[2:] | ||||
return os.path.join("objects", directory, filename) | return os.path.join("objects", directory, filename) | ||||
def object_exists(self, obj_id: Sha1Git) -> bool: | def object_exists(self, obj_id: Sha1Git) -> bool: | ||||
"""Returns whether the object identified by the given ``obj_id`` was already | |||||
written to a file in :file:`.git/object/`. | |||||
This function ignores objects contained in a git pack.""" | |||||
return os.path.exists(self._obj_path(obj_id)) | return os.path.exists(self._obj_path(obj_id)) | ||||
def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: | ||||
"""Writes a git object on disk. | """Writes a git object on disk. | ||||
Returns whether it was already written.""" | Returns whether it was already written.""" | ||||
# Git requires objects to be zlib-compressed; but repacking decompresses and | # Git requires objects to be zlib-compressed; but repacking decompresses and | ||||
# removes them, so we don't need to compress them too much. | # removes them, so we don't need to compress them too much. | ||||
data = zlib.compress(obj, level=1) | data = zlib.compress(obj, level=1) | ||||
with open(self._obj_path(obj_id), "wb") as fd: | with open(self._obj_path(obj_id), "wb") as fd: | ||||
fd.write(data) | fd.write(data) | ||||
return True | return True | ||||
def push_subgraph(self, obj_type: RootObjectType, obj_id) -> None: | def push_subgraph(self, obj_type: RootObjectType, obj_id) -> None: | ||||
"""Adds graph induced by the given ``obj_id`` without recursing through | |||||
directories, to the todo-lists. | |||||
If swh-graph is not available, this immediately loads revisions, as they | |||||
need to be fetched in order to compute the subgraph, and fetching them | |||||
immediately avoids duplicate fetches.""" | |||||
if self.obj_type is RootObjectType.REVISION: | if self.obj_type is RootObjectType.REVISION: | ||||
self.push_revision_subgraph(obj_id) | self.push_revision_subgraph(obj_id) | ||||
elif self.obj_type is RootObjectType.DIRECTORY: | elif self.obj_type is RootObjectType.DIRECTORY: | ||||
self._push(self._dir_stack, [obj_id]) | self._push(self._dir_stack, [obj_id]) | ||||
elif self.obj_type is RootObjectType.SNAPSHOT: | elif self.obj_type is RootObjectType.SNAPSHOT: | ||||
self.push_snapshot_subgraph(obj_id) | self.push_snapshot_subgraph(obj_id) | ||||
else: | else: | ||||
assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") | assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") | ||||
Show All 34 Lines | def load_objects(self) -> None: | ||||
self.nb_loaded += len(directory_ids) | self.nb_loaded += len(directory_ids) | ||||
content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) | ||||
if content_ids: | if content_ids: | ||||
self.load_contents(content_ids) | self.load_contents(content_ids) | ||||
self.nb_loaded += len(content_ids) | self.nb_loaded += len(content_ids) | ||||
def push_revision_subgraph(self, obj_id: Sha1Git) -> None: | def push_revision_subgraph(self, obj_id: Sha1Git) -> None: | ||||
"""Fetches a revision and all its children, and writes them to disk""" | """Fetches the graph of revisions induced by the given ``obj_id`` and adds | ||||
them to ``self._rev_stack``. | |||||
If swh-graph is not available, this requires fetching the revisions themselves, | |||||
so they are directly loaded instead.""" | |||||
loaded_from_graph = False | loaded_from_graph = False | ||||
if self.graph: | if self.graph: | ||||
from swh.graph.client import GraphArgumentException | from swh.graph.client import GraphArgumentException | ||||
# First, try to cook using swh-graph, as it is more efficient than | # First, try to cook using swh-graph, as it is more efficient than | ||||
# swh-storage for querying the history | # swh-storage for querying the history | ||||
obj_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id,) | obj_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id,) | ||||
Show All 26 Lines | def push_revision_subgraph(self, obj_id: Sha1Git) -> None: | ||||
self.write_revision_node(revision) | self.write_revision_node(revision) | ||||
self.nb_loaded += 1 | self.nb_loaded += 1 | ||||
self._push(self._dir_stack, [revision["directory"]]) | self._push(self._dir_stack, [revision["directory"]]) | ||||
# Save the state, so the next call to the walker won't return the same | # Save the state, so the next call to the walker won't return the same | ||||
# revisions | # revisions | ||||
self._walker_state = walker.export_state() | self._walker_state = walker.export_state() | ||||
def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | ||||
"""Fetches a snapshot and all its children, and writes them to disk""" | """Fetches a snapshot and all its children, excluding directories and contents, | ||||
and pushes them to the todo-lists. | |||||
Also loads revisions if swh-graph is not available, see | |||||
:meth:`push_revision_subgraph`.""" | |||||
loaded_from_graph = False | loaded_from_graph = False | ||||
if self.graph: | if self.graph: | ||||
revision_ids = [] | revision_ids = [] | ||||
release_ids = [] | release_ids = [] | ||||
directory_ids = [] | directory_ids = [] | ||||
content_ids = [] | content_ids = [] | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: | ||||
assert_never( | assert_never( | ||||
branch.target_type, f"Unexpected target type: {self.obj_type}" | branch.target_type, f"Unexpected target type: {self.obj_type}" | ||||
) | ) | ||||
self.write_refs(snapshot=snapshot) | self.write_refs(snapshot=snapshot) | ||||
def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | def load_revisions(self, obj_ids: List[Sha1Git]) -> None: | ||||
"""Given a list of revision ids, loads these revisions and their directories; | """Given a list of revision ids, loads these revisions and their directories; | ||||
but not their parent revisions.""" | but not their parent revisions (ie. this is not recursive).""" | ||||
ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) | ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) | ||||
revisions: List[Revision] = list(filter(None, ret)) | revisions: List[Revision] = list(filter(None, ret)) | ||||
if len(ret) != len(revisions): | if len(ret) != len(revisions): | ||||
logger.error("Missing revision(s), ignoring them.") | logger.error("Missing revision(s), ignoring them.") | ||||
for revision in revisions: | for revision in revisions: | ||||
self.write_revision_node(revision.to_dict()) | self.write_revision_node(revision.to_dict()) | ||||
▲ Show 20 Lines • Show All 148 Lines • Show Last 20 Lines |
should be in the imperative form to match consistently the remaining part of our docstring.