Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/setup.py b/setup.py
index e6c9616..1ccd22d 100755
--- a/setup.py
+++ b/setup.py
@@ -1,70 +1,70 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from setuptools import setup, find_packages
-
-from os import path
from io import open
+from os import path
+
+from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
setup(
name="swh.dataset",
description="Software Heritage dataset tools",
long_description=long_description,
long_description_content_type="text/markdown",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/source/swh-dataset/",
packages=find_packages(), # packages's modules
install_requires=parse_requirements() + parse_requirements("swh"),
tests_require=parse_requirements("test"),
entry_points="""
[swh.cli.subcommands]
dataset=swh.dataset.cli:cli
""",
setup_requires=["vcversioner"],
extras_require={"testing": parse_requirements("test")},
vcversioner={},
include_package_data=True,
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": "https://forge.softwareheritage.org/source/swh-dataset",
"Documentation": "https://docs.softwareheritage.org/devel/swh-dataset/",
},
)
diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py
index 3e2946a..66863da 100644
--- a/swh/dataset/exporter.py
+++ b/swh/dataset/exporter.py
@@ -1,240 +1,241 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import concurrent.futures
+from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
import multiprocessing
import time
-import tqdm
from typing import Mapping, Sequence, Tuple
-from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
+
from confluent_kafka import TopicPartition
+import tqdm
from swh.journal.client import JournalClient
class JournalClientOffsetRanges(JournalClient):
"""
A subclass of JournalClient reading only inside some specific offset
range. Partition assignments have to be manually given to the class.
This client can only read a single topic at a time.
"""
def __init__(
self,
*args,
offset_ranges: Mapping[int, Tuple[int, int]] = None,
assignment: Sequence[int] = None,
progress_queue: multiprocessing.Queue = None,
refresh_every: int = 200,
**kwargs,
):
"""
Args:
offset_ranges: A mapping of partition_id -> (low, high) offsets
that define the boundaries of the messages to consume.
assignment: The list of partitions to assign to this client.
progress_queue: a multiprocessing.Queue where the current
progress will be reported.
refresh_every: the refreshing rate of the progress reporting.
"""
self.offset_ranges = offset_ranges
self.progress_queue = progress_queue
self.refresh_every = refresh_every
self.assignment = assignment
self.count = None
self.topic_name = None
super().__init__(*args, **kwargs)
def subscribe(self):
self.topic_name = self.subscription[0]
time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
self.consumer.assign(
[TopicPartition(self.topic_name, pid) for pid in self.assignment]
)
def process(self, *args, **kwargs):
self.count = 0
try:
self.handle_committed_offsets()
super().process(*args, **kwargs)
except EOFError:
pass
finally:
self.progress_queue.put(None)
def handle_committed_offsets(self,):
"""
Handle already committed partition offsets before starting processing.
"""
committed = self.consumer.committed(
[TopicPartition(self.topic_name, pid) for pid in self.assignment]
)
for tp in committed:
self.handle_offset(tp.partition, tp.offset)
def handle_offset(self, partition_id, offset):
"""
Check whether the client has reached the end of the current
partition, and trigger a reassignment if that is the case.
Raise EOFError if all the partitions have reached the end.
"""
if offset < 0: # Uninitialized partition offset
return
if self.count % self.refresh_every == 0:
self.progress_queue.put({partition_id: offset})
if offset >= self.offset_ranges[partition_id][1] - 1:
self.assignment = [pid for pid in self.assignment if pid != partition_id]
self.subscribe()
if not self.assignment:
raise EOFError
def deserialize_message(self, message):
"""
Override of the message deserialization to hook the handling of the
message offset.
"""
self.handle_offset(message.partition(), message.offset())
self.count += 1
return super().deserialize_message(message)
class ParallelExporter:
"""
Base class for all the Journal exporters.
Each exporter should override the `export_worker` function with an
implementation of how to run the message processing.
"""
def __init__(self, config, export_id: str, obj_type, processes=1):
"""
Args:
config: the exporter config, which should also include the
JournalClient configuration.
export_id: a unique identifier for the export that will be used
as part of a Kafka consumer group ID.
obj_type: The type of SWH object to export.
processes: The number of processes to run.
"""
self.config = config
self.export_id = "swh-dataset-export-{}".format(export_id)
self.obj_type = obj_type
self.processes = processes
self.offsets = None
def get_offsets(self):
"""
First pass to fetch all the current low and high offsets of each
partition to define the consumption boundaries.
"""
if self.offsets is None:
client = JournalClient(
**self.config["journal"],
object_types=[self.obj_type],
group_id=self.export_id,
)
topic_name = client.subscription[0]
topics = client.consumer.list_topics(topic_name).topics
partitions = topics[topic_name].partitions
self.offsets = {}
for partition_id in tqdm.tqdm(
partitions.keys(), desc=" - Partition offsets"
):
tp = TopicPartition(topic_name, partition_id)
(lo, hi) = client.consumer.get_watermark_offsets(tp)
self.offsets[partition_id] = (lo, hi)
return self.offsets
def run(self, *args):
"""
Run the parallel export.
"""
offsets = self.get_offsets()
to_assign = list(offsets.keys())
manager = multiprocessing.Manager()
q = manager.Queue()
with ProcessPoolExecutor(self.processes + 1) as pool:
futures = []
for i in range(self.processes):
futures.append(
pool.submit(
self.export_worker,
*args,
assignment=to_assign[i :: self.processes],
queue=q,
)
)
futures.append(pool.submit(self.progress_worker, queue=q))
concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
for f in futures:
if f.running():
continue
exc = f.exception()
if exc:
pool.shutdown(wait=False)
f.result()
raise exc
def progress_worker(self, *args, queue=None):
"""
An additional worker process that reports the current progress of the
export between all the different parallel consumers and across all the
partitions, by consuming the shared progress reporting Queue.
"""
d = {}
active_workers = self.processes
offset_diff = sum((hi - lo) for lo, hi in self.offsets.values())
with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar:
while active_workers:
item = queue.get()
if item is None:
active_workers -= 1
continue
d.update(item)
progress = sum(n - self.offsets[p][0] for p, n in d.items())
pbar.set_postfix(
active_workers=active_workers, total_workers=self.processes
)
pbar.update(progress - pbar.n)
def process(self, callback, assignment=None, queue=None):
client = JournalClientOffsetRanges(
**self.config["journal"],
object_types=[self.obj_type],
group_id=self.export_id,
debug="cgrp,broker",
offset_ranges=self.offsets,
assignment=assignment,
progress_queue=queue,
**{"message.max.bytes": str(500 * 1024 * 1024)},
)
client.process(callback)
def export_worker(self, *args, **kwargs):
"""
Override this with a custom implementation of a worker function.
A worker function should call `self.process(fn, **kwargs)` with `fn`
being a callback that will be called in the same fashion as with
`JournalClient.process()`.
A simple exporter to print all the objects in the log would look like
this:
```
class PrintExporter(ParallelExporter):
def export_worker(self, **kwargs):
self.process(print, **kwargs)
```
"""
raise NotImplementedError
diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py
index c3c30bd..35463ed 100644
--- a/swh/dataset/graph.py
+++ b/swh/dataset/graph.py
@@ -1,241 +1,241 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import contextlib
import functools
import os
import os.path
import pathlib
import shlex
import subprocess
import tempfile
import uuid
from swh.dataset.exporter import ParallelExporter
-from swh.dataset.utils import ZSTFile, SQLiteSet
+from swh.dataset.utils import SQLiteSet, ZSTFile
from swh.model.identifiers import origin_identifier, persistent_identifier
from swh.storage.fixer import fix_objects
def process_messages(messages, config, node_writer, edge_writer, node_set):
"""
Args:
messages: A sequence of messages to process
config: The exporter configuration
node_writer: A file-like object where to write nodes
edge_writer: A file-like object where to write edges
"""
def write_node(node):
node_type, node_id = node
if node_id is None:
return
node_pid = persistent_identifier(object_type=node_type, object_id=node_id)
node_writer.write("{}\n".format(node_pid))
def write_edge(src, dst):
src_type, src_id = src
dst_type, dst_id = dst
if src_id is None or dst_id is None:
return
src_pid = persistent_identifier(object_type=src_type, object_id=src_id)
dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id)
edge_writer.write("{} {}\n".format(src_pid, dst_pid))
messages = {k: fix_objects(k, v) for k, v in messages.items()}
for visit in messages.get("origin_visit", []):
origin_id = origin_identifier({"url": visit["origin"]})
visit_id = visit["visit"]
if not node_set.add("{}:{}".format(origin_id, visit_id).encode()):
continue
write_node(("origin", origin_id))
write_edge(("origin", origin_id), ("snapshot", visit["snapshot"]))
for snapshot in messages.get("snapshot", []):
if not node_set.add(snapshot["id"]):
continue
write_node(("snapshot", snapshot["id"]))
for branch_name, branch in snapshot["branches"].items():
while branch and branch.get("target_type") == "alias":
branch_name = branch["target"]
branch = snapshot["branches"][branch_name]
if branch is None or not branch_name:
continue
# Heuristic to filter out pull requests in snapshots: remove all
# branches that start with refs/ but do not start with refs/heads or
# refs/tags.
if config.get("remove_pull_requests") and (
branch_name.startswith(b"refs/")
and not (
branch_name.startswith(b"refs/heads")
or branch_name.startswith(b"refs/tags")
)
):
continue
write_edge(
("snapshot", snapshot["id"]), (branch["target_type"], branch["target"])
)
for release in messages.get("release", []):
if not node_set.add(release["id"]):
continue
write_node(("release", release["id"]))
write_edge(
("release", release["id"]), (release["target_type"], release["target"])
)
for revision in messages.get("revision", []):
if not node_set.add(revision["id"]):
continue
write_node(("revision", revision["id"]))
write_edge(("revision", revision["id"]), ("directory", revision["directory"]))
for parent in revision["parents"]:
write_edge(("revision", revision["id"]), ("revision", parent))
for directory in messages.get("directory", []):
if not node_set.add(directory["id"]):
continue
write_node(("directory", directory["id"]))
for entry in directory["entries"]:
entry_type_mapping = {
"file": "content",
"dir": "directory",
"rev": "revision",
}
write_edge(
("directory", directory["id"]),
(entry_type_mapping[entry["type"]], entry["target"]),
)
for content in messages.get("content", []):
if not node_set.add(content["sha1_git"]):
continue
write_node(("content", content["sha1_git"]))
class GraphEdgeExporter(ParallelExporter):
"""
Implementation of ParallelExporter which writes all the graph edges
of a specific type in a Zstandard-compressed CSV file.
Each row of the CSV is in the format: `<SRC PID> <DST PID>
"""
def export_worker(self, export_path, **kwargs):
dataset_path = pathlib.Path(export_path)
dataset_path.mkdir(exist_ok=True, parents=True)
unique_id = str(uuid.uuid4())
nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id))
edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id))
node_set_file = dataset_path / (".set-nodes-{}.sqlite3".format(unique_id))
with contextlib.ExitStack() as stack:
node_writer = stack.enter_context(ZSTFile(nodes_file, "w"))
edge_writer = stack.enter_context(ZSTFile(edges_file, "w"))
node_set = stack.enter_context(SQLiteSet(node_set_file))
process_fn = functools.partial(
process_messages,
config=self.config,
node_writer=node_writer,
edge_writer=edge_writer,
node_set=node_set,
)
self.process(process_fn, **kwargs)
def export_edges(config, export_path, export_id, processes):
"""Run the edge exporter for each edge type."""
object_types = [
"origin_visit",
"snapshot",
"release",
"revision",
"directory",
"content",
]
for obj_type in object_types:
print("{} edges:".format(obj_type))
exporter = GraphEdgeExporter(config, export_id, obj_type, processes)
exporter.run(os.path.join(export_path, obj_type))
def sort_graph_nodes(export_path, config):
"""
Generate the node list from the edges files.
We cannot solely rely on the object IDs that are read in the journal,
as some nodes that are referred to as destinations in the edge file
might not be present in the archive (e.g a rev_entry referring to a
revision that we do not have crawled yet).
The most efficient way of getting all the nodes that are mentioned in
the edges file is therefore to use sort(1) on the gigantic edge files
to get all the unique node IDs, while using the disk as a temporary
buffer.
This pipeline does, in order:
- concatenate and write all the compressed edges files in
graph.edges.csv.zst (using the fact that ZST compression is an additive
function) ;
- deflate the edges ;
- count the number of edges and write it in graph.edges.count.txt ;
- count the number of occurrences of each edge type and write them
in graph.edges.stats.txt ;
- concatenate all the (deflated) nodes from the export with the
destination edges, and sort the output to get the list of unique graph
nodes ;
- count the number of unique graph nodes and write it in
graph.nodes.count.txt ;
- count the number of occurrences of each node type and write them
in graph.nodes.stats.txt ;
- compress and write the resulting nodes in graph.nodes.csv.zst.
"""
# Use awk as a replacement of `sort | uniq -c` to avoid buffering everything
# in memory
counter_command = "awk '{ t[$0]++ } END { for (i in t) print i,t[i] }'"
# Use bytes for the sorting algorithm (faster than being locale-specific)
env = {
**os.environ.copy(),
"LC_ALL": "C",
"LC_COLLATE": "C",
"LANG": "C",
}
sort_buffer_size = config.get("sort_buffer_size", "4G")
disk_buffer_dir = config.get("disk_buffer_dir", export_path)
with tempfile.TemporaryDirectory(
prefix=".graph_node_sort_", dir=disk_buffer_dir
) as buffer_path:
subprocess.run(
[
"bash",
"-c",
(
"pv {export_path}/*/*.edges.csv.zst | "
"tee {export_path}/graph.edges.csv.zst |"
"zstdcat |"
"tee >( wc -l > {export_path}/graph.edges.count.txt ) |"
"tee >( cut -d: -f3,6 | {counter_command} | sort "
" > {export_path}/graph.edges.stats.txt ) |"
"cut -d' ' -f2 | "
"cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | "
"sort -u -S{sort_buffer_size} -T{buffer_path} | "
"tee >( wc -l > {export_path}/graph.nodes.count.txt ) |"
"tee >( cut -d: -f3 | {counter_command} | sort "
" > {export_path}/graph.nodes.stats.txt ) |"
"zstdmt > {export_path}/graph.nodes.csv.zst"
).format(
export_path=shlex.quote(str(export_path)),
buffer_path=shlex.quote(str(buffer_path)),
sort_buffer_size=shlex.quote(sort_buffer_size),
counter_command=counter_command,
),
],
env=env,
)
diff --git a/swh/dataset/test/test_graph.py b/swh/dataset/test/test_graph.py
index 072091b..55b7bdf 100644
--- a/swh/dataset/test/test_graph.py
+++ b/swh/dataset/test/test_graph.py
@@ -1,516 +1,516 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
import hashlib
from typing import Tuple
+from unittest.mock import Mock, call
import pytest
-from unittest.mock import Mock, call
from swh.dataset.graph import process_messages, sort_graph_nodes
from swh.dataset.utils import ZSTFile
from swh.model.hashutil import MultiHash, hash_to_bytes
DATE = {
"timestamp": {"seconds": 1234567891, "microseconds": 0},
"offset": 120,
"negative_utc": False,
}
TEST_CONTENT = {
**MultiHash.from_data(b"foo").digest(),
"length": 3,
"status": "visible",
}
TEST_REVISION = {
"id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
"message": b"hello",
"date": DATE,
"committer": {"fullname": b"foo", "name": b"foo", "email": b""},
"author": {"fullname": b"foo", "name": b"foo", "email": b""},
"committer_date": DATE,
"type": "git",
"directory": b"\x01" * 20,
"synthetic": False,
"metadata": None,
"parents": [],
}
TEST_RELEASE = {
"id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
"name": b"v0.0.1",
"date": {
"timestamp": {"seconds": 1234567890, "microseconds": 0,},
"offset": 120,
"negative_utc": False,
},
"author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}},
"target_type": "revision",
"target": b"\x04" * 20,
"message": b"foo",
"synthetic": False,
}
TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"}
TEST_ORIGIN_2 = {"url": "https://somewhere.org/den/fox/2"}
TEST_ORIGIN_VISIT = {
"origin": TEST_ORIGIN["url"],
"visit": 1,
"date": "2013-05-07 04:20:39.369271+00:00",
"snapshot": None, # TODO
"status": "ongoing", # TODO
"metadata": {"foo": "bar"},
"type": "git",
}
class FakeDiskSet(set):
"""
A set with an add() method that returns whether the item has been added
or was already there. Used to replace SQLiteSet in unittests.
"""
def add(self, v):
assert isinstance(v, bytes)
r = True
if v in self:
r = False
super().add(v)
return r
@pytest.fixture
def exporter():
def wrapped(messages, config=None) -> Tuple[Mock, Mock]:
if config is None:
config = {}
node_writer = Mock()
edge_writer = Mock()
node_set = FakeDiskSet()
process_messages(
messages,
config=config,
node_writer=node_writer,
edge_writer=edge_writer,
node_set=node_set,
)
return node_writer.write, edge_writer.write
return wrapped
def binhash(s):
return hashlib.sha1(s.encode()).digest()
def hexhash(s):
return hashlib.sha1(s.encode()).hexdigest()
def test_export_origin_visits(exporter):
node_writer, edge_writer = exporter(
{
"origin_visit": [
{
**TEST_ORIGIN_VISIT,
"origin": {"url": "ori1"},
"snapshot": binhash("snp1"),
},
{
**TEST_ORIGIN_VISIT,
"origin": {"url": "ori2"},
"snapshot": binhash("snp2"),
},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:ori:{hexhash('ori1')}\n"),
call(f"swh:1:ori:{hexhash('ori2')}\n"),
]
assert edge_writer.mock_calls == [
call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"),
call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"),
]
def test_export_snapshot_simple(exporter):
node_writer, edge_writer = exporter(
{
"snapshot": [
{
"id": binhash("snp1"),
"branches": {
b"refs/heads/master": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"HEAD": {"target": binhash("rev1"), "target_type": "revision"},
},
},
{
"id": binhash("snp2"),
"branches": {
b"refs/heads/master": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"HEAD": {"target": binhash("rev2"), "target_type": "revision"},
b"bcnt": {"target": binhash("cnt1"), "target_type": "content"},
b"bdir": {
"target": binhash("dir1"),
"target_type": "directory",
},
b"brel": {"target": binhash("rel1"), "target_type": "release"},
b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"},
},
},
{"id": binhash("snp3"), "branches": {}},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:snp:{hexhash('snp1')}\n"),
call(f"swh:1:snp:{hexhash('snp2')}\n"),
call(f"swh:1:snp:{hexhash('snp3')}\n"),
]
assert edge_writer.mock_calls == [
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}\n"),
call(f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}\n"),
call(f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}\n"),
call(f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}\n"),
call(f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}\n"),
]
def test_export_snapshot_aliases(exporter):
node_writer, edge_writer = exporter(
{
"snapshot": [
{
"id": binhash("snp1"),
"branches": {
b"origin_branch": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"alias1": {"target": b"origin_branch", "target_type": "alias"},
b"alias2": {"target": b"alias1", "target_type": "alias"},
b"alias3": {"target": b"alias2", "target_type": "alias"},
},
},
]
}
)
assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")]
assert edge_writer.mock_calls == (
[call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n")] * 4
)
def test_export_snapshot_no_pull_requests(exporter):
snp = {
"id": binhash("snp1"),
"branches": {
b"refs/heads/master": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"},
b"refs/merge-requests/lol": {
"target": binhash("rev3"),
"target_type": "revision",
},
b"refs/tags/v1.0.0": {
"target": binhash("rev4"),
"target_type": "revision",
},
b"refs/patch/123456abc": {
"target": binhash("rev5"),
"target_type": "revision",
},
},
}
node_writer, edge_writer = exporter({"snapshot": [snp]})
assert edge_writer.mock_calls == [
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}\n"),
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}\n"),
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}\n"),
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev5')}\n"),
]
node_writer, edge_writer = exporter(
{"snapshot": [snp]}, config={"remove_pull_requests": True}
)
assert edge_writer.mock_calls == [
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}\n"),
]
def test_export_releases(exporter):
node_writer, edge_writer = exporter(
{
"release": [
{
**TEST_RELEASE,
"id": binhash("rel1"),
"target": binhash("rev1"),
"target_type": "revision",
},
{
**TEST_RELEASE,
"id": binhash("rel2"),
"target": binhash("rel1"),
"target_type": "release",
},
{
**TEST_RELEASE,
"id": binhash("rel3"),
"target": binhash("dir1"),
"target_type": "directory",
},
{
**TEST_RELEASE,
"id": binhash("rel4"),
"target": binhash("cnt1"),
"target_type": "content",
},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:rel:{hexhash('rel1')}\n"),
call(f"swh:1:rel:{hexhash('rel2')}\n"),
call(f"swh:1:rel:{hexhash('rel3')}\n"),
call(f"swh:1:rel:{hexhash('rel4')}\n"),
]
assert edge_writer.mock_calls == [
call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"),
call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"),
call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"),
]
def test_export_revision(exporter):
node_writer, edge_writer = exporter(
{
"revision": [
{
**TEST_REVISION,
"id": binhash("rev1"),
"directory": binhash("dir1"),
"parents": [binhash("rev2"), binhash("rev3"),],
},
{
**TEST_REVISION,
"id": binhash("rev2"),
"directory": binhash("dir2"),
"parents": [],
},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:rev:{hexhash('rev2')}\n"),
]
assert edge_writer.mock_calls == [
call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"),
call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"),
call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"),
call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"),
]
def test_export_directory(exporter):
node_writer, edge_writer = exporter(
{
"directory": [
{
"id": binhash("dir1"),
"entries": [
{"type": "file", "target": binhash("cnt1")},
{"type": "dir", "target": binhash("dir2")},
{"type": "rev", "target": binhash("rev1")},
],
},
{"id": binhash("dir2"), "entries": [],},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:dir:{hexhash('dir1')}\n"),
call(f"swh:1:dir:{hexhash('dir2')}\n"),
]
assert edge_writer.mock_calls == [
call(f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}\n"),
call(f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}\n"),
call(f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}\n"),
]
def test_export_content(exporter):
node_writer, edge_writer = exporter(
{
"content": [
{**TEST_CONTENT, "sha1_git": binhash("cnt1"),},
{**TEST_CONTENT, "sha1_git": binhash("cnt2"),},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:cnt:{hexhash('cnt1')}\n"),
call(f"swh:1:cnt:{hexhash('cnt2')}\n"),
]
assert edge_writer.mock_calls == []
def test_export_duplicate_node(exporter):
node_writer, edge_writer = exporter(
{
"content": [
{**TEST_CONTENT, "sha1_git": binhash("cnt1")},
{**TEST_CONTENT, "sha1_git": binhash("cnt1")},
{**TEST_CONTENT, "sha1_git": binhash("cnt1")},
],
},
)
assert node_writer.mock_calls == [
call(f"swh:1:cnt:{hexhash('cnt1')}\n"),
]
assert edge_writer.mock_calls == []
def test_export_duplicate_visit(exporter):
node_writer, edge_writer = exporter(
{
"origin_visit": [
{**TEST_ORIGIN_VISIT, "origin": {"url": "ori1"}, "visit": 1},
{**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 1},
{**TEST_ORIGIN_VISIT, "origin": {"url": "ori1"}, "visit": 1},
{**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 1},
{**TEST_ORIGIN_VISIT, "origin": {"url": "ori1"}, "visit": 2},
{**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 2},
{**TEST_ORIGIN_VISIT, "origin": {"url": "ori2"}, "visit": 2},
],
},
)
assert node_writer.mock_calls == [
call(f"swh:1:ori:{hexhash('ori1')}\n"),
call(f"swh:1:ori:{hexhash('ori2')}\n"),
call(f"swh:1:ori:{hexhash('ori1')}\n"),
call(f"swh:1:ori:{hexhash('ori2')}\n"),
]
assert edge_writer.mock_calls == []
def zstwrite(fp, lines):
with ZSTFile(fp, "w") as writer:
- for l in lines:
- writer.write(l + "\n")
+ for line in lines:
+ writer.write(line + "\n")
def zstread(fp):
with ZSTFile(fp, "r") as reader:
return reader.read()
def test_sort_pipeline(tmp_path):
short_type_mapping = {
"origin_visit": "ori",
"snapshot": "snp",
"release": "rel",
"revision": "rev",
"directory": "dir",
"content": "cnt",
}
input_nodes = [
f"swh:1:{short}:{hexhash(short + str(x))}"
for short in short_type_mapping.values()
for x in range(4)
]
input_edges = [
f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}",
f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}",
f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}",
f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')}",
f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')}",
f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}",
f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}",
f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}",
f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}",
f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}",
f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest
f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}",
f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}",
f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}",
f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')}",
f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}",
]
for obj_type, short_obj_type in short_type_mapping.items():
p = tmp_path / obj_type
p.mkdir()
edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")]
zstwrite(p / "00.edges.csv.zst", edges[0::2])
zstwrite(p / "01.edges.csv.zst", edges[1::2])
nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")]
zstwrite(p / "00.nodes.csv.zst", nodes[0::2])
zstwrite(p / "01.nodes.csv.zst", nodes[1::2])
sort_graph_nodes(tmp_path, config={"sort_buffer_size": "1M"})
output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n")
output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n")
output_nodes = list(filter(bool, output_nodes))
output_edges = list(filter(bool, output_edges))
expected_nodes = set(input_nodes) | set(e.split()[1] for e in input_edges)
assert output_nodes == sorted(expected_nodes)
assert int((tmp_path / "graph.nodes.count.txt").read_text()) == len(expected_nodes)
assert sorted(output_edges) == sorted(input_edges)
assert int((tmp_path / "graph.edges.count.txt").read_text()) == len(input_edges)
actual_node_stats = (tmp_path / "graph.nodes.stats.txt").read_text().strip()
expected_node_stats = "\n".join(
sorted(
"{} {}".format(k, v)
for k, v in collections.Counter(
node.split(":")[2] for node in expected_nodes
).items()
)
)
assert actual_node_stats == expected_node_stats
actual_edge_stats = (tmp_path / "graph.edges.stats.txt").read_text().strip()
expected_edge_stats = "\n".join(
sorted(
"{} {}".format(k, v)
for k, v in collections.Counter(
"{}:{}".format(edge.split(":")[2], edge.split(":")[5])
for edge in input_edges
).items()
)
)
assert actual_edge_stats == expected_edge_stats
diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py
index 0e88170..f0e6e7b 100644
--- a/swh/dataset/utils.py
+++ b/swh/dataset/utils.py
@@ -1,81 +1,81 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import subprocess
-import sqlite3
import os
+import sqlite3
+import subprocess
class ZSTFile:
"""
Object-like wrapper around a ZST file. Uses a subprocess of the "zstd"
command to compress and deflate the objects.
"""
def __init__(self, path, mode="r"):
if mode not in ("r", "rb", "w", "wb"):
raise ValueError(f"ZSTFile mode {mode} is invalid.")
self.path = path
self.mode = mode
def __enter__(self):
is_text = not (self.mode in ("rb", "wb"))
writing = self.mode in ("w", "wb")
if writing:
cmd = ["zstd", "-q", "-o", self.path]
else:
cmd = ["zstdcat", self.path]
self.process = subprocess.Popen(
cmd, text=is_text, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
)
return self
def __exit__(self, exc_type, exc_value, tb):
self.process.stdin.close()
self.process.stdout.close()
self.process.wait()
def read(self, *args):
return self.process.stdout.read(*args)
def write(self, buf):
self.process.stdin.write(buf)
class SQLiteSet:
"""
On-disk Set object for hashes using SQLite as an indexer backend. Used to
deduplicate objects when processing large queues with duplicates.
"""
def __init__(self, db_path: os.PathLike):
self.db_path = db_path
def __enter__(self):
self.db = sqlite3.connect(str(self.db_path))
self.db.execute(
"CREATE TABLE tmpset (val TEXT NOT NULL PRIMARY KEY) WITHOUT ROWID"
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.db.close()
def add(self, v: bytes) -> bool:
"""
Add an item to the set.
Args:
v: The value to add to the set.
Returns:
True if the value was added to the set, False if it was already present.
"""
try:
self.db.execute("INSERT INTO tmpset(val) VALUES (?)", (v.hex(),))
except sqlite3.IntegrityError:
return False
else:
return True

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:49 PM (2 w, 5 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3302204

Event Timeline