diff --git a/vlorentz/cassandra_stream_graph.py b/vlorentz/cassandra_stream_graph.py new file mode 100644 index 0000000..e876558 --- /dev/null +++ b/vlorentz/cassandra_stream_graph.py @@ -0,0 +1,70 @@ +from threading import Event +import time + +from cassandra.query import SimpleStatement + +from swh.storage import get_storage + +storage = get_storage('cassandra', { + 'hosts': [ + '128.93.66.190', + '128.93.66.191', + '128.93.66.187', + '128.93.64.42', + ], + 'keyspace': 'swh_test', + 'objstorage': { + 'cls': 'memory', + 'args': {}, + }, +}) + +session = storage._proxy._session + +class PagedResultHandler(object): + + def __init__(self, query, callback): + self.callback = callback + self.total_rows = 0 + self.error = None + self.finished_event = Event() + + statement = SimpleStatement(query) + self.start_time = time.time() + self.future = session.execute_async(statement) + + self.future.add_callbacks( + callback=self.handle_page, + errback=self.handle_error) + + def handle_page(self, rows): + if self.future.has_more_pages: + self.future.start_fetching_next_page() + + self.total_rows += len(rows) + print('total rows: %s (%dk/s)' % ( + self.total_rows, + self.total_rows/(time.time()-self.start_time)/1000)) + + for row in rows: + self.callback(row) + + if not self.future.has_more_pages: + self.finished_event.set() + + def handle_error(self, exc): + print('%s' % exc) + self.finished_event.set() + + +with open('nodes.csv', 'w') as nodes_fd, open('edges.csv', 'w') as edges_fd: + def revision_callback(row): + rev_id = 'swh:1:rev:%s' % row.id.hex() + nodes_fd.write(rev_id + '\n') + edges_fd.write('%s swh:1:dir:%s\n' % (rev_id, row.directory.hex())) + for parent in row.parents: + edges_fd.write('%s swh:1:rev:%s\n' % (rev_id, parent.hex())) + + + handler = PagedResultHandler("SELECT token(id) as tok, id, directory, parents FROM revision", revision_callback) + handler.finished_event.wait()