diff --git a/takedowns/gen_removal_sql.py b/takedowns/gen_removal_sql.py index 488acfb..7704d79 100644 --- a/takedowns/gen_removal_sql.py +++ b/takedowns/gen_removal_sql.py @@ -1,208 +1,207 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import sys from list_objects import Graph -SQL_TEMPLATE = ''' +SQL_TEMPLATE = """ BEGIN; create temp table objects_to_remove (type text not null, id bytea not null) on commit drop; insert into objects_to_remove (type, id) values %s; create temp table origins_to_remove on commit drop as select origin.id from origin inner join objects_to_remove otr on digest(url, 'sha1') = otr.id and otr.type = 'ori'; copy (select '-- origin_visit_status') to stdout; copy (select '') to stdout; copy (select * from origin_visit_status ovs where ovs.origin in (select id from origins_to_remove)) to stdout with (format csv, header); copy (select '') to stdout; --delete from origin_visit_status ovs -- where ovs.origin in (select id from origins_to_remove); copy (select '-- origin_visit') to stdout; copy (select '') to stdout; copy (select * from origin_visit ov where ov.origin in (select id from origins_to_remove)) to stdout with (format csv, header); copy (select '') to stdout; --delete from origin_visit ov -- where ov.origin in (select id from origins_to_remove); copy (select '-- origin') to stdout; copy (select '') to stdout; copy (select * from origin o where o.id in (select id from origins_to_remove)) to stdout with (format csv, header); copy (select '') to stdout; --delete from origin o -- where o.id in (select id from origins_to_remove); copy (select '-- full snapshot data') to stdout; copy (select '') to stdout; copy (select s.id, sb.name, sb.target_type, sb.target from snapshot s left join snapshot_branches sbs on s.object_id = sbs.snapshot_id inner join snapshot_branch sb on sbs.branch_id = sb.object_id where s.id in (select id from objects_to_remove where type='snp')) to stdout with (format csv, header); copy (select '') to stdout; create temp table snp_to_remove on commit drop as select object_id as snapshot_id from snapshot s inner join objects_to_remove otr on s.id = otr.id and otr.type = 'snp'; create temp table snp_branch_to_remove on commit drop as select branch_id from snapshot_branches sb where snapshot_id in (select snapshot_id from snp_to_remove); copy (select '-- snapshot_branches') to stdout; copy (select '') to stdout; copy (select * from snapshot_branches where snapshot_id in (select snapshot_id from snp_to_remove)) to stdout with (format csv, header); copy (select '') to stdout; --delete from snapshot_branches where snapshot_id in (select snapshot_id from snp_to_remove); copy (select '-- snapshot_branch') to stdout; copy (select '') to stdout; copy (select * from snapshot_branch sb where sb.object_id in (select branch_id from snp_branch_to_remove) and not exists (select 1 from snapshot_branches sbs where sbs.branch_id = sb.object_id and sbs.snapshot_id not in (select snapshot_id from snp_to_remove))) to stdout with (format csv, header); copy (select '') to stdout; --delete from snapshot_branch sb where sb.object_id in (select branch_id from snp_branch_to_remove) and not exists (select 1 from snapshot_branches sbs where sbs.branch_id = sb.object_id and sbs.snapshot_id not in (select snapshot_id from snp_to_remove)); copy (select '-- snapshot') to stdout; copy (select '') to stdout; copy (select * from snapshot where object_id in (select snapshot_id from snp_to_remove)) to stdout with (format csv, header); copy (select '') to stdout; --delete from snapshot where object_id in (select snapshot_id from snp_to_remove); copy (select '-- release') to stdout; copy (select '') to stdout; copy (select * from release where id in (select id from objects_to_remove where type = 'rel')) to stdout with (format csv, header); copy (select '') to stdout; --delete from release where id in (select id from objects_to_remove where type = 'rel'); copy (select '-- revision_history') to stdout; copy (select '') to stdout; copy (select * from revision_history where id in (select id from objects_to_remove where type = 'rev')) to stdout with (format csv, header); copy (select '') to stdout; --delete from revision_history where id in (select id from objects_to_remove where type = 'rev'); copy (select '-- revision') to stdout; copy (select '') to stdout; copy (select * from revision where id in (select id from objects_to_remove where type = 'rev')) to stdout with (format csv, header); copy (select '') to stdout; --delete from revision where id in (select id from objects_to_remove where type = 'rev'); create temp table de_dir_to_remove on commit drop as select unnest(dir_entries) as id from directory inner join objects_to_remove otr on directory.id = otr.id and otr.type = 'dir'; create temp table de_file_to_remove on commit drop as select unnest(file_entries) as id from directory inner join objects_to_remove otr on directory.id = otr.id and otr.type = 'dir'; create temp table de_rev_to_remove on commit drop as select unnest(rev_entries) as id from directory inner join objects_to_remove otr on directory.id = otr.id and otr.type = 'dir'; copy (select '-- directory') to stdout; copy (select '') to stdout; copy (select dir_id as id, name, perms, w.type, target from objects_to_remove otr join lateral swh_directory_walk_one(id) w on true where otr.type = 'dir') to stdout with (format csv, header); copy (select '') to stdout; --delete from directory where id in (select id from objects_to_remove where type = 'dir'); copy (select '-- content') to stdout; copy (select '') to stdout; copy (select * from content where sha1_git in (select id from objects_to_remove where type = 'cnt')) to stdout with (format csv, header); copy (select '') to stdout; --delete from content where sha1_git in (select id from objects_to_remove where type = 'cnt'); -ROLLBACK;''' +ROLLBACK;""" +if __name__ == "__main__": + graph = pickle.load(open(sys.argv[1], "rb")) -if __name__ == '__main__': - graph = pickle.load(open(sys.argv[1], 'rb')) - - nodes = graph.vs.select(predecessors_outside_subgraph_eq=False)["swhid"] + nodes = graph.vs.select(has_inbound_edges_outside_subgraph_eq=False)["swhid"] values = ", ".join( f"('{swhid.object_type.value}', '\\x{swhid.object_id.hex()}')" for swhid in nodes ) print(SQL_TEMPLATE % values) diff --git a/takedowns/list_objects.py b/takedowns/list_objects.py index bc4475b..bb3a79a 100644 --- a/takedowns/list_objects.py +++ b/takedowns/list_objects.py @@ -1,676 +1,777 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging import os import pickle import sys -from typing import (Callable, Collection, Dict, Iterable, Iterator, Optional, +from typing import (Callable, Collection, Dict, Iterator, List, Optional, Tuple, Union) from igraph import Graph as _Graph from igraph import Vertex, plot, summary from requests import Session -from swh.model.identifiers import ExtendedObjectType as ObjectType -from swh.model.identifiers import ExtendedSWHID as SWHID from swh.model.model import Revision, TargetType +from swh.model.swhids import ExtendedObjectType as ObjectType +from swh.model.swhids import ExtendedSWHID as SWHID from swh.storage import get_storage from swh.storage.algos.origin import (iter_origin_visit_statuses, iter_origin_visits) from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PgStorage logger = logging.getLogger(__name__) @functools.lru_cache(32) def object_type(tt: Union[str, TargetType]) -> ObjectType: value: Optional[str] = None if tt == "file": return ObjectType.CONTENT elif isinstance(tt, str): value = tt name = tt else: name = tt.name if value is not None: try: return ObjectType(value) except ValueError: pass return ObjectType[name] class Graph(_Graph): """A graph, backed by igraph, with uniquely named vertices""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._vertices: Dict[str, int] = {} def add_vertex(self, name, *args, **kwargs): try: return self.vs[self._vertices[name]] except (KeyError, IndexError): ret = super().add_vertex(name, *args, **kwargs) self._vertices[name] = ret.index return ret def add_node_to_graph(graph: Graph, swhid: SWHID) -> Tuple[Vertex, bool]: """Add the given `swhid` as a new vertex in the `graph`. Arguments: - graph: the graph on which to do the processing - swhid: the new object to add to the graph Returns: - the new Vertex for the given `swhid` (on which one can set attributes after processing, if needed) - a boolean whether the vertex was created or not """ node_name = str(swhid) created = node_name not in graph._vertices vertex = graph.add_vertex( node_name, swhid=swhid, fetched=swhid.object_type == ObjectType.CONTENT ) return vertex, created -def init_graph(swhid: SWHID, pickle_filename: str) -> Graph: +def init_graph(swhids: List[SWHID], pickle_filename: str) -> Graph: + """Initialize the Graph structure, either from a pickled object, or from a single given swhid. + Arguments: + - swhid: the primary anchor swhid for the graph in question + - pickle_filename: a the filename to a pickle file that we'd load to initialize + the graph + + """ if os.path.exists(pickle_filename): try: ret = pickle.load(open(pickle_filename, "rb")) if not isinstance(ret, Graph): raise TypeError("Unknown pickle data") logger.debug("Known nodes: %s", len(ret._vertices)) return ret except Exception: logger.exception("Could not load pickle file, fallback to basic graph") # Initialize the graph with the given swhids ret = Graph(directed=True) - add_node_to_graph(ret, swhid) + for swhid in swhids: + add_node_to_graph(ret, swhid) return ret -def get_descendents_graph( - graph: Graph, - storage: StorageInterface, - graph_baseurl: Optional[str] = None, +def populate_subtrees( + graph: Graph, storage: StorageInterface, graph_baseurl: Optional[str] = None, ): - """Get a graph containing all descendents of the set of `swhids`. + """Populate the graph data structure with the subtrees of all the swhids already + contained in the graph. Arguments: - swhids: the set of root swhids to process - - storage: the instance of swh.storage on which to search the swhids and their descendents. + - storage: the instance of swh.storage on which to search the swhids and their outbound edges. - graph_baseurl: the base URL of an instance of swh.graph """ if "fetched" not in graph.vs.attribute_names(): - logger.info("Descendents already fetched") + logger.info("Outbound edges already fetched") return session = Session() iteration = 0 while True: iteration += 1 vertices = graph.vs.select(fetched_eq=False) logger.debug( "Iteration %s: %s nodes to fetch (%s known)", iteration, len(vertices), len(graph.vs), ) if not vertices: break for i, vertex in enumerate(vertices): if vertex["fetched"]: continue - add_descendents_to_graph( + add_outbound_edges_to_graph( graph, vertex["swhid"], storage, graph_baseurl, session ) vertex["fetched"] = True if (i + 1) % 100 == 0: logger.debug( "Iteration %s: fetched %s nodes (%s known)", iteration, i + 1, len(graph.vs), ) del graph.vs["fetched"] return graph -def descendents_from_swh_graph( - graph: Graph, - swhid: SWHID, - graph_baseurl: str, - session: Optional[Session] = None, +def outbound_edges_from_swh_graph( + graph: Graph, swhid: SWHID, graph_baseurl: str, session: Optional[Session] = None, ): - """Get the descendents of an object from swh.graph (based at `graph_baseurl`). + """Get the subtree of objects referenced by a given one, from swh.graph (based at `graph_baseurl`). - Returns `True` if the list of descendents found is (supposed to be) exhaustive. + Returns `True` if the list of outbound edges found is (expected to be) exhaustive. """ if not session: session = Session() - edges_url = f"{graph_baseurl.rstrip('/')}/graph/visit/edges/{swhid}" + edges_url = f"{graph_baseurl.rstrip('/')}/graph/visit/edges/{swhid}/" response = session.get(edges_url, stream=True) if response.status_code == 404: logger.debug("Object %s not found in swh.graph", swhid) return False response.raise_for_status() - logger.debug("Processing descendents of %s from swh.graph...", swhid) + logger.debug("Processing outbound edges of %s from swh.graph...", swhid) new_nodes = 0 known_nodes: Dict[str, Tuple[int, bool]] = {} def add_vertex(name: str): nonlocal new_nodes swhid = SWHID.from_string(name) vertex, created = add_node_to_graph(graph, swhid) if created and swhid.object_type != ObjectType.ORIGIN: vertex["fetched"] = True new_nodes += created known_nodes[name] = vertex.index, created def add_edges(): for node in node_batch: if node not in known_nodes: add_vertex(node) edges_to_add = [] for left, right in edge_batch: left_id, created = known_nodes[left] if not created: continue right_id, _ = known_nodes[right] edges_to_add.append((left_id, right_id)) graph.add_edges(edges_to_add) edge_batch[:] = [] node_batch.clear() edge_batch = [] node_batch = set() edges = 0 for line in response.iter_lines(): edges += 1 left, right = line.decode().strip().split() if left not in node_batch and left not in known_nodes: node_batch.add(left) if right not in node_batch and right not in known_nodes: node_batch.add(right) edge_batch.append((left, right)) if edges % 1000 == 0: logger.debug("Read %s edges (%s nodes added so far)", edges, new_nodes) if len(edge_batch) >= 1000000 or len(node_batch) >= 100000: logger.debug( "Registering %s new edges (%s new nodes)...", len(edge_batch), len(node_batch), ) add_edges() add_edges() logger.info( - "Found %s new nodes from %s in swh.graph", - new_nodes, - swhid, + "Found %s new nodes from %s in swh.graph", new_nodes, swhid, ) return swhid.object_type != ObjectType.ORIGIN -def content_descendents_from_storage( - graph: Graph, swhid: SWHID, storage: StorageInterface +def content_outbound_edges_from_storage( + storage: StorageInterface, swhid: SWHID, ): return -def directory_descendents_from_storage( - graph: Graph, swhid: SWHID, storage: StorageInterface +def directory_outbound_edges_from_storage( + storage: StorageInterface, swhid: SWHID ) -> Iterator[Tuple[SWHID, SWHID]]: for entry in storage.directory_ls(swhid.object_id): succ_type = object_type(entry["type"]) succ_swhid = SWHID(object_type=succ_type, object_id=entry["target"]) if succ_type == ObjectType.REVISION: logger.info("Ignored submodule edge %s -> %s", swhid, succ_swhid) continue yield (swhid, succ_swhid) -def revision_descendents_from_storage( - graph: Graph, swhid: SWHID, storage: StorageInterface +def revision_outbound_edges_from_storage( + storage: StorageInterface, swhid: SWHID, ) -> Iterator[Tuple[SWHID, SWHID]]: for rev_d in storage.revision_log([swhid.object_id], limit=100): rev = Revision.from_dict(rev_d) rev_swhid = rev.swhid().to_extended() dir_swhid = SWHID(object_type=ObjectType.DIRECTORY, object_id=rev.directory) yield (rev_swhid, dir_swhid) for succ_rev in rev.parents: succ_swhid = SWHID(object_type=ObjectType.REVISION, object_id=succ_rev) yield (rev_swhid, succ_swhid) -def release_descendents_from_storage( - graph: Graph, swhid: SWHID, storage: StorageInterface +def release_outbound_edges_from_storage( + storage: StorageInterface, swhid: SWHID ) -> Iterator[Tuple[SWHID, SWHID]]: [rel] = storage.release_get([swhid.object_id]) if not rel: logger.warning("Release %s not found", swhid) return assert rel.target target_swhid = SWHID(object_id=rel.target, object_type=object_type(rel.target_type)) yield (swhid, target_swhid) -def snapshot_descendents_from_storage( - graph: Graph, swhid: SWHID, storage: StorageInterface +def snapshot_outbound_edges_from_storage( + storage: StorageInterface, swhid: SWHID ) -> Iterator[Tuple[SWHID, SWHID]]: snp = snapshot_get_all_branches(storage, swhid.object_id) if not snp: logger.warning("Snapshot %s not found", swhid) return for branch in snp.branches.values(): if branch is None or branch.target_type.value == "alias": continue target_swhid = SWHID( object_id=branch.target, object_type=object_type(branch.target_type) ) yield (swhid, target_swhid) -def origin_descendents_from_storage( - graph: Graph, swhid: SWHID, storage: StorageInterface +def origin_outbound_edges_from_storage( + storage: StorageInterface, swhid: SWHID, ) -> Iterator[Tuple[SWHID, SWHID]]: [origin] = storage.origin_get_by_sha1([swhid.object_id]) if not origin: logger.warning("Origin %s not found", swhid) return for visit in iter_origin_visits(storage, origin["url"]): for status in iter_origin_visit_statuses(storage, visit.origin, visit.visit): if status.snapshot: snapshot_swhid = SWHID( object_id=status.snapshot, object_type=ObjectType.SNAPSHOT ) yield (swhid, snapshot_swhid) -descendents_from_storage_map: Dict[ - ObjectType, - Callable[[Graph, SWHID, StorageInterface], Iterator[Tuple[SWHID, SWHID]]], +outbound_edges_from_storage_map: Dict[ + ObjectType, Callable[[StorageInterface, SWHID], Iterator[Tuple[SWHID, SWHID]]], ] = { - ObjectType.CONTENT: content_descendents_from_storage, - ObjectType.DIRECTORY: directory_descendents_from_storage, - ObjectType.REVISION: revision_descendents_from_storage, - ObjectType.RELEASE: release_descendents_from_storage, - ObjectType.SNAPSHOT: snapshot_descendents_from_storage, - ObjectType.ORIGIN: origin_descendents_from_storage, + ObjectType.CONTENT: content_outbound_edges_from_storage, + ObjectType.DIRECTORY: directory_outbound_edges_from_storage, + ObjectType.REVISION: revision_outbound_edges_from_storage, + ObjectType.RELEASE: release_outbound_edges_from_storage, + ObjectType.SNAPSHOT: snapshot_outbound_edges_from_storage, + ObjectType.ORIGIN: origin_outbound_edges_from_storage, } -def add_descendents_to_graph( +def add_outbound_edges_to_graph( graph: Graph, swhid: SWHID, storage: StorageInterface, graph_baseurl: Optional[str] = None, session: Optional[Session] = None, ): obj_type = swhid.object_type if graph_baseurl: - all_found = descendents_from_swh_graph(graph, swhid, graph_baseurl, session) + all_found = outbound_edges_from_swh_graph(graph, swhid, graph_baseurl, session) if all_found: return else: - logger.debug("Getting other descendents of %s from swh.storage", swhid) + logger.debug("Getting other outbound edges of %s from swh.storage", swhid) - descendents_from_storage = descendents_from_storage_map.get(obj_type) - if descendents_from_storage: + outbound_edges_from_storage_fn = outbound_edges_from_storage_map.get(obj_type) + if outbound_edges_from_storage_fn: new_edges = [] - for left, right in descendents_from_storage(graph, swhid, storage): + for left, right in outbound_edges_from_storage_fn(storage, swhid): left_vertex, created = add_node_to_graph(graph, left) if left_vertex["fetched"]: continue right_vertex, created = add_node_to_graph(graph, right) new_edges.append((left_vertex.index, right_vertex.index)) graph.add_edges(new_edges) for index in set(left for left, _ in new_edges): graph.vs[index]["fetched"] = True else: raise ValueError("Unknown object type: %s" % obj_type) -def check_predecessors_outside_graph( +def record_inbound_edges_outside_graph( graph: Graph, storage: StorageInterface, graph_baseurl: Optional[str] = None, session: Optional[Session] = None, ): + """Check if objects have inbound edges outside the subgraph""" if graph_baseurl and session is None: session = Session() - graph.vs["predecessors_outside_subgraph"] = False - graph.vs["touched"] = False + if "inbound_edges_checked" not in graph.vs.attribute_names(): + graph.vs["inbound_edges_checked"] = False + + if "has_inbound_edges_outside_subgraph" not in graph.vs.attribute_names(): + graph.vs["has_inbound_edges_outside_subgraph"] = False + + total_nodes = len(graph.vs) + inbound_unknown = len(graph.vs.select(inbound_edges_checked=False)) + logger.info( + "Total nodes: %s, inbound nodes unknown: %s", total_nodes, inbound_unknown + ) - for vid in graph.topological_sorting(): + for count, vid in enumerate(graph.topological_sorting()): vertex = graph.vs[vid] - vertex["touched"] = True swhid = vertex["swhid"] + if count == 0 or count + 1 == total_nodes or count % 100 == 99: + logger.info("Checking inbound edges for node %s/%s", count + 1, total_nodes) + + if vertex["inbound_edges_checked"]: + continue + if swhid.object_type == ObjectType.ORIGIN: + vertex["inbound_edges_checked"] = True continue for pred in vertex.predecessors(): - if not pred["touched"]: - logger.warning("Processing %s: predecessor %s has not been touched!", swhid, pred["swhid"]) + if not pred["inbound_edges_checked"]: + logger.warning( + "record_inbound_edges_outside_graph %s: predecessor %s has not been checked yet!", + swhid, + pred["swhid"], + ) raise ValueError("toposort broken!") - if any(pred["predecessors_outside_subgraph"] for pred in vertex.predecessors()): - vertex["predecessors_outside_subgraph"] = True + if any( + pred["has_inbound_edges_outside_subgraph"] for pred in vertex.predecessors() + ): + vertex["has_inbound_edges_outside_subgraph"] = True + vertex["inbound_edges_checked"] = True continue - if graph_baseurl: - if check_antecedents_swh_graph(vertex, graph_baseurl, session): - logger.info("Found antecedent for %s in swh.graph", swhid) - vertex["predecessors_outside_subgraph"] = True - continue - predecessors = {pred["swhid"] for pred in vertex.predecessors()} - if swhid.object_type == ObjectType.SNAPSHOT: - if check_antecedents_storage_visit(storage, swhid, predecessors): - logger.info("Found visit with snapshot %s", swhid) - vertex["predecessors_outside_subgraph"] = True - continue + result = find_one_inbound_edge( + swhid, predecessors, storage, graph_baseurl, session + ) + if result: + found, reason = result + logger.debug("Found inbound edge for %s using %s: %s", swhid, reason, found) + vertex["has_inbound_edges_outside_subgraph"] = True + vertex["inbound_edge_outside_subgraph"] = found - if check_antecedents_storage_snapshot(storage, swhid, predecessors): - logger.info("Found snapshot containing %s", swhid) - vertex["predecessors_outside_subgraph"] = True - continue + vertex["inbound_edges_checked"] = True - if swhid.object_type != ObjectType.SNAPSHOT: - if check_antecedents_storage_release(storage, swhid, predecessors): - logger.info("Found release containing %s", swhid) - vertex["predecessors_outside_subgraph"] = True - continue - if swhid.object_type == ObjectType.REVISION: - if check_antecedents_storage_revision(storage, swhid, predecessors): - logger.info("Found revision with %s as parent", swhid) - vertex["predecessors_outside_subgraph"] = True - continue +def find_one_inbound_edge( + swhid: SWHID, + known_predecessors: Collection[SWHID], + storage: StorageInterface, + graph_baseurl: Optional[str] = None, + session: Optional[Session] = None, +) -> Optional[Tuple[SWHID, str]]: + """Find one inbound edge for `swhid` outside of `known_predecessors`. - if swhid.object_type == ObjectType.DIRECTORY: - if check_antecedents_storage_dir_in_rev(storage, swhid, predecessors): - logger.info("Found revision with %s as directory", swhid) - vertex["predecessors_outside_subgraph"] = True - continue + Arguments: + swhid: the object for which we should look for inbound edges + known_predecessors: known objects that we should ignore as predecessors for `swhid` + storage: the storage to check against + graph_baseurl: the url of the swh.graph API to query against + session: a common requests session for swh.graph API calls - if check_antecedents_storage_directory(storage, swhid, predecessors): - logger.info("Found directory with %s in it", swhid) - vertex["predecessors_outside_subgraph"] = True - continue + Returns: + a couple with the `swhid` for a predecessor that was not previously known, and the way it was found + """ + + if graph_baseurl: + found = get_one_inbound_edge_swh_graph( + graph_baseurl, swhid, known_predecessors, session=session + ) + if found: + return found, "swh.graph" + + if swhid.object_type == ObjectType.SNAPSHOT: + found = get_one_inbound_edge_storage_visit(storage, swhid, known_predecessors) + if found: + return found, "swh.storage:snapshot_in_visit" + + found = get_one_inbound_edge_storage_snapshot(storage, swhid, known_predecessors) + if found: + return found, "swh.storage:object_in_snapshot" + + if swhid.object_type != ObjectType.SNAPSHOT: + found = get_one_inbound_edge_storage_release(storage, swhid, known_predecessors) + if found: + return found, "swh.storage:object_in_release" + + if swhid.object_type == ObjectType.REVISION: + found = get_one_inbound_edge_storage_revision( + storage, swhid, known_predecessors + ) + if found: + return found, "swh.storage:revision_has_parent" + + if swhid.object_type == ObjectType.DIRECTORY: + found = get_one_inbound_edge_storage_dir_in_rev( + storage, swhid, known_predecessors + ) + if found: + return found, "swh.storage:directory_in_revision" - logger.info("No predecessors found for %s", swhid) + found = get_one_inbound_edge_storage_directory(storage, swhid, known_predecessors) + if found: + return found, "swh.storage:object_in_directory" + return None -def check_antecedents_swh_graph( - vertex: Vertex, graph_baseurl: str, session: Optional[Session] = None -) -> bool: + +def get_one_inbound_edge_swh_graph( + graph_baseurl: str, + swhid: SWHID, + known_predecessors: Collection[SWHID], + session: Optional[Session] = None, +) -> Optional[SWHID]: if not session: session = Session() - swhid = vertex["swhid"] + nodes_url = f"{graph_baseurl.rstrip('/')}/graph/neighbors/{swhid}/" + params = {"direction": "backward", "max_edges": str(len(known_predecessors) + 1)} - count_url = f"{graph_baseurl.rstrip('/')}/graph/neighbors/count/{swhid}" - nodes_url = f"{graph_baseurl.rstrip('/')}/graph/neighbors/{swhid}" - params = {"direction": "backward"} - - count_response = session.get(count_url, params=params) - if count_response.status_code == 404: - logger.debug("Object %s not found in swh.graph", swhid) - return False + response = session.get(nodes_url, params=params, stream=True) - count_response.raise_for_status() - known_neighbors = int(count_response.content) - logger.debug( - "%s has %s predecessors in swh.graph, %s in the current subgraph", - swhid, - known_neighbors, - vertex.indegree(), - ) - if known_neighbors > vertex.indegree(): - return True + if response.status_code == 404: + return None - response = session.get(nodes_url, params=params, stream=True) + response.raise_for_status() - found = False + found: Optional[SWHID] = None s_swhid = str(swhid) - s_pred = set(str(pred["swhid"]) for pred in vertex.predecessors()) + s_pred = {str(p_swhid) for p_swhid in known_predecessors} for node in response.iter_lines(decode_unicode=True): - if found or node == s_swhid: + node = node.strip() + if not node or found or node == s_swhid: continue if node not in s_pred: - found = True + found = SWHID.from_string(node) return found -def check_antecedents_storage_visit( - storage: StorageInterface, swhid: SWHID, predecessors: Collection[SWHID] -): - assert isinstance(storage, PgStorage), "Need to use a `local` storage instance" +def get_one_inbound_edge_storage_visit( + storage: StorageInterface, swhid: SWHID, known_predecessors: Collection[SWHID] +) -> Optional[SWHID]: + assert isinstance(storage, PgStorage), "Need to use a `postgresql` storage instance" with storage.db() as db: with db.transaction() as cur: - cur.execute(""" + cur.execute( + """ select distinct(digest(origin.url, 'sha1')) from origin inner join origin_visit_status on origin.id = origin_visit_status.origin where snapshot = %s limit %s - """, (swhid.object_id, len(predecessors) + 1)) - results = {SWHID(object_id=line[0], object_type=ObjectType.ORIGIN) for line in cur.fetchall()} + """, + (swhid.object_id, len(known_predecessors) + 1), + ) + results = { + SWHID(object_id=line[0], object_type=ObjectType.ORIGIN) + for line in cur.fetchall() + } - return bool(results - set(predecessors)) + outside = results - set(known_predecessors) + return next(iter(outside)) if outside else None -def check_antecedents_storage_snapshot( - storage: StorageInterface, swhid: SWHID, predecessors: Collection[SWHID] -): - assert isinstance(storage, PgStorage), "Need to use a `local` storage instance" + +def get_one_inbound_edge_storage_snapshot( + storage: StorageInterface, swhid: SWHID, known_predecessors: Collection[SWHID] +) -> Optional[SWHID]: + assert isinstance(storage, PgStorage), "Need to use a `postgresql` storage instance" with storage.db() as db: with db.transaction() as cur: - cur.execute(""" + cur.execute( + """ select distinct(snapshot.id) from snapshot inner join snapshot_branches on snapshot.object_id = snapshot_branches.snapshot_id inner join snapshot_branch on snapshot_branches.branch_id = snapshot_branch.object_id where snapshot_branch.target = %s and snapshot_branch.target_type = %s limit %s - """, (swhid.object_id, swhid.object_type.name.lower(), len(predecessors) + 1)) - results = {SWHID(object_id=line[0], object_type=ObjectType.SNAPSHOT) for line in cur.fetchall()} + """, + ( + swhid.object_id, + swhid.object_type.name.lower(), + len(known_predecessors) + 1, + ), + ) + results = { + SWHID(object_id=line[0], object_type=ObjectType.SNAPSHOT) + for line in cur.fetchall() + } - return bool(results - set(predecessors)) + outside = results - set(known_predecessors) + return next(iter(outside)) if outside else None -def check_antecedents_storage_release( - storage: StorageInterface, swhid: SWHID, predecessors: Collection[SWHID] -): - assert isinstance(storage, PgStorage), "Need to use a `local` storage instance" +def get_one_inbound_edge_storage_release( + storage: StorageInterface, swhid: SWHID, known_predecessors: Collection[SWHID] +) -> Optional[SWHID]: + assert isinstance(storage, PgStorage), "Need to use a `postgresql` storage instance" with storage.db() as db: with db.transaction() as cur: - cur.execute(""" + cur.execute( + """ select distinct(release.id) from release where release.target = %s and release.target_type = %s limit %s - """, (swhid.object_id, swhid.object_type.name.lower(), len(predecessors) + 1)) - results = {SWHID(object_id=line[0], object_type=ObjectType.RELEASE) for line in cur.fetchall()} + """, + ( + swhid.object_id, + swhid.object_type.name.lower(), + len(known_predecessors) + 1, + ), + ) + results = { + SWHID(object_id=line[0], object_type=ObjectType.RELEASE) + for line in cur.fetchall() + } - return bool(results - set(predecessors)) + outside = results - set(known_predecessors) + return next(iter(outside)) if outside else None -def check_antecedents_storage_revision( - storage: StorageInterface, swhid: SWHID, predecessors: Collection[SWHID] -): - assert isinstance(storage, PgStorage), "Need to use a `local` storage instance" +def get_one_inbound_edge_storage_revision( + storage: StorageInterface, swhid: SWHID, known_predecessors: Collection[SWHID] +) -> Optional[SWHID]: + assert isinstance(storage, PgStorage), "Need to use a `postgresql` storage instance" with storage.db() as db: with db.transaction() as cur: - cur.execute(""" + cur.execute( + """ select distinct(revision.id) from revision inner join revision_history using (id) where revision_history.parent_id = %s limit %s - """, (swhid.object_id, len(predecessors) + 1)) - results = {SWHID(object_id=line[0], object_type=ObjectType.REVISION) for line in cur.fetchall()} + """, + (swhid.object_id, len(known_predecessors) + 1), + ) + results = { + SWHID(object_id=line[0], object_type=ObjectType.REVISION) + for line in cur.fetchall() + } - return bool(results - set(predecessors)) + outside = results - set(known_predecessors) + return next(iter(outside)) if outside else None -def check_antecedents_storage_dir_in_rev( - storage: StorageInterface, swhid: SWHID, predecessors: Collection[SWHID] -): - assert isinstance(storage, PgStorage), "Need to use a `local` storage instance" +def get_one_inbound_edge_storage_dir_in_rev( + storage: StorageInterface, swhid: SWHID, known_predecessors: Collection[SWHID] +) -> Optional[SWHID]: + assert isinstance(storage, PgStorage), "Need to use a `postgresql` storage instance" with storage.db() as db: with db.transaction() as cur: - cur.execute(""" + cur.execute( + """ select distinct(revision.id) from revision where directory = %s limit %s - """, (swhid.object_id, len(predecessors) + 1)) - results = {SWHID(object_id=line[0], object_type=ObjectType.REVISION) for line in cur.fetchall()} + """, + (swhid.object_id, len(known_predecessors) + 1), + ) + results = { + SWHID(object_id=line[0], object_type=ObjectType.REVISION) + for line in cur.fetchall() + } - return bool(results - set(predecessors)) + outside = results - set(known_predecessors) + return next(iter(outside)) if outside else None -def check_antecedents_storage_directory( - storage: StorageInterface, swhid: SWHID, predecessors: Collection[SWHID] -): +def get_one_inbound_edge_storage_directory( + storage: StorageInterface, swhid: SWHID, known_predecessors: Collection[SWHID] +) -> Optional[SWHID]: + assert isinstance(storage, PgStorage), "Need to use a `postgresql` storage instance" + if swhid.object_type == ObjectType.DIRECTORY: - entries_table = 'directory_entry_dir' - entries_column = 'dir_entries' + entries_table = "directory_entry_dir" + entries_column = "dir_entries" min_limit = 1000 elif swhid.object_type == ObjectType.CONTENT: - entries_table = 'directory_entry_file' - entries_column = 'file_entries' + entries_table = "directory_entry_file" + entries_column = "file_entries" min_limit = 10000 else: return False with storage.db() as db: with db.transaction() as cur: - cur.execute(f""" + cur.execute( + f""" select id from {entries_table} where target = %s - """, (swhid.object_id,)) + """, + (swhid.object_id,), + ) entry_ids = {line[0] for line in cur.fetchall()} if not entry_ids: - return False + return None # needed to force an index scan - full_limit = len(predecessors) + 1 + full_limit = len(known_predecessors) + 1 base_limit = max(min_limit, full_limit) for entry_id in entry_ids: with storage.db() as db: with db.transaction() as cur: - cur.execute(f""" + cur.execute( + f""" select distinct(id) from directory where ARRAY[%s]::bigint[] <@ {entries_column} limit %s - """, (entry_id, base_limit)) + """, + (entry_id, base_limit), + ) for line in cur: - if SWHID(object_id=line[0], object_type=ObjectType.DIRECTORY) not in predecessors: - return True - - return False + found = SWHID(object_id=line[0], object_type=ObjectType.DIRECTORY) + if found not in known_predecessors: + return found def plot_graph(graph: Graph): color_dict = { ObjectType.CONTENT: "pink", ObjectType.DIRECTORY: "lavender", ObjectType.REVISION: "orchid", ObjectType.RELEASE: "olive", ObjectType.SNAPSHOT: "aqua", ObjectType.ORIGIN: "khaki", } graph.vs["label"] = [swhid.object_id.hex()[:8] for swhid in graph.vs["swhid"]] plot( graph, "%s.svg" % graph.vs[0]["swhid"], layout=graph.layout_sugiyama(), vertex_color=[ color_dict.get(swhid.object_type, "red") for swhid in graph.vs["swhid"] ], bbox=(10000, 100000), ) del graph.vs["label"] if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s" ) logger.setLevel(logging.DEBUG) graph: Optional[Graph] = None try: storage = get_storage( - cls="local", db="service=swh", objstorage={"cls": "memory"} + cls="postgresql", db="service=swh", objstorage={"cls": "memory"} ) - swhid = SWHID.from_string(sys.argv[1]) + swhids = [SWHID.from_string(arg) for arg in sys.argv[1:]] + swhid = swhids[0] graph_baseurl = "http://granet.internal.softwareheritage.org:5009/" - graph = init_graph(swhid, pickle_filename=f"{swhid}.pickle") + graph = init_graph(swhids, pickle_filename=f"{swhid}.pickle") - get_descendents_graph( - graph, - storage, - graph_baseurl, + populate_subtrees( + graph, storage, graph_baseurl, ) - check_predecessors_outside_graph(graph, storage, graph_baseurl) + record_inbound_edges_outside_graph(graph, storage, graph_baseurl) finally: if graph: date = datetime.datetime.now().strftime("%Y%m%dT%H%M%S") pickle_name = f"{swhid}.{date}.pickle" logger.info("Dumping to %s", pickle_name) graph.write_pickle(pickle_name) # plot_graph(graph) summary(graph) - logger.info("Predecessors found: %s", len(graph.vs.select(predecessors_outside_subgraph_eq=True))) - logger.info("Predecessors not found: %s", len(graph.vs.select(predecessors_outside_subgraph_eq=False))) + logger.info( + "Nodes with external inbound edges found: %s", + len(graph.vs.select(has_inbound_edges_outside_subgraph_eq=True)), + ) + logger.info( + "Nodes with no external inbound edges found: %s", + len(graph.vs.select(has_inbound_edges_outside_subgraph_eq=False)), + ) diff --git a/takedowns/yaml_manifest.py b/takedowns/yaml_manifest.py index 65b75c5..b79136e 100644 --- a/takedowns/yaml_manifest.py +++ b/takedowns/yaml_manifest.py @@ -1,40 +1,34 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle import sys import yaml from list_objects import Graph if __name__ == "__main__": graph = pickle.load(open(sys.argv[1], "rb")) nodes_to_remove = [ - str(s) for s in graph.vs.select(predecessors_outside_subgraph_eq=False)["swhid"] + str(s) for s in graph.vs.select(has_inbound_edges_outside_subgraph_eq=False)["swhid"] ] nodes_to_keep = [ - str(s) for s in graph.vs.select(predecessors_outside_subgraph_eq=True)["swhid"] + str(s) for s in graph.vs.select(has_inbound_edges_outside_subgraph_eq=True)["swhid"] ] yaml.dump( { "version": 1, - "request": { - "date": None, - "object": None, - }, - "decision": { - "date": None, - "action": None, - }, + "request": {"date": None, "object": None,}, + "decision": {"date": None, "action": None,}, "affected_nodes": { "kept": sorted(nodes_to_keep), "removed": sorted(nodes_to_remove), }, }, - sys.stdout + sys.stdout, )