diff --git a/olasd/consistency/check_objects.py b/olasd/consistency/check_objects.py new file mode 100755 index 0000000..f2a5755 --- /dev/null +++ b/olasd/consistency/check_objects.py @@ -0,0 +1,68 @@ +#!/usr/bin/python3 + +import pprint +import sys + +import psycopg2 + +from swh.storage import converters +from swh.model import identifiers + +from utils import DSN, RELEASE_COLUMNS, REVISION_COLUMNS, process_query, copy_identifiers + +id_to_str = identifiers.identifier_to_str + +RELEASE_QUERY = ''' +copy ( + select %s + from tmp_bytea t + left join release r on t.id = r.id + left join person a on r.author = a.id +) to stdout +''' % ', '.join('%s as %s' % (column, alias) + for column, alias, _ in RELEASE_COLUMNS) + +REVISION_QUERY = ''' +copy ( + select %s + from tmp_bytea t + left join revision r on t.id = r.id + left join person a on r.author = a.id + left join person c on r.committer = c.id +) to stdout +''' % ', '.join('%s as %s' % (column, alias) + for column, alias, _ in REVISION_COLUMNS) + + +def releases_from_file(cursor, filename): + copy_identifiers(cursor, filename) + yield from process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, + converters.db_to_release) + + +def revisions_from_file(cursor, filename): + copy_identifiers(cursor, filename) + yield from process_query(cursor, REVISION_QUERY, REVISION_COLUMNS, + converters.db_to_revision) + + +if __name__ == '__main__': + if len(sys.argv) != 3: + sys.exit(2) + + if sys.argv[1] == 'release': + iterator = releases_from_file + identifier_fn = identifiers.release_identifier + elif sys.argv[1] == 'revision': + iterator = revisions_from_file + identifier_fn = identifiers.revision_identifier + else: + sys.exit(2) + + db = psycopg2.connect(DSN) + cursor = db.cursor() + for object in iterator(cursor, sys.argv[2]): + intrinsic_id = id_to_str(object['id']) + computed_id = id_to_str(identifier_fn(object)) + if computed_id != intrinsic_id: + print(intrinsic_id) diff --git a/olasd/consistency/check_releases.py b/olasd/consistency/check_releases.py deleted file mode 100755 index 5ff6954..0000000 --- a/olasd/consistency/check_releases.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/python3 - -import pprint -import sys - -import psycopg2 - -from swh.storage import converters -from swh.model import identifiers - -from utils import DSN, RELEASE_COLUMNS, process_query, copy_identifiers - -id_to_str = identifiers.identifier_to_str - -RELEASE_QUERY = ''' -copy ( - select %s - from tmp_bytea t - left join release r on t.id = r.id - left join person a on r.author = a.id -) to stdout -''' % ', '.join('%s as %s' % (column, alias) - for column, alias, _ in RELEASE_COLUMNS) - - -def releases_from_file(cursor, filename): - copy_identifiers(cursor, filename) - yield from process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, - converters.db_to_release) - - -if __name__ == '__main__': - db = psycopg2.connect(DSN) - cursor = db.cursor() - for release in releases_from_file(cursor, sys.argv[1]): - intrinsic_id = id_to_str(release['id']) - computed_id = id_to_str(identifiers.release_identifier(release)) - if computed_id != intrinsic_id: - print(intrinsic_id) - else: - pprint.pprint(release, stream=sys.stderr) diff --git a/olasd/consistency/list_origins.py b/olasd/consistency/list_origins.py index bc6cca7..4aa990f 100755 --- a/olasd/consistency/list_origins.py +++ b/olasd/consistency/list_origins.py @@ -1,33 +1,58 @@ #!/usr/bin/python3 +import os +import sys +import threading + import psycopg2 from utils import copy_identifiers, DSN -def process_data(db, type): - query = { - 'release': """ - select id, array( +def get_easy_origins(db, type, filename): + query = """ + COPY (select id, array( select url from origin where origin.id in ( select distinct oh.origin from occurrence_history oh - where target = tmp_bytea.id and target_type='release' + where target = tmp_bytea.id and target_type='%(type)s' order by 1) ) - from tmp_bytea - """, - }.get(type) - filename = "broken_%ss" % type + from tmp_bytea) TO STDOUT + """ % {'type': type} cur = db.cursor() - copy_identifiers(cur, filename) - cur.execute(query) - for id, origins in cur.fetchall(): - print(bytes(id).hex(), ' '.join(origins)) + + r_fd, w_fd = os.pipe() + + def thread(): + copy_identifiers(cur, filename) + with open(w_fd, 'wb') as w_file: + cur.copy_expert(query, w_file) + + read_thread = threading.Thread(target=thread) + + read_thread.start() + + with open(r_fd, 'rb') as r_file: + for line in r_file: + id, origins = line.decode().strip().split('\t') + origins = origins[1:-1] + if origins: + origins = origins.split(',') + else: + origins = [] + yield (id[3:], origins) + + read_thread.join() if __name__ == '__main__': + if len(sys.argv) != 3: + sys.exit(2) + + type, filename = sys.argv[1:] db = psycopg2.connect(DSN) - process_data(db, 'release') + for id, origins in get_easy_origins(db, type, filename): + print(id, ' '.join(origins)) db.rollback()