Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9311721
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
View Options
diff --git a/ardumont/nixguix/nar.py b/ardumont/nixguix/nar.py
index 22cd219..2ed4ca7 100644
--- a/ardumont/nixguix/nar.py
+++ b/ardumont/nixguix/nar.py
@@ -1,160 +1,223 @@
# Copyright (C) 2022 @zimoun and the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import hashlib
import io
import os
import stat
from pathlib import Path
import click
class Nar:
+ """NAR serializer.
+
+ This builds the NAR structure and serializes it as per the phd thesis from Eelco
+ Dolstra thesis.
+
+ For example, this tree on a filesystem:
+
+ $ tree foo
+ foo
+ ├── bar
+ │ └── exe
+ └── baz
+
+ 1 directory, 2 files
+
+ serializes as:
+
+ nix-archive-1
+ (
+ type
+ directory
+ entry
+ (
+ name
+ bar
+ node
+ (
+ type
+ directory
+ entry
+ (
+ name
+ exe
+ node
+ (
+ type
+ regular
+ executable
+
+ contents
+ <_io.BufferedReader name='foo/bar/exe'>
+ )
+ )
+ )
+ )
+ entry
+ (
+ name
+ baz
+ node
+ (
+ type
+ regular
+ contents
+ <_io.BufferedReader name='foo/baz'>
+ )
+ )
+ )
+
+ """
+
def __init__(self, updater, isdebug=False):
self._update = updater
self.__isdebug = isdebug
self.__indent = 0
# Will be set in the serialize call
self.__path_to_ignore: str = ""
def str_(self, thing):
# named 'str' in Figure 5.2 p.93 (page 101 of pdf)
if self.__isdebug and (
isinstance(thing, str) or isinstance(thing, io.BufferedReader)
):
indent = "".join([" " for _ in range(self.__indent)])
print(indent + str(thing))
# named 'int'
if isinstance(thing, str):
byte_sequence = thing.encode("utf-8")
length = len(byte_sequence)
elif isinstance(thing, io.BufferedReader):
length = os.stat(thing.name).st_size
# ease reading of _serialize
elif isinstance(thing, list):
for stuff in thing:
self.str_(stuff)
return
else:
raise ValueError("not string nor file")
blen = length.to_bytes(8, byteorder="little") # 64-bit little endian
self._update(blen)
# first part of 'pad'
if isinstance(thing, str):
self._update(byte_sequence)
elif isinstance(thing, io.BufferedReader):
for chunk in iter(lambda: thing.read(2 * 2 * 2 * 2 * 4096), b""):
self._update(chunk)
# second part of 'pad
m = length % 8
if m == 0:
offset = 0
else:
offset = 8 - m
boffset = bytearray(offset)
self._update(boffset)
def _serialize(self, fso):
if self.__isdebug:
self.__indent += 1
self.str_("(")
mode = os.lstat(fso).st_mode
if stat.S_ISREG(mode):
self.str_(["type", "regular"])
if os.access(fso, os.X_OK):
self.str_(["executable", ""])
self.str_("contents")
with open(str(fso), "rb") as f:
self.str_(f)
elif stat.S_ISLNK(mode):
self.str_(["type", "symlink", "target"])
self.str_(os.readlink(fso))
elif stat.S_ISDIR(mode):
self.str_(["type", "directory"])
for path in sorted(Path(fso).iterdir()):
ignore = False
for path_to_ignore in self.__paths_to_ignore:
if path.match(path_to_ignore): # Ignore specific folder from hash
ignore = True
break
if ignore:
continue
self._serializeEntry(path)
else:
raise ValueError("unsupported file type")
self.str_(")")
if self.__isdebug:
self.__indent += -1
def _serializeEntry(self, fso):
if self.__isdebug:
self.__indent += 1
self.str_(["entry", "(", "name", fso.name, "node"])
self._serialize(fso)
self.str_(")")
if self.__isdebug:
self.__indent += -1
def serialize(self, fso):
self.str_("nix-archive-1")
- self.__paths_to_ignore = [f"{fso}/{folder}" for folder in [".git", ".hg", ".svn"]]
+ self.__paths_to_ignore = [
+ f"{fso}/{folder}" for folder in [".git", ".hg", ".svn"]
+ ]
self._serialize(fso)
return
@click.command()
@click.argument("directory")
@click.option("--hash-algo", "-H", default="sha256")
@click.option(
"--format-output",
"-f",
default="hex",
type=click.Choice(["hex", "base32", "base64"], case_sensitive=False),
)
@click.option("--debug/--no-debug", default=lambda: os.environ.get("DEBUG", False))
def cli(directory, hash_algo, format_output, debug):
"""Compute NAR hashes on a directory."""
h = hashlib.sha256() if hash_algo == "sha256" else "sha1"
updater = h.update
format_output = format_output.lower()
def identity(hsh):
return hsh
def convert_b64(hsh: str):
return base64.b64encode(bytes.fromhex(hsh)).decode().lower()
def convert_b32(hsh: str):
return base64.b32encode(bytes.fromhex(hsh)).decode().lower()
convert_fn = {
"hex": identity,
"base64": convert_b64,
"base32": convert_b32,
}
convert = convert_fn[format_output]
nar = Nar(updater, isdebug=debug)
nar.serialize(directory)
print(convert(h.hexdigest()))
if __name__ == "__main__":
cli()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:29 AM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3212995
Attached To
rDSNIP Code snippets
Event Timeline
Log In to Comment