diff --git a/olasd/consistency/check_releases.py b/olasd/consistency/check_releases.py new file mode 100755 index 0000000..5ff6954 --- /dev/null +++ b/olasd/consistency/check_releases.py @@ -0,0 +1,41 @@ +#!/usr/bin/python3 + +import pprint +import sys + +import psycopg2 + +from swh.storage import converters +from swh.model import identifiers + +from utils import DSN, RELEASE_COLUMNS, process_query, copy_identifiers + +id_to_str = identifiers.identifier_to_str + +RELEASE_QUERY = ''' +copy ( + select %s + from tmp_bytea t + left join release r on t.id = r.id + left join person a on r.author = a.id +) to stdout +''' % ', '.join('%s as %s' % (column, alias) + for column, alias, _ in RELEASE_COLUMNS) + + +def releases_from_file(cursor, filename): + copy_identifiers(cursor, filename) + yield from process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, + converters.db_to_release) + + +if __name__ == '__main__': + db = psycopg2.connect(DSN) + cursor = db.cursor() + for release in releases_from_file(cursor, sys.argv[1]): + intrinsic_id = id_to_str(release['id']) + computed_id = id_to_str(identifiers.release_identifier(release)) + if computed_id != intrinsic_id: + print(intrinsic_id) + else: + pprint.pprint(release, stream=sys.stderr) diff --git a/olasd/consistency/check_swh.py b/olasd/consistency/check_swh.py index ee4b847..bd45f72 100755 --- a/olasd/consistency/check_swh.py +++ b/olasd/consistency/check_swh.py @@ -1,122 +1,52 @@ #!/usr/bin/python3 -import os -import threading - import psycopg2 -import converters from swh.storage import converters as db_converters from swh.model import identifiers -id_to_str = identifiers.identifier_to_str - -DSN = ('host=somerset.internal.softwareheritage.org port=5433 ' - 'user=guest dbname=softwareheritage') +from utils import DSN, REVISION_COLUMNS, RELEASE_COLUMNS, process_query -REVISION_COLUMNS = [ - ("r.id", "id", converters.tobytes), - ("date", "date", converters.todate), - ("date_offset", "date_offset", converters.toint), - ("committer_date", "committer_date", converters.todate), - ("committer_date_offset", "committer_date_offset", converters.toint), - ("type", "type", converters.tostr), - ("directory", "directory", converters.tobytes), - ("message", "message", converters.tobytes), - ("synthetic", "synthetic", converters.tobool), - ("metadata", "metadata", converters.tojson), - ("date_neg_utc_offset", "date_neg_utc_offset", converters.tobool), - ("committer_date_neg_utc_offset", "committer_date_neg_utc_offset", - converters.tobool), - ("array(select parent_id::bytea from revision_history rh " - "where rh.id = r.id order by rh.parent_rank asc)", - "parents", converters.tolist), - ("a.id", "author_id", converters.toint), - ("a.name", "author_name", converters.tobytes), - ("a.email", "author_email", converters.tobytes), - ("a.fullname", "author_fullname", converters.tobytes), - ("c.id", "committer_id", converters.toint), - ("c.name", "committer_name", converters.tobytes), - ("c.email", "committer_email", converters.tobytes), - ("c.fullname", "committer_fullname", converters.tobytes), -] +id_to_str = identifiers.identifier_to_str REVISION_QUERY = ''' copy ( select %s from revision r left join person a on r.author = a.id left join person c on r.committer = c.id ) to stdout ''' % ', '.join('%s as %s' % (column, alias) for column, alias, _ in REVISION_COLUMNS) -RELEASE_COLUMNS = [ - ("r.id", "id", converters.tobytes), - ("date", "date", converters.todate), - ("date_offset", "date_offset", converters.toint), - ("comment", "comment", converters.tobytes), - ("r.name", "name", converters.tobytes), - ("synthetic", "synthetic", converters.tobool), - ("date_neg_utc_offset", "date_neg_utc_offset", converters.tobool), - ("target", "target", converters.tobytes), - ("target_type", "target_type", converters.tostr), - ("a.id", "author_id", converters.toint), - ("a.name", "author_name", converters.tobytes), - ("a.email", "author_email", converters.tobytes), - ("a.fullname", "author_fullname", converters.tobytes), -] - RELEASE_QUERY = ''' copy ( select %s from release r left join person a on r.author = a.id ) to stdout ''' % ', '.join('%s as %s' % (column, alias) for column, alias, _ in RELEASE_COLUMNS) -def process_query(query, columns, db_converter): - r_fd, w_fd = os.pipe() +if __name__ == '__main__': db = psycopg2.connect(DSN) + cursor = db.cursor() - def get_data_thread(): - cursor = db.cursor() - cursor.copy_expert(query, open(w_fd, 'wb')) - cursor.close() - db.commit() - - data_thread = threading.Thread(target=get_data_thread) - data_thread.start() - - r = open(r_fd, 'rb') - for line in r: - fields = { - alias: decoder(value) - for (_, alias, decoder), value - in zip(columns, line[:-1].decode('utf-8').split('\t')) - } - yield db_converter(fields) - - r.close() - - data_thread.join() - - -if __name__ == '__main__': with open('broken_releases', 'w') as broken_releases: - for release in process_query(RELEASE_QUERY, RELEASE_COLUMNS, + for release in process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, db_converters.db_to_release): intrinsic_id = id_to_str(release['id']) computed_id = id_to_str(identifiers.release_identifier(release)) if intrinsic_id != computed_id: print(intrinsic_id, computed_id, file=broken_releases) with open('broken_revisions', 'w') as broken_revisions: - for revision in process_query(REVISION_QUERY, REVISION_COLUMNS, + for revision in process_query(cursor, REVISION_QUERY, REVISION_COLUMNS, db_converters.db_to_revision): intrinsic_id = id_to_str(revision['id']) computed_id = id_to_str(identifiers.revision_identifier(revision)) if intrinsic_id != computed_id: print(intrinsic_id, computed_id, file=broken_revisions) + + db.rollback() diff --git a/olasd/consistency/fix_releases.py b/olasd/consistency/fix_releases.py new file mode 100755 index 0000000..9afa734 --- /dev/null +++ b/olasd/consistency/fix_releases.py @@ -0,0 +1,38 @@ +#!/usr/bin/python3 + +import sys + +import psycopg2 + +from swh.storage import converters +from swh.model import identifiers + +from utils import DSN, RELEASE_COLUMNS, process_query, copy_identifiers + +id_to_str = identifiers.identifier_to_str + +RELEASE_QUERY = ''' +copy ( + select %s + from tmp_bytea t + left join release r on t.id = r.id + left join person a on r.author = a.id +) to stdout +''' % ', '.join('%s as %s' % (column, alias) + for column, alias, _ in RELEASE_COLUMNS) + + +def releases_from_file(cursor, filename): + copy_identifiers(cursor, filename) + yield from process_query(cursor, RELEASE_QUERY, RELEASE_COLUMNS, + converters.db_to_release) + + +if __name__ == '__main__': + db = psycopg2.connect(DSN) + cursor = db.cursor() + for release in releases_from_file(cursor, sys.argv[1]): + intrinsic_id = id_to_str(release['id']) + release['message'] = release['message'] + b'\n' + fix_computed_id = identifiers.release_identifier(release) + print('\\\\x%s\t\\\\x%s' % (intrinsic_id, fix_computed_id)) diff --git a/olasd/consistency/list_origins.py b/olasd/consistency/list_origins.py new file mode 100755 index 0000000..bc6cca7 --- /dev/null +++ b/olasd/consistency/list_origins.py @@ -0,0 +1,33 @@ +#!/usr/bin/python3 + +import psycopg2 + +from utils import copy_identifiers, DSN + + +def process_data(db, type): + query = { + 'release': """ + select id, array( + select url from origin + where origin.id in ( + select distinct oh.origin + from occurrence_history oh + where target = tmp_bytea.id and target_type='release' + order by 1) + ) + from tmp_bytea + """, + }.get(type) + filename = "broken_%ss" % type + cur = db.cursor() + copy_identifiers(cur, filename) + cur.execute(query) + for id, origins in cur.fetchall(): + print(bytes(id).hex(), ' '.join(origins)) + + +if __name__ == '__main__': + db = psycopg2.connect(DSN) + process_data(db, 'release') + db.rollback() diff --git a/olasd/consistency/check_swh.py b/olasd/consistency/utils.py old mode 100755 new mode 100644 similarity index 60% copy from olasd/consistency/check_swh.py copy to olasd/consistency/utils.py index ee4b847..7429eeb --- a/olasd/consistency/check_swh.py +++ b/olasd/consistency/utils.py @@ -1,122 +1,93 @@ -#!/usr/bin/python3 - import os import threading -import psycopg2 - import converters -from swh.storage import converters as db_converters -from swh.model import identifiers - -id_to_str = identifiers.identifier_to_str DSN = ('host=somerset.internal.softwareheritage.org port=5433 ' 'user=guest dbname=softwareheritage') REVISION_COLUMNS = [ ("r.id", "id", converters.tobytes), ("date", "date", converters.todate), ("date_offset", "date_offset", converters.toint), ("committer_date", "committer_date", converters.todate), ("committer_date_offset", "committer_date_offset", converters.toint), ("type", "type", converters.tostr), ("directory", "directory", converters.tobytes), ("message", "message", converters.tobytes), ("synthetic", "synthetic", converters.tobool), ("metadata", "metadata", converters.tojson), ("date_neg_utc_offset", "date_neg_utc_offset", converters.tobool), ("committer_date_neg_utc_offset", "committer_date_neg_utc_offset", converters.tobool), ("array(select parent_id::bytea from revision_history rh " "where rh.id = r.id order by rh.parent_rank asc)", "parents", converters.tolist), ("a.id", "author_id", converters.toint), ("a.name", "author_name", converters.tobytes), ("a.email", "author_email", converters.tobytes), ("a.fullname", "author_fullname", converters.tobytes), ("c.id", "committer_id", converters.toint), ("c.name", "committer_name", converters.tobytes), ("c.email", "committer_email", converters.tobytes), ("c.fullname", "committer_fullname", converters.tobytes), ] -REVISION_QUERY = ''' -copy ( - select %s - from revision r - left join person a on r.author = a.id - left join person c on r.committer = c.id -) to stdout -''' % ', '.join('%s as %s' % (column, alias) - for column, alias, _ in REVISION_COLUMNS) - RELEASE_COLUMNS = [ ("r.id", "id", converters.tobytes), ("date", "date", converters.todate), ("date_offset", "date_offset", converters.toint), ("comment", "comment", converters.tobytes), ("r.name", "name", converters.tobytes), ("synthetic", "synthetic", converters.tobool), ("date_neg_utc_offset", "date_neg_utc_offset", converters.tobool), ("target", "target", converters.tobytes), ("target_type", "target_type", converters.tostr), ("a.id", "author_id", converters.toint), ("a.name", "author_name", converters.tobytes), ("a.email", "author_email", converters.tobytes), ("a.fullname", "author_fullname", converters.tobytes), ] -RELEASE_QUERY = ''' -copy ( - select %s - from release r - left join person a on r.author = a.id -) to stdout -''' % ', '.join('%s as %s' % (column, alias) - for column, alias, _ in RELEASE_COLUMNS) - -def process_query(query, columns, db_converter): +def process_query(cursor, query, columns, db_converter): r_fd, w_fd = os.pipe() - db = psycopg2.connect(DSN) def get_data_thread(): - cursor = db.cursor() cursor.copy_expert(query, open(w_fd, 'wb')) cursor.close() - db.commit() data_thread = threading.Thread(target=get_data_thread) data_thread.start() r = open(r_fd, 'rb') for line in r: fields = { alias: decoder(value) for (_, alias, decoder), value in zip(columns, line[:-1].decode('utf-8').split('\t')) } yield db_converter(fields) r.close() data_thread.join() -if __name__ == '__main__': - with open('broken_releases', 'w') as broken_releases: - for release in process_query(RELEASE_QUERY, RELEASE_COLUMNS, - db_converters.db_to_release): - intrinsic_id = id_to_str(release['id']) - computed_id = id_to_str(identifiers.release_identifier(release)) - if intrinsic_id != computed_id: - print(intrinsic_id, computed_id, file=broken_releases) - - with open('broken_revisions', 'w') as broken_revisions: - for revision in process_query(REVISION_QUERY, REVISION_COLUMNS, - db_converters.db_to_revision): - intrinsic_id = id_to_str(revision['id']) - computed_id = id_to_str(identifiers.revision_identifier(revision)) - if intrinsic_id != computed_id: - print(intrinsic_id, computed_id, file=broken_revisions) +def copy_identifiers(cursor, filename): + read_fd, write_fd = os.pipe() + + def filter_data_thread(): + with open(write_fd, 'w') as output_file: + with open(filename, 'r') as input_file: + for line in input_file: + print(r'\\x', line.split()[0], file=output_file) + + filter_thread = threading.Thread(target=filter_data_thread) + filter_thread.start() + + cursor.execute('select swh_mktemp_bytea()') + with open(read_fd, 'rb') as input_file: + cursor.copy_expert("copy tmp_bytea (id) from stdin", input_file) + + filter_thread.join()