diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py index b0783f9..07ca8c6 100644 --- a/swh/dataset/exporter.py +++ b/swh/dataset/exporter.py @@ -1,66 +1,75 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import pathlib from types import TracebackType from typing import Any, Dict, Optional, Type +import uuid class Exporter: """ Base class for all the exporters. Each export can have multiple exporters, so we can read the journal a single time, then export the objects we read in different formats without having to re-read them every time. Override this class with the behavior for an export in a specific export format. You have to overwrite process_object() to make it write to the appropriate export files. You can also put setup and teardown logic in __enter__ and __exit__, and it will be called automatically. """ def __init__( self, config: Dict[str, Any], export_path, *args: Any, **kwargs: Any ) -> None: self.config: Dict[str, Any] = config self.export_path = pathlib.Path(export_path) self.exit_stack = contextlib.ExitStack() def __enter__(self) -> "Exporter": self.export_path.mkdir(exist_ok=True, parents=True) self.exit_stack.__enter__() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Optional[bool]: return self.exit_stack.__exit__(exc_type, exc_value, traceback) def process_object(self, object_type: str, obj: Dict[str, Any]) -> None: """ Process a SWH object to export. Override this with your custom exporter. """ raise NotImplementedError + def get_unique_file_id(self) -> str: + """ + Return a unique random file id for the current process. + + If config['test_unique_file_id'] is set, it will be used instead. + """ + return str(self.config.get("test_unique_file_id", uuid.uuid4())) + class ExporterDispatch(Exporter): """ Like Exporter, but dispatches each object type to a different function (e.g you can override `process_origin(self, object)` to process origins.) """ def process_object(self, object_type: str, obj: Dict[str, Any]) -> None: method_name = "process_" + object_type if hasattr(self, method_name): getattr(self, method_name)(obj) diff --git a/swh/dataset/exporters/edges.py b/swh/dataset/exporters/edges.py index 1d82d32..440d67c 100644 --- a/swh/dataset/exporters/edges.py +++ b/swh/dataset/exporters/edges.py @@ -1,230 +1,229 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import os import os.path import shlex import subprocess import tempfile from typing import Tuple -import uuid from swh.dataset.exporter import ExporterDispatch from swh.dataset.utils import ZSTFile, remove_pull_requests from swh.model.hashutil import hash_to_hex from swh.model.model import Origin from swh.model.swhids import ExtendedObjectType def swhid(object_type, object_id): # We use string interpolation here instead of using ExtendedSWHID to format, # as building temporary ExtendedSWHID objects has a non-negligeable impact # on performance. return f"swh:1:{object_type.value}:{hash_to_hex(object_id)}" class GraphEdgesExporter(ExporterDispatch): """ Implementation of an exporter which writes all the graph edges of a specific type to a Zstandard-compressed CSV file. Each row of the CSV is in the format: `` ``. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.writers = {} def get_writers_for(self, obj_type: ExtendedObjectType): if obj_type not in self.writers: dataset_path = self.export_path / obj_type.name.lower() dataset_path.mkdir(exist_ok=True) - unique_id = str(uuid.uuid4()) + unique_id = self.get_unique_file_id() nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id)) edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id)) node_writer = self.exit_stack.enter_context(ZSTFile(str(nodes_file), "w")) edge_writer = self.exit_stack.enter_context(ZSTFile(str(edges_file), "w")) self.writers[obj_type] = (node_writer, edge_writer) return self.writers[obj_type] def get_node_writer_for(self, obj_type: ExtendedObjectType): return self.get_writers_for(obj_type)[0] def get_edge_writer_for(self, obj_type: ExtendedObjectType): return self.get_writers_for(obj_type)[1] def write_node(self, node: Tuple[ExtendedObjectType, bytes]): node_type, node_id = node if node_id is None: return node_swhid = swhid(object_type=node_type, object_id=node_id) node_writer = self.get_node_writer_for(node_type) node_writer.write("{}\n".format(node_swhid)) def write_edge( self, src: Tuple[ExtendedObjectType, bytes], dst: Tuple[ExtendedObjectType, bytes], *, labels=None, ): src_type, src_id = src dst_type, dst_id = dst if src_id is None or dst_id is None: return src_swhid = swhid(object_type=src_type, object_id=src_id) dst_swhid = swhid(object_type=dst_type, object_id=dst_id) edge_line = " ".join([src_swhid, dst_swhid] + (labels if labels else [])) edge_writer = self.get_edge_writer_for(src_type) edge_writer.write("{}\n".format(edge_line)) def process_origin(self, origin): origin_id = Origin(url=origin["url"]).id self.write_node((ExtendedObjectType.ORIGIN, origin_id)) def process_origin_visit_status(self, visit_status): origin_id = Origin(url=visit_status["origin"]).id self.write_edge( (ExtendedObjectType.ORIGIN, origin_id), (ExtendedObjectType.SNAPSHOT, visit_status["snapshot"]), ) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): remove_pull_requests(snapshot) self.write_node((ExtendedObjectType.SNAPSHOT, snapshot["id"])) for branch_name, branch in snapshot["branches"].items(): original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] branch = snapshot["branches"].get(branch_name) if branch is None or not branch_name: continue self.write_edge( (ExtendedObjectType.SNAPSHOT, snapshot["id"]), (ExtendedObjectType[branch["target_type"].upper()], branch["target"]), labels=[base64.b64encode(original_branch_name).decode(),], ) def process_release(self, release): self.write_node((ExtendedObjectType.RELEASE, release["id"])) self.write_edge( (ExtendedObjectType.RELEASE, release["id"]), (ExtendedObjectType[release["target_type"].upper()], release["target"]), ) def process_revision(self, revision): self.write_node((ExtendedObjectType.REVISION, revision["id"])) self.write_edge( (ExtendedObjectType.REVISION, revision["id"]), (ExtendedObjectType.DIRECTORY, revision["directory"]), ) for parent in revision["parents"]: self.write_edge( (ExtendedObjectType.REVISION, revision["id"]), (ExtendedObjectType.REVISION, parent), ) def process_directory(self, directory): self.write_node((ExtendedObjectType.DIRECTORY, directory["id"])) for entry in directory["entries"]: entry_type_mapping = { "file": ExtendedObjectType.CONTENT, "dir": ExtendedObjectType.DIRECTORY, "rev": ExtendedObjectType.REVISION, } self.write_edge( (ExtendedObjectType.DIRECTORY, directory["id"]), (entry_type_mapping[entry["type"]], entry["target"]), labels=[base64.b64encode(entry["name"]).decode(), str(entry["perms"])], ) def process_content(self, content): self.write_node((ExtendedObjectType.CONTENT, content["sha1_git"])) def sort_graph_nodes(export_path, config): """ Generate the node list from the edges files. We cannot solely rely on the object IDs that are read in the journal, as some nodes that are referred to as destinations in the edge file might not be present in the archive (e.g a rev_entry referring to a revision that we do not have crawled yet). The most efficient way of getting all the nodes that are mentioned in the edges file is therefore to use sort(1) on the gigantic edge files to get all the unique node IDs, while using the disk as a temporary buffer. This pipeline does, in order: - concatenate and write all the compressed edges files in graph.edges.csv.zst (using the fact that ZST compression is an additive function) ; - deflate the edges ; - count the number of edges and write it in graph.edges.count.txt ; - count the number of occurrences of each edge type and write them in graph.edges.stats.txt ; - concatenate all the (deflated) nodes from the export with the destination edges, and sort the output to get the list of unique graph nodes ; - count the number of unique graph nodes and write it in graph.nodes.count.txt ; - count the number of occurrences of each node type and write them in graph.nodes.stats.txt ; - compress and write the resulting nodes in graph.nodes.csv.zst. """ # Use awk as a replacement of `sort | uniq -c` to avoid buffering everything # in memory counter_command = "awk '{ t[$0]++ } END { for (i in t) print i,t[i] }'" sort_script = """ pv {export_path}/*/*.edges.csv.zst | tee {export_path}/graph.edges.csv.zst | zstdcat | tee >( wc -l > {export_path}/graph.edges.count.txt ) | tee >( cut -d: -f3,6 | {counter_command} | sort \ > {export_path}/graph.edges.stats.txt ) | tee >( cut -d' ' -f3 | grep . | \ sort -u -S{sort_buffer_size} -T{buffer_path} | \ zstdmt > {export_path}/graph.labels.csv.zst ) | cut -d' ' -f2 | cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | sort -u -S{sort_buffer_size} -T{buffer_path} | tee >( wc -l > {export_path}/graph.nodes.count.txt ) | tee >( cut -d: -f3 | {counter_command} | sort \ > {export_path}/graph.nodes.stats.txt ) | zstdmt > {export_path}/graph.nodes.csv.zst """ # Use bytes for the sorting algorithm (faster than being locale-specific) env = { **os.environ.copy(), "LC_ALL": "C", "LC_COLLATE": "C", "LANG": "C", } sort_buffer_size = config.get("sort_buffer_size", "4G") disk_buffer_dir = config.get("disk_buffer_dir", export_path) with tempfile.TemporaryDirectory( prefix=".graph_node_sort_", dir=disk_buffer_dir ) as buffer_path: subprocess.run( [ "bash", "-c", sort_script.format( export_path=shlex.quote(str(export_path)), buffer_path=shlex.quote(str(buffer_path)), sort_buffer_size=shlex.quote(sort_buffer_size), counter_command=counter_command, ), ], env=env, ) diff --git a/swh/dataset/exporters/orc.py b/swh/dataset/exporters/orc.py index a4b1cd7..9ca2bf7 100644 --- a/swh/dataset/exporters/orc.py +++ b/swh/dataset/exporters/orc.py @@ -1,262 +1,261 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import math from typing import Any, Optional, Tuple, Type, cast -import uuid from pyorc import ( BigInt, Binary, CompressionKind, Int, SmallInt, String, Struct, Timestamp, TypeKind, Writer, ) from pyorc.converters import ORCConverter from swh.dataset.exporter import ExporterDispatch from swh.dataset.relational import TABLES from swh.dataset.utils import remove_pull_requests from swh.model.hashutil import hash_to_hex from swh.model.model import TimestampWithTimezone ORC_TYPE_MAP = { "string": String, "smallint": SmallInt, "int": Int, "bigint": BigInt, "timestamp": Timestamp, "binary": Binary, } EXPORT_SCHEMA = { table_name: Struct( **{ column_name: ORC_TYPE_MAP[column_type]() for column_name, column_type in columns } ) for table_name, columns in TABLES.items() } def hash_to_hex_or_none(hash): return hash_to_hex(hash) if hash is not None else None def swh_date_to_tuple(obj): if obj is None or obj["timestamp"] is None: return (None, None, None) offset_bytes = obj.get("offset_bytes") if offset_bytes is None: offset = obj.get("offset", 0) negative = offset < 0 or obj.get("negative_utc", False) (hours, minutes) = divmod(abs(offset), 60) offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() else: offset = TimestampWithTimezone._parse_offset_bytes(offset_bytes) return ( (obj["timestamp"]["seconds"], obj["timestamp"]["microseconds"]), offset, offset_bytes, ) def datetime_to_tuple(obj: Optional[datetime]) -> Optional[Tuple[int, int]]: if obj is None: return None return (math.floor(obj.timestamp()), obj.microsecond) class SWHTimestampConverter: """This is an ORCConverter compatible class to convert timestamps from/to ORC files timestamps in python are given as a couple (seconds, microseconds) and are serialized as a couple (seconds, nanoseconds) in the ORC file. Reimplemented because we do not want the Python object to be converted as ORC timestamp to be Python datatime objects, since swh.model's Timestamp cannot be converted without loss a Python datetime objects. """ # use Any as timezone annotation to make it easier to run mypy on python < # 3.9, plus we do not use the timezone argument here... @staticmethod def from_orc(seconds: int, nanoseconds: int, timezone: Any,) -> Tuple[int, int]: return (seconds, nanoseconds // 1000) @staticmethod def to_orc( obj: Optional[Tuple[int, int]], timezone: Any, ) -> Optional[Tuple[int, int]]: if obj is None: return None return (obj[0], obj[1] * 1000 if obj[1] is not None else None) class ORCExporter(ExporterDispatch): """ Implementation of an exporter which writes the entire graph dataset as ORC files. Useful for large scale processing, notably on cloud instances (e.g BigQuery, Amazon Athena, Azure). """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.writers = {} def get_writer_for(self, table_name: str): if table_name not in self.writers: object_type_dir = self.export_path / table_name object_type_dir.mkdir(exist_ok=True) - unique_id = str(uuid.uuid4()) + unique_id = self.get_unique_file_id() export_file = object_type_dir / ("graph-{}.orc".format(unique_id)) export_obj = self.exit_stack.enter_context(export_file.open("wb")) self.writers[table_name] = self.exit_stack.enter_context( Writer( export_obj, EXPORT_SCHEMA[table_name], compression=CompressionKind.ZSTD, converters={ TypeKind.TIMESTAMP: cast( Type[ORCConverter], SWHTimestampConverter ) }, ) ) return self.writers[table_name] def process_origin(self, origin): origin_writer = self.get_writer_for("origin") origin_writer.write((origin["url"],)) def process_origin_visit(self, visit): origin_visit_writer = self.get_writer_for("origin_visit") origin_visit_writer.write( ( visit["origin"], visit["visit"], datetime_to_tuple(visit["date"]), visit["type"], ) ) def process_origin_visit_status(self, visit_status): origin_visit_status_writer = self.get_writer_for("origin_visit_status") origin_visit_status_writer.write( ( visit_status["origin"], visit_status["visit"], datetime_to_tuple(visit_status["date"]), visit_status["status"], hash_to_hex_or_none(visit_status["snapshot"]), ) ) def process_snapshot(self, snapshot): if self.config.get("remove_pull_requests"): remove_pull_requests(snapshot) snapshot_writer = self.get_writer_for("snapshot") snapshot_writer.write((hash_to_hex_or_none(snapshot["id"]),)) snapshot_branch_writer = self.get_writer_for("snapshot_branch") for branch_name, branch in snapshot["branches"].items(): if branch is None: continue snapshot_branch_writer.write( ( hash_to_hex_or_none(snapshot["id"]), branch_name, hash_to_hex_or_none(branch["target"]), branch["target_type"], ) ) def process_release(self, release): release_writer = self.get_writer_for("release") release_writer.write( ( hash_to_hex_or_none(release["id"]), release["name"], release["message"], hash_to_hex_or_none(release["target"]), release["target_type"], (release.get("author") or {}).get("fullname"), *swh_date_to_tuple(release["date"]), ) ) def process_revision(self, revision): release_writer = self.get_writer_for("revision") release_writer.write( ( hash_to_hex_or_none(revision["id"]), revision["message"], revision["author"]["fullname"], *swh_date_to_tuple(revision["date"]), revision["committer"]["fullname"], *swh_date_to_tuple(revision["committer_date"]), hash_to_hex_or_none(revision["directory"]), ) ) revision_history_writer = self.get_writer_for("revision_history") for i, parent_id in enumerate(revision["parents"]): revision_history_writer.write( ( hash_to_hex_or_none(revision["id"]), hash_to_hex_or_none(parent_id), i, ) ) def process_directory(self, directory): directory_writer = self.get_writer_for("directory") directory_writer.write((hash_to_hex_or_none(directory["id"]),)) directory_entry_writer = self.get_writer_for("directory_entry") for entry in directory["entries"]: directory_entry_writer.write( ( hash_to_hex_or_none(directory["id"]), entry["name"], entry["type"], hash_to_hex_or_none(entry["target"]), entry["perms"], ) ) def process_content(self, content): content_writer = self.get_writer_for("content") content_writer.write( ( hash_to_hex_or_none(content["sha1"]), hash_to_hex_or_none(content["sha1_git"]), hash_to_hex_or_none(content["sha256"]), hash_to_hex_or_none(content["blake2s256"]), content["length"], content["status"], ) ) def process_skipped_content(self, skipped_content): skipped_content_writer = self.get_writer_for("skipped_content") skipped_content_writer.write( ( hash_to_hex_or_none(skipped_content["sha1"]), hash_to_hex_or_none(skipped_content["sha1_git"]), hash_to_hex_or_none(skipped_content["sha256"]), hash_to_hex_or_none(skipped_content["blake2s256"]), skipped_content["length"], skipped_content["status"], skipped_content["reason"], ) ) diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py index 223a255..d1be461 100644 --- a/swh/dataset/utils.py +++ b/swh/dataset/utils.py @@ -1,146 +1,146 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import sqlite3 import subprocess try: # Plyvel shouldn't be a hard dependency if we want to use sqlite instead import plyvel except ImportError: plyvel = None class ZSTFile: """ Object-like wrapper around a ZST file. Uses a subprocess of the "zstd" command to compress and deflate the objects. """ def __init__(self, path: str, mode: str = "r"): if mode not in ("r", "rb", "w", "wb"): raise ValueError(f"ZSTFile mode {mode} is invalid.") self.path = path self.mode = mode def __enter__(self) -> "ZSTFile": is_text = not (self.mode in ("rb", "wb")) writing = self.mode in ("w", "wb") if writing: - cmd = ["zstd", "-q", "-o", self.path] + cmd = ["zstd", "-f", "-q", "-o", self.path] else: cmd = ["zstdcat", self.path] self.process = subprocess.Popen( cmd, text=is_text, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) return self def __exit__(self, exc_type, exc_value, tb): self.process.stdin.close() self.process.stdout.close() self.process.wait() def read(self, *args): return self.process.stdout.read(*args) def write(self, buf): self.process.stdin.write(buf) class SQLiteSet: """ On-disk Set object for hashes using SQLite as an indexer backend. Used to deduplicate objects when processing large queues with duplicates. """ def __init__(self, db_path): self.db_path = db_path def __enter__(self): self.db = sqlite3.connect(str(self.db_path)) self.db.execute( "CREATE TABLE IF NOT EXISTS" " tmpset (val TEXT NOT NULL PRIMARY KEY)" " WITHOUT ROWID" ) self.db.execute("PRAGMA synchronous = OFF") self.db.execute("PRAGMA journal_mode = OFF") return self def __exit__(self, exc_type, exc_val, exc_tb): self.db.commit() self.db.close() def add(self, v: bytes) -> bool: """ Add an item to the set. Args: v: The value to add to the set. Returns: True if the value was added to the set, False if it was already present. """ try: self.db.execute("INSERT INTO tmpset(val) VALUES (?)", (v.hex(),)) except sqlite3.IntegrityError: return False else: return True class LevelDBSet: """ On-disk Set object for hashes using LevelDB as an indexer backend. Used to deduplicate objects when processing large queues with duplicates. """ def __init__(self, db_path): self.db_path = db_path if plyvel is None: raise ImportError("Plyvel library not found, required for LevelDBSet") def __enter__(self): self.db = plyvel.DB(str(self.db_path), create_if_missing=True) return self def __exit__(self, exc_type, exc_val, exc_tb): self.db.close() def add(self, v: bytes) -> bool: """ Add an item to the set. Args: v: The value to add to the set. Returns: True if the value was added to the set, False if it was already present. """ if self.db.get(v): return False else: self.db.put(v, b"T") return True def remove_pull_requests(snapshot): """ Heuristic to filter out pull requests in snapshots: remove all branches that start with refs/ but do not start with refs/heads or refs/tags. """ # Copy the items with list() to remove items during iteration for branch_name, branch in list(snapshot["branches"].items()): original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] branch = snapshot["branches"].get(branch_name) if branch is None or not branch_name: continue if branch_name.startswith(b"refs/") and not ( branch_name.startswith(b"refs/heads") or branch_name.startswith(b"refs/tags") ): snapshot["branches"].pop(original_branch_name)