diff --git a/olasd/consistency/check_objects.py b/olasd/consistency/check_objects.py index f2a5755..d5ec35b 100755 --- a/olasd/consistency/check_objects.py +++ b/olasd/consistency/check_objects.py @@ -1,68 +1,68 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import pprint import sys import psycopg2 from swh.storage import converters from swh.model import identifiers from utils import DSN, RELEASE_COLUMNS, REVISION_COLUMNS, process_query, copy_identifiers id_to_str = identifiers.identifier_to_str RELEASE_QUERY = ''' copy ( select %s from tmp_bytea t left join release r on t.id = r.id left join person a on r.author = a.id ) to stdout ''' % ', '.join('%s as %s' % (column, alias) for column, alias, _ in RELEASE_COLUMNS) REVISION_QUERY = ''' copy ( select %s from tmp_bytea t left join revision r on t.id = r.id left join person a on r.author = a.id left join person c on r.committer = c.id ) to stdout ''' % ', '.join('%s as %s' % (column, alias) for column, alias, _ in REVISION_COLUMNS) def releases_from_file(cursor, filename): copy_identifiers(cursor, filename) yield from process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, converters.db_to_release) def revisions_from_file(cursor, filename): copy_identifiers(cursor, filename) yield from process_query(cursor, REVISION_QUERY, REVISION_COLUMNS, converters.db_to_revision) if __name__ == '__main__': if len(sys.argv) != 3: sys.exit(2) if sys.argv[1] == 'release': iterator = releases_from_file identifier_fn = identifiers.release_identifier elif sys.argv[1] == 'revision': iterator = revisions_from_file identifier_fn = identifiers.revision_identifier else: sys.exit(2) db = psycopg2.connect(DSN) cursor = db.cursor() for object in iterator(cursor, sys.argv[2]): intrinsic_id = id_to_str(object['id']) computed_id = id_to_str(identifier_fn(object)) if computed_id != intrinsic_id: print(intrinsic_id) diff --git a/olasd/consistency/check_swh.py b/olasd/consistency/check_swh.py index bd45f72..9f79ec0 100755 --- a/olasd/consistency/check_swh.py +++ b/olasd/consistency/check_swh.py @@ -1,52 +1,52 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import psycopg2 from swh.storage import converters as db_converters from swh.model import identifiers from utils import DSN, REVISION_COLUMNS, RELEASE_COLUMNS, process_query id_to_str = identifiers.identifier_to_str REVISION_QUERY = ''' copy ( select %s from revision r left join person a on r.author = a.id left join person c on r.committer = c.id ) to stdout ''' % ', '.join('%s as %s' % (column, alias) for column, alias, _ in REVISION_COLUMNS) RELEASE_QUERY = ''' copy ( select %s from release r left join person a on r.author = a.id ) to stdout ''' % ', '.join('%s as %s' % (column, alias) for column, alias, _ in RELEASE_COLUMNS) if __name__ == '__main__': db = psycopg2.connect(DSN) cursor = db.cursor() with open('broken_releases', 'w') as broken_releases: for release in process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, db_converters.db_to_release): intrinsic_id = id_to_str(release['id']) computed_id = id_to_str(identifiers.release_identifier(release)) if intrinsic_id != computed_id: print(intrinsic_id, computed_id, file=broken_releases) with open('broken_revisions', 'w') as broken_revisions: for revision in process_query(cursor, REVISION_QUERY, REVISION_COLUMNS, db_converters.db_to_revision): intrinsic_id = id_to_str(revision['id']) computed_id = id_to_str(identifiers.revision_identifier(revision)) if intrinsic_id != computed_id: print(intrinsic_id, computed_id, file=broken_revisions) db.rollback() diff --git a/olasd/consistency/fix_releases.py b/olasd/consistency/fix_releases.py index 9afa734..f10d34e 100755 --- a/olasd/consistency/fix_releases.py +++ b/olasd/consistency/fix_releases.py @@ -1,38 +1,38 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import sys import psycopg2 from swh.storage import converters from swh.model import identifiers from utils import DSN, RELEASE_COLUMNS, process_query, copy_identifiers id_to_str = identifiers.identifier_to_str RELEASE_QUERY = ''' copy ( select %s from tmp_bytea t left join release r on t.id = r.id left join person a on r.author = a.id ) to stdout ''' % ', '.join('%s as %s' % (column, alias) for column, alias, _ in RELEASE_COLUMNS) def releases_from_file(cursor, filename): copy_identifiers(cursor, filename) yield from process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, converters.db_to_release) if __name__ == '__main__': db = psycopg2.connect(DSN) cursor = db.cursor() for release in releases_from_file(cursor, sys.argv[1]): intrinsic_id = id_to_str(release['id']) release['message'] = release['message'] + b'\n' fix_computed_id = identifiers.release_identifier(release) print('\\\\x%s\t\\\\x%s' % (intrinsic_id, fix_computed_id)) diff --git a/olasd/consistency/fix_revisions.py b/olasd/consistency/fix_revisions.py index ccaf34a..6541e90 100755 --- a/olasd/consistency/fix_revisions.py +++ b/olasd/consistency/fix_revisions.py @@ -1,73 +1,73 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import logging import pickle import os import sys from swh.loader.git.reader import GitCommitRemoteReader def get_revisions_and_origins_from_file(filename): revs = {} with open(filename, 'r') as f: for line in f: data = line.strip().split() revision, origins = data[0], data[1:] revs[revision] = set(origins) return revs def revisions_from_origin(origin_url): reader = GitCommitRemoteReader() return { id.decode(): revision for id, revision in reader.load(origin_url).items() } def dump_to_file(filename, data): with open('%s.tmp' % filename, 'wb') as f: pickle.dump(data, f) os.rename('%s.tmp' % filename, filename) if __name__ == '__main__': filename = sys.argv[1] snapshot = sys.argv[2] if os.path.exists(snapshot): with open(snapshot, 'rb') as f: revs, parsed_revs, origins = pickle.load(f) else: revs = get_revisions_and_origins_from_file(filename) origins = set() for urls in revs.values(): origins |= {url for url in urls if url.startswith('https://github.com/')} parsed_revs = {} dump_to_file(snapshot, [revs, parsed_revs, origins]) ctr = 0 while origins: print("%s origins, %s/%s revs remaining" % ( len(origins), len(revs) - len(parsed_revs), len(revs) )) ctr += 1 origin_url = origins.pop() try: origin_revs = revisions_from_origin(origin_url) except Exception as e: logging.exception(e) continue for id, revision in origin_revs.items(): if id in revs and id not in parsed_revs: parsed_revs[id] = revision if ctr >= 10: ctr = 0 dump_to_file(snapshot, [revs, parsed_revs, origins]) dump_to_file(snapshot, [revs, parsed_revs, origins]) diff --git a/olasd/consistency/list_origins.py b/olasd/consistency/list_origins.py index 4aa990f..e4e0ca7 100755 --- a/olasd/consistency/list_origins.py +++ b/olasd/consistency/list_origins.py @@ -1,58 +1,58 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import os import sys import threading import psycopg2 from utils import copy_identifiers, DSN def get_easy_origins(db, type, filename): query = """ COPY (select id, array( select url from origin where origin.id in ( select distinct oh.origin from occurrence_history oh where target = tmp_bytea.id and target_type='%(type)s' order by 1) ) from tmp_bytea) TO STDOUT """ % {'type': type} cur = db.cursor() r_fd, w_fd = os.pipe() def thread(): copy_identifiers(cur, filename) with open(w_fd, 'wb') as w_file: cur.copy_expert(query, w_file) read_thread = threading.Thread(target=thread) read_thread.start() with open(r_fd, 'rb') as r_file: for line in r_file: id, origins = line.decode().strip().split('\t') origins = origins[1:-1] if origins: origins = origins.split(',') else: origins = [] yield (id[3:], origins) read_thread.join() if __name__ == '__main__': if len(sys.argv) != 3: sys.exit(2) type, filename = sys.argv[1:] db = psycopg2.connect(DSN) for id, origins in get_easy_origins(db, type, filename): print(id, ' '.join(origins)) db.rollback() diff --git a/olasd/github/list_starred_repos.py b/olasd/github/list_starred_repos.py index a55c340..c5177d9 100755 --- a/olasd/github/list_starred_repos.py +++ b/olasd/github/list_starred_repos.py @@ -1,45 +1,45 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import sys import time import requests API_SEARCH_URL = 'https://api.github.com/search/repositories' REPOS_QUERY = 'stars:>=1000' def get_page(api, query, page, credentials): req = requests.get(api, auth=credentials, params={ 'q': query, 'sorted': 'stars', 'page': page, }) res = req.json() if 'items' not in res: print(res) return None urls = ['https://github.com/%s' % repo['full_name'] for repo in req.json()['items']] return { 'urls': urls, 'links': req.links, } if __name__ == '__main__': credentials = tuple(sys.argv[1:3]) page = 1 while True: res = get_page(API_SEARCH_URL, REPOS_QUERY, page, credentials) if not res: break for url in res['urls']: print(url) if not res['links'].get('next'): break page += 1 time.sleep(5) diff --git a/olasd/snapshots/create_snapshots.py b/olasd/snapshots/create_snapshots.py index 9d3ad27..af1e83f 100644 --- a/olasd/snapshots/create_snapshots.py +++ b/olasd/snapshots/create_snapshots.py @@ -1,139 +1,139 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 from ast import literal_eval from collections import defaultdict import csv import os import psycopg2 import sys import threading from swh.model.identifiers import snapshot_identifier, identifier_to_bytes from swh.storage.storage import Storage read_storage = None write_storage = None GET_OCCURRENCES_QUERY = '''COPY ( select origin, branch, target, target_type, visits from occurrence_history where origin >= %s order by origin, branch ) TO STDOUT CSV ''' DSN_READ = 'service=swh-s' DSN_WRITE = 'service=swh-swhstorage' def get_read_storage(): global read_storage if not read_storage: read_storage = Storage(DSN_READ, { 'cls': 'in-memory', 'args': {}, }) return read_storage def get_write_storage(): global write_storage if not write_storage: write_storage = Storage(DSN_WRITE, { 'cls': 'in-memory', 'args': {}, }) return write_storage def get_snapshots(origin): read_fd, write_fd = os.pipe() def get_data_thread(): db = psycopg2.connect(DSN_READ) cursor = db.cursor() cursor.copy_expert( cursor.mogrify(GET_OCCURRENCES_QUERY, (origin, )), open(write_fd, 'wb') ) cursor.close() db.close() data_thread = threading.Thread(target=get_data_thread) data_thread.start() snapshots = defaultdict(lambda: defaultdict(lambda: {'branches': {}})) current_origin = None for line in csv.reader(open(read_fd, 'r')): origin, branch, target, target_type, visits = line branch = bytes.fromhex(branch[2:]) target = bytes.fromhex(target[2:]) visits = literal_eval(visits) for visit in visits: snapshots[origin][visit]['branches'][branch] = { 'target': target, 'target_type': target_type, } if current_origin and origin != current_origin: # done processing the current origin; send snapshots for visit, snapshot in sorted(snapshots[current_origin].items()): for branch, target in snapshot['branches'].copy().items(): if target == { 'target': b'\x00' * 20, 'target_type': 'revision', }: snapshot['branches'][branch] = None snapshot_id = snapshot_identifier(snapshot) snapshot['id'] = identifier_to_bytes(snapshot_id) yield current_origin, visit, snapshot del snapshots[current_origin] current_origin = origin data_thread.join() if __name__ == '__main__': get_read_storage() get_write_storage() min_origin = 0 if len(sys.argv) >= 2: min_origin = int(sys.argv[1]) current_origin = None cursor = None for origin, visit, snapshot in get_snapshots(min_origin): if origin != current_origin and cursor: write_storage.db.conn.commit() cursor = None current_origin = origin if not cursor: cursor = write_storage.db.conn.cursor() cursor.execute("""\ select snapshot_id from origin_visit where origin=%s and visit=%s and status in ('full', 'partial')""", (origin, visit)) data = cursor.fetchone() if not data: print('origin_visit', origin, visit, 'not found') continue if not data[0]: write_storage.snapshot_add(origin, visit, snapshot, back_compat=False, cur=cursor) cursor.execute('drop table tmp_snapshot_branch') print('origin_visit', origin, visit, 'ok') elif data[0] != snapshot['id']: print('origin_visit', origin, visit, 'discrepancy: db has', data[0], 'computed', snapshot['id']) continue else: continue diff --git a/seirl/swh-dedup/deduper/__main__.py b/seirl/swh-dedup/deduper/__main__.py index 7df9647..c9d727d 100644 --- a/seirl/swh-dedup/deduper/__main__.py +++ b/seirl/swh-dedup/deduper/__main__.py @@ -1,30 +1,30 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 """compute Rabin fingerprints of Software Heritage content objects Read a list of Software Heritage content object IDs on standard input, fetch each of them from a (local) object storage and apply Rabin fingerprinting to its content. Store in a (postgres) DB the mapping between content objects and (Rabin-delimited) chunks. """ import sys from swh.model.hashutil import hash_to_bytes from deduper.deduper import Deduper OBJS_ROOT = '/home/seirl/content-samples' OBJS_SLICING = '0:2/2:4' DB_SERVICE = 'swh-dedup' # postgres service name def main(): deduper = Deduper(DB_SERVICE, OBJS_ROOT, OBJS_SLICING) for line in sys.stdin: # schedule tasks content_id = line.rstrip() deduper.dedup(hash_to_bytes(content_id)) if __name__ == '__main__': main() diff --git a/sql/anon-dump/anonymize-email b/sql/anon-dump/anonymize-email index b5d974d..09755db 100755 --- a/sql/anon-dump/anonymize-email +++ b/sql/anon-dump/anonymize-email @@ -1,34 +1,34 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import csv import hashlib import random import string import sys SALT_LENGTH = 2 CSV_DELIMITER = ',' CSV_QUOTECHAR = '"' def anonymize_email(email): salt = '' for i in range(0, 2): salt += random.choice(string.printable) return hashlib.sha1((salt + email).encode('ascii')).hexdigest() def main(): dump_in = csv.reader(sys.stdin, delimiter=CSV_DELIMITER, quotechar=CSV_QUOTECHAR) dump_out = csv.writer(sys.stdout, delimiter=CSV_DELIMITER, quotechar=CSV_QUOTECHAR) for (person_id, name, email) in dump_in: anon_email = anonymize_email(email) dump_out.writerow([person_id, name, anon_email]) if __name__ == '__main__': main() diff --git a/zack/swh-dedup/swh-dedup-blocks.py b/zack/swh-dedup/swh-dedup-blocks.py index 777f9a0..7466b0d 100755 --- a/zack/swh-dedup/swh-dedup-blocks.py +++ b/zack/swh-dedup/swh-dedup-blocks.py @@ -1,175 +1,175 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 """compute Rabin fingerprints of Software Heritage content objects Read a list of Software Heritage content object IDs on standard input, fetch each of them from a (local) object storage and apply Rabin fingerprinting to its content. Store in a (postgres) DB the mapping between content objects and (Rabin-delimited) chunks. """ import logging import psycopg2 import rabin import sys from hashlib import sha1 from multiprocessing import Process, Queue from psycopg2.extras import execute_values from swh.model.hashutil import hash_to_bytes from swh.objstorage import PathSlicingObjStorage as ObjStorage from swh.objstorage.exc import ObjNotFoundError OBJS_ROOT = '/srv/softwareheritage/objects' OBJS_SLICING = '0:2/2:4' DB_SERVICE = 'swh-dedup' # postgres service name RABIN_PARAMS = { 'prime': 3, 'window_size': 48, # bytes 'min_block_size': 512, # bytes 'average_block_size': 1 * 1024, # bytes 'max_block_size': 8 * 1024, # bytes } NUM_WORKERS = 4 def rabin_init(params): if 'prime' in params: rabin.set_prime(params['prime']) if 'window_size' in params: rabin.set_window_size(params['window_size']) if 'min_block_size' in params: rabin.set_min_block_size(params['min_block_size']) if 'average_block_size' in params: rabin.set_average_block_size(params['average_block_size']) if 'max_block_size' in params: rabin.set_max_block_size(params['max_block_size']) def db_insert_chunks(db_conn, content_id, length, chunks): with db_conn.cursor() as cur: cur.execute('INSERT INTO content (id, length) VALUES (%s, %s)', (content_id, length)) chunk_values = [] chunked_content_values = [] for (chunk_id, position, length) in chunks: chunk_values.append((chunk_id, length)) chunked_content_values.append((content_id, chunk_id, position)) execute_values(cur, '''INSERT INTO chunk (id, length) VALUES %s ON CONFLICT DO NOTHING''', chunk_values) execute_values(cur, '''INSERT INTO chunked_content (content_id, chunk_id, position) VALUES %s''', chunked_content_values) def print_summary(db_conn): with db_conn.cursor() as c: c.execute('SELECT count(*) FROM content') print('contents:', c.fetchone()[0]) c.execute('SELECT count(*) FROM chunk') print('chunks:', c.fetchone()[0]) c.execute('SELECT avg(length) FROM chunk') print('average chunk size: %.2f' % float(c.fetchone()[0])) c.execute('SELECT sum(length) FROM content') content_size = int(c.fetchone()[0]) print('total content size:', content_size) c.execute('SELECT sum(length) FROM chunk') chunk_size = int(c.fetchone()[0]) print('total chunk size: %d (%.2f%%)' % (chunk_size, float(chunk_size) / content_size * 100)) def dedup(db_conn, content_id, content): with db_conn.cursor() as c: # skip deduplication if content is known c.execute('SELECT 1 FROM content WHERE id = %s', [content_id]) if c.fetchone(): return # do Rabin fingerprinting r = rabin.Rabin() buf = bytearray() for data in content: buf.extend(data) # TODO avoid loading the entire content in memory r.update(data) with db_conn: # transaction chunks = [] if buf: # r.fingerprints() invoked on empty objects segfaults :-( for (position, length, _fpr) in r.fingerprints(): chunk = buf[position:(position+length)] chunk_id = sha1(chunk).digest() chunks.append((chunk_id, position, length)) db_insert_chunks(db_conn, content_id, len(buf), chunks) r.clear() def dedup_worker(task_queue, result_queue, obj_storage): db_conn = psycopg2.connect('service=%s' % DB_SERVICE) # persistent conn. while True: content_id = task_queue.get() if content_id is None: # no more tasks break try: dedup(db_conn, hash_to_bytes(content_id), obj_storage.get_stream(content_id)) result_queue.put((content_id, True)) except ObjNotFoundError: logging.warning('cannot find object "%s", skipping' % content_id) result_queue.put((content_id, False)) def progress_monitor(result_queue): obj_count = 0 while True: (content_id, _success) = result_queue.get() obj_count += 1 if obj_count % 1000 == 0: logging.info('processed %d objects, currently at %s' % (obj_count, content_id)) def main(): rabin_init(RABIN_PARAMS) obj_storage = ObjStorage(OBJS_ROOT, OBJS_SLICING) task_queue = Queue() result_queue = Queue() workers = [] for i in range(0, NUM_WORKERS): p = Process(target=dedup_worker, args=(task_queue, result_queue, obj_storage)) workers.append(p) p.start() monitor = Process(target=progress_monitor, args=(result_queue,)) monitor.start() for line in sys.stdin: # schedule tasks content_id = line.rstrip() task_queue.put(content_id) for i in range(0, NUM_WORKERS): # tell workers we're done task_queue.put(None) for p in workers: # wait for completion p.join() monitor.terminate() with psycopg2.connect('service=%s' % DB_SERVICE) as db_conn: print_summary(db_conn) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main()