Page MenuHomeSoftware Heritage

converters.py
No OneTemporary

converters.py

# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Convert dulwich objects to dictionaries suitable for swh.storage"""
from typing import Any, Dict, Optional, cast
from dulwich.objects import Blob, Commit, ShaFile, Tag, Tree
from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes
from swh.model.model import (
BaseContent,
Content,
Directory,
DirectoryEntry,
HashableObject,
ObjectType,
Person,
Release,
Revision,
RevisionType,
SkippedContent,
TargetType,
Timestamp,
TimestampWithTimezone,
)
class HashMismatch(Exception):
pass
def check_id(obj: HashableObject) -> None:
real_id = obj.compute_hash()
if obj.id != real_id:
raise HashMismatch(
f"Expected {type(obj).__name__} hash to be {obj.id.hex()}, "
f"got {real_id.hex()}"
)
def dulwich_blob_to_content_id(obj: ShaFile) -> Dict[str, Any]:
"""Convert a dulwich blob to a Software Heritage content id"""
if obj.type_name != b"blob":
raise ValueError("Argument is not a blob.")
blob = cast(Blob, obj)
size = blob.raw_length()
data = blob.as_raw_string()
hashes = MultiHash.from_data(data, DEFAULT_ALGORITHMS).digest()
if hashes["sha1_git"] != blob.sha().digest():
raise HashMismatch(
f"Expected Content hash to be {blob.sha().digest().hex()}, "
f"got {hashes['sha1_git'].hex()}"
)
hashes["length"] = size
return hashes
def dulwich_blob_to_content(obj: ShaFile, max_content_size=None) -> BaseContent:
"""Convert a dulwich blob to a Software Heritage content
"""
if obj.type_name != b"blob":
raise ValueError("Argument is not a blob.")
blob = cast(Blob, obj)
hashes = dulwich_blob_to_content_id(blob)
if max_content_size is not None and hashes["length"] >= max_content_size:
return SkippedContent(status="absent", reason="Content too large", **hashes,)
else:
return Content(data=blob.as_raw_string(), status="visible", **hashes,)
def dulwich_tree_to_directory(obj: ShaFile) -> Directory:
"""Format a tree as a directory"""
if obj.type_name != b"tree":
raise ValueError("Argument is not a tree.")
tree = cast(Tree, obj)
entries = []
entry_mode_map = {
0o040000: "dir",
0o160000: "rev",
0o100644: "file",
0o100755: "file",
0o120000: "file",
}
for entry in tree.iteritems():
entries.append(
DirectoryEntry(
type=entry_mode_map.get(entry.mode, "file"),
perms=entry.mode,
name=entry.path,
target=hash_to_bytes(entry.sha.decode("ascii")),
)
)
dir_ = Directory(id=tree.sha().digest(), entries=tuple(entries),)
check_id(dir_)
return dir_
def parse_author(name_email: bytes) -> Person:
"""Parse an author line"""
return Person.from_fullname(name_email)
def dulwich_tsinfo_to_timestamp(
timestamp, timezone, timezone_neg_utc
) -> TimestampWithTimezone:
"""Convert the dulwich timestamp information to a structure compatible with
Software Heritage"""
return TimestampWithTimezone(
timestamp=Timestamp(seconds=int(timestamp), microseconds=0,),
offset=timezone // 60,
negative_utc=timezone_neg_utc if timezone == 0 else False,
)
def dulwich_commit_to_revision(obj: ShaFile) -> Revision:
if obj.type_name != b"commit":
raise ValueError("Argument is not a commit.")
commit = cast(Commit, obj)
extra_headers = []
if commit.encoding is not None:
extra_headers.append((b"encoding", commit.encoding))
if commit.mergetag:
for mergetag in commit.mergetag:
raw_string = mergetag.as_raw_string()
assert raw_string.endswith(b"\n")
extra_headers.append((b"mergetag", raw_string[:-1]))
if commit.extra:
extra_headers.extend((k, v) for k, v in commit.extra)
if commit.gpgsig:
extra_headers.append((b"gpgsig", commit.gpgsig))
rev = Revision(
id=commit.sha().digest(),
author=parse_author(commit.author),
date=dulwich_tsinfo_to_timestamp(
commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc,
),
committer=parse_author(commit.committer),
committer_date=dulwich_tsinfo_to_timestamp(
commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc,
),
type=RevisionType.GIT,
directory=bytes.fromhex(commit.tree.decode()),
message=commit.message,
metadata=None,
extra_headers=tuple(extra_headers),
synthetic=False,
parents=tuple(bytes.fromhex(p.decode()) for p in commit.parents),
)
check_id(rev)
return rev
DULWICH_TARGET_TYPES = {
b"blob": TargetType.CONTENT,
b"tree": TargetType.DIRECTORY,
b"commit": TargetType.REVISION,
b"tag": TargetType.RELEASE,
}
DULWICH_OBJECT_TYPES = {
b"blob": ObjectType.CONTENT,
b"tree": ObjectType.DIRECTORY,
b"commit": ObjectType.REVISION,
b"tag": ObjectType.RELEASE,
}
def dulwich_tag_to_release(obj: ShaFile) -> Release:
if obj.type_name != b"tag":
raise ValueError("Argument is not a tag.")
tag = cast(Tag, obj)
target_type, target = tag.object
if tag.tagger:
author: Optional[Person] = parse_author(tag.tagger)
if not tag.tag_time:
date = None
else:
date = dulwich_tsinfo_to_timestamp(
tag.tag_time, tag.tag_timezone, tag._tag_timezone_neg_utc,
)
else:
author = date = None
message = tag.message
if tag.signature:
message += tag.signature
rel = Release(
id=tag.sha().digest(),
author=author,
date=date,
name=tag.name,
target=bytes.fromhex(target.decode()),
target_type=DULWICH_OBJECT_TYPES[target_type.type_name],
message=message,
metadata=None,
synthetic=False,
)
check_id(rel)
return rel

File Metadata

Mime Type
text/x-python
Expires
Fri, Jul 4, 11:00 AM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3296450

Event Timeline