diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 1c95e3d..f972cd9 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,40 +1,40 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
- rev: v4.1.0
+ rev: v4.3.0
hooks:
- id: trailing-whitespace
- id: check-json
- id: check-yaml
- - repo: https://gitlab.com/pycqa/flake8
- rev: 4.0.1
+ - repo: https://github.com/pycqa/flake8
+ rev: 5.0.4
hooks:
- id: flake8
- additional_dependencies: [flake8-bugbear==22.3.23]
+ additional_dependencies: [flake8-bugbear==22.9.23]
- repo: https://github.com/codespell-project/codespell
- rev: v2.1.0
+ rev: v2.2.2
hooks:
- id: codespell
name: Check source code spelling
stages: [commit]
- repo: local
hooks:
- id: mypy
name: mypy
entry: mypy
args: [swh]
pass_filenames: false
language: system
types: [python]
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/python/black
- rev: 22.3.0
+ rev: 22.10.0
hooks:
- id: black
diff --git a/PKG-INFO b/PKG-INFO
index 4a1f9f1..0bffcf8 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,42 +1,42 @@
Metadata-Version: 2.1
Name: swh.model
-Version: 6.6.0
+Version: 6.6.1
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-model
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: cli
Provides-Extra: testing-minimal
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-model
=========
Implementation of the Data model of the Software Heritage project, used to
archive source code artifacts.
This module defines the notion of SoftWare Heritage persistent IDentifiers
(SWHIDs) and provides tools to compute them:
```sh
$ swh-identify fork.c kmod.c sched/deadline.c
swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c
swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c
swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c
$ swh-identify --no-filename /usr/src/linux/kernel/
swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab
```
diff --git a/docs/persistent-identifiers.rst b/docs/persistent-identifiers.rst
index 105f58c..01da6aa 100644
--- a/docs/persistent-identifiers.rst
+++ b/docs/persistent-identifiers.rst
@@ -1,427 +1,427 @@
.. _persistent-identifiers:
.. _swhids:
=================================================
SoftWare Heritage persistent IDentifiers (SWHIDs)
=================================================
**version 1.6, last modified 2021-04-30**
.. contents::
:local:
:depth: 2
Overview
========
You can point to objects present in the `Software Heritage
`_ `archive
`_ by the means of **SoftWare Heritage
persistent IDentifiers**, or **SWHIDs** for short, that are guaranteed to
remain stable (persistent) over time. Their syntax, meaning, and usage is
described below. Note that they are identifiers and not URLs, even though
URL-based `resolvers`_ for SWHIDs are also available.
A SWHID consists of two separate parts, a mandatory *core identifier* that can
point to any software artifact (or "object") available in the Software Heritage
archive, and an optional list of *qualifiers* that allows to specify the
context where the object is meant to be seen and point to a subpart of the
object itself.
Objects come in different types:
* contents
* directories
* revisions
* releases
* snapshots
Each object is identified by an intrinsic, type-specific object identifier that
is embedded in its SWHID as described below. The intrinsic identifiers embedded
in SWHIDs are strong cryptographic hashes computed on the entire set of object
properties. Together, these identifiers form a `Merkle structure
`_, specifically a Merkle `DAG
`_.
See the :ref:`Software Heritage data model ` for an overview of
object types and how they are linked together. See
:py:mod:`swh.model.git_objects` for details on how the intrinsic identifiers
embedded in SWHIDs are computed.
The optional qualifiers are of two kinds:
* **context qualifiers:** carry information about the context where a given
object is meant to be seen. This is particularly important, as the same
object can be reached in the Merkle graph following different *paths*
starting from different nodes (or *anchors*), and it may have been retrieved
from different *origins*, that may evolve between different *visits*
* **fragment qualifiers:** allow to pinpoint specific subparts of an object
.. _swhids-syntax:
Syntax
======
Syntactically, SWHIDs are generated by the ```` entry point in the
following grammar:
.. code-block:: bnf
::= [ ] ;
::= "swh" ":" ":" ":" ;
::= "1" ;
::=
"snp" (* snapshot *)
| "rel" (* release *)
| "rev" (* revision *)
| "dir" (* directory *)
| "cnt" (* content *)
;
::= 40 * ; (* intrinsic object id, as hex-encoded SHA1 *)
::= "0" | "1" | "2" | "3" | "4" | "5" | "6" | "7" | "8" | "9" ;
::= | "a" | "b" | "c" | "d" | "e" | "f" ;
:= ";" [ ] ;
::=
|
;
::=
|
|
|
;
::= "origin" "=" ;
::= "visit" "=" ;
::= "anchor" "=" ;
::= "path" "=" ;
::= "lines" "=" ["-" ] ;
::= + ;
::= (* RFC 3987 IRI *)
::= (* RFC 3987 absolute path *)
Where:
- ```` is an ```` from `RFC 3987`_, and
- ```` is a `RFC 3987`_ IRI
in either case all occurrences of ``;`` (and ``%``, as required by the RFC)
have been percent-encoded (as ``%3B`` and ``%25`` respectively). Other
characters *can* be percent-encoded, e.g., to improve readability and/or
embeddability of SWHID in other contexts.
.. _RFC 3987: https://tools.ietf.org/html/rfc3987
.. _swhids-semantics:
Semantics
=========
.. _swhids-core:
Core identifiers
----------------
``:`` is used as separator between the logical parts of core identifiers. The
``swh`` prefix makes explicit that these identifiers are related to *SoftWare
Heritage*. ``1`` (````) is the current version of this
identifier *scheme*. Future editions will use higher version numbers, possibly
breaking backward compatibility, but without breaking the resolvability of
SWHIDs that conform to previous versions of the scheme.
A SWHID points to a single object, whose type is explicitly captured by
````:
* ``snp`` to **snapshots**,
* ``rel`` to **releases**,
* ``rev`` to **revisions**,
* ``dir`` to **directories**,
* ``cnt`` to **contents**.
The actual object pointed to is identified by the intrinsic identifier
````, which is a hex-encoded (using lowercase ASCII characters) SHA1
computed on the content and metadata of the object itself, as follows:
* for **snapshots**, intrinsic identifiers are SHA1 hashes of manifests computed as per
:py:func:`swh.model.git_objects.snapshot_git_object`
* for **releases**, as per
:py:func:`swh.model.git_objects.release_git_object`
that produces the same result as a git release hash
* for **revisions**, as per
:py:func:`swh.model.git_objects.revision_git_object`
that produces the same result as a git commit hash
* for **directories**, per
:py:func:`swh.model.git_objects.directory_git_object`
that produces the same result as a git tree hash
* for **contents**, the intrinsic identifier is the ``sha1_git`` hash returned by
:py:meth:`swh.hashutil.MultiHash.digest`, i.e., the SHA1 of a byte
sequence obtained by juxtaposing the ASCII string ``"blob"`` (without
quotes), a space, the length of the content as decimal digits, a NULL byte,
and the actual content of the file.
.. _swhids-qualifiers:
Qualifiers
----------
``;`` is used as separator between the core identifier and the optional
qualifiers, as well as between qualifiers. Each qualifier is specified as a
key/value pair, using ``=`` as a separator.
The following *context qualifiers* are available:
* **origin:** the *software origin* where an object has been found or observed
in the wild, as an URI;
* **visit:** the core identifier of a *snapshot* corresponding to a specific
*visit* of a repository containing the designated object;
* **anchor:** a *designated node* in the Merkle DAG relative to which a *path
to the object* is specified, as the core identifier of a directory, a
revision, a release or a snapshot;
* **path:** the *absolute file path*, from the *root directory* associated to
the *anchor node*, to the object; when the anchor denotes a directory or a
revision, and almost always when it's a release, the root directory is
uniquely determined; when the anchor denotes a snapshot, the root directory
is the one pointed to by ``HEAD`` (possibly indirectly), and undefined if
such a reference is missing;
The following *fragment qualifier* is available:
* **lines:** *line number(s)* of interest, usually within a content object
We recommend to equip identifiers meant to be shared with as many qualifiers as
possible. While qualifiers may be listed in any order, it is good practice to
present them in the order given above, i.e., ``origin``, ``visit``, ``anchor``,
``path``, ``lines``. Redundant information should be omitted: for example, if
the *visit* is present, and the *path* is relative to the snapshot indicated
there, then the *anchor* qualifier is superfluous; similarly, if the *path* is
empty, it may be omitted.
Interoperability
================
URI scheme
----------
The ``swh`` URI scheme is registered at IANA for SWHIDs. The present documents
constitutes the scheme specification for such URI scheme.
Git compatibility
-----------------
SWHIDs for contents, directories, revisions, and releases are, at present,
compatible with the `Git `_ way of `computing identifiers
`_ for its objects.
The ```` part of a SWHID for a content object is the Git blob
identifier of any file with the same content; for a revision it is the Git
commit identifier for the same revision, etc. This is not the case for
snapshot identifiers, as Git does not have a corresponding object type.
Note that Git compatibility is incidental and is not guaranteed to be
maintained in future versions of this scheme (or Git).
Automatically fixing invalid SWHIDs
-----------------------------------
User interfaces may fix invalid SWHIDs, by lower-casing the
```` part of a SWHID, if it contains upper-case letters
because of user errors or limitations in software displaying SWHIDs.
However, implementations displaying or generating SWHIDs should not rely
on this behavior, and must display or generate only valid SWHIDs when
technically possible.
User interfaces should show an error when such an automatic fix occurs,
so users have a chance to fix their SWHID before pasting it to an other interface
that does not perform the same corrections.
This also makes it easier to understand issues when a case-sensitive
qualifier has its casing altered.
Examples
========
Core identifiers
----------------
* ``swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2`` points to the content
of a file containing the full text of the GPL3 license
* ``swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505`` points to a directory
containing the source code of the Darktable photography application as it was
at some point on 4 May 2017
* ``swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d`` points to a commit in
the development history of Darktable, dated 16 January 2017, that added
undo/redo supports for masks
* ``swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f`` points to Darktable
release 2.3.0, dated 24 December 2016
* ``swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453`` points to a snapshot
of the entire Darktable Git repository taken on 4 May 2017 from GitHub
Identifiers with qualifiers
---------------------------
* The following :swh_web:`SWHID
`
denotes the lines 9 to 15 of a file content that can be found at absolute
path ``/Examples/SimpleFarm/simplefarm.ml`` from the root directory of the
revision ``swh:1:rev:2db189928c94d62a3b4757b3eec68f0a4d4113f0`` that is
contained in the snapshot
``swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9`` taken from the origin
``https://gitorious.org/ocamlp3l/ocamlp3l_cvs.git``::
swh:1:cnt:4d99d2d18326621ccdd70f5ea66c2e2ac236ad8b;
origin=https://gitorious.org/ocamlp3l/ocamlp3l_cvs.git;
visit=swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9;
anchor=swh:1:rev:2db189928c94d62a3b4757b3eec68f0a4d4113f0;
path=/Examples/SimpleFarm/simplefarm.ml;
lines=9-15
* Here is an example of a :swh_web:`SWHID
- `
+ `
with a file path that requires percent-escaping::
swh:1:cnt:f10371aa7b8ccabca8479196d6cd640676fd4a04;
origin=https://github.com/web-platform-tests/wpt;
visit=swh:1:snp:b37d435721bbd450624165f334724e3585346499;
anchor=swh:1:rev:259d0612af038d14f2cd889a14a3adb6c9e96d96;
path=/html/semantics/document-metadata/the-meta-element/pragma-directives/attr-meta-http-equiv-refresh/support/x%3Burl=foo/
Implementation
==============
Computing
---------
An important property of any SWHID is that its core identifier is *intrinsic*:
it can be *computed from the object itself*, without having to rely on any
third party. An implementation of SWHID that allows to do so locally is the
`swh identify `_
tool, available from the `swh.model `_
Python package under the GPL license. This package can be installed via the ``pip``
package manager with the one liner ``pip3 install swh.model[cli]`` on any machine with
Python (at least version 3.7) and ``pip`` installed (on a Debian or Ubuntu system a simple ``apt install python3 python3-pip``
will suffice, see `the general instructions `_ for other platforms).
SWHIDs are also automatically computed by Software Heritage for all archived
objects as part of its archival activity, and can be looked up via the project
:swh_web:`Web interface <>`.
This has various practical implications:
* when a software artifact is obtained from Software Heritage by resolving a
SWHID, it is straightforward to verify that it is exactly the intended one:
just compute the core identifier from the artefact itself, and check that it
is the same as the core identifier part of the SHWID
* the core identifier of a software artifact can be computed *before* its
archival on Software Heritage
Choosing what type of SWHID to use
----------------------------------
``swh:1:dir:`` SWHIDs are the most robust SWHIDs, as they can be recomputed from
the simplest objects (a directory structure on a filesystem), even when all
metadata is lost, without relying on the Software Heritage archive.
Therefore, we advise implementers and users to prefer this type of SWHIDs
over ``swh:1:rev:`` and ``swh:1:rel:`` to reference a source code artifacts.
However, since keeping the metadata is also important, you should add an anchor
qualifier to ``swh:1:dir:`` SWHIDs whenever possible, so the metadata stored
in the Software Heritage archive can be retrieved when needed.
This means, for example, that you should prefer
``swh:1:dir:a8eded6a2d062c998ba2dcc3dcb0ce68a4e15a58;anchor=swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f``
over ``swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f``.
Resolvers
---------
Software Heritage resolver
~~~~~~~~~~~~~~~~~~~~~~~~~~
SWHIDs can be resolved using the Software Heritage :swh_web:`Web interface <>`.
In particular, the **root endpoint**
``/`` can be given a SWHID and will lead to the browsing page of the
corresponding object, like this:
``https://archive.softwareheritage.org/``.
A **dedicated** ``/resolve`` **endpoint** of the Software Heritage :swh_web:`Web API
` is also available to
programmatically resolve SWHIDs; see: :http:get:`/api/1/resolve/(swhid)/`.
Examples:
-* :swh_web:``
-* :swh_web:``
-* :swh_web:``
-* :swh_web:``
-* :swh_web:``
-* :swh_web:``
-* :swh_web:``
+* :swh_web:`swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2`
+* :swh_web:`swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505`
+* :swh_web:`api/1/resolve/swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d`
+* :swh_web:`api/1/resolve/swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f`
+* :swh_web:`api/1/resolve/swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453`
+* :swh_web:`swh:1:cnt:4d99d2d18326621ccdd70f5ea66c2e2ac236ad8b;origin=https://gitorious.org/ocamlp3l/ocamlp3l_cvs.git;visit=swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9;anchor=swh:1:rev:2db189928c94d62a3b4757b3eec68f0a4d4113f0;path=/Examples/SimpleFarm/simplefarm.ml;lines=9-15`
+* :swh_web:`swh:1:cnt:f10371aa7b8ccabca8479196d6cd640676fd4a04;origin=https://github.com/web-platform-tests/wpt;visit=swh:1:snp:b37d435721bbd450624165f334724e3585346499;anchor=swh:1:rev:259d0612af038d14f2cd889a14a3adb6c9e96d96;path=/html/semantics/document-metadata/the-meta-element/pragma-directives/attr-meta-http-equiv-refresh/support/x%253Burl=foo/`
Third-party resolvers
~~~~~~~~~~~~~~~~~~~~~
The following **third party resolvers** support SWHID resolution:
* `Identifiers.org `_; see:
``_ (registry identifier `MIR:00000655
`_).
* `Name-to-Thing (N2T) `_
Note that resolution via Identifiers.org currently only supports *core
identifiers* due to `syntactic incompatibilities with qualifiers
`_.
Examples:
* ``_
* ``_
* ``_
* ``_
* ``_
* ``_
-* ``_
+* ``_
References
==========
* Roberto Di Cosmo, Morane Gruenpeter, Stefano Zacchiroli. `Identifiers for
Digital Objects: the Case of Software Source Code Preservation
`_. In Proceedings of `iPRES
2018 `_: 15th International Conference on Digital
Preservation, Boston, MA, USA, September 2018, 9 pages.
* Roberto Di Cosmo, Morane Gruenpeter, Stefano Zacchiroli. `Referencing Source
Code Artifacts: a Separate Concern in Software Citation
`_. In Computing in Science and
Engineering, volume 22, issue 2, pages 33-43. ISSN 1521-9615,
IEEE. March 2020.
diff --git a/mypy.ini b/mypy.ini
index e3daf6d..d411c51 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,26 +1,6 @@
[mypy]
namespace_packages = True
warn_unused_ignores = True
# 3rd party libraries without stubs (yet)
-[mypy-attrs_strict.*] # a bit sad, but...
-ignore_missing_imports = True
-
-[mypy-deprecated.*]
-ignore_missing_imports = True
-
-[mypy-django.*] # false positive, only used my hypotesis' extras
-ignore_missing_imports = True
-
-[mypy-dulwich.*]
-ignore_missing_imports = True
-
-[mypy-iso8601.*]
-ignore_missing_imports = True
-
-[mypy-pkg_resources.*]
-ignore_missing_imports = True
-
-[mypy-pytest.*]
-ignore_missing_imports = True
diff --git a/swh.model.egg-info/PKG-INFO b/swh.model.egg-info/PKG-INFO
index 4a1f9f1..0bffcf8 100644
--- a/swh.model.egg-info/PKG-INFO
+++ b/swh.model.egg-info/PKG-INFO
@@ -1,42 +1,42 @@
Metadata-Version: 2.1
Name: swh.model
-Version: 6.6.0
+Version: 6.6.1
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-model
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: cli
Provides-Extra: testing-minimal
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-model
=========
Implementation of the Data model of the Software Heritage project, used to
archive source code artifacts.
This module defines the notion of SoftWare Heritage persistent IDentifiers
(SWHIDs) and provides tools to compute them:
```sh
$ swh-identify fork.c kmod.c sched/deadline.c
swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c
swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c
swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c
$ swh-identify --no-filename /usr/src/linux/kernel/
swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab
```
diff --git a/swh/model/from_disk.py b/swh/model/from_disk.py
index 8bd7f5d..058e77f 100644
--- a/swh/model/from_disk.py
+++ b/swh/model/from_disk.py
@@ -1,592 +1,595 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Conversion from filesystem tree to SWH objects.
This module allows reading a tree of directories and files from a local
filesystem, and convert them to in-memory data structures, which can then
be exported to SWH data model objects, as defined in :mod:`swh.model.model`.
"""
import datetime
import enum
import fnmatch
import glob
import os
import re
import stat
from typing import Any, Iterable, Iterator, List, Optional, Pattern, Tuple
import attr
from attrs_strict import type_validator
from typing_extensions import Final
from . import model
from .exceptions import InvalidDirectoryPath
from .git_objects import directory_entry_sort_key
from .hashutil import MultiHash, hash_to_hex
from .merkle import MerkleLeaf, MerkleNode
from .swhids import CoreSWHID, ObjectType
@attr.s(frozen=True, slots=True)
class DiskBackedContent(model.BaseContent):
"""Content-like class, which allows lazy-loading data from the disk."""
object_type: Final = "content_file"
sha1 = attr.ib(type=bytes, validator=type_validator())
sha1_git = attr.ib(type=model.Sha1Git, validator=type_validator())
sha256 = attr.ib(type=bytes, validator=type_validator())
blake2s256 = attr.ib(type=bytes, validator=type_validator())
length = attr.ib(type=int, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(["visible", "hidden"]),
default="visible",
)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
path = attr.ib(type=Optional[bytes], default=None)
@classmethod
def from_dict(cls, d):
return cls(**d)
def __attrs_post_init__(self):
if self.path is None:
raise TypeError("path must not be None.")
def with_data(self) -> model.Content:
args = self.to_dict()
del args["path"]
assert self.path is not None
with open(self.path, "rb") as fd:
return model.Content.from_dict({**args, "data": fd.read()})
class DentryPerms(enum.IntEnum):
"""Admissible permissions for directory entries."""
content = 0o100644
"""Content"""
executable_content = 0o100755
"""Executable content (e.g. executable script)"""
symlink = 0o120000
"""Symbolic link"""
directory = 0o040000
"""Directory"""
revision = 0o160000
"""Revision (e.g. submodule)"""
def mode_to_perms(mode):
"""Convert a file mode to a permission compatible with Software Heritage
directory entries
Args:
mode (int): a file mode as returned by :func:`os.stat` in
:attr:`os.stat_result.st_mode`
Returns:
DentryPerms: one of the following values:
:const:`DentryPerms.content`: plain file
:const:`DentryPerms.executable_content`: executable file
:const:`DentryPerms.symlink`: symbolic link
:const:`DentryPerms.directory`: directory
"""
if stat.S_ISLNK(mode):
return DentryPerms.symlink
if stat.S_ISDIR(mode):
return DentryPerms.directory
else:
# file is executable in any way
if mode & (0o111):
return DentryPerms.executable_content
else:
return DentryPerms.content
class Content(MerkleLeaf):
"""Representation of a Software Heritage content as a node in a Merkle tree.
The current Merkle hash for the Content nodes is the `sha1_git`, which
makes it consistent with what :class:`Directory` uses for its own hash
computation.
"""
__slots__ = [] # type: List[str]
object_type: Final = "content"
@classmethod
def from_bytes(cls, *, mode, data):
"""Convert data (raw :class:`bytes`) to a Software Heritage content entry
Args:
mode (int): a file mode (passed to :func:`mode_to_perms`)
data (bytes): raw contents of the file
"""
ret = MultiHash.from_data(data).digest()
ret["length"] = len(data)
ret["perms"] = mode_to_perms(mode)
ret["data"] = data
ret["status"] = "visible"
return cls(ret)
@classmethod
def from_symlink(cls, *, path, mode):
"""Convert a symbolic link to a Software Heritage content entry"""
- return cls.from_bytes(mode=mode, data=os.readlink(path))
+ content = cls.from_bytes(mode=mode, data=os.readlink(path))
+ content.data["path"] = path
+ return content
@classmethod
def from_file(cls, *, path, max_content_length=None):
"""Compute the Software Heritage content entry corresponding to an
on-disk file.
The returned dictionary contains keys useful for both:
- loading the content in the archive (hashes, `length`)
- using the content as a directory entry in a directory
Args:
save_path (bool): add the file path to the entry
max_content_length (Optional[int]): if given, all contents larger
than this will be skipped.
"""
file_stat = os.lstat(path)
mode = file_stat.st_mode
length = file_stat.st_size
too_large = max_content_length is not None and length > max_content_length
if stat.S_ISLNK(mode):
# Symbolic link: return a file whose contents are the link target
if too_large:
# Unlike large contents, we can't stream symlinks to
# MultiHash, and we don't want to fit them in memory if
# they exceed max_content_length either.
# Thankfully, this should not happen for reasonable values of
# max_content_length because of OS/filesystem limitations,
# so let's just raise an error.
raise Exception(f"Symlink too large ({length} bytes)")
return cls.from_symlink(path=path, mode=mode)
elif not stat.S_ISREG(mode):
# not a regular file: return the empty file instead
return cls.from_bytes(mode=mode, data=b"")
if too_large:
skip_reason = "Content too large"
else:
skip_reason = None
hashes = MultiHash.from_path(path).digest()
if skip_reason:
ret = {
**hashes,
"status": "absent",
"reason": skip_reason,
}
else:
ret = {
**hashes,
"status": "visible",
}
ret["path"] = path
ret["perms"] = mode_to_perms(mode)
ret["length"] = length
obj = cls(ret)
return obj
def swhid(self) -> CoreSWHID:
"""Return node identifier as a SWHID"""
return CoreSWHID(object_type=ObjectType.CONTENT, object_id=self.hash)
def __repr__(self):
return "Content(id=%s)" % hash_to_hex(self.hash)
def compute_hash(self):
return self.data["sha1_git"]
def to_model(self) -> model.BaseContent:
"""Builds a `model.BaseContent` object based on this leaf."""
data = self.get_data().copy()
data.pop("perms", None)
if data["status"] == "absent":
data.pop("path", None)
return model.SkippedContent.from_dict(data)
elif "data" in data:
+ data.pop("path", None)
return model.Content.from_dict(data)
else:
return DiskBackedContent.from_dict(data)
def accept_all_directories(dirpath: str, dirname: str, entries: Iterable[Any]) -> bool:
"""Default filter for :func:`Directory.from_disk` accepting all
directories
Args:
dirname (bytes): directory name
entries (list): directory entries
"""
return True
def ignore_empty_directories(
dirpath: str, dirname: str, entries: Iterable[Any]
) -> bool:
"""Filter for :func:`directory_to_objects` ignoring empty directories
Args:
dirname (bytes): directory name
entries (list): directory entries
Returns:
True if the directory is not empty, false if the directory is empty
"""
return bool(entries)
def ignore_named_directories(names, *, case_sensitive=True):
"""Filter for :func:`directory_to_objects` to ignore directories named one
of names.
Args:
names (list of bytes): names to ignore
case_sensitive (bool): whether to do the filtering in a case sensitive
way
Returns:
a directory filter for :func:`directory_to_objects`
"""
if not case_sensitive:
names = [name.lower() for name in names]
def named_filter(
dirpath: str,
dirname: str,
entries: Iterable[Any],
names: Iterable[Any] = names,
case_sensitive: bool = case_sensitive,
):
if case_sensitive:
return dirname not in names
else:
return dirname.lower() not in names
return named_filter
# TODO: `extract_regex_objs` has been copied and adapted from `swh.scanner`.
# In the future `swh.scanner` should use the `swh.model` version and remove its own.
def extract_regex_objs(
root_path: bytes, patterns: Iterable[bytes]
) -> Iterator[Pattern[bytes]]:
"""Generates a regex object for each pattern given in input and checks if
the path is a subdirectory or relative to the root path.
Args:
root_path (bytes): path to the root directory
patterns (list of byte): shell patterns to match
Yields:
an SRE_Pattern object
"""
absolute_root_path = os.path.abspath(root_path)
for pattern in patterns:
if os.path.isabs(pattern):
pattern = os.path.relpath(pattern, root_path)
# python 3.10 has a `root_dir` argument for glob, but not the previous
# version. So we adjust the pattern
test_pattern = os.path.join(absolute_root_path, pattern)
for path in glob.glob(test_pattern):
if os.path.isabs(path) and not path.startswith(absolute_root_path):
error_msg = (
b'The path "' + path + b'" is not a subdirectory or relative '
b'to the root directory path: "' + root_path + b'"'
)
raise InvalidDirectoryPath(error_msg)
regex = fnmatch.translate((pattern.decode()))
yield re.compile(regex.encode())
def ignore_directories_patterns(root_path: bytes, patterns: Iterable[bytes]):
"""Filter for :func:`directory_to_objects` to ignore directories
matching certain patterns.
Args:
root_path (bytes): path of the root directory
patterns (list of bytes): patterns to ignore
Returns:
a directory filter for :func:`directory_to_objects`
"""
sre_patterns = set(extract_regex_objs(root_path, patterns))
def pattern_filter(
dirpath: bytes,
dirname: bytes,
entries: Iterable[Any],
patterns: Iterable[Any] = sre_patterns,
root_path: bytes = os.path.abspath(root_path),
):
full_path = os.path.abspath(dirpath)
relative_path = os.path.relpath(full_path, root_path)
return not any([pattern.match(relative_path) for pattern in patterns])
return pattern_filter
def iter_directory(
directory,
) -> Tuple[List[model.Content], List[model.SkippedContent], List[model.Directory]]:
"""Return the directory listing from a disk-memory directory instance.
Raises:
TypeError in case an unexpected object type is listed.
Returns:
Tuple of respectively iterable of content, skipped content and directories.
"""
contents: List[model.Content] = []
skipped_contents: List[model.SkippedContent] = []
directories: List[model.Directory] = []
for obj in directory.iter_tree():
obj = obj.to_model()
obj_type = obj.object_type
if obj_type in (model.Content.object_type, DiskBackedContent.object_type):
# FIXME: read the data from disk later (when the
# storage buffer is flushed).
obj = obj.with_data()
contents.append(obj)
elif obj_type == model.SkippedContent.object_type:
skipped_contents.append(obj)
elif obj_type == model.Directory.object_type:
directories.append(obj)
else:
raise TypeError(f"Unexpected object type from disk: {obj}")
return contents, skipped_contents, directories
class Directory(MerkleNode):
"""Representation of a Software Heritage directory as a node in a Merkle Tree.
This class can be used to generate, from an on-disk directory, all the
objects that need to be sent to the Software Heritage archive.
The :func:`from_disk` constructor allows you to generate the data structure
from a directory on disk. The resulting :class:`Directory` can then be
manipulated as a dictionary, using the path as key.
The :func:`collect` method is used to retrieve all the objects that need to
be added to the Software Heritage archive since the last collection, by
class (contents and directories).
When using the dict-like methods to update the contents of the directory,
the affected levels of hierarchy are reset and can be collected again using
the same method. This enables the efficient collection of updated nodes,
for instance when the client is applying diffs.
"""
__slots__ = ["__entries", "__model_object"]
object_type: Final = "directory"
@classmethod
def from_disk(
cls, *, path, dir_filter=accept_all_directories, max_content_length=None
):
"""Compute the Software Heritage objects for a given directory tree
Args:
path (bytes): the directory to traverse
data (bool): whether to add the data to the content objects
save_path (bool): whether to add the path to the content objects
dir_filter (function): a filter to ignore some directories by
name or contents. Takes two arguments: dirname and entries, and
returns True if the directory should be added, False if the
directory should be ignored.
max_content_length (Optional[int]): if given, all contents larger
than this will be skipped.
"""
top_path = path
dirs = {}
for root, dentries, fentries in os.walk(top_path, topdown=False):
entries = {}
# Join fentries and dentries in the same processing, as symbolic
# links to directories appear in dentries...
for name in fentries + dentries:
path = os.path.join(root, name)
if not os.path.isdir(path) or os.path.islink(path):
content = Content.from_file(
path=path, max_content_length=max_content_length
)
entries[name] = content
else:
if dir_filter(path, name, dirs[path].entries):
entries[name] = dirs[path]
dirs[root] = cls({"name": os.path.basename(root), "path": root})
dirs[root].update(entries)
return dirs[top_path]
def __init__(self, data=None):
super().__init__(data=data)
self.__entries = None
self.__model_object = None
def invalidate_hash(self):
self.__entries = None
self.__model_object = None
super().invalidate_hash()
@staticmethod
def child_to_directory_entry(name, child):
if child.object_type == "directory":
return {
"type": "dir",
"perms": DentryPerms.directory,
"target": child.hash,
"name": name,
}
elif child.object_type == "content":
return {
"type": "file",
"perms": child.data["perms"],
"target": child.hash,
"name": name,
}
else:
raise ValueError(f"unknown child {child}")
def get_data(self, **kwargs):
return {
"id": self.hash,
"entries": self.entries,
}
@property
def entries(self):
"""Child nodes, sorted by name in the same way
:func:`swh.model.git_objects.directory_git_object` does."""
if self.__entries is None:
self.__entries = sorted(
(
self.child_to_directory_entry(name, child)
for name, child in self.items()
),
key=directory_entry_sort_key,
)
return self.__entries
def swhid(self) -> CoreSWHID:
"""Return node identifier as a SWHID"""
return CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=self.hash)
def compute_hash(self):
return self.to_model().id
def to_model(self) -> model.Directory:
"""Builds a `model.Directory` object based on this node;
ignoring its children."""
if self.__model_object is None:
DirectoryEntry = model.DirectoryEntry
entries = []
for name, child in self.items():
if child.object_type == "directory":
e = DirectoryEntry(
type="dir",
perms=DentryPerms.directory,
target=child.hash,
name=name,
)
elif child.object_type == "content":
e = DirectoryEntry(
type="file",
perms=child.data["perms"],
target=child.hash,
name=name,
)
else:
raise ValueError(f"unknown child {child}")
entries.append(e)
entries.sort(key=directory_entry_sort_key)
self.__model_object = model.Directory(entries=tuple(entries))
return self.__model_object
def __getitem__(self, key):
if not isinstance(key, bytes):
raise ValueError("Can only get a bytes from Directory")
# Convenience shortcut
if key == b"":
return self
if b"/" not in key:
return super().__getitem__(key)
else:
key1, key2 = key.split(b"/", 1)
return self.__getitem__(key1)[key2]
def __setitem__(self, key, value):
if not isinstance(key, bytes):
raise ValueError("Can only set a bytes Directory entry")
if not isinstance(value, (Content, Directory)):
raise ValueError(
"Can only set a Directory entry to a Content or " "Directory"
)
if key == b"":
raise ValueError("Directory entry must have a name")
if b"\x00" in key:
raise ValueError("Directory entry name must not contain nul bytes")
if b"/" not in key:
return super().__setitem__(key, value)
else:
key1, key2 = key.rsplit(b"/", 1)
self[key1].__setitem__(key2, value)
def __delitem__(self, key):
if not isinstance(key, bytes):
raise ValueError("Can only delete a bytes Directory entry")
if b"/" not in key:
super().__delitem__(key)
else:
key1, key2 = key.rsplit(b"/", 1)
del self[key1][key2]
def __contains__(self, key):
if b"/" not in key:
return super().__contains__(key)
else:
key1, key2 = key.split(b"/", 1)
return super().__contains__(key1) and self[key1].__contains__(key2)
def __repr__(self):
return "Directory(id=%s, entries=[%s])" % (
hash_to_hex(self.hash),
", ".join(str(entry) for entry in self),
)
diff --git a/swh/model/tests/test_from_disk.py b/swh/model/tests/test_from_disk.py
index c07fef6..1ebcbbe 100644
--- a/swh/model/tests/test_from_disk.py
+++ b/swh/model/tests/test_from_disk.py
@@ -1,1008 +1,1010 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import os
import tarfile
import tempfile
from typing import ClassVar, Optional
import unittest
import pytest
from swh.model import from_disk, model
from swh.model.from_disk import Content, DentryPerms, Directory, DiskBackedContent
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
TEST_DATA = os.path.join(os.path.dirname(__file__), "data")
class ModeToPerms(unittest.TestCase):
def setUp(self):
super().setUp()
# Generate a full permissions map
self.perms_map = {}
# Symlinks
for i in range(0o120000, 0o127777 + 1):
self.perms_map[i] = DentryPerms.symlink
# Directories
for i in range(0o040000, 0o047777 + 1):
self.perms_map[i] = DentryPerms.directory
# Other file types: socket, regular file, block device, character
# device, fifo all map to regular files
for ft in [0o140000, 0o100000, 0o060000, 0o020000, 0o010000]:
for i in range(ft, ft + 0o7777 + 1):
if i & 0o111:
# executable bits are set
self.perms_map[i] = DentryPerms.executable_content
else:
self.perms_map[i] = DentryPerms.content
def test_exhaustive_mode_to_perms(self):
for fmode, perm in self.perms_map.items():
self.assertEqual(perm, from_disk.mode_to_perms(fmode))
class TestDiskBackedContent(unittest.TestCase):
def test_with_data(self):
expected_content = model.Content(
length=42,
status="visible",
data=b"foo bar",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
content = DiskBackedContent(
length=42,
status="visible",
path=fd.name,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
fd.write(b"foo bar")
fd.seek(0)
content_with_data = content.with_data()
assert expected_content == content_with_data
def test_lazy_data(self):
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
fd.write(b"foo")
fd.seek(0)
content = DiskBackedContent(
length=42,
status="visible",
path=fd.name,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
fd.write(b"bar")
fd.seek(0)
content_with_data = content.with_data()
fd.write(b"baz")
fd.seek(0)
assert content_with_data.data == b"bar"
def test_with_data_cannot_read(self):
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
content = DiskBackedContent(
length=42,
status="visible",
path=fd.name,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with pytest.raises(OSError):
content.with_data()
def test_missing_path(self):
with pytest.raises(TypeError):
DiskBackedContent(
length=42,
status="visible",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with pytest.raises(TypeError):
DiskBackedContent(
length=42,
status="visible",
path=None,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
class DataMixin:
maxDiff = None # type: ClassVar[Optional[int]]
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory(prefix="swh.model.from_disk")
self.tmpdir_name = os.fsencode(self.tmpdir.name)
self.contents = {
b"file": {
"data": b"42\n",
"sha1": hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
"sha256": hash_to_bytes(
"084c799cd551dd1d8d5c5f9a5d593b2e"
"931f5e36122ee5c793c1d08a19839cc0"
),
"sha1_git": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
"blake2s256": hash_to_bytes(
"d5fe1939576527e42cfd76a9455a2432"
"fe7f56669564577dd93c4280e76d661d"
),
"length": 3,
"mode": 0o100644,
},
}
self.symlinks = {
b"symlink": {
"data": b"target",
"blake2s256": hash_to_bytes(
"595d221b30fdd8e10e2fdf18376e688e"
"9f18d56fd9b6d1eb6a822f8c146c6da6"
),
"sha1": hash_to_bytes("0e8a3ad980ec179856012b7eecf4327e99cd44cd"),
"sha1_git": hash_to_bytes("1de565933b05f74c75ff9a6520af5f9f8a5a2f1d"),
"sha256": hash_to_bytes(
"34a04005bcaf206eec990bd9637d9fdb"
"6725e0a0c0d4aebf003f17f4c956eb5c"
),
"length": 6,
"perms": DentryPerms.symlink,
}
}
self.specials = {
b"fifo": os.mkfifo,
}
self.empty_content = {
"data": b"",
"length": 0,
"blake2s256": hash_to_bytes(
"69217a3079908094e11121d042354a7c" "1f55b6482ca1a51e1b250dfd1ed0eef9"
),
"sha1": hash_to_bytes("da39a3ee5e6b4b0d3255bfef95601890afd80709"),
"sha1_git": hash_to_bytes("e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"),
"sha256": hash_to_bytes(
"e3b0c44298fc1c149afbf4c8996fb924" "27ae41e4649b934ca495991b7852b855"
),
"perms": DentryPerms.content,
}
self.empty_directory = {
"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
"entries": [],
}
# Generated with generate_testdata_from_disk
self.tarball_contents = {
b"": {
"entries": [
{
"name": b"bar",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"3c1f578394f4623f74a0ba7fe761729f59fc6ec4"
),
"type": "dir",
},
{
"name": b"empty-folder",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"4b825dc642cb6eb9a060e54bf8d69288fbee4904"
),
"type": "dir",
},
{
"name": b"foo",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"2b41c40f0d1fbffcba12497db71fba83fcca96e5"
),
"type": "dir",
},
{
"name": b"link-to-another-quote",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"7d5c08111e21c8a9f71540939998551683375fad"
),
"type": "file",
},
{
"name": b"link-to-binary",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"e86b45e538d9b6888c969c89fbd22a85aa0e0366"
),
"type": "file",
},
{
"name": b"link-to-foo",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"19102815663d23f8b75a47e7a01965dcdc96468c"
),
"type": "file",
},
{
"name": b"some-binary",
"perms": DentryPerms.executable_content,
"target": hash_to_bytes(
"68769579c3eaadbe555379b9c3538e6628bae1eb"
),
"type": "file",
},
],
"id": hash_to_bytes("e8b0f1466af8608c8a3fb9879db172b887e80759"),
},
b"bar": {
"entries": [
{
"name": b"barfoo",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"c3020f6bf135a38c6df3afeb5fb38232c5e07087"
),
"type": "dir",
}
],
"id": hash_to_bytes("3c1f578394f4623f74a0ba7fe761729f59fc6ec4"),
},
b"bar/barfoo": {
"entries": [
{
"name": b"another-quote.org",
"perms": DentryPerms.content,
"target": hash_to_bytes(
"133693b125bad2b4ac318535b84901ebb1f6b638"
),
"type": "file",
}
],
"id": hash_to_bytes("c3020f6bf135a38c6df3afeb5fb38232c5e07087"),
},
b"bar/barfoo/another-quote.org": {
"blake2s256": hash_to_bytes(
"d26c1cad82d43df0bffa5e7be11a60e3"
"4adb85a218b433cbce5278b10b954fe8"
),
"length": 72,
"perms": DentryPerms.content,
"sha1": hash_to_bytes("90a6138ba59915261e179948386aa1cc2aa9220a"),
"sha1_git": hash_to_bytes("133693b125bad2b4ac318535b84901ebb1f6b638"),
"sha256": hash_to_bytes(
"3db5ae168055bcd93a4d08285dc99ffe"
"e2883303b23fac5eab850273a8ea5546"
),
},
b"empty-folder": {
"entries": [],
"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
},
b"foo": {
"entries": [
{
"name": b"barfoo",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"8185dfb2c0c2c597d16f75a8a0c37668567c3d7e"
),
"type": "file",
},
{
"name": b"quotes.md",
"perms": DentryPerms.content,
"target": hash_to_bytes(
"7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"
),
"type": "file",
},
{
"name": b"rel-link-to-barfoo",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"acac326ddd63b0bc70840659d4ac43619484e69f"
),
"type": "file",
},
],
"id": hash_to_bytes("2b41c40f0d1fbffcba12497db71fba83fcca96e5"),
},
b"foo/barfoo": {
"blake2s256": hash_to_bytes(
"e1252f2caa4a72653c4efd9af871b62b"
"f2abb7bb2f1b0e95969204bd8a70d4cd"
),
"data": b"bar/barfoo",
"length": 10,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("9057ee6d0162506e01c4d9d5459a7add1fedac37"),
"sha1_git": hash_to_bytes("8185dfb2c0c2c597d16f75a8a0c37668567c3d7e"),
"sha256": hash_to_bytes(
"29ad3f5725321b940332c78e403601af"
"ff61daea85e9c80b4a7063b6887ead68"
),
},
b"foo/quotes.md": {
"blake2s256": hash_to_bytes(
"bf7ce4fe304378651ee6348d3e9336ed"
"5ad603d33e83c83ba4e14b46f9b8a80b"
),
"length": 66,
"perms": DentryPerms.content,
"sha1": hash_to_bytes("1bf0bb721ac92c18a19b13c0eb3d741cbfadebfc"),
"sha1_git": hash_to_bytes("7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"),
"sha256": hash_to_bytes(
"caca942aeda7b308859eb56f909ec96d"
"07a499491690c453f73b9800a93b1659"
),
},
b"foo/rel-link-to-barfoo": {
"blake2s256": hash_to_bytes(
"d9c327421588a1cf61f316615005a2e9"
"c13ac3a4e96d43a24138d718fa0e30db"
),
"data": b"../bar/barfoo",
"length": 13,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("dc51221d308f3aeb2754db48391b85687c2869f4"),
"sha1_git": hash_to_bytes("acac326ddd63b0bc70840659d4ac43619484e69f"),
"sha256": hash_to_bytes(
"8007d20db2af40435f42ddef4b8ad76b"
"80adbec26b249fdf0473353f8d99df08"
),
},
b"link-to-another-quote": {
"blake2s256": hash_to_bytes(
"2d0e73cea01ba949c1022dc10c8a43e6"
"6180639662e5dc2737b843382f7b1910"
),
"data": b"bar/barfoo/another-quote.org",
"length": 28,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("cbeed15e79599c90de7383f420fed7acb48ea171"),
"sha1_git": hash_to_bytes("7d5c08111e21c8a9f71540939998551683375fad"),
"sha256": hash_to_bytes(
"e6e17d0793aa750a0440eb9ad5b80b25"
"8076637ef0fb68f3ac2e59e4b9ac3ba6"
),
},
b"link-to-binary": {
"blake2s256": hash_to_bytes(
"9ce18b1adecb33f891ca36664da676e1"
"2c772cc193778aac9a137b8dc5834b9b"
),
"data": b"some-binary",
"length": 11,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("d0248714948b3a48a25438232a6f99f0318f59f1"),
"sha1_git": hash_to_bytes("e86b45e538d9b6888c969c89fbd22a85aa0e0366"),
"sha256": hash_to_bytes(
"14126e97d83f7d261c5a6889cee73619"
"770ff09e40c5498685aba745be882eff"
),
},
b"link-to-foo": {
"blake2s256": hash_to_bytes(
"08d6cad88075de8f192db097573d0e82"
"9411cd91eb6ec65e8fc16c017edfdb74"
),
"data": b"foo",
"length": 3,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"),
"sha1_git": hash_to_bytes("19102815663d23f8b75a47e7a01965dcdc96468c"),
"sha256": hash_to_bytes(
"2c26b46b68ffc68ff99b453c1d304134"
"13422d706483bfa0f98a5e886266e7ae"
),
},
b"some-binary": {
"blake2s256": hash_to_bytes(
"922e0f7015035212495b090c27577357"
"a740ddd77b0b9e0cd23b5480c07a18c6"
),
"length": 5,
"perms": DentryPerms.executable_content,
"sha1": hash_to_bytes("0bbc12d7f4a2a15b143da84617d95cb223c9b23c"),
"sha1_git": hash_to_bytes("68769579c3eaadbe555379b9c3538e6628bae1eb"),
"sha256": hash_to_bytes(
"bac650d34a7638bb0aeb5342646d24e3"
"b9ad6b44c9b383621faa482b990a367d"
),
},
}
def tearDown(self):
self.tmpdir.cleanup()
def assertContentEqual(self, left, right, *, check_path=False): # noqa
if not isinstance(left, Content):
raise ValueError("%s is not a Content" % left)
if isinstance(right, Content):
right = right.get_data()
# Compare dictionaries
keys = DEFAULT_ALGORITHMS | {
"length",
"perms",
}
if check_path:
keys |= {"path"}
failed = []
for key in keys:
try:
lvalue = left.data[key]
if key == "perms" and "perms" not in right:
rvalue = from_disk.mode_to_perms(right["mode"])
else:
rvalue = right[key]
except KeyError:
failed.append(key)
continue
if lvalue != rvalue:
failed.append(key)
if failed:
raise self.failureException(
"Content mismatched:\n"
+ "\n".join(
"content[%s] = %r != %r" % (key, left.data.get(key), right.get(key))
for key in failed
)
)
def assertDirectoryEqual(self, left, right): # NoQA
if not isinstance(left, Directory):
raise ValueError("%s is not a Directory" % left)
if isinstance(right, Directory):
right = right.get_data()
assert left.entries == right["entries"]
assert left.hash == right["id"]
assert left.to_model() == model.Directory.from_dict(right)
def make_contents(self, directory):
for filename, content in self.contents.items():
path = os.path.join(directory, filename)
with open(path, "wb") as f:
f.write(content["data"])
os.chmod(path, content["mode"])
def make_symlinks(self, directory):
for filename, symlink in self.symlinks.items():
path = os.path.join(directory, filename)
os.symlink(symlink["data"], path)
def make_specials(self, directory):
for filename, fn in self.specials.items():
path = os.path.join(directory, filename)
fn(path)
def make_from_tarball(self, directory):
tarball = os.path.join(TEST_DATA, "dir-folders", "sample-folder.tgz")
with tarfile.open(tarball, "r:gz") as f:
f.extractall(os.fsdecode(directory))
class TestContent(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
def test_data_to_content(self):
for filename, content in self.contents.items():
conv_content = Content.from_bytes(
mode=content["mode"], data=content["data"]
)
self.assertContentEqual(conv_content, content)
self.assertIn(hash_to_hex(conv_content.hash), repr(conv_content))
def test_content_swhid(self):
for _, content in self.contents.items():
content_res = Content.from_bytes(mode=content["mode"], data=content["data"])
content_swhid = "swh:1:cnt:" + hash_to_hex(content["sha1_git"])
assert str(content_res.swhid()) == content_swhid
class TestDirectory(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
def test_directory_swhid(self):
directory_swhid = "swh:1:dir:" + hash_to_hex(self.empty_directory["id"])
directory = Directory.from_disk(path=self.tmpdir_name)
assert str(directory.swhid()) == directory_swhid
class SymlinkToContent(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_symlinks(self.tmpdir_name)
def test_symlink_to_content(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
perms = 0o120000
conv_content = Content.from_symlink(path=path, mode=perms)
- self.assertContentEqual(conv_content, symlink)
+ symlink_copy = symlink.copy()
+ symlink_copy["path"] = path
+ self.assertContentEqual(conv_content, symlink_copy, check_path=True)
def test_symlink_to_base_model(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
perms = 0o120000
model_content = Content.from_symlink(path=path, mode=perms).to_model()
right = symlink.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
class FileToContent(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_contents(self.tmpdir_name)
self.make_symlinks(self.tmpdir_name)
self.make_specials(self.tmpdir_name)
def test_symlink_to_content(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, symlink)
def test_file_to_content(self):
for filename, content in self.contents.items():
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, content)
def test_special_to_content(self):
for filename in self.specials:
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, self.empty_content)
for path in ["/dev/null", "/dev/zero"]:
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, self.empty_content)
def test_symlink_to_content_model(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
model_content = Content.from_file(path=path).to_model()
right = symlink.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
def test_file_to_content_model(self):
for filename, content in self.contents.items():
path = os.path.join(self.tmpdir_name, filename)
model_content = Content.from_file(path=path).to_model()
right = content.copy()
for key in ("perms", "mode"):
right.pop(key, None)
assert model_content.with_data() == model.Content.from_dict(right)
right["path"] = path
del right["data"]
assert model_content == DiskBackedContent.from_dict(right)
def test_special_to_content_model(self):
for filename in self.specials:
path = os.path.join(self.tmpdir_name, filename)
model_content = Content.from_file(path=path).to_model()
right = self.empty_content.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
for path in ["/dev/null", "/dev/zero"]:
model_content = Content.from_file(path=path).to_model()
right = self.empty_content.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
def test_symlink_max_length(self):
for max_content_length in [4, 10]:
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
content = Content.from_file(path=path)
if content.data["length"] > max_content_length:
with pytest.raises(Exception, match="too large"):
Content.from_file(
path=path, max_content_length=max_content_length
)
else:
limited_content = Content.from_file(
path=path, max_content_length=max_content_length
)
assert content == limited_content
def test_file_max_length(self):
for max_content_length in [2, 4]:
for filename, content in self.contents.items():
path = os.path.join(self.tmpdir_name, filename)
content = Content.from_file(path=path)
limited_content = Content.from_file(
path=path, max_content_length=max_content_length
)
assert content.data["length"] == limited_content.data["length"]
assert content.data["status"] == "visible"
if content.data["length"] > max_content_length:
assert limited_content.data["status"] == "absent"
assert limited_content.data["reason"] == "Content too large"
else:
assert limited_content.data["status"] == "visible"
def test_special_file_max_length(self):
for max_content_length in [None, 0, 1]:
for filename in self.specials:
path = os.path.join(self.tmpdir_name, filename)
content = Content.from_file(path=path)
limited_content = Content.from_file(
path=path, max_content_length=max_content_length
)
assert limited_content == content
def test_file_to_content_with_path(self):
for filename, content in self.contents.items():
content_w_path = content.copy()
path = os.path.join(self.tmpdir_name, filename)
content_w_path["path"] = path
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, content_w_path, check_path=True)
@pytest.mark.fs
class DirectoryToObjects(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
contents = os.path.join(self.tmpdir_name, b"contents")
os.mkdir(contents)
self.make_contents(contents)
symlinks = os.path.join(self.tmpdir_name, b"symlinks")
os.mkdir(symlinks)
self.make_symlinks(symlinks)
specials = os.path.join(self.tmpdir_name, b"specials")
os.mkdir(specials)
self.make_specials(specials)
empties = os.path.join(self.tmpdir_name, b"empty1", b"empty2")
os.makedirs(empties)
def check_collect(
self, directory, expected_directory_count, expected_content_count
):
objs = directory.collect()
contents = []
directories = []
for obj in objs:
if isinstance(obj, Content):
contents.append(obj)
elif isinstance(obj, Directory):
directories.append(obj)
self.assertEqual(len(directories), expected_directory_count)
self.assertEqual(len(contents), expected_content_count)
def test_directory_to_objects(self):
directory = Directory.from_disk(path=self.tmpdir_name)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name, value in self.symlinks.items():
self.assertContentEqual(directory[b"symlinks/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
self.assertEqual(
directory[b"empty1/empty2"].get_data(),
self.empty_directory,
)
# Raise on non existent file
with self.assertRaisesRegex(KeyError, "b'nonexistent'"):
directory[b"empty1/nonexistent"]
# Raise on non existent directory
with self.assertRaisesRegex(KeyError, "b'nonexistentdir'"):
directory[b"nonexistentdir/file"]
self.check_collect(
directory,
expected_directory_count=6,
expected_content_count=len(self.contents) + len(self.symlinks) + 1,
)
def test_directory_to_objects_ignore_empty(self):
directory = Directory.from_disk(
path=self.tmpdir_name, dir_filter=from_disk.ignore_empty_directories
)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name, value in self.symlinks.items():
self.assertContentEqual(directory[b"symlinks/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
# empty directories have been ignored recursively
with self.assertRaisesRegex(KeyError, "b'empty1'"):
directory[b"empty1"]
with self.assertRaisesRegex(KeyError, "b'empty1'"):
directory[b"empty1/empty2"]
self.check_collect(
directory,
expected_directory_count=4,
expected_content_count=len(self.contents) + len(self.symlinks) + 1,
)
def test_directory_to_objects_ignore_name(self):
directory = Directory.from_disk(
path=self.tmpdir_name,
dir_filter=from_disk.ignore_named_directories([b"symlinks"]),
)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
self.assertEqual(
directory[b"empty1/empty2"].get_data(),
self.empty_directory,
)
with self.assertRaisesRegex(KeyError, "b'symlinks'"):
directory[b"symlinks"]
self.check_collect(
directory,
expected_directory_count=5,
expected_content_count=len(self.contents) + 1,
)
def test_directory_to_objects_ignore_name_case(self):
directory = Directory.from_disk(
path=self.tmpdir_name,
dir_filter=from_disk.ignore_named_directories(
[b"symLiNks"], case_sensitive=False
),
)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
self.assertEqual(
directory[b"empty1/empty2"].get_data(),
self.empty_directory,
)
with self.assertRaisesRegex(KeyError, "b'symlinks'"):
directory[b"symlinks"]
self.check_collect(
directory,
expected_directory_count=5,
expected_content_count=len(self.contents) + 1,
)
def test_directory_entry_order(self):
with tempfile.TemporaryDirectory() as dirname:
dirname = os.fsencode(dirname)
open(os.path.join(dirname, b"foo."), "a")
open(os.path.join(dirname, b"foo0"), "a")
os.mkdir(os.path.join(dirname, b"foo"))
directory = Directory.from_disk(path=dirname)
assert [entry["name"] for entry in directory.entries] == [
b"foo.",
b"foo",
b"foo0",
]
@pytest.mark.fs
class TarballTest(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_from_tarball(self.tmpdir_name)
def test_contents_match(self):
directory = Directory.from_disk(
path=os.path.join(self.tmpdir_name, b"sample-folder")
)
for name, expected in self.tarball_contents.items():
obj = directory[name]
if isinstance(obj, Content):
self.assertContentEqual(obj, expected)
elif isinstance(obj, Directory):
self.assertDirectoryEqual(obj, expected)
else:
raise self.failureException("Unknown type for %s" % obj)
class TarballIterDirectory(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_from_tarball(self.tmpdir_name)
def test_iter_directory(self):
"""Iter from_disk.directory should yield the full arborescence tree"""
directory = Directory.from_disk(
path=os.path.join(self.tmpdir_name, b"sample-folder")
)
contents, skipped_contents, directories = from_disk.iter_directory(directory)
expected_nb = defaultdict(int)
for name in self.tarball_contents.keys():
obj = directory[name]
expected_nb[obj.object_type] += 1
assert len(contents) == expected_nb["content"] and len(contents) > 0
assert len(skipped_contents) == 0
assert len(directories) == expected_nb["directory"] and len(directories) > 0
class DirectoryManipulation(DataMixin, unittest.TestCase):
def test_directory_access_nested(self):
d = Directory()
d[b"a"] = Directory()
d[b"a/b"] = Directory()
self.assertEqual(d[b"a/b"].get_data(), self.empty_directory)
def test_directory_del_nested(self):
d = Directory()
d[b"a"] = Directory()
d[b"a/b"] = Directory()
with self.assertRaisesRegex(KeyError, "b'c'"):
del d[b"a/b/c"]
with self.assertRaisesRegex(KeyError, "b'level2'"):
del d[b"a/level2/c"]
del d[b"a/b"]
self.assertEqual(d[b"a"].get_data(), self.empty_directory)
def test_directory_access_self(self):
d = Directory()
self.assertIs(d, d[b""])
self.assertIs(d, d[b"/"])
self.assertIs(d, d[b"//"])
def test_directory_access_wrong_type(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "bytes from Directory"):
d["foo"]
with self.assertRaisesRegex(ValueError, "bytes from Directory"):
d[42]
def test_directory_repr(self):
entries = [b"a", b"b", b"c"]
d = Directory()
for entry in entries:
d[entry] = Directory()
r = repr(d)
self.assertIn(hash_to_hex(d.hash), r)
for entry in entries:
self.assertIn(str(entry), r)
def test_directory_set_wrong_type_name(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
d["foo"] = Directory()
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
d[42] = Directory()
def test_directory_set_nul_in_name(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "nul bytes"):
d[b"\x00\x01"] = Directory()
def test_directory_set_empty_name(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "must have a name"):
d[b""] = Directory()
with self.assertRaisesRegex(ValueError, "must have a name"):
d[b"/"] = Directory()
def test_directory_set_wrong_type(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "Content or Directory"):
d[b"entry"] = object()
def test_directory_del_wrong_type(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
del d["foo"]
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
del d[42]
def test_directory_contains(self):
d = Directory()
d[b"a"] = Directory()
d[b"a/b"] = Directory()
d[b"a/b/c"] = Directory()
d[b"a/b/c/d"] = Content()
self.assertIn(b"a", d)
self.assertIn(b"a/b", d)
self.assertIn(b"a/b/c", d)
self.assertIn(b"a/b/c/d", d)
self.assertNotIn(b"b", d)
self.assertNotIn(b"b/c", d)
self.assertNotIn(b"b/c/d", d)
diff --git a/tox.ini b/tox.ini
index 5198d08..38261f4 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,83 +1,84 @@
[tox]
envlist=black,flake8,mypy,py3-{minimal,full}
[testenv]
extras =
full: testing
minimal: testing-minimal
deps =
pytest-cov
commands =
pytest \
--doctest-modules \
full: --cov={envsitepackagesdir}/swh/model --cov-branch {posargs} \
full: {envsitepackagesdir}/swh/model
minimal: {envsitepackagesdir}/swh/model/tests/test_cli.py -m 'not requires_optional_deps'
[testenv:py3]
skip_install = true
deps = tox
commands =
tox -e py3-full -- {posargs}
tox -e py3-minimal -- {posargs}
[testenv:black]
skip_install = true
deps =
- black==22.3.0
+ black==22.10.0
commands =
{envpython} -m black --check swh
[testenv:flake8]
skip_install = true
deps =
- flake8==4.0.1
- flake8-bugbear==22.3.23
+ flake8==5.0.4
+ flake8-bugbear==22.9.23
+ pycodestyle==2.9.1
commands =
{envpython} -m flake8
[testenv:mypy]
extras =
testing
deps =
mypy==0.942
commands =
mypy swh
# build documentation outside swh-environment using the current
# git HEAD of swh-docs, is executed on CI for each diff to prevent
# breaking doc build
[testenv:sphinx]
whitelist_externals = make
usedevelop = true
extras =
testing
deps =
# fetch and install swh-docs in develop mode
-e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs
setenv =
SWH_PACKAGE_DOC_TOX_BUILD = 1
# turn warnings into errors
SPHINXOPTS = -W
commands =
make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs
# build documentation only inside swh-environment using local state
# of swh-docs package
[testenv:sphinx-dev]
whitelist_externals = make
usedevelop = true
extras =
testing
deps =
# install swh-docs in develop mode
-e ../swh-docs
setenv =
SWH_PACKAGE_DOC_TOX_BUILD = 1
# turn warnings into errors
SPHINXOPTS = -W
commands =
make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs