Page MenuHomeSoftware Heritage

D4718.id16750.diff
No OneTemporary

D4718.id16750.diff

diff --git a/swh/dataset/exporter.py b/swh/dataset/exporter.py
--- a/swh/dataset/exporter.py
+++ b/swh/dataset/exporter.py
@@ -3,239 +3,40 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import concurrent.futures
-from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
-import multiprocessing
-import time
-from typing import Mapping, Sequence, Tuple
+from types import TracebackType
+from typing import Any, Dict, Optional, Type
-from confluent_kafka import TopicPartition
-import tqdm
-from swh.journal.client import JournalClient
+class Exporter:
+ def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any) -> None:
+ self.config: Dict[str, Any] = config
+ def __enter__(self) -> "Exporter":
+ return self
-class JournalClientOffsetRanges(JournalClient):
- """
- A subclass of JournalClient reading only inside some specific offset
- range. Partition assignments have to be manually given to the class.
-
- This client can only read a single topic at a time.
- """
-
- def __init__(
+ def __exit__(
self,
- *args,
- offset_ranges: Mapping[int, Tuple[int, int]] = None,
- assignment: Sequence[int] = None,
- progress_queue: multiprocessing.Queue = None,
- refresh_every: int = 200,
- **kwargs,
- ):
- """
- Args:
- offset_ranges: A mapping of partition_id -> (low, high) offsets
- that define the boundaries of the messages to consume.
- assignment: The list of partitions to assign to this client.
- progress_queue: a multiprocessing.Queue where the current
- progress will be reported.
- refresh_every: the refreshing rate of the progress reporting.
- """
- self.offset_ranges = offset_ranges
- self.progress_queue = progress_queue
- self.refresh_every = refresh_every
- self.assignment = assignment
- self.count = None
- self.topic_name = None
- super().__init__(*args, **kwargs)
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> Optional[bool]:
+ pass
- def subscribe(self):
- self.topic_name = self.subscription[0]
- time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
- self.consumer.assign(
- [TopicPartition(self.topic_name, pid) for pid in self.assignment]
- )
-
- def process(self, *args, **kwargs):
- self.count = 0
- try:
- self.handle_committed_offsets()
- super().process(*args, **kwargs)
- except EOFError:
- pass
- finally:
- self.progress_queue.put(None)
-
- def handle_committed_offsets(self,):
- """
- Handle already committed partition offsets before starting processing.
+ def process_object(self, object_type: str, object: Dict[str, Any]) -> None:
"""
- committed = self.consumer.committed(
- [TopicPartition(self.topic_name, pid) for pid in self.assignment]
- )
- for tp in committed:
- self.handle_offset(tp.partition, tp.offset)
+ Process a SWH object to export.
- def handle_offset(self, partition_id, offset):
+ Override this with your custom worker.
"""
- Check whether the client has reached the end of the current
- partition, and trigger a reassignment if that is the case.
- Raise EOFError if all the partitions have reached the end.
- """
- if offset < 0: # Uninitialized partition offset
- return
-
- if self.count % self.refresh_every == 0:
- self.progress_queue.put({partition_id: offset})
-
- if offset >= self.offset_ranges[partition_id][1] - 1:
- self.assignment = [pid for pid in self.assignment if pid != partition_id]
- self.subscribe()
-
- if not self.assignment:
- raise EOFError
-
- def deserialize_message(self, message):
- """
- Override of the message deserialization to hook the handling of the
- message offset.
- """
- self.handle_offset(message.partition(), message.offset())
- self.count += 1
- return super().deserialize_message(message)
+ raise NotImplementedError
-class ParallelExporter:
+class ExporterDispatch(Exporter):
"""
- Base class for all the Journal exporters.
-
- Each exporter should override the `export_worker` function with an
- implementation of how to run the message processing.
+ Like Exporter, but dispatches each object type to a different function.
"""
- def __init__(self, config, export_id: str, obj_type, processes=1):
- """
- Args:
- config: the exporter config, which should also include the
- JournalClient configuration.
- export_id: a unique identifier for the export that will be used
- as part of a Kafka consumer group ID.
- obj_type: The type of SWH object to export.
- processes: The number of processes to run.
- """
- self.config = config
- self.export_id = "swh-dataset-export-{}".format(export_id)
- self.obj_type = obj_type
- self.processes = processes
- self.offsets = None
-
- def get_offsets(self):
- """
- First pass to fetch all the current low and high offsets of each
- partition to define the consumption boundaries.
- """
- if self.offsets is None:
- client = JournalClient(
- **self.config["journal"],
- object_types=[self.obj_type],
- group_id=self.export_id,
- )
- topic_name = client.subscription[0]
- topics = client.consumer.list_topics(topic_name).topics
- partitions = topics[topic_name].partitions
-
- self.offsets = {}
- for partition_id in tqdm.tqdm(
- partitions.keys(), desc=" - Partition offsets"
- ):
- tp = TopicPartition(topic_name, partition_id)
- (lo, hi) = client.consumer.get_watermark_offsets(tp)
- self.offsets[partition_id] = (lo, hi)
- return self.offsets
-
- def run(self, *args):
- """
- Run the parallel export.
- """
- offsets = self.get_offsets()
- to_assign = list(offsets.keys())
-
- manager = multiprocessing.Manager()
- q = manager.Queue()
-
- with ProcessPoolExecutor(self.processes + 1) as pool:
- futures = []
- for i in range(self.processes):
- futures.append(
- pool.submit(
- self.export_worker,
- *args,
- assignment=to_assign[i :: self.processes],
- queue=q,
- )
- )
- futures.append(pool.submit(self.progress_worker, queue=q))
-
- concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
- for f in futures:
- if f.running():
- continue
- exc = f.exception()
- if exc:
- pool.shutdown(wait=False)
- f.result()
- raise exc
-
- def progress_worker(self, *args, queue=None):
- """
- An additional worker process that reports the current progress of the
- export between all the different parallel consumers and across all the
- partitions, by consuming the shared progress reporting Queue.
- """
- d = {}
- active_workers = self.processes
- offset_diff = sum((hi - lo) for lo, hi in self.offsets.values())
- with tqdm.tqdm(total=offset_diff, desc=" - Journal export") as pbar:
- while active_workers:
- item = queue.get()
- if item is None:
- active_workers -= 1
- continue
- d.update(item)
- progress = sum(n - self.offsets[p][0] for p, n in d.items())
- pbar.set_postfix(
- active_workers=active_workers, total_workers=self.processes
- )
- pbar.update(progress - pbar.n)
-
- def process(self, callback, assignment=None, queue=None):
- client = JournalClientOffsetRanges(
- **self.config["journal"],
- object_types=[self.obj_type],
- group_id=self.export_id,
- debug="cgrp,broker",
- offset_ranges=self.offsets,
- assignment=assignment,
- progress_queue=queue,
- **{"message.max.bytes": str(500 * 1024 * 1024)},
- )
- client.process(callback)
-
- def export_worker(self, *args, **kwargs):
- """
- Override this with a custom implementation of a worker function.
-
- A worker function should call `self.process(fn, **kwargs)` with `fn`
- being a callback that will be called in the same fashion as with
- `JournalClient.process()`.
-
- A simple exporter to print all the objects in the log would look like
- this:
-
- ```
- class PrintExporter(ParallelExporter):
- def export_worker(self, **kwargs):
- self.process(print, **kwargs)
- ```
- """
- raise NotImplementedError
+ def process_object(self, object_type: str, object: Dict[str, Any]) -> None:
+ method_name = "process_" + object_type
+ if hasattr(self, method_name):
+ getattr(self, method_name)(object)
diff --git a/swh/dataset/graph.py b/swh/dataset/graph.py
--- a/swh/dataset/graph.py
+++ b/swh/dataset/graph.py
@@ -3,8 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import contextlib
-import functools
+import base64
import os
import os.path
import pathlib
@@ -13,143 +12,138 @@
import tempfile
import uuid
-from swh.dataset.exporter import ParallelExporter
-from swh.dataset.utils import SQLiteSet, ZSTFile
+from swh.dataset.exporter import ExporterDispatch
+from swh.dataset.journalprocessor import ParallelJournalProcessor
+from swh.dataset.utils import ZSTFile
from swh.model.identifiers import origin_identifier, swhid
-from swh.storage.fixer import fix_objects
-def process_messages(messages, config, node_writer, edge_writer, node_set):
+class GraphEdgesExporter(ExporterDispatch):
"""
- Args:
- messages: A sequence of messages to process
- config: The exporter configuration
- node_writer: A file-like object where to write nodes
- edge_writer: A file-like object where to write edges
+ Implementation of an exporter which writes all the graph edges
+ of a specific type in a Zstandard-compressed CSV file.
+
+ Each row of the CSV is in the format: `<SRC SWHID> <DST SWHID>
"""
- def write_node(node):
+ def __init__(self, config, export_path, **kwargs):
+ super().__init__(config)
+ self.export_path = export_path
+
+ def __enter__(self):
+ dataset_path = pathlib.Path(self.export_path)
+ dataset_path.mkdir(exist_ok=True, parents=True)
+ unique_id = str(uuid.uuid4())
+ nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id))
+ edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id))
+ self.node_writer = ZSTFile(nodes_file, "w")
+ self.edge_writer = ZSTFile(edges_file, "w")
+ self.node_writer.__enter__()
+ self.edge_writer.__enter__()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.node_writer.__exit__(exc_type, exc_value, traceback)
+ self.edge_writer.__exit__(exc_type, exc_value, traceback)
+
+ def write_node(self, node):
node_type, node_id = node
if node_id is None:
return
node_swhid = swhid(object_type=node_type, object_id=node_id)
- node_writer.write("{}\n".format(node_swhid))
+ self.node_writer.write("{}\n".format(node_swhid))
- def write_edge(src, dst):
+ def write_edge(self, src, dst, *, labels=None):
src_type, src_id = src
dst_type, dst_id = dst
if src_id is None or dst_id is None:
return
src_swhid = swhid(object_type=src_type, object_id=src_id)
dst_swhid = swhid(object_type=dst_type, object_id=dst_id)
- edge_writer.write("{} {}\n".format(src_swhid, dst_swhid))
+ edge_line = " ".join([src_swhid, dst_swhid] + (labels if labels else []))
+ self.edge_writer.write("{}\n".format(edge_line))
- messages = {k: fix_objects(k, v) for k, v in messages.items()}
+ def process_origin(self, origin):
+ origin_id = origin_identifier({"url": origin["url"]})
+ self.write_node(("origin", origin_id))
- for visit_status in messages.get("origin_visit_status", []):
+ def process_origin_visit_status(self, visit_status):
origin_id = origin_identifier({"url": visit_status["origin"]})
- visit_id = visit_status["visit"]
- if not node_set.add("{}:{}".format(origin_id, visit_id).encode()):
- continue
- write_node(("origin", origin_id))
- write_edge(("origin", origin_id), ("snapshot", visit_status["snapshot"]))
-
- for snapshot in messages.get("snapshot", []):
- if not node_set.add(snapshot["id"]):
- continue
- write_node(("snapshot", snapshot["id"]))
+ self.write_edge(("origin", origin_id), ("snapshot", visit_status["snapshot"]))
+
+ def process_snapshot(self, snapshot):
+ if self.config.get("remove_pull_requests"):
+ self.remove_pull_requests(snapshot)
+
+ self.write_node(("snapshot", snapshot["id"]))
for branch_name, branch in snapshot["branches"].items():
+ original_branch_name = branch_name
while branch and branch.get("target_type") == "alias":
branch_name = branch["target"]
branch = snapshot["branches"][branch_name]
if branch is None or not branch_name:
continue
- # Heuristic to filter out pull requests in snapshots: remove all
- # branches that start with refs/ but do not start with refs/heads or
- # refs/tags.
- if config.get("remove_pull_requests") and (
- branch_name.startswith(b"refs/")
- and not (
- branch_name.startswith(b"refs/heads")
- or branch_name.startswith(b"refs/tags")
- )
- ):
- continue
- write_edge(
- ("snapshot", snapshot["id"]), (branch["target_type"], branch["target"])
+ self.write_edge(
+ ("snapshot", snapshot["id"]),
+ (branch["target_type"], branch["target"]),
+ labels=[base64.b64encode(original_branch_name).decode(),],
)
- for release in messages.get("release", []):
- if not node_set.add(release["id"]):
- continue
- write_node(("release", release["id"]))
- write_edge(
+ def process_release(self, release):
+ self.write_node(("release", release["id"]))
+ self.write_edge(
("release", release["id"]), (release["target_type"], release["target"])
)
- for revision in messages.get("revision", []):
- if not node_set.add(revision["id"]):
- continue
- write_node(("revision", revision["id"]))
- write_edge(("revision", revision["id"]), ("directory", revision["directory"]))
+ def process_revision(self, revision):
+ self.write_node(("revision", revision["id"]))
+ self.write_edge(
+ ("revision", revision["id"]), ("directory", revision["directory"])
+ )
for parent in revision["parents"]:
- write_edge(("revision", revision["id"]), ("revision", parent))
+ self.write_edge(("revision", revision["id"]), ("revision", parent))
- for directory in messages.get("directory", []):
- if not node_set.add(directory["id"]):
- continue
- write_node(("directory", directory["id"]))
+ def process_directory(self, directory):
+ self.write_node(("directory", directory["id"]))
for entry in directory["entries"]:
entry_type_mapping = {
"file": "content",
"dir": "directory",
"rev": "revision",
}
- write_edge(
+ self.write_edge(
("directory", directory["id"]),
(entry_type_mapping[entry["type"]], entry["target"]),
+ labels=[base64.b64encode(entry["name"]).decode(), str(entry["perms"]),],
)
- for content in messages.get("content", []):
- if not node_set.add(content["sha1_git"]):
- continue
- write_node(("content", content["sha1_git"]))
-
-
-class GraphEdgeExporter(ParallelExporter):
- """
- Implementation of ParallelExporter which writes all the graph edges
- of a specific type in a Zstandard-compressed CSV file.
-
- Each row of the CSV is in the format: `<SRC SWHID> <DST SWHID>
- """
-
- def export_worker(self, export_path, **kwargs):
- dataset_path = pathlib.Path(export_path)
- dataset_path.mkdir(exist_ok=True, parents=True)
- unique_id = str(uuid.uuid4())
- nodes_file = dataset_path / ("graph-{}.nodes.csv.zst".format(unique_id))
- edges_file = dataset_path / ("graph-{}.edges.csv.zst".format(unique_id))
- node_set_file = dataset_path / (".set-nodes-{}.sqlite3".format(unique_id))
-
- with contextlib.ExitStack() as stack:
- node_writer = stack.enter_context(ZSTFile(nodes_file, "w"))
- edge_writer = stack.enter_context(ZSTFile(edges_file, "w"))
- node_set = stack.enter_context(SQLiteSet(node_set_file))
-
- process_fn = functools.partial(
- process_messages,
- config=self.config,
- node_writer=node_writer,
- edge_writer=edge_writer,
- node_set=node_set,
- )
- self.process(process_fn, **kwargs)
+ def process_content(self, content):
+ self.write_node(("content", content["sha1_git"]))
+
+ def remove_pull_requests(self, snapshot):
+ """
+ Heuristic to filter out pull requests in snapshots: remove all branches
+ that start with refs/ but do not start with refs/heads or refs/tags.
+ """
+ # Copy the items with list() to remove items during iteration
+ for branch_name, branch in list(snapshot["branches"].items()):
+ original_branch_name = branch_name
+ while branch and branch.get("target_type") == "alias":
+ branch_name = branch["target"]
+ branch = snapshot["branches"][branch_name]
+ if branch is None or not branch_name:
+ continue
+ if branch_name.startswith(b"refs/") and not (
+ branch_name.startswith(b"refs/heads")
+ or branch_name.startswith(b"refs/tags")
+ ):
+ snapshot["branches"].pop(original_branch_name)
def export_edges(config, export_path, export_id, processes):
"""Run the edge exporter for each edge type."""
object_types = [
+ "origin",
"origin_visit_status",
"snapshot",
"release",
@@ -159,8 +153,18 @@
]
for obj_type in object_types:
print("{} edges:".format(obj_type))
- exporter = GraphEdgeExporter(config, export_id, obj_type, processes)
- exporter.run(os.path.join(export_path, obj_type))
+ exporters = [
+ (GraphEdgesExporter, {"export_path": os.path.join(export_path, obj_type)}),
+ ]
+ parallel_exporter = ParallelJournalProcessor(
+ config,
+ exporters,
+ export_id,
+ obj_type,
+ node_sets_path=pathlib.Path(export_path) / ".node_sets",
+ processes=processes,
+ )
+ parallel_exporter.run()
def sort_graph_nodes(export_path, config):
@@ -200,6 +204,25 @@
# in memory
counter_command = "awk '{ t[$0]++ } END { for (i in t) print i,t[i] }'"
+ sort_script = """
+ pv {export_path}/*/*.edges.csv.zst |
+ tee {export_path}/graph.edges.csv.zst |
+ zstdcat |
+ tee >( wc -l > {export_path}/graph.edges.count.txt ) |
+ tee >( cut -d: -f3,6 | {counter_command} | sort \
+ > {export_path}/graph.edges.stats.txt ) |
+ tee >( cut -d' ' -f3 | grep . | \
+ sort -u -S{sort_buffer_size} -T{buffer_path} | \
+ zstdmt > {export_path}/graph.labels.csv.zst ) |
+ cut -d' ' -f2 |
+ cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) |
+ sort -u -S{sort_buffer_size} -T{buffer_path} |
+ tee >( wc -l > {export_path}/graph.nodes.count.txt ) |
+ tee >( cut -d: -f3 | {counter_command} | sort \
+ > {export_path}/graph.nodes.stats.txt ) |
+ zstdmt > {export_path}/graph.nodes.csv.zst
+ """
+
# Use bytes for the sorting algorithm (faster than being locale-specific)
env = {
**os.environ.copy(),
@@ -216,21 +239,7 @@
[
"bash",
"-c",
- (
- "pv {export_path}/*/*.edges.csv.zst | "
- "tee {export_path}/graph.edges.csv.zst |"
- "zstdcat |"
- "tee >( wc -l > {export_path}/graph.edges.count.txt ) |"
- "tee >( cut -d: -f3,6 | {counter_command} | sort "
- " > {export_path}/graph.edges.stats.txt ) |"
- "cut -d' ' -f2 | "
- "cat - <( zstdcat {export_path}/*/*.nodes.csv.zst ) | "
- "sort -u -S{sort_buffer_size} -T{buffer_path} | "
- "tee >( wc -l > {export_path}/graph.nodes.count.txt ) |"
- "tee >( cut -d: -f3 | {counter_command} | sort "
- " > {export_path}/graph.nodes.stats.txt ) |"
- "zstdmt > {export_path}/graph.nodes.csv.zst"
- ).format(
+ sort_script.format(
export_path=shlex.quote(str(export_path)),
buffer_path=shlex.quote(str(buffer_path)),
sort_buffer_size=shlex.quote(sort_buffer_size),
diff --git a/swh/dataset/exporter.py b/swh/dataset/journalprocessor.py
copy from swh/dataset/exporter.py
copy to swh/dataset/journalprocessor.py
--- a/swh/dataset/exporter.py
+++ b/swh/dataset/journalprocessor.py
@@ -3,16 +3,25 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import collections
import concurrent.futures
from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
+import contextlib
+from hashlib import sha1
import multiprocessing
+from pathlib import Path
import time
-from typing import Mapping, Sequence, Tuple
+from typing import Any, Dict, Mapping, Sequence, Tuple, Type
from confluent_kafka import TopicPartition
import tqdm
+from swh.dataset.exporter import Exporter
+from swh.dataset.utils import SQLiteSet
from swh.journal.client import JournalClient
+from swh.journal.serializers import kafka_to_value
+from swh.model.identifiers import origin_identifier
+from swh.storage.fixer import fix_objects
class JournalClientOffsetRanges(JournalClient):
@@ -99,34 +108,51 @@
"""
Override of the message deserialization to hook the handling of the
message offset.
+ We also return the raw objects instead of deserializing them because we
+ will need the partition ID later.
"""
+ # XXX: this is a bad hack that we have to do because of how the
+ # journal API works. Ideally it would be better to change the API so
+ # that journal clients can know the partition of the message they are
+ # handling.
self.handle_offset(message.partition(), message.offset())
self.count += 1
- return super().deserialize_message(message)
+ # return super().deserialize_message(message)
+ return message
-class ParallelExporter:
+class ParallelJournalProcessor:
"""
- Base class for all the Journal exporters.
-
- Each exporter should override the `export_worker` function with an
- implementation of how to run the message processing.
+ Reads the given object type from the journal in parallel.
+ It creates one JournalExportWorker per process.
"""
- def __init__(self, config, export_id: str, obj_type, processes=1):
+ def __init__(
+ self,
+ config,
+ exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]],
+ export_id: str,
+ obj_type: str,
+ node_sets_path: Path,
+ processes: int = 1,
+ ):
"""
Args:
config: the exporter config, which should also include the
JournalClient configuration.
+ exporters: a list of Exporter to process the objects
export_id: a unique identifier for the export that will be used
as part of a Kafka consumer group ID.
obj_type: The type of SWH object to export.
processes: The number of processes to run.
+ node_sets_path: A directory where to store the node sets.
"""
self.config = config
+ self.exporters = exporters
self.export_id = "swh-dataset-export-{}".format(export_id)
self.obj_type = obj_type
self.processes = processes
+ self.node_sets_path = node_sets_path
self.offsets = None
def get_offsets(self):
@@ -153,7 +179,7 @@
self.offsets[partition_id] = (lo, hi)
return self.offsets
- def run(self, *args):
+ def run(self):
"""
Run the parallel export.
"""
@@ -169,9 +195,8 @@
futures.append(
pool.submit(
self.export_worker,
- *args,
assignment=to_assign[i :: self.processes],
- queue=q,
+ progress_queue=q,
)
)
futures.append(pool.submit(self.progress_worker, queue=q))
@@ -208,34 +233,134 @@
)
pbar.update(progress - pbar.n)
- def process(self, callback, assignment=None, queue=None):
+ def export_worker(self, assignment, progress_queue):
+ worker = JournalProcessorWorker(
+ self.config,
+ self.exporters,
+ self.export_id,
+ self.obj_type,
+ self.offsets,
+ assignment,
+ progress_queue,
+ self.node_sets_path,
+ )
+ with worker:
+ worker.run()
+
+
+class JournalProcessorWorker:
+ """
+ Worker process that processes all the messages and calls the given exporters
+ for each object read from the journal.
+ """
+
+ def __init__(
+ self,
+ config,
+ exporters: Sequence[Tuple[Type[Exporter], Dict[str, Any]]],
+ export_id: str,
+ obj_type: str,
+ offsets: Dict[int, Tuple[int, int]],
+ assignment: Sequence[int],
+ progress_queue: multiprocessing.Queue,
+ node_sets_path: Path,
+ ):
+ self.config = config
+ self.export_id = export_id
+ self.obj_type = obj_type
+ self.offsets = offsets
+ self.assignment = assignment
+ self.progress_queue = progress_queue
+
+ self.node_sets_path = node_sets_path
+ self.node_sets_path.mkdir(exist_ok=True, parents=True)
+ self.node_sets: Dict[int, SQLiteSet] = {}
+
+ self.exporters = [
+ exporter_class(config, **kwargs) for exporter_class, kwargs in exporters
+ ]
+ self.exit_stack: contextlib.ExitStack = contextlib.ExitStack()
+
+ def __enter__(self):
+ self.exit_stack.__enter__()
+ for exporter in self.exporters:
+ self.exit_stack.enter_context(exporter)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.exit_stack.__exit__(exc_type, exc_value, traceback)
+
+ def get_node_set_for_partition(self, partition_id: int):
+ """
+ Return an on-disk set object, which stores the nodes that have
+ already been processed.
+
+ Node sets are sharded by partition ID, as each object is guaranteed to
+ be assigned to a deterministic Kafka partition.
+ """
+ if partition_id not in self.node_sets:
+ node_set_file = self.node_sets_path / "nodes-part-{}.sqlite".format(
+ partition_id
+ )
+ node_set = SQLiteSet(node_set_file)
+ self.exit_stack.enter_context(node_set)
+ self.node_sets[partition_id] = node_set
+ return self.node_sets[partition_id]
+
+ def run(self):
+ """
+ Start a Journal client on the given assignment and process all the
+ incoming messages.
+ """
client = JournalClientOffsetRanges(
**self.config["journal"],
object_types=[self.obj_type],
group_id=self.export_id,
debug="cgrp,broker",
offset_ranges=self.offsets,
- assignment=assignment,
- progress_queue=queue,
+ assignment=self.assignment,
+ progress_queue=self.progress_queue,
**{"message.max.bytes": str(500 * 1024 * 1024)},
)
- client.process(callback)
+ client.process(self.process_messages)
- def export_worker(self, *args, **kwargs):
+ def process_messages(self, messages):
"""
- Override this with a custom implementation of a worker function.
-
- A worker function should call `self.process(fn, **kwargs)` with `fn`
- being a callback that will be called in the same fashion as with
- `JournalClient.process()`.
+ Process the incoming Kafka messages.
+ """
+ for object_type, message_list in messages.items():
+ fixed_objects_by_partition = collections.defaultdict(list)
+ for message in message_list:
+ fixed_objects_by_partition[message.partition()].extend(
+ fix_objects(object_type, (kafka_to_value(message.value()),))
+ )
+ for partition, objects in fixed_objects_by_partition.items():
+ for object in objects:
+ self.process_message(object_type, partition, object)
- A simple exporter to print all the objects in the log would look like
- this:
+ def process_message(self, object_type, partition, object):
+ """
+ Process a single incoming Kafka message if the object it refers to has
+ not been processed yet.
- ```
- class PrintExporter(ParallelExporter):
- def export_worker(self, **kwargs):
- self.process(print, **kwargs)
- ```
+ It uses an on-disk set to make sure that each object is only ever
+ processed once.
"""
- raise NotImplementedError
+ node_set = self.get_node_set_for_partition(partition)
+
+ if object_type == "origin_visit_status":
+ origin_id = origin_identifier({"url": object["origin"]})
+ visit = object["visit"]
+ node_id = sha1("{}:{}".format(origin_id, visit).encode()).digest()
+ elif object_type == "origin":
+ node_id = sha1(object["url"].encode()).digest()
+ elif object_type == "content":
+ node_id = object["sha1_git"]
+ else:
+ node_id = object["id"]
+ if not node_set.add(node_id):
+ # Node already processed, skipping.
+ return
+
+ for exporter in self.exporters:
+ exporter.process_object(object_type, object)
diff --git a/swh/dataset/test/test_graph.py b/swh/dataset/test/test_graph.py
--- a/swh/dataset/test/test_graph.py
+++ b/swh/dataset/test/test_graph.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from base64 import b64encode
import collections
import hashlib
from typing import Tuple
@@ -10,7 +11,7 @@
import pytest
-from swh.dataset.graph import process_messages, sort_graph_nodes
+from swh.dataset.graph import GraphEdgesExporter, sort_graph_nodes
from swh.dataset.utils import ZSTFile
from swh.model.hashutil import MultiHash, hash_to_bytes
@@ -88,17 +89,13 @@
def wrapped(messages, config=None) -> Tuple[Mock, Mock]:
if config is None:
config = {}
- node_writer = Mock()
- edge_writer = Mock()
- node_set = FakeDiskSet()
- process_messages(
- messages,
- config=config,
- node_writer=node_writer,
- edge_writer=edge_writer,
- node_set=node_set,
- )
- return node_writer.write, edge_writer.write
+ exporter = GraphEdgesExporter(config, "/dummy_path")
+ exporter.node_writer = Mock()
+ exporter.edge_writer = Mock()
+ for object_type, objects in messages.items():
+ for obj in objects:
+ exporter.process_object(object_type, obj)
+ return exporter.node_writer.write, exporter.edge_writer.write
return wrapped
@@ -111,6 +108,21 @@
return hashlib.sha1(s.encode()).hexdigest()
+def b64e(s: str) -> str:
+ return b64encode(s.encode()).decode()
+
+
+def test_export_origin(exporter):
+ node_writer, edge_writer = exporter(
+ {"origin": [{"origin": "ori1"}, {"origin": "ori2"},]}
+ )
+ assert node_writer.mock_calls == [
+ call(f"swh:1:ori:{hexhash('ori1')}\n"),
+ call(f"swh:1:ori:{hexhash('ori2')}\n"),
+ ]
+ assert edge_writer.mock_calls == []
+
+
def test_export_origin_visit_status(exporter):
node_writer, edge_writer = exporter(
{
@@ -128,10 +140,7 @@
]
}
)
- assert node_writer.mock_calls == [
- call(f"swh:1:ori:{hexhash('ori1')}\n"),
- call(f"swh:1:ori:{hexhash('ori2')}\n"),
- ]
+ assert node_writer.mock_calls == []
assert edge_writer.mock_calls == [
call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"),
call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"),
@@ -179,14 +188,38 @@
call(f"swh:1:snp:{hexhash('snp3')}\n"),
]
assert edge_writer.mock_calls == [
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
- call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}\n"),
- call(f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}\n"),
- call(f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}\n"),
- call(f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}\n"),
- call(f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}\n"),
- call(f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}\n"),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('refs/heads/master')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('HEAD')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('refs/heads/master')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}"
+ f" {b64e('HEAD')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}"
+ f" {b64e('bcnt')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}"
+ f" {b64e('bdir')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}"
+ f" {b64e('brel')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}"
+ f" {b64e('bsnp')}\n"
+ ),
]
@@ -210,9 +243,24 @@
}
)
assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")]
- assert edge_writer.mock_calls == (
- [call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n")] * 4
- )
+ assert edge_writer.mock_calls == [
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('origin_branch')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('alias1')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('alias2')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('alias3')}\n"
+ ),
+ ]
def test_export_snapshot_no_pull_requests(exporter):
@@ -241,19 +289,40 @@
node_writer, edge_writer = exporter({"snapshot": [snp]})
assert edge_writer.mock_calls == [
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}\n"),
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}\n"),
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}\n"),
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev5')}\n"),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('refs/heads/master')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}"
+ f" {b64e('refs/pull/42')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}"
+ f" {b64e('refs/merge-requests/lol')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}"
+ f" {b64e('refs/tags/v1.0.0')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev5')}"
+ f" {b64e('refs/patch/123456abc')}\n"
+ ),
]
node_writer, edge_writer = exporter(
{"snapshot": [snp]}, config={"remove_pull_requests": True}
)
assert edge_writer.mock_calls == [
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}\n"),
- call(f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}\n"),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('refs/heads/master')}\n"
+ ),
+ call(
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}"
+ f" {b64e('refs/tags/v1.0.0')}\n"
+ ),
]
@@ -340,9 +409,24 @@
{
"id": binhash("dir1"),
"entries": [
- {"type": "file", "target": binhash("cnt1")},
- {"type": "dir", "target": binhash("dir2")},
- {"type": "rev", "target": binhash("rev1")},
+ {
+ "type": "file",
+ "target": binhash("cnt1"),
+ "name": b"cnt1",
+ "perms": 0o644,
+ },
+ {
+ "type": "dir",
+ "target": binhash("dir2"),
+ "name": b"dir2",
+ "perms": 0o755,
+ },
+ {
+ "type": "rev",
+ "target": binhash("rev1"),
+ "name": b"rev1",
+ "perms": 0o160000,
+ },
],
},
{"id": binhash("dir2"), "entries": [],},
@@ -354,9 +438,18 @@
call(f"swh:1:dir:{hexhash('dir2')}\n"),
]
assert edge_writer.mock_calls == [
- call(f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}\n"),
- call(f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}\n"),
- call(f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}\n"),
+ call(
+ f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}"
+ f" {b64e('cnt1')} {0o644}\n"
+ ),
+ call(
+ f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}"
+ f" {b64e('dir2')} {0o755}\n"
+ ),
+ call(
+ f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}"
+ f" {b64e('rev1')} {0o160000}\n"
+ ),
]
@@ -376,45 +469,6 @@
assert edge_writer.mock_calls == []
-def test_export_duplicate_node(exporter):
- node_writer, edge_writer = exporter(
- {
- "content": [
- {**TEST_CONTENT, "sha1_git": binhash("cnt1")},
- {**TEST_CONTENT, "sha1_git": binhash("cnt1")},
- {**TEST_CONTENT, "sha1_git": binhash("cnt1")},
- ],
- },
- )
- assert node_writer.mock_calls == [
- call(f"swh:1:cnt:{hexhash('cnt1')}\n"),
- ]
- assert edge_writer.mock_calls == []
-
-
-def test_export_duplicate_visit(exporter):
- node_writer, edge_writer = exporter(
- {
- "origin_visit_status": [
- {**TEST_ORIGIN_VISIT_STATUS, "origin": "ori1", "visit": 1},
- {**TEST_ORIGIN_VISIT_STATUS, "origin": "ori2", "visit": 1},
- {**TEST_ORIGIN_VISIT_STATUS, "origin": "ori1", "visit": 1},
- {**TEST_ORIGIN_VISIT_STATUS, "origin": "ori2", "visit": 1},
- {**TEST_ORIGIN_VISIT_STATUS, "origin": "ori1", "visit": 2},
- {**TEST_ORIGIN_VISIT_STATUS, "origin": "ori2", "visit": 2},
- {**TEST_ORIGIN_VISIT_STATUS, "origin": "ori2", "visit": 2},
- ],
- },
- )
- assert node_writer.mock_calls == [
- call(f"swh:1:ori:{hexhash('ori1')}\n"),
- call(f"swh:1:ori:{hexhash('ori2')}\n"),
- call(f"swh:1:ori:{hexhash('ori1')}\n"),
- call(f"swh:1:ori:{hexhash('ori2')}\n"),
- ]
- assert edge_writer.mock_calls == []
-
-
def zstwrite(fp, lines):
with ZSTFile(fp, "w") as writer:
for line in lines:
@@ -447,10 +501,10 @@
f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}",
f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}",
f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest
- f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
- f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}", # dup
- f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')}",
- f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')}",
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup1')}",
+ f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup2')}",
+ f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')}",
+ f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')} {b64e('r1')}",
f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}",
f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}",
f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}",
@@ -461,9 +515,9 @@
f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest
f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}",
f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}",
- f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}",
- f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')}",
- f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}",
+ f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')} 42",
+ f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')} {b64e('d1')} 1337",
+ f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')} {b64e('r1')} 0",
]
for obj_type, short_obj_type in short_type_mapping.items():
@@ -481,8 +535,10 @@
output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n")
output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n")
+ output_labels = zstread(tmp_path / "graph.labels.csv.zst").split("\n")
output_nodes = list(filter(bool, output_nodes))
output_edges = list(filter(bool, output_edges))
+ output_labels = list(filter(bool, output_labels))
expected_nodes = set(input_nodes) | set(e.split()[1] for e in input_edges)
assert output_nodes == sorted(expected_nodes)
@@ -491,6 +547,9 @@
assert sorted(output_edges) == sorted(input_edges)
assert int((tmp_path / "graph.edges.count.txt").read_text()) == len(input_edges)
+ expected_labels = set(e[2] for e in [e.split() for e in input_edges] if len(e) > 2)
+ assert output_labels == sorted(expected_labels)
+
actual_node_stats = (tmp_path / "graph.nodes.stats.txt").read_text().strip()
expected_node_stats = "\n".join(
sorted(
diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py
--- a/swh/dataset/utils.py
+++ b/swh/dataset/utils.py
@@ -14,13 +14,13 @@
command to compress and deflate the objects.
"""
- def __init__(self, path, mode="r"):
+ def __init__(self, path: str, mode: str = "r"):
if mode not in ("r", "rb", "w", "wb"):
raise ValueError(f"ZSTFile mode {mode} is invalid.")
self.path = path
self.mode = mode
- def __enter__(self):
+ def __enter__(self) -> "ZSTFile":
is_text = not (self.mode in ("rb", "wb"))
writing = self.mode in ("w", "wb")
if writing:
@@ -50,17 +50,20 @@
deduplicate objects when processing large queues with duplicates.
"""
- def __init__(self, db_path: os.PathLike):
+ def __init__(self, db_path: os.PathLike[str]):
self.db_path = db_path
def __enter__(self):
self.db = sqlite3.connect(str(self.db_path))
self.db.execute(
- "CREATE TABLE tmpset (val TEXT NOT NULL PRIMARY KEY) WITHOUT ROWID"
+ "CREATE TABLE IF NOT EXISTS"
+ " tmpset (val TEXT NOT NULL PRIMARY KEY)"
+ " WITHOUT ROWID"
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
+ self.db.commit()
self.db.close()
def add(self, v: bytes) -> bool:

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 1:44 PM (8 h, 40 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3233110

Event Timeline