Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/graph.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | |||||
import functools | import functools | ||||
import os | import os | ||||
import os.path | import os.path | ||||
import pathlib | import pathlib | ||||
import shlex | import shlex | ||||
import subprocess | import subprocess | ||||
import tempfile | import tempfile | ||||
import uuid | import uuid | ||||
from swh.dataset.exporter import ParallelExporter | from swh.dataset.exporter import ParallelExporter | ||||
from swh.dataset.utils import ZSTFile | from swh.dataset.utils import ZSTFile, SQLiteSet | ||||
from swh.model.identifiers import origin_identifier, persistent_identifier | from swh.model.identifiers import origin_identifier, persistent_identifier | ||||
from swh.storage.fixer import fix_objects | from swh.storage.fixer import fix_objects | ||||
def process_messages(messages, config, node_writer, edge_writer): | def process_messages(messages, config, node_writer, edge_writer, node_set): | ||||
""" | """ | ||||
Args: | Args: | ||||
messages: A sequence of messages to process | messages: A sequence of messages to process | ||||
config: The exporter configuration | config: The exporter configuration | ||||
node_writer: A file-like object where to write nodes | node_writer: A file-like object where to write nodes | ||||
edge_writer: A file-like object where to write edges | edge_writer: A file-like object where to write edges | ||||
""" | """ | ||||
Show All 10 Lines | def write_edge(src, dst): | ||||
if src_id is None or dst_id is None: | if src_id is None or dst_id is None: | ||||
return | return | ||||
src_pid = persistent_identifier(object_type=src_type, object_id=src_id) | src_pid = persistent_identifier(object_type=src_type, object_id=src_id) | ||||
dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id) | dst_pid = persistent_identifier(object_type=dst_type, object_id=dst_id) | ||||
edge_writer.write("{} {}\n".format(src_pid, dst_pid)) | edge_writer.write("{} {}\n".format(src_pid, dst_pid)) | ||||
messages = {k: fix_objects(k, v) for k, v in messages.items()} | messages = {k: fix_objects(k, v) for k, v in messages.items()} | ||||
for visit in messages.get("origin_visit", []): | for visit in messages.get("origin_visit", []): | ||||
origin_id = origin_identifier({"url": visit["origin"]}) | origin_id = origin_identifier({"url": visit["origin"]}) | ||||
if not node_set.add(origin_id): | |||||
continue | |||||
write_node(("origin", origin_id)) | write_node(("origin", origin_id)) | ||||
olasd: I think you need origin and the visit id here, or you'll only get one visit per origin | |||||
Not Done Inline ActionsAnd you probably need to filter visits out to only keep the ones whose states are "final" olasd: And you probably need to filter visits out to only keep the ones whose states are "final" | |||||
Done Inline ActionsGood catch for the visit ID, thanks! seirl: Good catch for the visit ID, thanks! | |||||
write_edge(("origin", origin_id), ("snapshot", visit["snapshot"])) | write_edge(("origin", origin_id), ("snapshot", visit["snapshot"])) | ||||
for snapshot in messages.get("snapshot", []): | for snapshot in messages.get("snapshot", []): | ||||
if not node_set.add(snapshot["id"]): | |||||
continue | |||||
write_node(("snapshot", snapshot["id"])) | write_node(("snapshot", snapshot["id"])) | ||||
for branch_name, branch in snapshot["branches"].items(): | for branch_name, branch in snapshot["branches"].items(): | ||||
while branch and branch.get("target_type") == "alias": | while branch and branch.get("target_type") == "alias": | ||||
branch_name = branch["target"] | branch_name = branch["target"] | ||||
branch = snapshot["branches"][branch_name] | branch = snapshot["branches"][branch_name] | ||||
if branch is None or not branch_name: | if branch is None or not branch_name: | ||||
continue | continue | ||||
if config.get("remove_pull_requests") and ( | if config.get("remove_pull_requests") and ( | ||||
branch_name.startswith(b"refs/pull") | branch_name.startswith(b"refs/pull") | ||||
or branch_name.startswith(b"refs/merge-requests") | or branch_name.startswith(b"refs/merge-requests") | ||||
): | ): | ||||
continue | continue | ||||
write_edge( | write_edge( | ||||
("snapshot", snapshot["id"]), (branch["target_type"], branch["target"]) | ("snapshot", snapshot["id"]), (branch["target_type"], branch["target"]) | ||||
) | ) | ||||
for release in messages.get("release", []): | for release in messages.get("release", []): | ||||
if not node_set.add(release["id"]): | |||||
continue | |||||
write_node(("release", release["id"])) | write_node(("release", release["id"])) | ||||
write_edge( | write_edge( | ||||
("release", release["id"]), (release["target_type"], release["target"]) | ("release", release["id"]), (release["target_type"], release["target"]) | ||||
) | ) | ||||
for revision in messages.get("revision", []): | for revision in messages.get("revision", []): | ||||
if not node_set.add(revision["id"]): | |||||
continue | |||||
write_node(("revision", revision["id"])) | write_node(("revision", revision["id"])) | ||||
write_edge(("revision", revision["id"]), ("directory", revision["directory"])) | write_edge(("revision", revision["id"]), ("directory", revision["directory"])) | ||||
for parent in revision["parents"]: | for parent in revision["parents"]: | ||||
write_edge(("revision", revision["id"]), ("revision", parent)) | write_edge(("revision", revision["id"]), ("revision", parent)) | ||||
for directory in messages.get("directory", []): | for directory in messages.get("directory", []): | ||||
if not node_set.add(directory["id"]): | |||||
continue | |||||
write_node(("directory", directory["id"])) | write_node(("directory", directory["id"])) | ||||
for entry in directory["entries"]: | for entry in directory["entries"]: | ||||
entry_type_mapping = { | entry_type_mapping = { | ||||
"file": "content", | "file": "content", | ||||
"dir": "directory", | "dir": "directory", | ||||
"rev": "revision", | "rev": "revision", | ||||
} | } | ||||
write_edge( | write_edge( | ||||
("directory", directory["id"]), | ("directory", directory["id"]), | ||||
(entry_type_mapping[entry["type"]], entry["target"]), | (entry_type_mapping[entry["type"]], entry["target"]), | ||||
) | ) | ||||
for content in messages.get("content", []): | for content in messages.get("content", []): | ||||
if not node_set.add(content["sha1_git"]): | |||||
continue | |||||
write_node(("content", content["sha1_git"])) | write_node(("content", content["sha1_git"])) | ||||
class GraphEdgeExporter(ParallelExporter): | class GraphEdgeExporter(ParallelExporter): | ||||
""" | """ | ||||
Implementation of ParallelExporter which writes all the graph edges | Implementation of ParallelExporter which writes all the graph edges | ||||
of a specific type in a Zstandard-compressed CSV file. | of a specific type in a Zstandard-compressed CSV file. | ||||
Each row of the CSV is in the format: `<SRC PID> <DST PID> | Each row of the CSV is in the format: `<SRC PID> <DST PID> | ||||
""" | """ | ||||
def export_worker(self, export_path, **kwargs): | def export_worker(self, export_path, **kwargs): | ||||
dataset_path = pathlib.Path(export_path) | dataset_path = pathlib.Path(export_path) | ||||
dataset_path.mkdir(exist_ok=True, parents=True) | dataset_path.mkdir(exist_ok=True, parents=True) | ||||
nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(str(uuid.uuid4()))) | unique_id = str(uuid.uuid4()) | ||||
edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(str(uuid.uuid4()))) | nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) | ||||
edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) | |||||
node_set_file = dataset_path / (".set-nodes-{}.sqlite3".format(unique_id)) | |||||
with contextlib.ExitStack() as stack: | |||||
nodes_writer = stack.enter_context(ZSTFile(nodes_file, "w")) | |||||
edges_writer = stack.enter_context(ZSTFile(edges_file, "w")) | |||||
node_set = stack.enter_context(SQLiteSet(node_set_file)) | |||||
with ZSTFile(nodes_file, "w") as nodes_writer, ZSTFile( | |||||
edges_file, "w" | |||||
) as edges_writer: | |||||
process_fn = functools.partial( | process_fn = functools.partial( | ||||
process_messages, | process_messages, | ||||
config=self.config, | config=self.config, | ||||
nodes_writer=nodes_writer, | nodes_writer=nodes_writer, | ||||
edges_writer=edges_writer, | edges_writer=edges_writer, | ||||
node_set=node_set, | |||||
) | ) | ||||
self.process(process_fn, **kwargs) | self.process(process_fn, **kwargs) | ||||
def export_edges(config, export_path, export_id, processes): | def export_edges(config, export_path, export_id, processes): | ||||
"""Run the edge exporter for each edge type.""" | """Run the edge exporter for each edge type.""" | ||||
object_types = [ | object_types = [ | ||||
"origin_visit", | "origin_visit", | ||||
▲ Show 20 Lines • Show All 73 Lines • Show Last 20 Lines |
I think you need origin and the visit id here, or you'll only get one visit per origin