Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338376
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
46 KB
Subscribers
None
View Options
diff --git a/setup.py b/setup.py
index 4b61328..22a9bf5 100755
--- a/setup.py
+++ b/setup.py
@@ -1,77 +1,77 @@
#!/usr/bin/env python3
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from io import open
from os import path
from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.rst"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(*names):
requirements = []
for name in names:
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
# Edit this part to match your module, replace foo by its name
# Full sample:
# https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py
setup(
name="swh.loader.bzr", # example: swh.loader.pypi
description="Software Heritage Bazaar/Breezy intent",
long_description=long_description,
long_description_content_type="text/x-rst",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DLDBZR/",
packages=find_packages(), # packages's modules
install_requires=parse_requirements(None, "swh"),
tests_require=parse_requirements("test"),
setup_requires=["setuptools-scm"],
use_scm_version=True,
extras_require={"testing": parse_requirements("test")},
include_package_data=True,
entry_points="""
[swh.workers]
loader.bzr=swh.loader.bzr:register
[console_scripts]
swh-bzr-identify=swh.loader.bzr.identify:main
""",
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": ("https://forge.softwareheritage.org/source/swh-loader-bzr"),
- "Documentation": "https://docs.softwareheritage.org/devel/swh-loader-bzr/", # NoQA: E501
+ "Documentation": "https://docs.softwareheritage.org/devel/swh-loader-bzr/", # NoQA: B950
},
)
diff --git a/swh/loader/bzr/loader.py b/swh/loader/bzr/loader.py
index bb06189..004e07d 100644
--- a/swh/loader/bzr/loader.py
+++ b/swh/loader/bzr/loader.py
@@ -1,705 +1,709 @@
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This document contains a SWH loader for ingesting repository data
from Bazaar or Breezy.
"""
from datetime import datetime
from functools import lru_cache, partial
import itertools
import os
from tempfile import mkdtemp
from typing import Dict, Iterator, List, NewType, Optional, Set, Tuple, TypeVar, Union
from breezy import errors as bzr_errors
from breezy import repository, tsort
from breezy.builtins import cmd_branch, cmd_upgrade
from breezy.bzr import bzrdir
from breezy.bzr.branch import Branch as BzrBranch
from breezy.bzr.inventory import Inventory, InventoryEntry
from breezy.bzr.inventorytree import InventoryTreeChange
from breezy.revision import NULL_REVISION
from breezy.revision import Revision as BzrRevision
from breezy.tree import Tree
from swh.loader.core.loader import BaseLoader
from swh.loader.core.utils import clean_dangling_folders, clone_with_timeout
from swh.model import from_disk, swhids
from swh.model.model import (
Content,
ExtID,
ObjectType,
Origin,
Person,
Release,
Revision,
RevisionType,
Sha1Git,
Snapshot,
SnapshotBranch,
TargetType,
Timestamp,
TimestampWithTimezone,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface
TEMPORARY_DIR_PREFIX_PATTERN = "swh.loader.bzr.from_disk"
EXTID_TYPE = "bzr-nodeid"
EXTID_VERSION: int = 1
BzrRevisionId = NewType("BzrRevisionId", bytes)
T = TypeVar("T")
# These are all the old Bazaar repository formats that we might encounter
# in the wild. Bazaar's `clone` does not result in an upgrade, it needs to be
# explicit.
older_repository_formats = {
b"Bazaar Knit Repository Format 3 (bzr 0.15)\n",
b"Bazaar Knit Repository Format 4 (bzr 1.0)\n",
b"Bazaar RepositoryFormatKnitPack5 (bzr 1.6)\n",
b"Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6)\n",
b"Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6.1)\n",
b"Bazaar RepositoryFormatKnitPack6 (bzr 1.9)\n",
b"Bazaar RepositoryFormatKnitPack6RichRoot (bzr 1.9)\n",
b"Bazaar development format 2 with subtree support \
(needs bzr.dev from before 1.8)\n",
b"Bazaar development format 8\n",
b"Bazaar pack repository format 1 (needs bzr 0.92)\n",
b"Bazaar pack repository format 1 with rich root (needs bzr 1.0)\n",
b"Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n",
b"Bazaar-NG Knit Repository Format 1",
}
# Latest one as of this time, unlikely to change
expected_repository_format = b"Bazaar repository format 2a (needs bzr 1.16 or later)\n"
class UnknownRepositoryFormat(Exception):
"""The repository we're trying to load is using an unknown format.
It's possible (though unlikely) that a new format has come out, we should
check before dismissing the repository as broken or unsupported."""
class BzrDirectory(from_disk.Directory):
"""A more practical directory.
- creates missing parent directories
- removes empty directories
"""
def __setitem__(
self, path: bytes, value: Union[from_disk.Content, "BzrDirectory"]
) -> None:
if b"/" in path:
head, tail = path.split(b"/", 1)
directory = self.get(head)
if directory is None or isinstance(directory, from_disk.Content):
directory = BzrDirectory()
self[head] = directory
directory[tail] = value
else:
super().__setitem__(path, value)
def __delitem__(self, path: bytes) -> None:
super().__delitem__(path)
while b"/" in path: # remove empty parent directories
path = path.rsplit(b"/", 1)[0]
if len(self[path]) == 0:
super().__delitem__(path)
else:
break
def get(
self, path: bytes, default: Optional[T] = None
) -> Optional[Union[from_disk.Content, "BzrDirectory", T]]:
# TODO move to swh.model.from_disk.Directory
try:
return self[path]
except KeyError:
return default
def sort_changes(change: InventoryTreeChange) -> str:
"""Key function for sorting the changes by path.
Sorting allows us to group the folders together (for example "b", then "a/a",
then "a/b"). Reversing this sort in the `sorted()` call will make it
so the files appear before the folder ("a/a", then "a") if the folder has
changed. This removes a bug where the order of operations is:
- "a" goes from directory to file, removing all of its subtree
- "a/a" is removed, but our structure has already forgotten it"""
source_path, target_path = change.path
# Neither path can be the empty string
return source_path or target_path
class BazaarLoader(BaseLoader):
"""Loads a Bazaar repository"""
visit_type = "bzr"
def __init__(
self,
storage: StorageInterface,
url: str,
directory: Optional[str] = None,
logging_class: str = "swh.loader.bzr.Loader",
visit_date: Optional[datetime] = None,
temp_directory: str = "/tmp",
clone_timeout_seconds: int = 7200,
max_content_size: Optional[int] = None,
):
super().__init__(
storage=storage,
logging_class=logging_class,
max_content_size=max_content_size,
)
self._temp_directory = temp_directory
self._clone_timeout = clone_timeout_seconds
self._revision_id_to_sha1git: Dict[BzrRevisionId, Sha1Git] = {}
self._last_root = BzrDirectory()
self._tags: Optional[Dict[bytes, BzrRevisionId]] = None
self._head_revision_id: Optional[bytes] = None
# Remember the previous revision to only compute the delta between
# revisions
self._prev_revision: Optional[BzrRevision] = None
self._branch: Optional[BzrBranch] = None
# Revisions that are pointed to, but don't exist in the current branch
# Rare, but exist usually for cross-VCS references.
self._ghosts: Set[BzrRevisionId] = set()
# Exists if in an incremental run, is the latest saved revision from
# this origin
self._latest_head: Optional[BzrRevisionId] = None
self._load_status = "eventful"
self.origin_url = url
self.visit_date = visit_date
self.directory = directory
self.repo: Optional[repository.Repository] = None
def pre_cleanup(self) -> None:
"""As a first step, will try and check for dangling data to cleanup.
This should do its best to avoid raising issues.
"""
clean_dangling_folders(
self._temp_directory,
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN,
log=self.log,
)
def prepare_origin_visit(self) -> None:
"""First step executed by the loader to prepare origin and visit
references. Set/update self.origin, and
optionally self.origin_url, self.visit_date.
"""
self.origin = Origin(url=self.origin_url)
def prepare(self) -> None:
"""Second step executed by the loader to prepare some state needed by
the loader.
"""
latest_snapshot = snapshot_get_latest(self.storage, self.origin_url)
if latest_snapshot:
self._set_recorded_state(latest_snapshot)
def load_status(self) -> Dict[str, str]:
"""Detailed loading status.
Defaults to logging an eventful load.
Returns: a dictionary that is eventually passed back as the task's
result to the scheduler, allowing tuning of the task recurrence
mechanism.
"""
return {
"status": self._load_status,
}
def _set_recorded_state(self, latest_snapshot: Snapshot) -> None:
if not latest_snapshot.branches:
# Last snapshot was empty
return
head = latest_snapshot.branches[b"trunk"]
bzr_head = self._get_extids_for_targets([head.target])[0].extid
self._latest_head = BzrRevisionId(bzr_head)
def _get_extids_for_targets(self, targets: List[Sha1Git]) -> List[ExtID]:
"""Get all Bzr ExtIDs for the targets in the latest snapshot"""
extids = []
for extid in self.storage.extid_get_from_target(
swhids.ObjectType.REVISION,
targets,
extid_type=EXTID_TYPE,
extid_version=EXTID_VERSION,
):
extids.append(extid)
self._revision_id_to_sha1git[
BzrRevisionId(extid.extid)
] = extid.target.object_id
if extids:
# Filter out dangling extids, we need to load their target again
revisions_missing = self.storage.revision_missing(
[extid.target.object_id for extid in extids]
)
extids = [
extid
for extid in extids
if extid.target.object_id not in revisions_missing
]
return extids
def cleanup(self) -> None:
if self.repo is not None:
self.repo.unlock()
def get_repo_and_branch(self) -> Tuple[repository.Repository, BzrBranch]:
_, branch, repo, _ = bzrdir.BzrDir.open_containing_tree_branch_or_repository(
self._repo_directory
)
return repo, branch
def run_upgrade(self):
"""Upgrade both repository and branch to the most recent supported version
to be compatible with the loader."""
cmd_upgrade().run(self._repo_directory, clean=True)
def fetch_data(self) -> bool:
"""Fetch the data from the source the loader is currently loading
Returns:
a value that is interpreted as a boolean. If True, fetch_data needs
to be called again to complete loading.
"""
if not self.directory: # no local repository
self._repo_directory = mkdtemp(
prefix=TEMPORARY_DIR_PREFIX_PATTERN,
suffix=f"-{os.getpid()}",
dir=self._temp_directory,
)
msg = "Cloning '%s' to '%s' with timeout %s seconds"
self.log.debug(
msg, self.origin_url, self._repo_directory, self._clone_timeout
)
closure = partial(
cmd_branch().run,
self.origin_url,
self._repo_directory,
no_tree=True,
use_existing_dir=True,
)
clone_with_timeout(
self.origin_url, self._repo_directory, closure, self._clone_timeout
)
else: # existing local repository
# Allow to load on disk repository without cloning
# for testing purpose.
self.log.debug("Using local directory '%s'", self.directory)
self._repo_directory = self.directory
repo, branch = self.get_repo_and_branch()
repository_format = repo._format.as_string() # lies about being a string
if not repository_format == expected_repository_format:
if repository_format in older_repository_formats:
self.log.debug(
"Upgrading repository from format '%s'",
repository_format.decode("ascii").strip("\n"),
)
self.run_upgrade()
repo, branch = self.get_repo_and_branch()
else:
raise UnknownRepositoryFormat()
if not branch.supports_tags():
# Some repos have the right format marker but their branches do not
# support tags
self.log.debug("Branch does not support tags, upgrading")
self.run_upgrade()
repo, branch = self.get_repo_and_branch()
# We could set the branch here directly, but we want to run the
# sanity checks in the `self.branch` property, so let's make sure
# we invalidate the "cache".
self._branch = None
self.repo = repo
self.repo.lock_read()
self.head_revision_id # set the property
self.tags # set the property
return False
def store_data(self) -> None:
"""Store fetched data in the database."""
assert self.repo is not None
assert self.tags is not None
# Insert revisions using a topological sorting
revs = self._get_bzr_revs_to_load()
if revs and revs[0] == NULL_REVISION:
# The first rev we load isn't necessarily `NULL_REVISION` even in a
# full load, as bzr allows for ghost revisions.
revs = revs[1:]
length_ingested_revs = 0
for rev in revs:
self.store_revision(self.repo.get_revision(rev))
length_ingested_revs += 1
if length_ingested_revs == 0:
# no new revision ingested, so uneventful
# still we'll make a snapshot, so we continue
self._load_status = "uneventful"
snapshot_branches: Dict[bytes, Optional[SnapshotBranch]] = {}
for tag_name, target in self.tags.items():
label = b"tags/%s" % tag_name
if target == NULL_REVISION:
# Some very rare repositories have meaningless tags that point
# to the null revision.
self.log.debug("Tag '%s' points to the null revision", tag_name)
snapshot_branches[label] = None
continue
try:
# Used only to detect corruption
self.branch.revision_id_to_dotted_revno(target)
except (
bzr_errors.NoSuchRevision,
bzr_errors.GhostRevisionsHaveNoRevno,
bzr_errors.UnsupportedOperation,
):
# Bad tag data/merges can lead to tagged revisions
# which are not in this branch. We cannot point a tag there.
snapshot_branches[label] = None
continue
snp_target = self._get_revision_id_from_bzr_id(target)
snapshot_branches[label] = SnapshotBranch(
target=self.store_release(tag_name, snp_target),
target_type=TargetType.RELEASE,
)
if self.head_revision_id != NULL_REVISION:
head_revision_git_hash = self._get_revision_id_from_bzr_id(
self.head_revision_id
)
snapshot_branches[b"trunk"] = SnapshotBranch(
target=head_revision_git_hash, target_type=TargetType.REVISION
)
snapshot_branches[b"HEAD"] = SnapshotBranch(
- target=b"trunk", target_type=TargetType.ALIAS,
+ target=b"trunk",
+ target_type=TargetType.ALIAS,
)
snapshot = Snapshot(branches=snapshot_branches)
self.storage.snapshot_add([snapshot])
self.flush()
self.loaded_snapshot_id = snapshot.id
def store_revision(self, bzr_rev: BzrRevision) -> None:
self.log.debug("Storing revision '%s'", bzr_rev.revision_id)
directory = self.store_directories(bzr_rev)
associated_bugs = [
(b"bug", b"%s %s" % (status.encode(), url.encode()))
for url, status in bzr_rev.iter_bugs()
]
extra_headers = [
- (b"time_offset_seconds", str(bzr_rev.timezone).encode(),),
+ (
+ b"time_offset_seconds",
+ str(bzr_rev.timezone).encode(),
+ ),
*associated_bugs,
]
timestamp = Timestamp(int(bzr_rev.timestamp), 0)
timezone = round(int(bzr_rev.timezone) / 60)
date = TimestampWithTimezone.from_numeric_offset(timestamp, timezone, False)
# TODO (how) should we store multiple authors? (T3887)
revision = Revision(
author=Person.from_fullname(bzr_rev.get_apparent_authors()[0].encode()),
date=date,
committer=Person.from_fullname(bzr_rev.committer.encode()),
committer_date=date,
type=RevisionType.BAZAAR,
directory=directory,
message=bzr_rev.message.encode(),
extra_headers=extra_headers,
synthetic=False,
parents=self._get_revision_parents(bzr_rev),
)
self._revision_id_to_sha1git[bzr_rev.revision_id] = revision.id
self.storage.revision_add([revision])
self.storage.extid_add(
[
ExtID(
extid_type=EXTID_TYPE,
extid_version=EXTID_VERSION,
extid=bzr_rev.revision_id,
target=revision.swhid(),
)
]
)
def store_directories(self, bzr_rev: BzrRevision) -> Sha1Git:
"""Store a revision's directories."""
repo: repository.Repository = self.repo
inventory: Inventory = repo.get_inventory(bzr_rev.revision_id)
if self._prev_revision is None:
self._store_directories_slow(bzr_rev, inventory)
return self._store_tree(bzr_rev)
old_tree = self._get_revision_tree(self._prev_revision.revision_id)
new_tree = self._get_revision_tree(bzr_rev.revision_id)
delta = new_tree.changes_from(old_tree)
if delta.renamed or delta.copied:
# Figuring out all nested and possibly conflicting renames is a lot
# of effort for very few revisions, just go the slow way
self._store_directories_slow(bzr_rev, inventory)
return self._store_tree(bzr_rev)
to_remove = sorted(
delta.removed + delta.missing, key=sort_changes, reverse=True
)
for change in to_remove:
if change.kind[0] == "directory":
# empty directories will delete themselves in `self._last_root`
continue
path = change.path[0]
del self._last_root[path.encode()]
# `delta.kind_changed` needs to happen before `delta.added` since a file
# could be added under a node that changed from directory to file at the
# same time, for example
for change in itertools.chain(delta.kind_changed, delta.added, delta.modified):
path = change.path[1]
entry = inventory.get_entry(change.file_id)
content = self.store_content(bzr_rev, path, entry)
self._last_root[path.encode()] = content
self._prev_revision = bzr_rev
return self._store_tree(bzr_rev)
def store_release(self, name: bytes, target: Sha1Git) -> Sha1Git:
"""Store a release given its name and its target.
Args:
name: name of the release.
target: sha1_git of the target revision.
Returns:
the sha1_git of the stored release.
"""
release = Release(
name=name,
target=target,
target_type=ObjectType.REVISION,
message=None,
metadata=None,
synthetic=False,
author=Person(name=None, email=None, fullname=b""),
date=None,
)
self.storage.release_add([release])
return release.id
def store_content(
self, bzr_rev: BzrRevision, file_path: str, entry: InventoryEntry
) -> from_disk.Content:
if entry.executable:
perms = from_disk.DentryPerms.executable_content
elif entry.kind == "directory":
perms = from_disk.DentryPerms.directory
elif entry.kind == "symlink":
perms = from_disk.DentryPerms.symlink
elif entry.kind == "file":
perms = from_disk.DentryPerms.content
else: # pragma: no cover
raise RuntimeError("Hit unreachable condition")
data = b""
if entry.has_text():
rev_tree = self._get_revision_tree(bzr_rev.revision_id)
data = rev_tree.get_file(file_path).read()
assert len(data) == entry.text_size
content = Content.from_data(data)
self.storage.content_add([content])
return from_disk.Content({"sha1_git": content.sha1_git, "perms": perms})
def _get_bzr_revs_to_load(self) -> List[BzrRevision]:
assert self.repo is not None
repo: repository.Repository = self.repo
self.log.debug("Getting fully sorted revision tree")
if self.head_revision_id == NULL_REVISION:
return []
head_revision = repo.get_revision(self.head_revision_id)
# bazaar's model doesn't allow it to iterate on its graph from
# the bottom lazily, but basically all DAGs (especially bzr ones)
# are small enough to fit in RAM.
ancestors_iter = self._iterate_ancestors(head_revision)
ancestry = []
for rev, parents in ancestors_iter:
if parents is None:
# Filter out ghosts, they scare the `TopoSorter`.
# Store them to later catch exceptions about missing parent revision
self._ghosts.add(rev)
continue
ancestry.append((rev, parents))
sorter = tsort.TopoSorter(ancestry)
all_revisions = sorter.sorted()
if self._latest_head is not None:
# Breezy does not offer a generic querying system, so we do the
# filtering ourselves, which is simple enough given that bzr does
# not have multiple heads per branch
found = False
new_revisions = []
# Filter out revisions until we reach the one we've already seen
for rev in all_revisions:
if not found:
if rev == self._latest_head:
found = True
else:
new_revisions.append(rev)
if not found and all_revisions:
# The previously saved head has been uncommitted, reload
# everything
msg = "Previous head (%s) not found, loading all revisions"
self.log.debug(msg, self._latest_head)
return all_revisions
return new_revisions
return all_revisions
def _iterate_ancestors(self, rev: BzrRevision) -> Iterator[BzrRevisionId]:
"""Return an iterator of this revision's ancestors"""
assert self.repo is not None
return self.repo.get_graph().iter_ancestry([rev.revision_id])
# We want to cache at most the current revision and the last, no need to
# take cache more than this.
@lru_cache(maxsize=2)
def _get_revision_tree(self, rev: BzrRevisionId) -> Tree:
assert self.repo is not None
return self.repo.revision_tree(rev)
def _store_tree(self, bzr_rev: BzrRevision) -> Sha1Git:
"""Save the current in-memory tree to storage."""
directories: List[from_disk.Directory] = [self._last_root]
while directories:
directory = directories.pop()
self.storage.directory_add([directory.to_model()])
directories.extend(
[
item
for item in directory.values()
if isinstance(item, from_disk.Directory)
]
)
self._prev_revision = bzr_rev
return self._last_root.hash
def _store_directories_slow(
self, bzr_rev: BzrRevision, inventory: Inventory
) -> None:
"""Store a revision's directories.
This is the slow variant: it does not use a diff from the last revision
but lists all the files. It is used for the first revision of a load
(the null revision for a full run, the last recorded head for an
incremental one) or for cases where the headaches of figuring out the
delta from the breezy primitives is not worth it.
"""
# Don't reuse the last root, we're listing everything anyway, and we
# could be keeping around deleted files
self._last_root = BzrDirectory()
for path, entry in inventory.iter_entries():
if path == "":
# root repo is created by default
continue
content = self.store_content(bzr_rev, path, entry)
self._last_root[path.encode()] = content
def _get_revision_parents(self, bzr_rev: BzrRevision) -> Tuple[Sha1Git, ...]:
parents = []
for parent_id in bzr_rev.parent_ids:
if parent_id == NULL_REVISION:
# Paranoid, don't think that actually happens
continue
try:
revision_id = self._get_revision_id_from_bzr_id(parent_id)
except LookupError:
if parent_id in self._ghosts:
# We can't store ghosts in any meaningful way (yet?). They
# have no contents by definition, and they're pretty rare,
# so just ignore them.
continue
raise
parents.append(revision_id)
return tuple(parents)
def _get_revision_id_from_bzr_id(self, bzr_id: BzrRevisionId) -> Sha1Git:
"""Return the git sha1 of a revision given its bazaar revision id."""
from_cache = self._revision_id_to_sha1git.get(bzr_id)
if from_cache is not None:
return from_cache
# The parent was not loaded in this run, get it from storage
from_storage = self.storage.extid_get_from_extid(
EXTID_TYPE, ids=[bzr_id], version=EXTID_VERSION
)
if len(from_storage) != 1:
msg = "Expected 1 match from storage for bzr node %r, got %d"
raise LookupError(msg % (bzr_id.hex(), len(from_storage)))
return from_storage[0].target.object_id
@property
def branch(self) -> BzrBranch:
"""Returns the only branch in the current repository.
Bazaar branches can be assimilated to repositories in other VCS like
Git or Mercurial. By contrast, a Bazaar repository is just a store of
revisions to optimize disk usage, with no particular semantics."""
assert self.repo is not None
branches = list(self.repo.find_branches(using=True))
msg = "Expected only 1 branch in the repository, got %d"
assert len(branches) == 1, msg % len(branches)
self._branch = branches[0]
return branches[0]
@property
def head_revision_id(self) -> BzrRevisionId:
"""Returns the Bazaar revision id of the branch's head.
Bazaar/Breezy branches do not have multiple heads."""
assert self.repo is not None
if self._head_revision_id is None:
self._head_revision_id = self.branch.last_revision()
assert self._head_revision_id is not None
return BzrRevisionId(self._head_revision_id)
@property
def tags(self) -> Optional[Dict[bytes, BzrRevisionId]]:
assert self.repo is not None
if self._tags is None:
self._tags = {
n.encode(): r for n, r in self.branch.tags.get_tag_dict().items()
}
return self._tags
diff --git a/swh/loader/bzr/tests/test_loader.py b/swh/loader/bzr/tests/test_loader.py
index 861e213..c96f5aa 100644
--- a/swh/loader/bzr/tests/test_loader.py
+++ b/swh/loader/bzr/tests/test_loader.py
@@ -1,426 +1,430 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from pathlib import Path
from breezy.builtins import cmd_uncommit
import pytest
from swh.loader.bzr.loader import BazaarLoader, BzrDirectory
from swh.loader.tests import (
assert_last_visit_matches,
get_stats,
prepare_repository_from_archive,
)
from swh.model.from_disk import Content
from swh.model.hashutil import hash_to_bytes
from swh.storage.algos.snapshot import snapshot_get_latest
# Generated repositories:
# - needs-upgrade:
# - Repository needs upgrade
# - empty:
# - Empty repo
# - renames:
# - File rename
# - Directory renames
# - Directory renames *and* file rename conflicting
# - no-branch:
# - No branch
# - metadata-and-type-changes:
# - Directory removed
# - Kind changed (file to symlink, directory to file, etc.)
# - not changed_content and not renamed and not kind_changed (so, exec file?)
# - Executable file
# - Empty commit (bzr commit --unchanged)
# - ghosts
# - Ghost revisions
# - broken-tags
# - Tags corruption
# - does-not-support-tags
# - Repo is recent but branch does not support tags, needs upgraded
# TODO tests:
# - Root path listed in changes (does that even happen?)
# - Parent is :null (does that even happen?)
# - Case insensitive removal (Is it actually a problem?)
# - Truly corrupted revision?
# - No match from storage (wrong topo sort or broken rev)
def do_uncommit(repo_url):
"""Remove the latest revision from the given bzr repo"""
uncommit_cmd = cmd_uncommit()
with open(os.devnull, "w") as f:
uncommit_cmd.outf = f
uncommit_cmd.run(repo_url)
@pytest.mark.parametrize("do_clone", [False, True])
def test_nominal(swh_storage, datadir, tmp_path, do_clone):
archive_path = Path(datadir, "nominal.tgz")
repo_url = prepare_repository_from_archive(archive_path, "nominal", tmp_path)
if do_clone:
# Check that the cloning mechanism works
loader = BazaarLoader(swh_storage, repo_url)
else:
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "eventful"}
assert_last_visit_matches(swh_storage, repo_url, status="full", type="bzr")
snapshot = snapshot_get_latest(swh_storage, repo_url)
expected_branches = [
b"HEAD",
b"tags/0.1",
b"tags/latest",
b"tags/other-tag",
b"trunk",
]
assert sorted(snapshot.branches.keys()) == expected_branches
stats = get_stats(swh_storage)
assert stats == {
"content": 7,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 3,
"revision": 6,
"skipped_content": 0,
"snapshot": 1,
}
# It contains associated bugs, making it a good complete candidate
example_revision = hash_to_bytes("18bb5b2c866c10c58a191afcd0b450a8727f1c62")
revision = loader.storage.revision_get([example_revision])[0]
assert revision.to_dict() == {
"message": b"fixing bugs",
"author": {
"fullname": b"Rapha\xc3\xabl Gom\xc3\xa8s <alphare@alphare-carbon.lan>",
"name": b"Rapha\xc3\xabl Gom\xc3\xa8s",
"email": b"alphare@alphare-carbon.lan",
},
"committer": {
"fullname": b"Rapha\xc3\xabl Gom\xc3\xa8s <alphare@alphare-carbon.lan>",
"name": b"Rapha\xc3\xabl Gom\xc3\xa8s",
"email": b"alphare@alphare-carbon.lan",
},
"date": {
"timestamp": {"seconds": 1643302390, "microseconds": 0},
"offset_bytes": b"+0100",
},
"committer_date": {
"timestamp": {"seconds": 1643302390, "microseconds": 0},
"offset_bytes": b"+0100",
},
"type": "bzr",
"directory": b"s0\xf3pe\xa3\x12\x05{\xc7\xbc\x86\xa6\x14.\xc1b\x1c\xeb\x05",
"synthetic": False,
"metadata": None,
"parents": (b"*V\xf5\n\xf0?\x1d{kE4\xda(\xb1\x08R\x83\x87-\xb6",),
"id": example_revision,
"extra_headers": (
(b"time_offset_seconds", b"3600"),
(b"bug", b"fixed https://launchpad.net/bugs/1234"),
(b"bug", b"fixed https://bz.example.com/?show_bug=4321"),
),
}
def test_needs_upgrade(swh_storage, datadir, tmp_path, mocker):
"""Old bzr repository format should be upgraded to latest format"""
archive_path = Path(datadir, "needs-upgrade.tgz")
repo_url = prepare_repository_from_archive(archive_path, "needs-upgrade", tmp_path)
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
upgrade_spy = mocker.spy(loader, "run_upgrade")
res = loader.load()
upgrade_spy.assert_called()
assert res == {"status": "uneventful"} # needs-upgrade is an empty repo
def test_does_not_support_tags(swh_storage, datadir, tmp_path, mocker):
"""Repository format is correct, but the branch itself does not support tags
and should be upgraded to the latest format"""
archive_path = Path(datadir, "does-not-support-tags.tgz")
path = "does-not-support-tags-repo/does-not-support-tags-branch"
- repo_url = prepare_repository_from_archive(archive_path, path, tmp_path,)
+ repo_url = prepare_repository_from_archive(
+ archive_path,
+ path,
+ tmp_path,
+ )
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
upgrade_spy = mocker.spy(loader, "run_upgrade")
res = loader.load()
upgrade_spy.assert_called()
assert res == {"status": "uneventful"} # does-not-support-tags is an empty repo
def test_no_branch(swh_storage, datadir, tmp_path):
"""This should only happen with a broken clone, so the expected result is failure"""
archive_path = Path(datadir, "no-branch.tgz")
repo_url = prepare_repository_from_archive(archive_path, "no-branch", tmp_path)
res = BazaarLoader(swh_storage, repo_url, directory=repo_url).load()
assert res == {"status": "failed"}
def test_empty(swh_storage, datadir, tmp_path):
"""An empty repository is fine, it's just got no information"""
archive_path = Path(datadir, "empty.tgz")
repo_url = prepare_repository_from_archive(archive_path, "empty", tmp_path)
res = BazaarLoader(swh_storage, repo_url, directory=repo_url).load()
assert res == {"status": "uneventful"}
# Empty snapshot does not bother the incremental code
res = BazaarLoader(swh_storage, repo_url, directory=repo_url).load()
assert res == {"status": "uneventful"}
def test_renames(swh_storage, datadir, tmp_path):
archive_path = Path(datadir, "renames.tgz")
repo_url = prepare_repository_from_archive(archive_path, "renames", tmp_path)
res = BazaarLoader(swh_storage, repo_url, directory=repo_url).load()
assert res == {"status": "eventful"}
assert_last_visit_matches(swh_storage, repo_url, status="full", type="bzr")
snapshot = snapshot_get_latest(swh_storage, repo_url)
assert sorted(snapshot.branches.keys()) == [
b"HEAD",
b"trunk",
]
stats = get_stats(swh_storage)
assert stats == {
"content": 1,
"directory": 5,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 2,
"skipped_content": 0,
"snapshot": 1,
}
def test_broken_tags(swh_storage, datadir, tmp_path):
"""A tag pointing to a the null revision should not break anything"""
archive_path = Path(datadir, "broken-tags.tgz")
repo_url = prepare_repository_from_archive(archive_path, "broken-tags", tmp_path)
res = BazaarLoader(swh_storage, repo_url, directory=repo_url).load()
assert res == {"status": "uneventful"}
assert_last_visit_matches(swh_storage, repo_url, status="full", type="bzr")
snapshot = snapshot_get_latest(swh_storage, repo_url)
assert sorted(snapshot.branches.keys()) == [
b"tags/null-tag", # broken tag does appear, but didn't cause any issues
]
stats = get_stats(swh_storage)
assert stats == {
"content": 0,
"directory": 0,
"origin": 1,
"origin_visit": 1,
"release": 0, # Does not count as a valid release
"revision": 0,
"skipped_content": 0,
"snapshot": 1,
}
def test_metadata_and_type_changes(swh_storage, datadir, tmp_path):
archive_path = Path(datadir, "metadata-and-type-changes.tgz")
repo_url = prepare_repository_from_archive(
archive_path, "metadata-and-type-changes", tmp_path
)
res = BazaarLoader(swh_storage, repo_url, directory=repo_url).load()
assert res == {"status": "eventful"}
assert_last_visit_matches(swh_storage, repo_url, status="full", type="bzr")
snapshot = snapshot_get_latest(swh_storage, repo_url)
assert sorted(snapshot.branches.keys()) == [
b"HEAD",
b"trunk",
]
stats = get_stats(swh_storage)
assert stats == {
"content": 1,
"directory": 9,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
def test_ghosts(swh_storage, datadir, tmp_path):
archive_path = Path(datadir, "ghosts.tgz")
repo_url = prepare_repository_from_archive(archive_path, "ghosts", tmp_path)
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
assert loader._ghosts == set()
res = loader.load()
assert loader._ghosts == set((b"iamaghostboo",))
assert res == {"status": "eventful"}
assert_last_visit_matches(swh_storage, repo_url, status="full", type="bzr")
snapshot = snapshot_get_latest(swh_storage, repo_url)
assert sorted(snapshot.branches.keys()) == [
b"HEAD",
b"tags/brokentag", # tag pointing to a ghost revision is tracked
b"trunk",
]
stats = get_stats(swh_storage)
assert stats == {
"content": 0, # No contents
"directory": 1, # Root directory always counts
"origin": 1,
"origin_visit": 1,
"release": 0, # Ghost tag is ignored, stored as dangling
"revision": 1, # Only one revision, the ghost is ignored
"skipped_content": 0,
"snapshot": 1,
}
def test_bzr_directory():
directory = BzrDirectory()
directory[b"a/decently/enough/nested/path"] = Content(b"whatever")
directory[b"a/decently/other_node"] = Content(b"whatever else")
directory[b"another_node"] = Content(b"contents")
assert directory[b"a/decently/enough/nested/path"] == Content(b"whatever")
assert directory[b"a/decently/other_node"] == Content(b"whatever else")
assert directory[b"another_node"] == Content(b"contents")
del directory[b"a/decently/enough/nested/path"]
assert directory.get(b"a/decently/enough/nested/path") is None
assert directory.get(b"a/decently/enough/nested/") is None
assert directory.get(b"a/decently/enough") is None
# no KeyError
directory[b"a/decently"]
directory[b"a"]
directory[b"another_node"]
def test_incremental_noop(swh_storage, datadir, tmp_path):
"""Check that nothing happens if we try to load a repo twice in a row"""
archive_path = Path(datadir, "nominal.tgz")
repo_url = prepare_repository_from_archive(archive_path, "nominal", tmp_path)
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "eventful"}
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "uneventful"}
def test_incremental_nominal(swh_storage, datadir, tmp_path):
"""Check that an updated repository does update after the second run, but
is still a noop in the third run."""
archive_path = Path(datadir, "nominal.tgz")
repo_url = prepare_repository_from_archive(archive_path, "nominal", tmp_path)
# remove 2 latest commits
do_uncommit(repo_url)
do_uncommit(repo_url)
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "eventful"}
stats = get_stats(swh_storage)
assert stats == {
"content": 6,
"directory": 4,
"origin": 1,
"origin_visit": 1,
"release": 2,
"revision": 4,
"skipped_content": 0,
"snapshot": 1,
}
# Load the complete repo now
repo_url = prepare_repository_from_archive(archive_path, "nominal", tmp_path)
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "eventful"}
stats = get_stats(swh_storage)
expected_stats = {
"content": 7,
"directory": 7,
"origin": 1,
"origin_visit": 2,
"release": 3,
"revision": 6,
"skipped_content": 0,
"snapshot": 2,
}
assert stats == expected_stats
# Nothing should change
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "uneventful"}
stats = get_stats(swh_storage)
assert stats == {**expected_stats, "origin_visit": 2 + 1}
def test_incremental_uncommitted_head(swh_storage, datadir, tmp_path):
"""Check that doing an incremental run with the saved head missing does not
error out but instead loads everything correctly"""
archive_path = Path(datadir, "nominal.tgz")
repo_url = prepare_repository_from_archive(archive_path, "nominal", tmp_path)
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "eventful"}
stats = get_stats(swh_storage)
expected_stats = {
"content": 7,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 3,
"revision": 6,
"skipped_content": 0,
"snapshot": 1,
}
assert stats == expected_stats
# Remove the previously saved head
do_uncommit(repo_url)
loader = BazaarLoader(swh_storage, repo_url, directory=repo_url)
res = loader.load()
assert res == {"status": "eventful"}
# Everything is loaded correctly
stats = get_stats(swh_storage)
assert stats == {**expected_stats, "origin_visit": 1 + 1, "snapshot": 1 + 1}
diff --git a/swh/loader/bzr/tests/test_tasks.py b/swh/loader/bzr/tests/test_tasks.py
index aa2330f..7e7158d 100644
--- a/swh/loader/bzr/tests/test_tasks.py
+++ b/swh/loader/bzr/tests/test_tasks.py
@@ -1,23 +1,27 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
def test_loader(
mocker, swh_config, swh_scheduler_celery_app, swh_scheduler_celery_worker
):
mock_loader = mocker.patch("swh.loader.bzr.loader.BazaarLoader.load")
mock_loader.return_value = {"status": "eventful"}
res = swh_scheduler_celery_app.send_task(
"swh.loader.bzr.tasks.LoadBazaar",
- kwargs={"url": "origin_url", "directory": "/some/repo", "visit_date": "now",},
+ kwargs={
+ "url": "origin_url",
+ "directory": "/some/repo",
+ "visit_date": "now",
+ },
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
mock_loader.assert_called_once_with()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:45 AM (7 w, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3311507
Attached To
rDLDBZR BZR loader
Event Timeline
Log In to Comment