Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
index 31e3f40..27e4453 100644
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -1,885 +1,883 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import hashlib
from itertools import islice
import json
import logging
import os
import string
import sys
import tempfile
from typing import (
Any,
Dict,
Generic,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
)
import attr
import sentry_sdk
from swh.core.tarball import uncompress
from swh.loader.core.loader import BaseLoader
from swh.loader.exception import NotFound
from swh.loader.package.utils import download
from swh.model import from_disk
from swh.model.collections import ImmutableDict
from swh.model.hashutil import hash_to_hex
from swh.model.identifiers import (
CoreSWHID,
ExtendedObjectType,
ExtendedSWHID,
ObjectType,
)
from swh.model.model import (
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
Origin,
OriginVisit,
OriginVisitStatus,
RawExtrinsicMetadata,
Revision,
Sha1Git,
Snapshot,
TargetType,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface
from swh.storage.utils import now
logger = logging.getLogger(__name__)
SWH_METADATA_AUTHORITY = MetadataAuthority(
type=MetadataAuthorityType.REGISTRY,
url="https://softwareheritage.org/",
metadata={},
)
"""Metadata authority for extrinsic metadata generated by Software Heritage.
Used for metadata on "original artifacts", ie. length, filename, and checksums
of downloaded archive files."""
PartialExtID = Tuple[str, bytes]
"""The ``extid_type`` and ``extid`` fields of an :ref:py:`ExtID` object."""
@attr.s
class RawExtrinsicMetadataCore:
"""Contains the core of the metadata extracted by a loader, that will be
used to build a full RawExtrinsicMetadata object by adding object identifier,
context, and provenance information."""
format = attr.ib(type=str)
metadata = attr.ib(type=bytes)
discovery_date = attr.ib(type=Optional[datetime.datetime], default=None)
"""Defaults to the visit date."""
@attr.s
class BasePackageInfo:
"""Compute the primary key for a dict using the id_keys as primary key
composite.
Args:
d: A dict entry to compute the primary key on
id_keys: Sequence of keys to use as primary key
Returns:
The identity for that dict entry
"""
url = attr.ib(type=str)
filename = attr.ib(type=Optional[str])
MANIFEST_FORMAT: Optional[string.Template] = None
"""If not None, used by the default extid() implementation to format a manifest,
before hashing it to produce an ExtID."""
EXTID_TYPE: str = "package-manifest-sha256"
# The following attribute has kw_only=True in order to allow subclasses
# to add attributes. Without kw_only, attributes without default values cannot
# go after attributes with default values.
# See <https://github.com/python-attrs/attrs/issues/38>
directory_extrinsic_metadata = attr.ib(
type=List[RawExtrinsicMetadataCore], default=[], kw_only=True,
)
# TODO: add support for metadata for directories and contents
def extid(self) -> Optional[PartialExtID]:
"""Returns a unique intrinsic identifier of this package info,
or None if this package info is not 'deduplicatable' (meaning that
we will always load it, instead of checking the ExtID storage
to see if we already did)"""
if self.MANIFEST_FORMAT is None:
return None
else:
manifest = self.MANIFEST_FORMAT.substitute(
{k: str(v) for (k, v) in attr.asdict(self).items()}
)
return (self.EXTID_TYPE, hashlib.sha256(manifest.encode()).digest())
TPackageInfo = TypeVar("TPackageInfo", bound=BasePackageInfo)
class PackageLoader(BaseLoader, Generic[TPackageInfo]):
# Origin visit type (str) set by the loader
visit_type = ""
visit_date: datetime.datetime
def __init__(
self,
storage: StorageInterface,
url: str,
max_content_size: Optional[int] = None,
):
"""Loader's constructor. This raises exception if the minimal required
configuration is missing (cf. fn:`check` method).
Args:
storage: Storage instance
url: Origin url to load data from
"""
super().__init__(storage=storage, max_content_size=max_content_size)
self.url = url
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc)
def get_versions(self) -> Sequence[str]:
"""Return the list of all published package versions.
Raises:
`class:swh.loader.exception.NotFound` error when failing to read the
published package versions.
Returns:
Sequence of published versions
"""
return []
def get_package_info(self, version: str) -> Iterator[Tuple[str, TPackageInfo]]:
"""Given a release version of a package, retrieve the associated
package information for such version.
Args:
version: Package version
Returns:
(branch name, package metadata)
"""
yield from {}
def build_revision(
self, p_info: TPackageInfo, uncompressed_path: str, directory: Sha1Git
) -> Optional[Revision]:
"""Build the revision from the archive metadata (extrinsic
artifact metadata) and the intrinsic metadata.
Args:
p_info: Package information
uncompressed_path: Artifact uncompressed path on disk
Returns:
Revision object
"""
raise NotImplementedError("build_revision")
def get_default_version(self) -> str:
"""Retrieve the latest release version if any.
Returns:
Latest version
"""
return ""
def last_snapshot(self) -> Optional[Snapshot]:
"""Retrieve the last snapshot out of the last visit.
"""
return snapshot_get_latest(self.storage, self.url)
def known_artifacts(
self, snapshot: Optional[Snapshot]
) -> Dict[Sha1Git, Optional[ImmutableDict[str, object]]]:
"""Retrieve the known releases/artifact for the origin.
Args
snapshot: snapshot for the visit
Returns:
Dict of keys revision id (bytes), values a metadata Dict.
"""
if not snapshot:
return {}
# retrieve only revisions (e.g the alias we do not want here)
revs = [
rev.target
for rev in snapshot.branches.values()
if rev and rev.target_type == TargetType.REVISION
]
known_revisions = self.storage.revision_get(revs)
return {
revision.id: revision.metadata for revision in known_revisions if revision
}
def new_packageinfo_to_extid(self, p_info: TPackageInfo) -> Optional[PartialExtID]:
return p_info.extid()
def known_artifact_to_extid(self, known_artifact: Dict) -> Optional[PartialExtID]:
"""Returns a unique intrinsic identifier of a downloaded artifact,
used to check if a new artifact is the same."""
return None
def resolve_revision_from(
self, known_artifacts: Dict, p_info: TPackageInfo,
) -> Optional[bytes]:
"""Resolve the revision from a snapshot and an artifact metadata dict.
If the artifact has already been downloaded, this will return the
existing revision targeting that uncompressed artifact directory.
Otherwise, this returns None.
Args:
snapshot: Snapshot
p_info: Package information
Returns:
None or revision identifier
"""
if not known_artifacts:
# No known artifact, no need to compute the artifact's extid
return None
new_extid = self.new_packageinfo_to_extid(p_info)
if new_extid is None:
# This loader does not support deduplication, at least not for this
# artifact.
return None
for rev_id, known_artifact in known_artifacts.items():
known_extid = self.known_artifact_to_extid(known_artifact)
if new_extid == known_extid:
return rev_id
return None
def download_package(
self, p_info: TPackageInfo, tmpdir: str
) -> List[Tuple[str, Mapping]]:
"""Download artifacts for a specific package. All downloads happen in
in the tmpdir folder.
Default implementation expects the artifacts package info to be
about one artifact per package.
Note that most implementation have 1 artifact per package. But some
implementation have multiple artifacts per package (debian), some have
none, the package is the artifact (gnu).
Args:
artifacts_package_info: Information on the package artifacts to
download (url, filename, etc...)
tmpdir: Location to retrieve such artifacts
Returns:
List of (path, computed hashes)
"""
return [download(p_info.url, dest=tmpdir, filename=p_info.filename)]
def uncompress(
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str
) -> str:
"""Uncompress the artifact(s) in the destination folder dest.
Optionally, this could need to use the p_info dict for some more
information (debian).
"""
uncompressed_path = os.path.join(dest, "src")
for a_path, _ in dl_artifacts:
uncompress(a_path, dest=uncompressed_path)
return uncompressed_path
def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]:
"""Return an extra dict of branches that are used to update the set of
branches.
"""
return {}
def finalize_visit(
self,
*,
snapshot: Optional[Snapshot],
visit: OriginVisit,
status_visit: str,
status_load: str,
failed_branches: List[str],
) -> Dict[str, Any]:
"""Finalize the visit:
- flush eventual unflushed data to storage
- update origin visit's status
- return the task's status
"""
self.storage.flush()
snapshot_id: Optional[bytes] = None
if snapshot and snapshot.id: # to prevent the snapshot.id to b""
snapshot_id = snapshot.id
assert visit.visit
visit_status = OriginVisitStatus(
origin=self.url,
visit=visit.visit,
type=self.visit_type,
date=now(),
status=status_visit,
snapshot=snapshot_id,
)
self.storage.origin_visit_status_add([visit_status])
result: Dict[str, Any] = {
"status": status_load,
}
if snapshot_id:
result["snapshot_id"] = hash_to_hex(snapshot_id)
if failed_branches:
logger.warning("%d failed branches", len(failed_branches))
for i, urls in enumerate(islice(failed_branches, 50)):
prefix_url = "Failed branches: " if i == 0 else ""
logger.warning("%s%s", prefix_url, urls)
return result
def load(self) -> Dict:
"""Load for a specific origin the associated contents.
for each package version of the origin
1. Fetch the files for one package version By default, this can be
implemented as a simple HTTP request. Loaders with more specific
requirements can override this, e.g.: the PyPI loader checks the
integrity of the downloaded files; the Debian loader has to download
and check several files for one package version.
2. Extract the downloaded files By default, this would be a universal
archive/tarball extraction.
Loaders for specific formats can override this method (for instance,
the Debian loader uses dpkg-source -x).
3. Convert the extracted directory to a set of Software Heritage
objects Using swh.model.from_disk.
4. Extract the metadata from the unpacked directories This would only
be applicable for "smart" loaders like npm (parsing the
package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing
debian/changelog and debian/control).
On "minimal-metadata" sources such as the GNU archive, the lister
should provide the minimal set of metadata needed to populate the
revision/release objects (authors, dates) as an argument to the
task.
5. Generate the revision/release objects for the given version. From
the data generated at steps 3 and 4.
end for each
6. Generate and load the snapshot for the visit
Using the revisions/releases collected at step 5., and the branch
information from step 0., generate a snapshot and load it into the
Software Heritage archive
"""
status_load = "uneventful" # either: eventful, uneventful, failed
status_visit = "full" # see swh.model.model.OriginVisitStatus
snapshot = None
failed_branches: List[str] = []
# Prepare origin and origin_visit
origin = Origin(url=self.url)
try:
self.storage.origin_add([origin])
visit = list(
self.storage.origin_visit_add(
[
OriginVisit(
origin=self.url, date=self.visit_date, type=self.visit_type,
)
]
)
)[0]
except Exception as e:
logger.exception("Failed to initialize origin_visit for %s", self.url)
sentry_sdk.capture_exception(e)
return {"status": "failed"}
try:
last_snapshot = self.last_snapshot()
logger.debug("last snapshot: %s", last_snapshot)
known_artifacts = self.known_artifacts(last_snapshot)
logger.debug("known artifacts: %s", known_artifacts)
except Exception as e:
logger.exception("Failed to get previous state for %s", self.url)
sentry_sdk.capture_exception(e)
return self.finalize_visit(
snapshot=snapshot,
visit=visit,
failed_branches=failed_branches,
status_visit="failed",
status_load="failed",
)
load_exceptions: List[Exception] = []
try:
versions = self.get_versions()
except NotFound:
return self.finalize_visit(
snapshot=snapshot,
visit=visit,
failed_branches=failed_branches,
status_visit="not_found",
status_load="failed",
)
except Exception:
return self.finalize_visit(
snapshot=snapshot,
visit=visit,
failed_branches=failed_branches,
status_visit="failed",
status_load="failed",
)
- packages_info = [
- (version, branch_name, p_info)
- for version in versions
- for (branch_name, p_info) in self.get_package_info(version)
- ]
-
tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = {
version: [] for version in versions
}
- for (version, branch_name, p_info) in packages_info:
- logger.debug("package_info: %s", p_info)
- revision_id = self.resolve_revision_from(known_artifacts, p_info)
- if revision_id is None:
- try:
- res = self._load_revision(p_info, origin)
- if res:
- (revision_id, directory_id) = res
- assert revision_id
- assert directory_id
- self._load_extrinsic_directory_metadata(
- p_info, revision_id, directory_id
- )
- self.storage.flush()
- status_load = "eventful"
- except Exception as e:
- self.storage.clear_buffers()
- load_exceptions.append(e)
- sentry_sdk.capture_exception(e)
- logger.exception(
- "Failed loading branch %s for %s", branch_name, self.url
- )
- failed_branches.append(branch_name)
- continue
+ for version in versions:
+ logger.debug("version: %s", version)
+ # `p_` stands for `package_`
+ for branch_name, p_info in self.get_package_info(version):
+ logger.debug("package_info: %s", p_info)
+ revision_id = self.resolve_revision_from(known_artifacts, p_info)
if revision_id is None:
- continue
+ try:
+ res = self._load_revision(p_info, origin)
+ if res:
+ (revision_id, directory_id) = res
+ assert revision_id
+ assert directory_id
+ self._load_extrinsic_directory_metadata(
+ p_info, revision_id, directory_id
+ )
+ self.storage.flush()
+ status_load = "eventful"
+ except Exception as e:
+ self.storage.clear_buffers()
+ load_exceptions.append(e)
+ sentry_sdk.capture_exception(e)
+ logger.exception(
+ "Failed loading branch %s for %s", branch_name, self.url
+ )
+ failed_branches.append(branch_name)
+ continue
+
+ if revision_id is None:
+ continue
- tmp_revisions[version].append((branch_name, revision_id))
+ tmp_revisions[version].append((branch_name, revision_id))
if load_exceptions:
status_visit = "partial"
if not tmp_revisions:
# We could not load any revisions; fail completely
return self.finalize_visit(
snapshot=snapshot,
visit=visit,
failed_branches=failed_branches,
status_visit="failed",
status_load="failed",
)
try:
# Retrieve the default release version (the "latest" one)
default_version = self.get_default_version()
logger.debug("default version: %s", default_version)
# Retrieve extra branches
extra_branches = self.extra_branches()
logger.debug("extra branches: %s", extra_branches)
snapshot = self._load_snapshot(
default_version, tmp_revisions, extra_branches
)
self.storage.flush()
except Exception as e:
logger.exception("Failed to build snapshot for origin %s", self.url)
sentry_sdk.capture_exception(e)
status_visit = "failed"
status_load = "failed"
if snapshot:
try:
metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id)
self._load_metadata_objects(metadata_objects)
except Exception as e:
logger.exception(
"Failed to load extrinsic snapshot metadata for %s", self.url
)
sentry_sdk.capture_exception(e)
status_visit = "partial"
status_load = "failed"
try:
metadata_objects = self.build_extrinsic_origin_metadata()
self._load_metadata_objects(metadata_objects)
except Exception as e:
logger.exception(
"Failed to load extrinsic origin metadata for %s", self.url
)
sentry_sdk.capture_exception(e)
status_visit = "partial"
status_load = "failed"
return self.finalize_visit(
snapshot=snapshot,
visit=visit,
failed_branches=failed_branches,
status_visit=status_visit,
status_load=status_load,
)
def _load_directory(
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str
) -> Tuple[str, from_disk.Directory]:
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir)
logger.debug("uncompressed_path: %s", uncompressed_path)
directory = from_disk.Directory.from_disk(
path=uncompressed_path.encode("utf-8"),
max_content_length=self.max_content_size,
)
contents, skipped_contents, directories = from_disk.iter_directory(directory)
logger.debug("Number of skipped contents: %s", len(skipped_contents))
self.storage.skipped_content_add(skipped_contents)
logger.debug("Number of contents: %s", len(contents))
self.storage.content_add(contents)
logger.debug("Number of directories: %s", len(directories))
self.storage.directory_add(directories)
return (uncompressed_path, directory)
def _load_revision(
self, p_info: TPackageInfo, origin
) -> Optional[Tuple[Sha1Git, Sha1Git]]:
"""Does all the loading of a revision itself:
* downloads a package and uncompresses it
* loads it from disk
* adds contents, directories, and revision to self.storage
* returns (revision_id, directory_id)
Raises
exception when unable to download or uncompress artifacts
"""
with tempfile.TemporaryDirectory() as tmpdir:
dl_artifacts = self.download_package(p_info, tmpdir)
(uncompressed_path, directory) = self._load_directory(dl_artifacts, tmpdir)
# FIXME: This should be release. cf. D409
revision = self.build_revision(
p_info, uncompressed_path, directory=directory.hash
)
if not revision:
# Some artifacts are missing intrinsic metadata
# skipping those
return None
metadata = [metadata for (filepath, metadata) in dl_artifacts]
extra_metadata: Tuple[str, Any] = (
"original_artifact",
metadata,
)
if revision.metadata is not None:
full_metadata = list(revision.metadata.items()) + [extra_metadata]
else:
full_metadata = [extra_metadata]
# TODO: don't add these extrinsic metadata to the revision.
revision = attr.evolve(revision, metadata=ImmutableDict(full_metadata))
original_artifact_metadata = RawExtrinsicMetadata(
target=ExtendedSWHID(
object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory
),
discovery_date=self.visit_date,
authority=SWH_METADATA_AUTHORITY,
fetcher=self.get_metadata_fetcher(),
format="original-artifacts-json",
metadata=json.dumps(metadata).encode(),
origin=self.url,
revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=revision.id),
)
self._load_metadata_objects([original_artifact_metadata])
logger.debug("Revision: %s", revision)
self.storage.revision_add([revision])
assert directory.hash
return (revision.id, directory.hash)
def _load_snapshot(
self,
default_version: str,
revisions: Dict[str, List[Tuple[str, bytes]]],
extra_branches: Dict[bytes, Mapping[str, Any]],
) -> Optional[Snapshot]:
"""Build snapshot out of the current revisions stored and extra branches.
Then load it in the storage.
"""
logger.debug("revisions: %s", revisions)
# Build and load the snapshot
branches = {} # type: Dict[bytes, Mapping[str, Any]]
for version, branch_name_revisions in revisions.items():
if version == default_version and len(branch_name_revisions) == 1:
# only 1 branch (no ambiguity), we can create an alias
# branch 'HEAD'
branch_name, _ = branch_name_revisions[0]
# except for some corner case (deposit)
if branch_name != "HEAD":
branches[b"HEAD"] = {
"target_type": "alias",
"target": branch_name.encode("utf-8"),
}
for branch_name, target in branch_name_revisions:
branches[branch_name.encode("utf-8")] = {
"target_type": "revision",
"target": target,
}
# Deal with extra-branches
for name, branch_target in extra_branches.items():
if name in branches:
logger.error("Extra branch '%s' has been ignored", name)
else:
branches[name] = branch_target
snapshot_data = {"branches": branches}
logger.debug("snapshot: %s", snapshot_data)
snapshot = Snapshot.from_dict(snapshot_data)
logger.debug("snapshot: %s", snapshot)
self.storage.snapshot_add([snapshot])
return snapshot
def get_loader_name(self) -> str:
"""Returns a fully qualified name of this loader."""
return f"{self.__class__.__module__}.{self.__class__.__name__}"
def get_loader_version(self) -> str:
"""Returns the version of the current loader."""
module_name = self.__class__.__module__ or ""
module_name_parts = module_name.split(".")
# Iterate rootward through the package hierarchy until we find a parent of this
# loader's module with a __version__ attribute.
for prefix_size in range(len(module_name_parts), 0, -1):
package_name = ".".join(module_name_parts[0:prefix_size])
module = sys.modules[package_name]
if hasattr(module, "__version__"):
return module.__version__ # type: ignore
# If this loader's class has no parent package with a __version__,
# it should implement it itself.
raise NotImplementedError(
f"Could not dynamically find the version of {self.get_loader_name()}."
)
def get_metadata_fetcher(self) -> MetadataFetcher:
"""Returns a MetadataFetcher instance representing this package loader;
which is used to for adding provenance information to extracted
extrinsic metadata, if any."""
return MetadataFetcher(
name=self.get_loader_name(), version=self.get_loader_version(), metadata={},
)
def get_metadata_authority(self) -> MetadataAuthority:
"""For package loaders that get extrinsic metadata, returns the authority
the metadata are coming from.
"""
raise NotImplementedError("get_metadata_authority")
def get_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadataCore]:
"""Returns metadata items, used by build_extrinsic_origin_metadata."""
return []
def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]:
"""Builds a list of full RawExtrinsicMetadata objects, using
metadata returned by get_extrinsic_origin_metadata."""
metadata_items = self.get_extrinsic_origin_metadata()
if not metadata_items:
# If this package loader doesn't write metadata, no need to require
# an implementation for get_metadata_authority.
return []
authority = self.get_metadata_authority()
fetcher = self.get_metadata_fetcher()
metadata_objects = []
for item in metadata_items:
metadata_objects.append(
RawExtrinsicMetadata(
target=Origin(self.url).swhid(),
discovery_date=item.discovery_date or self.visit_date,
authority=authority,
fetcher=fetcher,
format=item.format,
metadata=item.metadata,
)
)
return metadata_objects
def get_extrinsic_snapshot_metadata(self) -> List[RawExtrinsicMetadataCore]:
"""Returns metadata items, used by build_extrinsic_snapshot_metadata."""
return []
def build_extrinsic_snapshot_metadata(
self, snapshot_id: Sha1Git
) -> List[RawExtrinsicMetadata]:
"""Builds a list of full RawExtrinsicMetadata objects, using
metadata returned by get_extrinsic_snapshot_metadata."""
metadata_items = self.get_extrinsic_snapshot_metadata()
if not metadata_items:
# If this package loader doesn't write metadata, no need to require
# an implementation for get_metadata_authority.
return []
authority = self.get_metadata_authority()
fetcher = self.get_metadata_fetcher()
metadata_objects = []
for item in metadata_items:
metadata_objects.append(
RawExtrinsicMetadata(
target=ExtendedSWHID(
object_type=ExtendedObjectType.SNAPSHOT, object_id=snapshot_id
),
discovery_date=item.discovery_date or self.visit_date,
authority=authority,
fetcher=fetcher,
format=item.format,
metadata=item.metadata,
origin=self.url,
)
)
return metadata_objects
def build_extrinsic_directory_metadata(
self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git,
) -> List[RawExtrinsicMetadata]:
if not p_info.directory_extrinsic_metadata:
# If this package loader doesn't write metadata, no need to require
# an implementation for get_metadata_authority.
return []
authority = self.get_metadata_authority()
fetcher = self.get_metadata_fetcher()
metadata_objects = []
for item in p_info.directory_extrinsic_metadata:
metadata_objects.append(
RawExtrinsicMetadata(
target=ExtendedSWHID(
object_type=ExtendedObjectType.DIRECTORY, object_id=directory_id
),
discovery_date=item.discovery_date or self.visit_date,
authority=authority,
fetcher=fetcher,
format=item.format,
metadata=item.metadata,
origin=self.url,
revision=CoreSWHID(
object_type=ObjectType.REVISION, object_id=revision_id
),
)
)
return metadata_objects
def _load_extrinsic_directory_metadata(
self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git,
) -> None:
metadata_objects = self.build_extrinsic_directory_metadata(
p_info, revision_id, directory_id
)
self._load_metadata_objects(metadata_objects)
def _load_metadata_objects(
self, metadata_objects: List[RawExtrinsicMetadata]
) -> None:
if not metadata_objects:
# If this package loader doesn't write metadata, no need to require
# an implementation for get_metadata_authority.
return
self._create_authorities(mo.authority for mo in metadata_objects)
self._create_fetchers(mo.fetcher for mo in metadata_objects)
self.storage.raw_extrinsic_metadata_add(metadata_objects)
def _create_authorities(self, authorities: Iterable[MetadataAuthority]) -> None:
deduplicated_authorities = {
(authority.type, authority.url): authority for authority in authorities
}
if authorities:
self.storage.metadata_authority_add(list(deduplicated_authorities.values()))
def _create_fetchers(self, fetchers: Iterable[MetadataFetcher]) -> None:
deduplicated_fetchers = {
(fetcher.name, fetcher.version): fetcher for fetcher in fetchers
}
if fetchers:
self.storage.metadata_fetcher_add(list(deduplicated_fetchers.values()))

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 8:58 PM (4 w, 6 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3337096

Event Timeline