diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1c95e3d..f972cd9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,40 +1,40 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.1.0 + rev: v4.3.0 hooks: - id: trailing-whitespace - id: check-json - id: check-yaml - - repo: https://gitlab.com/pycqa/flake8 - rev: 4.0.1 + - repo: https://github.com/pycqa/flake8 + rev: 5.0.4 hooks: - id: flake8 - additional_dependencies: [flake8-bugbear==22.3.23] + additional_dependencies: [flake8-bugbear==22.9.23] - repo: https://github.com/codespell-project/codespell - rev: v2.1.0 + rev: v2.2.2 hooks: - id: codespell name: Check source code spelling stages: [commit] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] - repo: https://github.com/PyCQA/isort rev: 5.10.1 hooks: - id: isort - repo: https://github.com/python/black - rev: 22.3.0 + rev: 22.10.0 hooks: - id: black diff --git a/PKG-INFO b/PKG-INFO index 4a1f9f1..0bffcf8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,42 +1,42 @@ Metadata-Version: 2.1 Name: swh.model -Version: 6.6.0 +Version: 6.6.1 Summary: Software Heritage data model Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-model Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: cli Provides-Extra: testing-minimal Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-model ========= Implementation of the Data model of the Software Heritage project, used to archive source code artifacts. This module defines the notion of SoftWare Heritage persistent IDentifiers (SWHIDs) and provides tools to compute them: ```sh $ swh-identify fork.c kmod.c sched/deadline.c swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c $ swh-identify --no-filename /usr/src/linux/kernel/ swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab ``` diff --git a/docs/persistent-identifiers.rst b/docs/persistent-identifiers.rst index 105f58c..01da6aa 100644 --- a/docs/persistent-identifiers.rst +++ b/docs/persistent-identifiers.rst @@ -1,427 +1,427 @@ .. _persistent-identifiers: .. _swhids: ================================================= SoftWare Heritage persistent IDentifiers (SWHIDs) ================================================= **version 1.6, last modified 2021-04-30** .. contents:: :local: :depth: 2 Overview ======== You can point to objects present in the `Software Heritage `_ `archive `_ by the means of **SoftWare Heritage persistent IDentifiers**, or **SWHIDs** for short, that are guaranteed to remain stable (persistent) over time. Their syntax, meaning, and usage is described below. Note that they are identifiers and not URLs, even though URL-based `resolvers`_ for SWHIDs are also available. A SWHID consists of two separate parts, a mandatory *core identifier* that can point to any software artifact (or "object") available in the Software Heritage archive, and an optional list of *qualifiers* that allows to specify the context where the object is meant to be seen and point to a subpart of the object itself. Objects come in different types: * contents * directories * revisions * releases * snapshots Each object is identified by an intrinsic, type-specific object identifier that is embedded in its SWHID as described below. The intrinsic identifiers embedded in SWHIDs are strong cryptographic hashes computed on the entire set of object properties. Together, these identifiers form a `Merkle structure `_, specifically a Merkle `DAG `_. See the :ref:`Software Heritage data model ` for an overview of object types and how they are linked together. See :py:mod:`swh.model.git_objects` for details on how the intrinsic identifiers embedded in SWHIDs are computed. The optional qualifiers are of two kinds: * **context qualifiers:** carry information about the context where a given object is meant to be seen. This is particularly important, as the same object can be reached in the Merkle graph following different *paths* starting from different nodes (or *anchors*), and it may have been retrieved from different *origins*, that may evolve between different *visits* * **fragment qualifiers:** allow to pinpoint specific subparts of an object .. _swhids-syntax: Syntax ====== Syntactically, SWHIDs are generated by the ```` entry point in the following grammar: .. code-block:: bnf ::= [ ] ; ::= "swh" ":" ":" ":" ; ::= "1" ; ::= "snp" (* snapshot *) | "rel" (* release *) | "rev" (* revision *) | "dir" (* directory *) | "cnt" (* content *) ; ::= 40 * ; (* intrinsic object id, as hex-encoded SHA1 *) ::= "0" | "1" | "2" | "3" | "4" | "5" | "6" | "7" | "8" | "9" ; ::= | "a" | "b" | "c" | "d" | "e" | "f" ; := ";" [ ] ; ::= | ; ::= | | | ; ::= "origin" "=" ; ::= "visit" "=" ; ::= "anchor" "=" ; ::= "path" "=" ; ::= "lines" "=" ["-" ] ; ::= + ; ::= (* RFC 3987 IRI *) ::= (* RFC 3987 absolute path *) Where: - ```` is an ```` from `RFC 3987`_, and - ```` is a `RFC 3987`_ IRI in either case all occurrences of ``;`` (and ``%``, as required by the RFC) have been percent-encoded (as ``%3B`` and ``%25`` respectively). Other characters *can* be percent-encoded, e.g., to improve readability and/or embeddability of SWHID in other contexts. .. _RFC 3987: https://tools.ietf.org/html/rfc3987 .. _swhids-semantics: Semantics ========= .. _swhids-core: Core identifiers ---------------- ``:`` is used as separator between the logical parts of core identifiers. The ``swh`` prefix makes explicit that these identifiers are related to *SoftWare Heritage*. ``1`` (````) is the current version of this identifier *scheme*. Future editions will use higher version numbers, possibly breaking backward compatibility, but without breaking the resolvability of SWHIDs that conform to previous versions of the scheme. A SWHID points to a single object, whose type is explicitly captured by ````: * ``snp`` to **snapshots**, * ``rel`` to **releases**, * ``rev`` to **revisions**, * ``dir`` to **directories**, * ``cnt`` to **contents**. The actual object pointed to is identified by the intrinsic identifier ````, which is a hex-encoded (using lowercase ASCII characters) SHA1 computed on the content and metadata of the object itself, as follows: * for **snapshots**, intrinsic identifiers are SHA1 hashes of manifests computed as per :py:func:`swh.model.git_objects.snapshot_git_object` * for **releases**, as per :py:func:`swh.model.git_objects.release_git_object` that produces the same result as a git release hash * for **revisions**, as per :py:func:`swh.model.git_objects.revision_git_object` that produces the same result as a git commit hash * for **directories**, per :py:func:`swh.model.git_objects.directory_git_object` that produces the same result as a git tree hash * for **contents**, the intrinsic identifier is the ``sha1_git`` hash returned by :py:meth:`swh.hashutil.MultiHash.digest`, i.e., the SHA1 of a byte sequence obtained by juxtaposing the ASCII string ``"blob"`` (without quotes), a space, the length of the content as decimal digits, a NULL byte, and the actual content of the file. .. _swhids-qualifiers: Qualifiers ---------- ``;`` is used as separator between the core identifier and the optional qualifiers, as well as between qualifiers. Each qualifier is specified as a key/value pair, using ``=`` as a separator. The following *context qualifiers* are available: * **origin:** the *software origin* where an object has been found or observed in the wild, as an URI; * **visit:** the core identifier of a *snapshot* corresponding to a specific *visit* of a repository containing the designated object; * **anchor:** a *designated node* in the Merkle DAG relative to which a *path to the object* is specified, as the core identifier of a directory, a revision, a release or a snapshot; * **path:** the *absolute file path*, from the *root directory* associated to the *anchor node*, to the object; when the anchor denotes a directory or a revision, and almost always when it's a release, the root directory is uniquely determined; when the anchor denotes a snapshot, the root directory is the one pointed to by ``HEAD`` (possibly indirectly), and undefined if such a reference is missing; The following *fragment qualifier* is available: * **lines:** *line number(s)* of interest, usually within a content object We recommend to equip identifiers meant to be shared with as many qualifiers as possible. While qualifiers may be listed in any order, it is good practice to present them in the order given above, i.e., ``origin``, ``visit``, ``anchor``, ``path``, ``lines``. Redundant information should be omitted: for example, if the *visit* is present, and the *path* is relative to the snapshot indicated there, then the *anchor* qualifier is superfluous; similarly, if the *path* is empty, it may be omitted. Interoperability ================ URI scheme ---------- The ``swh`` URI scheme is registered at IANA for SWHIDs. The present documents constitutes the scheme specification for such URI scheme. Git compatibility ----------------- SWHIDs for contents, directories, revisions, and releases are, at present, compatible with the `Git `_ way of `computing identifiers `_ for its objects. The ```` part of a SWHID for a content object is the Git blob identifier of any file with the same content; for a revision it is the Git commit identifier for the same revision, etc. This is not the case for snapshot identifiers, as Git does not have a corresponding object type. Note that Git compatibility is incidental and is not guaranteed to be maintained in future versions of this scheme (or Git). Automatically fixing invalid SWHIDs ----------------------------------- User interfaces may fix invalid SWHIDs, by lower-casing the ```` part of a SWHID, if it contains upper-case letters because of user errors or limitations in software displaying SWHIDs. However, implementations displaying or generating SWHIDs should not rely on this behavior, and must display or generate only valid SWHIDs when technically possible. User interfaces should show an error when such an automatic fix occurs, so users have a chance to fix their SWHID before pasting it to an other interface that does not perform the same corrections. This also makes it easier to understand issues when a case-sensitive qualifier has its casing altered. Examples ======== Core identifiers ---------------- * ``swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2`` points to the content of a file containing the full text of the GPL3 license * ``swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505`` points to a directory containing the source code of the Darktable photography application as it was at some point on 4 May 2017 * ``swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d`` points to a commit in the development history of Darktable, dated 16 January 2017, that added undo/redo supports for masks * ``swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f`` points to Darktable release 2.3.0, dated 24 December 2016 * ``swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453`` points to a snapshot of the entire Darktable Git repository taken on 4 May 2017 from GitHub Identifiers with qualifiers --------------------------- * The following :swh_web:`SWHID ` denotes the lines 9 to 15 of a file content that can be found at absolute path ``/Examples/SimpleFarm/simplefarm.ml`` from the root directory of the revision ``swh:1:rev:2db189928c94d62a3b4757b3eec68f0a4d4113f0`` that is contained in the snapshot ``swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9`` taken from the origin ``https://gitorious.org/ocamlp3l/ocamlp3l_cvs.git``:: swh:1:cnt:4d99d2d18326621ccdd70f5ea66c2e2ac236ad8b; origin=https://gitorious.org/ocamlp3l/ocamlp3l_cvs.git; visit=swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9; anchor=swh:1:rev:2db189928c94d62a3b4757b3eec68f0a4d4113f0; path=/Examples/SimpleFarm/simplefarm.ml; lines=9-15 * Here is an example of a :swh_web:`SWHID - ` + ` with a file path that requires percent-escaping:: swh:1:cnt:f10371aa7b8ccabca8479196d6cd640676fd4a04; origin=https://github.com/web-platform-tests/wpt; visit=swh:1:snp:b37d435721bbd450624165f334724e3585346499; anchor=swh:1:rev:259d0612af038d14f2cd889a14a3adb6c9e96d96; path=/html/semantics/document-metadata/the-meta-element/pragma-directives/attr-meta-http-equiv-refresh/support/x%3Burl=foo/ Implementation ============== Computing --------- An important property of any SWHID is that its core identifier is *intrinsic*: it can be *computed from the object itself*, without having to rely on any third party. An implementation of SWHID that allows to do so locally is the `swh identify `_ tool, available from the `swh.model `_ Python package under the GPL license. This package can be installed via the ``pip`` package manager with the one liner ``pip3 install swh.model[cli]`` on any machine with Python (at least version 3.7) and ``pip`` installed (on a Debian or Ubuntu system a simple ``apt install python3 python3-pip`` will suffice, see `the general instructions `_ for other platforms). SWHIDs are also automatically computed by Software Heritage for all archived objects as part of its archival activity, and can be looked up via the project :swh_web:`Web interface <>`. This has various practical implications: * when a software artifact is obtained from Software Heritage by resolving a SWHID, it is straightforward to verify that it is exactly the intended one: just compute the core identifier from the artefact itself, and check that it is the same as the core identifier part of the SHWID * the core identifier of a software artifact can be computed *before* its archival on Software Heritage Choosing what type of SWHID to use ---------------------------------- ``swh:1:dir:`` SWHIDs are the most robust SWHIDs, as they can be recomputed from the simplest objects (a directory structure on a filesystem), even when all metadata is lost, without relying on the Software Heritage archive. Therefore, we advise implementers and users to prefer this type of SWHIDs over ``swh:1:rev:`` and ``swh:1:rel:`` to reference a source code artifacts. However, since keeping the metadata is also important, you should add an anchor qualifier to ``swh:1:dir:`` SWHIDs whenever possible, so the metadata stored in the Software Heritage archive can be retrieved when needed. This means, for example, that you should prefer ``swh:1:dir:a8eded6a2d062c998ba2dcc3dcb0ce68a4e15a58;anchor=swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f`` over ``swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f``. Resolvers --------- Software Heritage resolver ~~~~~~~~~~~~~~~~~~~~~~~~~~ SWHIDs can be resolved using the Software Heritage :swh_web:`Web interface <>`. In particular, the **root endpoint** ``/`` can be given a SWHID and will lead to the browsing page of the corresponding object, like this: ``https://archive.softwareheritage.org/``. A **dedicated** ``/resolve`` **endpoint** of the Software Heritage :swh_web:`Web API ` is also available to programmatically resolve SWHIDs; see: :http:get:`/api/1/resolve/(swhid)/`. Examples: -* :swh_web:`` -* :swh_web:`` -* :swh_web:`` -* :swh_web:`` -* :swh_web:`` -* :swh_web:`` -* :swh_web:`` +* :swh_web:`swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2` +* :swh_web:`swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505` +* :swh_web:`api/1/resolve/swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d` +* :swh_web:`api/1/resolve/swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f` +* :swh_web:`api/1/resolve/swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453` +* :swh_web:`swh:1:cnt:4d99d2d18326621ccdd70f5ea66c2e2ac236ad8b;origin=https://gitorious.org/ocamlp3l/ocamlp3l_cvs.git;visit=swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9;anchor=swh:1:rev:2db189928c94d62a3b4757b3eec68f0a4d4113f0;path=/Examples/SimpleFarm/simplefarm.ml;lines=9-15` +* :swh_web:`swh:1:cnt:f10371aa7b8ccabca8479196d6cd640676fd4a04;origin=https://github.com/web-platform-tests/wpt;visit=swh:1:snp:b37d435721bbd450624165f334724e3585346499;anchor=swh:1:rev:259d0612af038d14f2cd889a14a3adb6c9e96d96;path=/html/semantics/document-metadata/the-meta-element/pragma-directives/attr-meta-http-equiv-refresh/support/x%253Burl=foo/` Third-party resolvers ~~~~~~~~~~~~~~~~~~~~~ The following **third party resolvers** support SWHID resolution: * `Identifiers.org `_; see: ``_ (registry identifier `MIR:00000655 `_). * `Name-to-Thing (N2T) `_ Note that resolution via Identifiers.org currently only supports *core identifiers* due to `syntactic incompatibilities with qualifiers `_. Examples: * ``_ * ``_ * ``_ * ``_ * ``_ * ``_ -* ``_ +* ``_ References ========== * Roberto Di Cosmo, Morane Gruenpeter, Stefano Zacchiroli. `Identifiers for Digital Objects: the Case of Software Source Code Preservation `_. In Proceedings of `iPRES 2018 `_: 15th International Conference on Digital Preservation, Boston, MA, USA, September 2018, 9 pages. * Roberto Di Cosmo, Morane Gruenpeter, Stefano Zacchiroli. `Referencing Source Code Artifacts: a Separate Concern in Software Citation `_. In Computing in Science and Engineering, volume 22, issue 2, pages 33-43. ISSN 1521-9615, IEEE. March 2020. diff --git a/mypy.ini b/mypy.ini index e3daf6d..d411c51 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,26 +1,6 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) -[mypy-attrs_strict.*] # a bit sad, but... -ignore_missing_imports = True - -[mypy-deprecated.*] -ignore_missing_imports = True - -[mypy-django.*] # false positive, only used my hypotesis' extras -ignore_missing_imports = True - -[mypy-dulwich.*] -ignore_missing_imports = True - -[mypy-iso8601.*] -ignore_missing_imports = True - -[mypy-pkg_resources.*] -ignore_missing_imports = True - -[mypy-pytest.*] -ignore_missing_imports = True diff --git a/swh.model.egg-info/PKG-INFO b/swh.model.egg-info/PKG-INFO index 4a1f9f1..0bffcf8 100644 --- a/swh.model.egg-info/PKG-INFO +++ b/swh.model.egg-info/PKG-INFO @@ -1,42 +1,42 @@ Metadata-Version: 2.1 Name: swh.model -Version: 6.6.0 +Version: 6.6.1 Summary: Software Heritage data model Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-model Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: cli Provides-Extra: testing-minimal Provides-Extra: testing License-File: LICENSE License-File: AUTHORS swh-model ========= Implementation of the Data model of the Software Heritage project, used to archive source code artifacts. This module defines the notion of SoftWare Heritage persistent IDentifiers (SWHIDs) and provides tools to compute them: ```sh $ swh-identify fork.c kmod.c sched/deadline.c swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c $ swh-identify --no-filename /usr/src/linux/kernel/ swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab ``` diff --git a/swh/model/from_disk.py b/swh/model/from_disk.py index 8bd7f5d..058e77f 100644 --- a/swh/model/from_disk.py +++ b/swh/model/from_disk.py @@ -1,592 +1,595 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Conversion from filesystem tree to SWH objects. This module allows reading a tree of directories and files from a local filesystem, and convert them to in-memory data structures, which can then be exported to SWH data model objects, as defined in :mod:`swh.model.model`. """ import datetime import enum import fnmatch import glob import os import re import stat from typing import Any, Iterable, Iterator, List, Optional, Pattern, Tuple import attr from attrs_strict import type_validator from typing_extensions import Final from . import model from .exceptions import InvalidDirectoryPath from .git_objects import directory_entry_sort_key from .hashutil import MultiHash, hash_to_hex from .merkle import MerkleLeaf, MerkleNode from .swhids import CoreSWHID, ObjectType @attr.s(frozen=True, slots=True) class DiskBackedContent(model.BaseContent): """Content-like class, which allows lazy-loading data from the disk.""" object_type: Final = "content_file" sha1 = attr.ib(type=bytes, validator=type_validator()) sha1_git = attr.ib(type=model.Sha1Git, validator=type_validator()) sha256 = attr.ib(type=bytes, validator=type_validator()) blake2s256 = attr.ib(type=bytes, validator=type_validator()) length = attr.ib(type=int, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden"]), default="visible", ) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) path = attr.ib(type=Optional[bytes], default=None) @classmethod def from_dict(cls, d): return cls(**d) def __attrs_post_init__(self): if self.path is None: raise TypeError("path must not be None.") def with_data(self) -> model.Content: args = self.to_dict() del args["path"] assert self.path is not None with open(self.path, "rb") as fd: return model.Content.from_dict({**args, "data": fd.read()}) class DentryPerms(enum.IntEnum): """Admissible permissions for directory entries.""" content = 0o100644 """Content""" executable_content = 0o100755 """Executable content (e.g. executable script)""" symlink = 0o120000 """Symbolic link""" directory = 0o040000 """Directory""" revision = 0o160000 """Revision (e.g. submodule)""" def mode_to_perms(mode): """Convert a file mode to a permission compatible with Software Heritage directory entries Args: mode (int): a file mode as returned by :func:`os.stat` in :attr:`os.stat_result.st_mode` Returns: DentryPerms: one of the following values: :const:`DentryPerms.content`: plain file :const:`DentryPerms.executable_content`: executable file :const:`DentryPerms.symlink`: symbolic link :const:`DentryPerms.directory`: directory """ if stat.S_ISLNK(mode): return DentryPerms.symlink if stat.S_ISDIR(mode): return DentryPerms.directory else: # file is executable in any way if mode & (0o111): return DentryPerms.executable_content else: return DentryPerms.content class Content(MerkleLeaf): """Representation of a Software Heritage content as a node in a Merkle tree. The current Merkle hash for the Content nodes is the `sha1_git`, which makes it consistent with what :class:`Directory` uses for its own hash computation. """ __slots__ = [] # type: List[str] object_type: Final = "content" @classmethod def from_bytes(cls, *, mode, data): """Convert data (raw :class:`bytes`) to a Software Heritage content entry Args: mode (int): a file mode (passed to :func:`mode_to_perms`) data (bytes): raw contents of the file """ ret = MultiHash.from_data(data).digest() ret["length"] = len(data) ret["perms"] = mode_to_perms(mode) ret["data"] = data ret["status"] = "visible" return cls(ret) @classmethod def from_symlink(cls, *, path, mode): """Convert a symbolic link to a Software Heritage content entry""" - return cls.from_bytes(mode=mode, data=os.readlink(path)) + content = cls.from_bytes(mode=mode, data=os.readlink(path)) + content.data["path"] = path + return content @classmethod def from_file(cls, *, path, max_content_length=None): """Compute the Software Heritage content entry corresponding to an on-disk file. The returned dictionary contains keys useful for both: - loading the content in the archive (hashes, `length`) - using the content as a directory entry in a directory Args: save_path (bool): add the file path to the entry max_content_length (Optional[int]): if given, all contents larger than this will be skipped. """ file_stat = os.lstat(path) mode = file_stat.st_mode length = file_stat.st_size too_large = max_content_length is not None and length > max_content_length if stat.S_ISLNK(mode): # Symbolic link: return a file whose contents are the link target if too_large: # Unlike large contents, we can't stream symlinks to # MultiHash, and we don't want to fit them in memory if # they exceed max_content_length either. # Thankfully, this should not happen for reasonable values of # max_content_length because of OS/filesystem limitations, # so let's just raise an error. raise Exception(f"Symlink too large ({length} bytes)") return cls.from_symlink(path=path, mode=mode) elif not stat.S_ISREG(mode): # not a regular file: return the empty file instead return cls.from_bytes(mode=mode, data=b"") if too_large: skip_reason = "Content too large" else: skip_reason = None hashes = MultiHash.from_path(path).digest() if skip_reason: ret = { **hashes, "status": "absent", "reason": skip_reason, } else: ret = { **hashes, "status": "visible", } ret["path"] = path ret["perms"] = mode_to_perms(mode) ret["length"] = length obj = cls(ret) return obj def swhid(self) -> CoreSWHID: """Return node identifier as a SWHID""" return CoreSWHID(object_type=ObjectType.CONTENT, object_id=self.hash) def __repr__(self): return "Content(id=%s)" % hash_to_hex(self.hash) def compute_hash(self): return self.data["sha1_git"] def to_model(self) -> model.BaseContent: """Builds a `model.BaseContent` object based on this leaf.""" data = self.get_data().copy() data.pop("perms", None) if data["status"] == "absent": data.pop("path", None) return model.SkippedContent.from_dict(data) elif "data" in data: + data.pop("path", None) return model.Content.from_dict(data) else: return DiskBackedContent.from_dict(data) def accept_all_directories(dirpath: str, dirname: str, entries: Iterable[Any]) -> bool: """Default filter for :func:`Directory.from_disk` accepting all directories Args: dirname (bytes): directory name entries (list): directory entries """ return True def ignore_empty_directories( dirpath: str, dirname: str, entries: Iterable[Any] ) -> bool: """Filter for :func:`directory_to_objects` ignoring empty directories Args: dirname (bytes): directory name entries (list): directory entries Returns: True if the directory is not empty, false if the directory is empty """ return bool(entries) def ignore_named_directories(names, *, case_sensitive=True): """Filter for :func:`directory_to_objects` to ignore directories named one of names. Args: names (list of bytes): names to ignore case_sensitive (bool): whether to do the filtering in a case sensitive way Returns: a directory filter for :func:`directory_to_objects` """ if not case_sensitive: names = [name.lower() for name in names] def named_filter( dirpath: str, dirname: str, entries: Iterable[Any], names: Iterable[Any] = names, case_sensitive: bool = case_sensitive, ): if case_sensitive: return dirname not in names else: return dirname.lower() not in names return named_filter # TODO: `extract_regex_objs` has been copied and adapted from `swh.scanner`. # In the future `swh.scanner` should use the `swh.model` version and remove its own. def extract_regex_objs( root_path: bytes, patterns: Iterable[bytes] ) -> Iterator[Pattern[bytes]]: """Generates a regex object for each pattern given in input and checks if the path is a subdirectory or relative to the root path. Args: root_path (bytes): path to the root directory patterns (list of byte): shell patterns to match Yields: an SRE_Pattern object """ absolute_root_path = os.path.abspath(root_path) for pattern in patterns: if os.path.isabs(pattern): pattern = os.path.relpath(pattern, root_path) # python 3.10 has a `root_dir` argument for glob, but not the previous # version. So we adjust the pattern test_pattern = os.path.join(absolute_root_path, pattern) for path in glob.glob(test_pattern): if os.path.isabs(path) and not path.startswith(absolute_root_path): error_msg = ( b'The path "' + path + b'" is not a subdirectory or relative ' b'to the root directory path: "' + root_path + b'"' ) raise InvalidDirectoryPath(error_msg) regex = fnmatch.translate((pattern.decode())) yield re.compile(regex.encode()) def ignore_directories_patterns(root_path: bytes, patterns: Iterable[bytes]): """Filter for :func:`directory_to_objects` to ignore directories matching certain patterns. Args: root_path (bytes): path of the root directory patterns (list of bytes): patterns to ignore Returns: a directory filter for :func:`directory_to_objects` """ sre_patterns = set(extract_regex_objs(root_path, patterns)) def pattern_filter( dirpath: bytes, dirname: bytes, entries: Iterable[Any], patterns: Iterable[Any] = sre_patterns, root_path: bytes = os.path.abspath(root_path), ): full_path = os.path.abspath(dirpath) relative_path = os.path.relpath(full_path, root_path) return not any([pattern.match(relative_path) for pattern in patterns]) return pattern_filter def iter_directory( directory, ) -> Tuple[List[model.Content], List[model.SkippedContent], List[model.Directory]]: """Return the directory listing from a disk-memory directory instance. Raises: TypeError in case an unexpected object type is listed. Returns: Tuple of respectively iterable of content, skipped content and directories. """ contents: List[model.Content] = [] skipped_contents: List[model.SkippedContent] = [] directories: List[model.Directory] = [] for obj in directory.iter_tree(): obj = obj.to_model() obj_type = obj.object_type if obj_type in (model.Content.object_type, DiskBackedContent.object_type): # FIXME: read the data from disk later (when the # storage buffer is flushed). obj = obj.with_data() contents.append(obj) elif obj_type == model.SkippedContent.object_type: skipped_contents.append(obj) elif obj_type == model.Directory.object_type: directories.append(obj) else: raise TypeError(f"Unexpected object type from disk: {obj}") return contents, skipped_contents, directories class Directory(MerkleNode): """Representation of a Software Heritage directory as a node in a Merkle Tree. This class can be used to generate, from an on-disk directory, all the objects that need to be sent to the Software Heritage archive. The :func:`from_disk` constructor allows you to generate the data structure from a directory on disk. The resulting :class:`Directory` can then be manipulated as a dictionary, using the path as key. The :func:`collect` method is used to retrieve all the objects that need to be added to the Software Heritage archive since the last collection, by class (contents and directories). When using the dict-like methods to update the contents of the directory, the affected levels of hierarchy are reset and can be collected again using the same method. This enables the efficient collection of updated nodes, for instance when the client is applying diffs. """ __slots__ = ["__entries", "__model_object"] object_type: Final = "directory" @classmethod def from_disk( cls, *, path, dir_filter=accept_all_directories, max_content_length=None ): """Compute the Software Heritage objects for a given directory tree Args: path (bytes): the directory to traverse data (bool): whether to add the data to the content objects save_path (bool): whether to add the path to the content objects dir_filter (function): a filter to ignore some directories by name or contents. Takes two arguments: dirname and entries, and returns True if the directory should be added, False if the directory should be ignored. max_content_length (Optional[int]): if given, all contents larger than this will be skipped. """ top_path = path dirs = {} for root, dentries, fentries in os.walk(top_path, topdown=False): entries = {} # Join fentries and dentries in the same processing, as symbolic # links to directories appear in dentries... for name in fentries + dentries: path = os.path.join(root, name) if not os.path.isdir(path) or os.path.islink(path): content = Content.from_file( path=path, max_content_length=max_content_length ) entries[name] = content else: if dir_filter(path, name, dirs[path].entries): entries[name] = dirs[path] dirs[root] = cls({"name": os.path.basename(root), "path": root}) dirs[root].update(entries) return dirs[top_path] def __init__(self, data=None): super().__init__(data=data) self.__entries = None self.__model_object = None def invalidate_hash(self): self.__entries = None self.__model_object = None super().invalidate_hash() @staticmethod def child_to_directory_entry(name, child): if child.object_type == "directory": return { "type": "dir", "perms": DentryPerms.directory, "target": child.hash, "name": name, } elif child.object_type == "content": return { "type": "file", "perms": child.data["perms"], "target": child.hash, "name": name, } else: raise ValueError(f"unknown child {child}") def get_data(self, **kwargs): return { "id": self.hash, "entries": self.entries, } @property def entries(self): """Child nodes, sorted by name in the same way :func:`swh.model.git_objects.directory_git_object` does.""" if self.__entries is None: self.__entries = sorted( ( self.child_to_directory_entry(name, child) for name, child in self.items() ), key=directory_entry_sort_key, ) return self.__entries def swhid(self) -> CoreSWHID: """Return node identifier as a SWHID""" return CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=self.hash) def compute_hash(self): return self.to_model().id def to_model(self) -> model.Directory: """Builds a `model.Directory` object based on this node; ignoring its children.""" if self.__model_object is None: DirectoryEntry = model.DirectoryEntry entries = [] for name, child in self.items(): if child.object_type == "directory": e = DirectoryEntry( type="dir", perms=DentryPerms.directory, target=child.hash, name=name, ) elif child.object_type == "content": e = DirectoryEntry( type="file", perms=child.data["perms"], target=child.hash, name=name, ) else: raise ValueError(f"unknown child {child}") entries.append(e) entries.sort(key=directory_entry_sort_key) self.__model_object = model.Directory(entries=tuple(entries)) return self.__model_object def __getitem__(self, key): if not isinstance(key, bytes): raise ValueError("Can only get a bytes from Directory") # Convenience shortcut if key == b"": return self if b"/" not in key: return super().__getitem__(key) else: key1, key2 = key.split(b"/", 1) return self.__getitem__(key1)[key2] def __setitem__(self, key, value): if not isinstance(key, bytes): raise ValueError("Can only set a bytes Directory entry") if not isinstance(value, (Content, Directory)): raise ValueError( "Can only set a Directory entry to a Content or " "Directory" ) if key == b"": raise ValueError("Directory entry must have a name") if b"\x00" in key: raise ValueError("Directory entry name must not contain nul bytes") if b"/" not in key: return super().__setitem__(key, value) else: key1, key2 = key.rsplit(b"/", 1) self[key1].__setitem__(key2, value) def __delitem__(self, key): if not isinstance(key, bytes): raise ValueError("Can only delete a bytes Directory entry") if b"/" not in key: super().__delitem__(key) else: key1, key2 = key.rsplit(b"/", 1) del self[key1][key2] def __contains__(self, key): if b"/" not in key: return super().__contains__(key) else: key1, key2 = key.split(b"/", 1) return super().__contains__(key1) and self[key1].__contains__(key2) def __repr__(self): return "Directory(id=%s, entries=[%s])" % ( hash_to_hex(self.hash), ", ".join(str(entry) for entry in self), ) diff --git a/swh/model/tests/test_from_disk.py b/swh/model/tests/test_from_disk.py index c07fef6..1ebcbbe 100644 --- a/swh/model/tests/test_from_disk.py +++ b/swh/model/tests/test_from_disk.py @@ -1,1008 +1,1010 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import os import tarfile import tempfile from typing import ClassVar, Optional import unittest import pytest from swh.model import from_disk, model from swh.model.from_disk import Content, DentryPerms, Directory, DiskBackedContent from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex TEST_DATA = os.path.join(os.path.dirname(__file__), "data") class ModeToPerms(unittest.TestCase): def setUp(self): super().setUp() # Generate a full permissions map self.perms_map = {} # Symlinks for i in range(0o120000, 0o127777 + 1): self.perms_map[i] = DentryPerms.symlink # Directories for i in range(0o040000, 0o047777 + 1): self.perms_map[i] = DentryPerms.directory # Other file types: socket, regular file, block device, character # device, fifo all map to regular files for ft in [0o140000, 0o100000, 0o060000, 0o020000, 0o010000]: for i in range(ft, ft + 0o7777 + 1): if i & 0o111: # executable bits are set self.perms_map[i] = DentryPerms.executable_content else: self.perms_map[i] = DentryPerms.content def test_exhaustive_mode_to_perms(self): for fmode, perm in self.perms_map.items(): self.assertEqual(perm, from_disk.mode_to_perms(fmode)) class TestDiskBackedContent(unittest.TestCase): def test_with_data(self): expected_content = model.Content( length=42, status="visible", data=b"foo bar", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) with tempfile.NamedTemporaryFile(mode="w+b") as fd: content = DiskBackedContent( length=42, status="visible", path=fd.name, sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) fd.write(b"foo bar") fd.seek(0) content_with_data = content.with_data() assert expected_content == content_with_data def test_lazy_data(self): with tempfile.NamedTemporaryFile(mode="w+b") as fd: fd.write(b"foo") fd.seek(0) content = DiskBackedContent( length=42, status="visible", path=fd.name, sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) fd.write(b"bar") fd.seek(0) content_with_data = content.with_data() fd.write(b"baz") fd.seek(0) assert content_with_data.data == b"bar" def test_with_data_cannot_read(self): with tempfile.NamedTemporaryFile(mode="w+b") as fd: content = DiskBackedContent( length=42, status="visible", path=fd.name, sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) with pytest.raises(OSError): content.with_data() def test_missing_path(self): with pytest.raises(TypeError): DiskBackedContent( length=42, status="visible", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) with pytest.raises(TypeError): DiskBackedContent( length=42, status="visible", path=None, sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) class DataMixin: maxDiff = None # type: ClassVar[Optional[int]] def setUp(self): self.tmpdir = tempfile.TemporaryDirectory(prefix="swh.model.from_disk") self.tmpdir_name = os.fsencode(self.tmpdir.name) self.contents = { b"file": { "data": b"42\n", "sha1": hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha256": hash_to_bytes( "084c799cd551dd1d8d5c5f9a5d593b2e" "931f5e36122ee5c793c1d08a19839cc0" ), "sha1_git": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "blake2s256": hash_to_bytes( "d5fe1939576527e42cfd76a9455a2432" "fe7f56669564577dd93c4280e76d661d" ), "length": 3, "mode": 0o100644, }, } self.symlinks = { b"symlink": { "data": b"target", "blake2s256": hash_to_bytes( "595d221b30fdd8e10e2fdf18376e688e" "9f18d56fd9b6d1eb6a822f8c146c6da6" ), "sha1": hash_to_bytes("0e8a3ad980ec179856012b7eecf4327e99cd44cd"), "sha1_git": hash_to_bytes("1de565933b05f74c75ff9a6520af5f9f8a5a2f1d"), "sha256": hash_to_bytes( "34a04005bcaf206eec990bd9637d9fdb" "6725e0a0c0d4aebf003f17f4c956eb5c" ), "length": 6, "perms": DentryPerms.symlink, } } self.specials = { b"fifo": os.mkfifo, } self.empty_content = { "data": b"", "length": 0, "blake2s256": hash_to_bytes( "69217a3079908094e11121d042354a7c" "1f55b6482ca1a51e1b250dfd1ed0eef9" ), "sha1": hash_to_bytes("da39a3ee5e6b4b0d3255bfef95601890afd80709"), "sha1_git": hash_to_bytes("e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"), "sha256": hash_to_bytes( "e3b0c44298fc1c149afbf4c8996fb924" "27ae41e4649b934ca495991b7852b855" ), "perms": DentryPerms.content, } self.empty_directory = { "id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), "entries": [], } # Generated with generate_testdata_from_disk self.tarball_contents = { b"": { "entries": [ { "name": b"bar", "perms": DentryPerms.directory, "target": hash_to_bytes( "3c1f578394f4623f74a0ba7fe761729f59fc6ec4" ), "type": "dir", }, { "name": b"empty-folder", "perms": DentryPerms.directory, "target": hash_to_bytes( "4b825dc642cb6eb9a060e54bf8d69288fbee4904" ), "type": "dir", }, { "name": b"foo", "perms": DentryPerms.directory, "target": hash_to_bytes( "2b41c40f0d1fbffcba12497db71fba83fcca96e5" ), "type": "dir", }, { "name": b"link-to-another-quote", "perms": DentryPerms.symlink, "target": hash_to_bytes( "7d5c08111e21c8a9f71540939998551683375fad" ), "type": "file", }, { "name": b"link-to-binary", "perms": DentryPerms.symlink, "target": hash_to_bytes( "e86b45e538d9b6888c969c89fbd22a85aa0e0366" ), "type": "file", }, { "name": b"link-to-foo", "perms": DentryPerms.symlink, "target": hash_to_bytes( "19102815663d23f8b75a47e7a01965dcdc96468c" ), "type": "file", }, { "name": b"some-binary", "perms": DentryPerms.executable_content, "target": hash_to_bytes( "68769579c3eaadbe555379b9c3538e6628bae1eb" ), "type": "file", }, ], "id": hash_to_bytes("e8b0f1466af8608c8a3fb9879db172b887e80759"), }, b"bar": { "entries": [ { "name": b"barfoo", "perms": DentryPerms.directory, "target": hash_to_bytes( "c3020f6bf135a38c6df3afeb5fb38232c5e07087" ), "type": "dir", } ], "id": hash_to_bytes("3c1f578394f4623f74a0ba7fe761729f59fc6ec4"), }, b"bar/barfoo": { "entries": [ { "name": b"another-quote.org", "perms": DentryPerms.content, "target": hash_to_bytes( "133693b125bad2b4ac318535b84901ebb1f6b638" ), "type": "file", } ], "id": hash_to_bytes("c3020f6bf135a38c6df3afeb5fb38232c5e07087"), }, b"bar/barfoo/another-quote.org": { "blake2s256": hash_to_bytes( "d26c1cad82d43df0bffa5e7be11a60e3" "4adb85a218b433cbce5278b10b954fe8" ), "length": 72, "perms": DentryPerms.content, "sha1": hash_to_bytes("90a6138ba59915261e179948386aa1cc2aa9220a"), "sha1_git": hash_to_bytes("133693b125bad2b4ac318535b84901ebb1f6b638"), "sha256": hash_to_bytes( "3db5ae168055bcd93a4d08285dc99ffe" "e2883303b23fac5eab850273a8ea5546" ), }, b"empty-folder": { "entries": [], "id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), }, b"foo": { "entries": [ { "name": b"barfoo", "perms": DentryPerms.symlink, "target": hash_to_bytes( "8185dfb2c0c2c597d16f75a8a0c37668567c3d7e" ), "type": "file", }, { "name": b"quotes.md", "perms": DentryPerms.content, "target": hash_to_bytes( "7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" ), "type": "file", }, { "name": b"rel-link-to-barfoo", "perms": DentryPerms.symlink, "target": hash_to_bytes( "acac326ddd63b0bc70840659d4ac43619484e69f" ), "type": "file", }, ], "id": hash_to_bytes("2b41c40f0d1fbffcba12497db71fba83fcca96e5"), }, b"foo/barfoo": { "blake2s256": hash_to_bytes( "e1252f2caa4a72653c4efd9af871b62b" "f2abb7bb2f1b0e95969204bd8a70d4cd" ), "data": b"bar/barfoo", "length": 10, "perms": DentryPerms.symlink, "sha1": hash_to_bytes("9057ee6d0162506e01c4d9d5459a7add1fedac37"), "sha1_git": hash_to_bytes("8185dfb2c0c2c597d16f75a8a0c37668567c3d7e"), "sha256": hash_to_bytes( "29ad3f5725321b940332c78e403601af" "ff61daea85e9c80b4a7063b6887ead68" ), }, b"foo/quotes.md": { "blake2s256": hash_to_bytes( "bf7ce4fe304378651ee6348d3e9336ed" "5ad603d33e83c83ba4e14b46f9b8a80b" ), "length": 66, "perms": DentryPerms.content, "sha1": hash_to_bytes("1bf0bb721ac92c18a19b13c0eb3d741cbfadebfc"), "sha1_git": hash_to_bytes("7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"), "sha256": hash_to_bytes( "caca942aeda7b308859eb56f909ec96d" "07a499491690c453f73b9800a93b1659" ), }, b"foo/rel-link-to-barfoo": { "blake2s256": hash_to_bytes( "d9c327421588a1cf61f316615005a2e9" "c13ac3a4e96d43a24138d718fa0e30db" ), "data": b"../bar/barfoo", "length": 13, "perms": DentryPerms.symlink, "sha1": hash_to_bytes("dc51221d308f3aeb2754db48391b85687c2869f4"), "sha1_git": hash_to_bytes("acac326ddd63b0bc70840659d4ac43619484e69f"), "sha256": hash_to_bytes( "8007d20db2af40435f42ddef4b8ad76b" "80adbec26b249fdf0473353f8d99df08" ), }, b"link-to-another-quote": { "blake2s256": hash_to_bytes( "2d0e73cea01ba949c1022dc10c8a43e6" "6180639662e5dc2737b843382f7b1910" ), "data": b"bar/barfoo/another-quote.org", "length": 28, "perms": DentryPerms.symlink, "sha1": hash_to_bytes("cbeed15e79599c90de7383f420fed7acb48ea171"), "sha1_git": hash_to_bytes("7d5c08111e21c8a9f71540939998551683375fad"), "sha256": hash_to_bytes( "e6e17d0793aa750a0440eb9ad5b80b25" "8076637ef0fb68f3ac2e59e4b9ac3ba6" ), }, b"link-to-binary": { "blake2s256": hash_to_bytes( "9ce18b1adecb33f891ca36664da676e1" "2c772cc193778aac9a137b8dc5834b9b" ), "data": b"some-binary", "length": 11, "perms": DentryPerms.symlink, "sha1": hash_to_bytes("d0248714948b3a48a25438232a6f99f0318f59f1"), "sha1_git": hash_to_bytes("e86b45e538d9b6888c969c89fbd22a85aa0e0366"), "sha256": hash_to_bytes( "14126e97d83f7d261c5a6889cee73619" "770ff09e40c5498685aba745be882eff" ), }, b"link-to-foo": { "blake2s256": hash_to_bytes( "08d6cad88075de8f192db097573d0e82" "9411cd91eb6ec65e8fc16c017edfdb74" ), "data": b"foo", "length": 3, "perms": DentryPerms.symlink, "sha1": hash_to_bytes("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"), "sha1_git": hash_to_bytes("19102815663d23f8b75a47e7a01965dcdc96468c"), "sha256": hash_to_bytes( "2c26b46b68ffc68ff99b453c1d304134" "13422d706483bfa0f98a5e886266e7ae" ), }, b"some-binary": { "blake2s256": hash_to_bytes( "922e0f7015035212495b090c27577357" "a740ddd77b0b9e0cd23b5480c07a18c6" ), "length": 5, "perms": DentryPerms.executable_content, "sha1": hash_to_bytes("0bbc12d7f4a2a15b143da84617d95cb223c9b23c"), "sha1_git": hash_to_bytes("68769579c3eaadbe555379b9c3538e6628bae1eb"), "sha256": hash_to_bytes( "bac650d34a7638bb0aeb5342646d24e3" "b9ad6b44c9b383621faa482b990a367d" ), }, } def tearDown(self): self.tmpdir.cleanup() def assertContentEqual(self, left, right, *, check_path=False): # noqa if not isinstance(left, Content): raise ValueError("%s is not a Content" % left) if isinstance(right, Content): right = right.get_data() # Compare dictionaries keys = DEFAULT_ALGORITHMS | { "length", "perms", } if check_path: keys |= {"path"} failed = [] for key in keys: try: lvalue = left.data[key] if key == "perms" and "perms" not in right: rvalue = from_disk.mode_to_perms(right["mode"]) else: rvalue = right[key] except KeyError: failed.append(key) continue if lvalue != rvalue: failed.append(key) if failed: raise self.failureException( "Content mismatched:\n" + "\n".join( "content[%s] = %r != %r" % (key, left.data.get(key), right.get(key)) for key in failed ) ) def assertDirectoryEqual(self, left, right): # NoQA if not isinstance(left, Directory): raise ValueError("%s is not a Directory" % left) if isinstance(right, Directory): right = right.get_data() assert left.entries == right["entries"] assert left.hash == right["id"] assert left.to_model() == model.Directory.from_dict(right) def make_contents(self, directory): for filename, content in self.contents.items(): path = os.path.join(directory, filename) with open(path, "wb") as f: f.write(content["data"]) os.chmod(path, content["mode"]) def make_symlinks(self, directory): for filename, symlink in self.symlinks.items(): path = os.path.join(directory, filename) os.symlink(symlink["data"], path) def make_specials(self, directory): for filename, fn in self.specials.items(): path = os.path.join(directory, filename) fn(path) def make_from_tarball(self, directory): tarball = os.path.join(TEST_DATA, "dir-folders", "sample-folder.tgz") with tarfile.open(tarball, "r:gz") as f: f.extractall(os.fsdecode(directory)) class TestContent(DataMixin, unittest.TestCase): def setUp(self): super().setUp() def test_data_to_content(self): for filename, content in self.contents.items(): conv_content = Content.from_bytes( mode=content["mode"], data=content["data"] ) self.assertContentEqual(conv_content, content) self.assertIn(hash_to_hex(conv_content.hash), repr(conv_content)) def test_content_swhid(self): for _, content in self.contents.items(): content_res = Content.from_bytes(mode=content["mode"], data=content["data"]) content_swhid = "swh:1:cnt:" + hash_to_hex(content["sha1_git"]) assert str(content_res.swhid()) == content_swhid class TestDirectory(DataMixin, unittest.TestCase): def setUp(self): super().setUp() def test_directory_swhid(self): directory_swhid = "swh:1:dir:" + hash_to_hex(self.empty_directory["id"]) directory = Directory.from_disk(path=self.tmpdir_name) assert str(directory.swhid()) == directory_swhid class SymlinkToContent(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.make_symlinks(self.tmpdir_name) def test_symlink_to_content(self): for filename, symlink in self.symlinks.items(): path = os.path.join(self.tmpdir_name, filename) perms = 0o120000 conv_content = Content.from_symlink(path=path, mode=perms) - self.assertContentEqual(conv_content, symlink) + symlink_copy = symlink.copy() + symlink_copy["path"] = path + self.assertContentEqual(conv_content, symlink_copy, check_path=True) def test_symlink_to_base_model(self): for filename, symlink in self.symlinks.items(): path = os.path.join(self.tmpdir_name, filename) perms = 0o120000 model_content = Content.from_symlink(path=path, mode=perms).to_model() right = symlink.copy() for key in ("perms", "path", "mode"): right.pop(key, None) right["status"] = "visible" assert model_content == model.Content.from_dict(right) class FileToContent(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.make_contents(self.tmpdir_name) self.make_symlinks(self.tmpdir_name) self.make_specials(self.tmpdir_name) def test_symlink_to_content(self): for filename, symlink in self.symlinks.items(): path = os.path.join(self.tmpdir_name, filename) conv_content = Content.from_file(path=path) self.assertContentEqual(conv_content, symlink) def test_file_to_content(self): for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) conv_content = Content.from_file(path=path) self.assertContentEqual(conv_content, content) def test_special_to_content(self): for filename in self.specials: path = os.path.join(self.tmpdir_name, filename) conv_content = Content.from_file(path=path) self.assertContentEqual(conv_content, self.empty_content) for path in ["/dev/null", "/dev/zero"]: path = os.path.join(self.tmpdir_name, filename) conv_content = Content.from_file(path=path) self.assertContentEqual(conv_content, self.empty_content) def test_symlink_to_content_model(self): for filename, symlink in self.symlinks.items(): path = os.path.join(self.tmpdir_name, filename) model_content = Content.from_file(path=path).to_model() right = symlink.copy() for key in ("perms", "path", "mode"): right.pop(key, None) right["status"] = "visible" assert model_content == model.Content.from_dict(right) def test_file_to_content_model(self): for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) model_content = Content.from_file(path=path).to_model() right = content.copy() for key in ("perms", "mode"): right.pop(key, None) assert model_content.with_data() == model.Content.from_dict(right) right["path"] = path del right["data"] assert model_content == DiskBackedContent.from_dict(right) def test_special_to_content_model(self): for filename in self.specials: path = os.path.join(self.tmpdir_name, filename) model_content = Content.from_file(path=path).to_model() right = self.empty_content.copy() for key in ("perms", "path", "mode"): right.pop(key, None) right["status"] = "visible" assert model_content == model.Content.from_dict(right) for path in ["/dev/null", "/dev/zero"]: model_content = Content.from_file(path=path).to_model() right = self.empty_content.copy() for key in ("perms", "path", "mode"): right.pop(key, None) right["status"] = "visible" assert model_content == model.Content.from_dict(right) def test_symlink_max_length(self): for max_content_length in [4, 10]: for filename, symlink in self.symlinks.items(): path = os.path.join(self.tmpdir_name, filename) content = Content.from_file(path=path) if content.data["length"] > max_content_length: with pytest.raises(Exception, match="too large"): Content.from_file( path=path, max_content_length=max_content_length ) else: limited_content = Content.from_file( path=path, max_content_length=max_content_length ) assert content == limited_content def test_file_max_length(self): for max_content_length in [2, 4]: for filename, content in self.contents.items(): path = os.path.join(self.tmpdir_name, filename) content = Content.from_file(path=path) limited_content = Content.from_file( path=path, max_content_length=max_content_length ) assert content.data["length"] == limited_content.data["length"] assert content.data["status"] == "visible" if content.data["length"] > max_content_length: assert limited_content.data["status"] == "absent" assert limited_content.data["reason"] == "Content too large" else: assert limited_content.data["status"] == "visible" def test_special_file_max_length(self): for max_content_length in [None, 0, 1]: for filename in self.specials: path = os.path.join(self.tmpdir_name, filename) content = Content.from_file(path=path) limited_content = Content.from_file( path=path, max_content_length=max_content_length ) assert limited_content == content def test_file_to_content_with_path(self): for filename, content in self.contents.items(): content_w_path = content.copy() path = os.path.join(self.tmpdir_name, filename) content_w_path["path"] = path conv_content = Content.from_file(path=path) self.assertContentEqual(conv_content, content_w_path, check_path=True) @pytest.mark.fs class DirectoryToObjects(DataMixin, unittest.TestCase): def setUp(self): super().setUp() contents = os.path.join(self.tmpdir_name, b"contents") os.mkdir(contents) self.make_contents(contents) symlinks = os.path.join(self.tmpdir_name, b"symlinks") os.mkdir(symlinks) self.make_symlinks(symlinks) specials = os.path.join(self.tmpdir_name, b"specials") os.mkdir(specials) self.make_specials(specials) empties = os.path.join(self.tmpdir_name, b"empty1", b"empty2") os.makedirs(empties) def check_collect( self, directory, expected_directory_count, expected_content_count ): objs = directory.collect() contents = [] directories = [] for obj in objs: if isinstance(obj, Content): contents.append(obj) elif isinstance(obj, Directory): directories.append(obj) self.assertEqual(len(directories), expected_directory_count) self.assertEqual(len(contents), expected_content_count) def test_directory_to_objects(self): directory = Directory.from_disk(path=self.tmpdir_name) for name, value in self.contents.items(): self.assertContentEqual(directory[b"contents/" + name], value) for name, value in self.symlinks.items(): self.assertContentEqual(directory[b"symlinks/" + name], value) for name in self.specials: self.assertContentEqual( directory[b"specials/" + name], self.empty_content, ) self.assertEqual( directory[b"empty1/empty2"].get_data(), self.empty_directory, ) # Raise on non existent file with self.assertRaisesRegex(KeyError, "b'nonexistent'"): directory[b"empty1/nonexistent"] # Raise on non existent directory with self.assertRaisesRegex(KeyError, "b'nonexistentdir'"): directory[b"nonexistentdir/file"] self.check_collect( directory, expected_directory_count=6, expected_content_count=len(self.contents) + len(self.symlinks) + 1, ) def test_directory_to_objects_ignore_empty(self): directory = Directory.from_disk( path=self.tmpdir_name, dir_filter=from_disk.ignore_empty_directories ) for name, value in self.contents.items(): self.assertContentEqual(directory[b"contents/" + name], value) for name, value in self.symlinks.items(): self.assertContentEqual(directory[b"symlinks/" + name], value) for name in self.specials: self.assertContentEqual( directory[b"specials/" + name], self.empty_content, ) # empty directories have been ignored recursively with self.assertRaisesRegex(KeyError, "b'empty1'"): directory[b"empty1"] with self.assertRaisesRegex(KeyError, "b'empty1'"): directory[b"empty1/empty2"] self.check_collect( directory, expected_directory_count=4, expected_content_count=len(self.contents) + len(self.symlinks) + 1, ) def test_directory_to_objects_ignore_name(self): directory = Directory.from_disk( path=self.tmpdir_name, dir_filter=from_disk.ignore_named_directories([b"symlinks"]), ) for name, value in self.contents.items(): self.assertContentEqual(directory[b"contents/" + name], value) for name in self.specials: self.assertContentEqual( directory[b"specials/" + name], self.empty_content, ) self.assertEqual( directory[b"empty1/empty2"].get_data(), self.empty_directory, ) with self.assertRaisesRegex(KeyError, "b'symlinks'"): directory[b"symlinks"] self.check_collect( directory, expected_directory_count=5, expected_content_count=len(self.contents) + 1, ) def test_directory_to_objects_ignore_name_case(self): directory = Directory.from_disk( path=self.tmpdir_name, dir_filter=from_disk.ignore_named_directories( [b"symLiNks"], case_sensitive=False ), ) for name, value in self.contents.items(): self.assertContentEqual(directory[b"contents/" + name], value) for name in self.specials: self.assertContentEqual( directory[b"specials/" + name], self.empty_content, ) self.assertEqual( directory[b"empty1/empty2"].get_data(), self.empty_directory, ) with self.assertRaisesRegex(KeyError, "b'symlinks'"): directory[b"symlinks"] self.check_collect( directory, expected_directory_count=5, expected_content_count=len(self.contents) + 1, ) def test_directory_entry_order(self): with tempfile.TemporaryDirectory() as dirname: dirname = os.fsencode(dirname) open(os.path.join(dirname, b"foo."), "a") open(os.path.join(dirname, b"foo0"), "a") os.mkdir(os.path.join(dirname, b"foo")) directory = Directory.from_disk(path=dirname) assert [entry["name"] for entry in directory.entries] == [ b"foo.", b"foo", b"foo0", ] @pytest.mark.fs class TarballTest(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.make_from_tarball(self.tmpdir_name) def test_contents_match(self): directory = Directory.from_disk( path=os.path.join(self.tmpdir_name, b"sample-folder") ) for name, expected in self.tarball_contents.items(): obj = directory[name] if isinstance(obj, Content): self.assertContentEqual(obj, expected) elif isinstance(obj, Directory): self.assertDirectoryEqual(obj, expected) else: raise self.failureException("Unknown type for %s" % obj) class TarballIterDirectory(DataMixin, unittest.TestCase): def setUp(self): super().setUp() self.make_from_tarball(self.tmpdir_name) def test_iter_directory(self): """Iter from_disk.directory should yield the full arborescence tree""" directory = Directory.from_disk( path=os.path.join(self.tmpdir_name, b"sample-folder") ) contents, skipped_contents, directories = from_disk.iter_directory(directory) expected_nb = defaultdict(int) for name in self.tarball_contents.keys(): obj = directory[name] expected_nb[obj.object_type] += 1 assert len(contents) == expected_nb["content"] and len(contents) > 0 assert len(skipped_contents) == 0 assert len(directories) == expected_nb["directory"] and len(directories) > 0 class DirectoryManipulation(DataMixin, unittest.TestCase): def test_directory_access_nested(self): d = Directory() d[b"a"] = Directory() d[b"a/b"] = Directory() self.assertEqual(d[b"a/b"].get_data(), self.empty_directory) def test_directory_del_nested(self): d = Directory() d[b"a"] = Directory() d[b"a/b"] = Directory() with self.assertRaisesRegex(KeyError, "b'c'"): del d[b"a/b/c"] with self.assertRaisesRegex(KeyError, "b'level2'"): del d[b"a/level2/c"] del d[b"a/b"] self.assertEqual(d[b"a"].get_data(), self.empty_directory) def test_directory_access_self(self): d = Directory() self.assertIs(d, d[b""]) self.assertIs(d, d[b"/"]) self.assertIs(d, d[b"//"]) def test_directory_access_wrong_type(self): d = Directory() with self.assertRaisesRegex(ValueError, "bytes from Directory"): d["foo"] with self.assertRaisesRegex(ValueError, "bytes from Directory"): d[42] def test_directory_repr(self): entries = [b"a", b"b", b"c"] d = Directory() for entry in entries: d[entry] = Directory() r = repr(d) self.assertIn(hash_to_hex(d.hash), r) for entry in entries: self.assertIn(str(entry), r) def test_directory_set_wrong_type_name(self): d = Directory() with self.assertRaisesRegex(ValueError, "bytes Directory entry"): d["foo"] = Directory() with self.assertRaisesRegex(ValueError, "bytes Directory entry"): d[42] = Directory() def test_directory_set_nul_in_name(self): d = Directory() with self.assertRaisesRegex(ValueError, "nul bytes"): d[b"\x00\x01"] = Directory() def test_directory_set_empty_name(self): d = Directory() with self.assertRaisesRegex(ValueError, "must have a name"): d[b""] = Directory() with self.assertRaisesRegex(ValueError, "must have a name"): d[b"/"] = Directory() def test_directory_set_wrong_type(self): d = Directory() with self.assertRaisesRegex(ValueError, "Content or Directory"): d[b"entry"] = object() def test_directory_del_wrong_type(self): d = Directory() with self.assertRaisesRegex(ValueError, "bytes Directory entry"): del d["foo"] with self.assertRaisesRegex(ValueError, "bytes Directory entry"): del d[42] def test_directory_contains(self): d = Directory() d[b"a"] = Directory() d[b"a/b"] = Directory() d[b"a/b/c"] = Directory() d[b"a/b/c/d"] = Content() self.assertIn(b"a", d) self.assertIn(b"a/b", d) self.assertIn(b"a/b/c", d) self.assertIn(b"a/b/c/d", d) self.assertNotIn(b"b", d) self.assertNotIn(b"b/c", d) self.assertNotIn(b"b/c/d", d) diff --git a/tox.ini b/tox.ini index 5198d08..38261f4 100644 --- a/tox.ini +++ b/tox.ini @@ -1,83 +1,84 @@ [tox] envlist=black,flake8,mypy,py3-{minimal,full} [testenv] extras = full: testing minimal: testing-minimal deps = pytest-cov commands = pytest \ --doctest-modules \ full: --cov={envsitepackagesdir}/swh/model --cov-branch {posargs} \ full: {envsitepackagesdir}/swh/model minimal: {envsitepackagesdir}/swh/model/tests/test_cli.py -m 'not requires_optional_deps' [testenv:py3] skip_install = true deps = tox commands = tox -e py3-full -- {posargs} tox -e py3-minimal -- {posargs} [testenv:black] skip_install = true deps = - black==22.3.0 + black==22.10.0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = - flake8==4.0.1 - flake8-bugbear==22.3.23 + flake8==5.0.4 + flake8-bugbear==22.9.23 + pycodestyle==2.9.1 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy==0.942 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs