Page MenuHomeSoftware Heritage

loader.py
No OneTemporary

loader.py

# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import logging
import tempfile
import os
import sys
from typing import (
Any,
Dict,
Iterator,
Generic,
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
)
import attr
import sentry_sdk
from swh.core.tarball import uncompress
from swh.core.config import SWHConfig
from swh.model import from_disk
from swh.model.collections import ImmutableDict
from swh.model.hashutil import hash_to_hex
from swh.model.model import (
BaseModel,
Sha1Git,
Revision,
TargetType,
Snapshot,
Origin,
OriginVisit,
OriginVisitStatus,
MetadataAuthority,
MetadataFetcher,
MetadataTargetType,
RawExtrinsicMetadata,
)
from swh.model.identifiers import SWHID
from swh.storage import get_storage
from swh.storage.utils import now
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.loader.package.utils import download
logger = logging.getLogger(__name__)
@attr.s
class BasePackageInfo:
"""Compute the primary key for a dict using the id_keys as primary key
composite.
Args:
d: A dict entry to compute the primary key on
id_keys: Sequence of keys to use as primary key
Returns:
The identity for that dict entry
"""
url = attr.ib(type=str)
filename = attr.ib(type=Optional[str])
# The following attribute has kw_only=True in order to allow subclasses
# to add attributes. Without kw_only, attributes without default values cannot
# go after attributes with default values.
# See <https://github.com/python-attrs/attrs/issues/38>
revision_extrinsic_metadata = attr.ib(
type=List[Tuple[datetime.datetime, str, bytes]], default=[], kw_only=True,
)
"""Tuple elements are respectively the 'discovery_date', 'format',
and 'metadata' fields of RawExtrinsicMetadata"""
# TODO: add support for metadata for origins, directories, and contents
@property
def ID_KEYS(self):
raise NotImplementedError(f"{self.__class__.__name__} is missing ID_KEYS")
def artifact_identity(self):
return [getattr(self, k) for k in self.ID_KEYS]
TPackageInfo = TypeVar("TPackageInfo", bound=BasePackageInfo)
class PackageLoader(Generic[TPackageInfo]):
# Origin visit type (str) set by the loader
visit_type = ""
DEFAULT_CONFIG = {
"create_authorities": ("bool", True),
"create_fetchers": ("bool", True),
}
def __init__(self, url):
"""Loader's constructor. This raises exception if the minimal required
configuration is missing (cf. fn:`check` method).
Args:
url (str): Origin url to load data from
"""
# This expects to use the environment variable SWH_CONFIG_FILENAME
self.config = SWHConfig.parse_config_file()
self._check_configuration()
self.storage = get_storage(**self.config["storage"])
self.url = url
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
self.max_content_size = self.config["max_content_size"]
def _check_configuration(self):
"""Checks the minimal configuration required is set for the loader.
If some required configuration is missing, exception detailing the
issue is raised.
"""
if "storage" not in self.config:
raise ValueError("Misconfiguration, at least the storage key should be set")
def get_versions(self) -> Sequence[str]:
"""Return the list of all published package versions.
Returns:
Sequence of published versions
"""
return []
def get_package_info(self, version: str) -> Iterator[Tuple[str, TPackageInfo]]:
"""Given a release version of a package, retrieve the associated
package information for such version.
Args:
version: Package version
Returns:
(branch name, package metadata)
"""
yield from {}
def build_revision(
self, p_info: TPackageInfo, uncompressed_path: str, directory: Sha1Git
) -> Optional[Revision]:
"""Build the revision from the archive metadata (extrinsic
artifact metadata) and the intrinsic metadata.
Args:
p_info: Package information
uncompressed_path: Artifact uncompressed path on disk
Returns:
SWH data dict
"""
raise NotImplementedError("build_revision")
def get_default_version(self) -> str:
"""Retrieve the latest release version if any.
Returns:
Latest version
"""
return ""
def last_snapshot(self) -> Optional[Snapshot]:
"""Retrieve the last snapshot out of the last visit.
"""
return snapshot_get_latest(self.storage, self.url)
def known_artifacts(self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]:
"""Retrieve the known releases/artifact for the origin.
Args
snapshot: snapshot for the visit
Returns:
Dict of keys revision id (bytes), values a metadata Dict.
"""
if not snapshot:
return {}
# retrieve only revisions (e.g the alias we do not want here)
revs = [
rev.target
for rev in snapshot.branches.values()
if rev and rev.target_type == TargetType.REVISION
]
known_revisions = self.storage.revision_get(revs)
return {
revision["id"]: revision["metadata"]
for revision in known_revisions
if revision
}
def resolve_revision_from(
self, known_artifacts: Dict, p_info: TPackageInfo,
) -> Optional[bytes]:
"""Resolve the revision from a snapshot and an artifact metadata dict.
If the artifact has already been downloaded, this will return the
existing revision targeting that uncompressed artifact directory.
Otherwise, this returns None.
Args:
snapshot: Snapshot
p_info: Package information
Returns:
None or revision identifier
"""
return None
def download_package(
self, p_info: TPackageInfo, tmpdir: str
) -> List[Tuple[str, Mapping]]:
"""Download artifacts for a specific package. All downloads happen in
in the tmpdir folder.
Default implementation expects the artifacts package info to be
about one artifact per package.
Note that most implementation have 1 artifact per package. But some
implementation have multiple artifacts per package (debian), some have
none, the package is the artifact (gnu).
Args:
artifacts_package_info: Information on the package artifacts to
download (url, filename, etc...)
tmpdir: Location to retrieve such artifacts
Returns:
List of (path, computed hashes)
"""
return [download(p_info.url, dest=tmpdir, filename=p_info.filename)]
def uncompress(
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str
) -> str:
"""Uncompress the artifact(s) in the destination folder dest.
Optionally, this could need to use the p_info dict for some more
information (debian).
"""
uncompressed_path = os.path.join(dest, "src")
for a_path, _ in dl_artifacts:
uncompress(a_path, dest=uncompressed_path)
return uncompressed_path
def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]:
"""Return an extra dict of branches that are used to update the set of
branches.
"""
return {}
def load(self) -> Dict:
"""Load for a specific origin the associated contents.
for each package version of the origin
1. Fetch the files for one package version By default, this can be
implemented as a simple HTTP request. Loaders with more specific
requirements can override this, e.g.: the PyPI loader checks the
integrity of the downloaded files; the Debian loader has to download
and check several files for one package version.
2. Extract the downloaded files By default, this would be a universal
archive/tarball extraction.
Loaders for specific formats can override this method (for instance,
the Debian loader uses dpkg-source -x).
3. Convert the extracted directory to a set of Software Heritage
objects Using swh.model.from_disk.
4. Extract the metadata from the unpacked directories This would only
be applicable for "smart" loaders like npm (parsing the
package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing
debian/changelog and debian/control).
On "minimal-metadata" sources such as the GNU archive, the lister
should provide the minimal set of metadata needed to populate the
revision/release objects (authors, dates) as an argument to the
task.
5. Generate the revision/release objects for the given version. From
the data generated at steps 3 and 4.
end for each
6. Generate and load the snapshot for the visit
Using the revisions/releases collected at step 5., and the branch
information from step 0., generate a snapshot and load it into the
Software Heritage archive
"""
status_load = "uneventful" # either: eventful, uneventful, failed
status_visit = "full" # either: partial, full
tmp_revisions = {} # type: Dict[str, List]
snapshot = None
def finalize_visit() -> Dict[str, Any]:
"""Finalize the visit:
- flush eventual unflushed data to storage
- update origin visit's status
- return the task's status
"""
self.storage.flush()
snapshot_id: Optional[bytes] = None
if snapshot and snapshot.id: # to prevent the snapshot.id to b""
snapshot_id = snapshot.id
assert visit.visit
visit_status = OriginVisitStatus(
origin=self.url,
visit=visit.visit,
date=now(),
status=status_visit,
snapshot=snapshot_id,
)
self.storage.origin_visit_status_add([visit_status])
result: Dict[str, Any] = {
"status": status_load,
}
if snapshot_id:
result["snapshot_id"] = hash_to_hex(snapshot_id)
return result
# Prepare origin and origin_visit
origin = Origin(url=self.url)
try:
self.storage.origin_add([origin])
visit = self.storage.origin_visit_add(
[
OriginVisit(
origin=self.url, date=self.visit_date, type=self.visit_type,
)
]
)[0]
except Exception as e:
logger.exception("Failed to initialize origin_visit for %s", self.url)
sentry_sdk.capture_exception(e)
return {"status": "failed"}
try:
last_snapshot = self.last_snapshot()
logger.debug("last snapshot: %s", last_snapshot)
known_artifacts = self.known_artifacts(last_snapshot)
logger.debug("known artifacts: %s", known_artifacts)
except Exception as e:
logger.exception("Failed to get previous state for %s", self.url)
sentry_sdk.capture_exception(e)
status_visit = "partial"
status_load = "failed"
return finalize_visit()
load_exceptions: List[Exception] = []
for version in self.get_versions(): # for each
logger.debug("version: %s", version)
tmp_revisions[version] = []
# `p_` stands for `package_`
for branch_name, p_info in self.get_package_info(version):
logger.debug("package_info: %s", p_info)
revision_id = self.resolve_revision_from(known_artifacts, p_info)
if revision_id is None:
try:
revision_id = self._load_revision(p_info, origin)
if revision_id:
self._load_extrinsic_revision_metadata(p_info, revision_id)
self.storage.flush()
status_load = "eventful"
except Exception as e:
self.storage.clear_buffers()
load_exceptions.append(e)
sentry_sdk.capture_exception(e)
logger.exception(
"Failed loading branch %s for %s", branch_name, self.url
)
continue
if revision_id is None:
continue
tmp_revisions[version].append((branch_name, revision_id))
if load_exceptions:
status_visit = "partial"
if not tmp_revisions:
# We could not load any revisions; fail completely
status_visit = "partial"
status_load = "failed"
return finalize_visit()
try:
# Retrieve the default release version (the "latest" one)
default_version = self.get_default_version()
logger.debug("default version: %s", default_version)
# Retrieve extra branches
extra_branches = self.extra_branches()
logger.debug("extra branches: %s", extra_branches)
snapshot = self._load_snapshot(
default_version, tmp_revisions, extra_branches
)
except Exception as e:
logger.exception("Failed to build snapshot for origin %s", self.url)
sentry_sdk.capture_exception(e)
status_visit = "partial"
status_load = "failed"
return finalize_visit()
def _load_revision(self, p_info: TPackageInfo, origin) -> Optional[Sha1Git]:
"""Does all the loading of a revision itself:
* downloads a package and uncompresses it
* loads it from disk
* adds contents, directories, and revision to self.storage
* returns (revision_id, loaded)
Raises
exception when unable to download or uncompress artifacts
"""
with tempfile.TemporaryDirectory() as tmpdir:
dl_artifacts = self.download_package(p_info, tmpdir)
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir)
logger.debug("uncompressed_path: %s", uncompressed_path)
directory = from_disk.Directory.from_disk(
path=uncompressed_path.encode("utf-8"),
max_content_length=self.max_content_size,
)
contents, skipped_contents, directories = from_disk.iter_directory(
directory
)
logger.debug("Number of skipped contents: %s", len(skipped_contents))
self.storage.skipped_content_add(skipped_contents)
logger.debug("Number of contents: %s", len(contents))
self.storage.content_add(contents)
logger.debug("Number of directories: %s", len(directories))
self.storage.directory_add(directories)
# FIXME: This should be release. cf. D409
revision = self.build_revision(
p_info, uncompressed_path, directory=directory.hash
)
if not revision:
# Some artifacts are missing intrinsic metadata
# skipping those
return None
extra_metadata: Tuple[str, Any] = (
"original_artifact",
[hashes for _, hashes in dl_artifacts],
)
if revision.metadata is not None:
full_metadata = list(revision.metadata.items()) + [extra_metadata]
else:
full_metadata = [extra_metadata]
revision = attr.evolve(revision, metadata=ImmutableDict(full_metadata))
logger.debug("Revision: %s", revision)
self.storage.revision_add([revision])
return revision.id
def _load_snapshot(
self,
default_version: str,
revisions: Dict[str, List[Tuple[str, bytes]]],
extra_branches: Dict[bytes, Mapping[str, Any]],
) -> Optional[Snapshot]:
"""Build snapshot out of the current revisions stored and extra branches.
Then load it in the storage.
"""
logger.debug("revisions: %s", revisions)
# Build and load the snapshot
branches = {} # type: Dict[bytes, Mapping[str, Any]]
for version, branch_name_revisions in revisions.items():
if version == default_version and len(branch_name_revisions) == 1:
# only 1 branch (no ambiguity), we can create an alias
# branch 'HEAD'
branch_name, _ = branch_name_revisions[0]
# except for some corner case (deposit)
if branch_name != "HEAD":
branches[b"HEAD"] = {
"target_type": "alias",
"target": branch_name.encode("utf-8"),
}
for branch_name, target in branch_name_revisions:
branches[branch_name.encode("utf-8")] = {
"target_type": "revision",
"target": target,
}
# Deal with extra-branches
for name, branch_target in extra_branches.items():
if name in branches:
logger.error("Extra branch '%s' has been ignored", name)
else:
branches[name] = branch_target
snapshot_data = {"branches": branches}
logger.debug("snapshot: %s", snapshot_data)
snapshot = Snapshot.from_dict(snapshot_data)
logger.debug("snapshot: %s", snapshot)
self.storage.snapshot_add([snapshot])
return snapshot
def get_loader_name(self) -> str:
"""Returns a fully qualified name of this loader."""
return f"{self.__class__.__module__}.{self.__class__.__name__}"
def get_loader_version(self) -> str:
"""Returns the version of the current loader."""
module_name = self.__class__.__module__ or ""
module_name_parts = module_name.split(".")
# Iterate rootward through the package hierarchy until we find a parent of this
# loader's module with a __version__ attribute.
for prefix_size in range(len(module_name_parts), 0, -1):
package_name = ".".join(module_name_parts[0:prefix_size])
module = sys.modules[package_name]
if hasattr(module, "__version__"):
return module.__version__ # type: ignore
# If this loader's class has no parent package with a __version__,
# it should implement it itself.
raise NotImplementedError(
f"Could not dynamically find the version of {self.get_loader_name()}."
)
def get_metadata_fetcher(self) -> MetadataFetcher:
"""Returns a MetadataFetcher instance representing this package loader;
which is used to for adding provenance information to extracted
extrinsic metadata, if any."""
return MetadataFetcher(
name=self.get_loader_name(), version=self.get_loader_version(), metadata={},
)
def get_metadata_authority(self) -> MetadataAuthority:
"""For package loaders that get extrinsic metadata, returns the authority
the metadata are coming from.
"""
raise NotImplementedError("get_metadata_authority")
def build_extrinsic_revision_metadata(
self, p_info: TPackageInfo, revision_id: Sha1Git
) -> List[RawExtrinsicMetadata]:
if not p_info.revision_extrinsic_metadata:
# If this package loader doesn't write metadata, no need to require
# an implementation for get_metadata_authority.
return []
authority = self.get_metadata_authority()
fetcher = self.get_metadata_fetcher()
metadata_objects = []
for (discovery_date, format, metadata) in p_info.revision_extrinsic_metadata:
metadata_objects.append(
RawExtrinsicMetadata(
type=MetadataTargetType.REVISION,
id=SWHID(object_type="revision", object_id=revision_id),
discovery_date=discovery_date,
authority=authority,
fetcher=fetcher,
format=format,
metadata=metadata,
origin=self.url,
)
)
return metadata_objects
def _load_extrinsic_revision_metadata(
self, p_info: TPackageInfo, revision_id: Sha1Git
) -> None:
metadata_objects = self.build_extrinsic_revision_metadata(p_info, revision_id)
authorities = {
(
metadata_object.authority.type,
metadata_object.authority.url,
): metadata_object.authority
for metadata_object in metadata_objects
}
if authorities:
self.storage.metadata_authority_add(authorities.values())
fetchers = {
(
metadata_object.fetcher.name,
metadata_object.fetcher.version,
): metadata_object.fetcher
for metadata_object in metadata_objects
}
if fetchers:
self.storage.metadata_fetcher_add(fetchers.values())
if metadata_objects:
self.storage.object_metadata_add(metadata_objects)

File Metadata

Mime Type
text/x-python
Expires
Fri, Jul 4, 3:36 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452609

Event Timeline