Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/mongo/backend.py
- This file was added.
""" | |||||
Experimental implementation | |||||
""" | |||||
from datetime import datetime, timezone | |||||
from typing import Dict, Generator, Iterable, Optional, Set, Tuple | |||||
from pymongo import MongoClient, ASCENDING | |||||
from bson import ObjectId | |||||
from swh.model.model import Sha1Git | |||||
from ..interface import ( | |||||
EntityType, | |||||
ProvenanceResult, | |||||
RelationData, | |||||
RelationType, | |||||
RevisionData, | |||||
) | |||||
def get_mongo_db(): | |||||
client = MongoClient() | |||||
return client.provenance | |||||
class MongoBackEnd: #Rename to match ProvenanceStorageMongoDb | |||||
def __init__(self): | |||||
self.db = get_mongo_db() | |||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | |||||
from swh.provenance.interface import ProvenanceResult, RelationType | |||||
# get all the revisions | |||||
# iterate and find the earliest | |||||
content = self.db.content.find_one({'sha1': id}) | |||||
if not content: | |||||
return None | |||||
qry = {"_id": {'$in': [ObjectId(obj_id) for obj_id in content['revision']]}} | |||||
lst = [] | |||||
for revision in self.db.revision.find(qry): | |||||
for obj_id in content['revision']: | |||||
if ObjectId(obj_id) == revision['_id']: | |||||
path = content['revision'][str(revision['_id'])][0] | |||||
break | |||||
lst.append(ProvenanceResult(content=id, revision=revision['sha1'], | |||||
date=datetime.fromtimestamp(revision['ts'], timezone.utc), | |||||
origin=self.db.origin.find_one({'sha1': revision['preferred']})['url'], | |||||
path=path)) | |||||
print(lst) | |||||
return sorted(lst, key=lambda x: (x.date, x.revision, x.origin, x.path))[0] | |||||
def content_find_all(self, id: Sha1Git, limit: Optional[int] = None | |||||
) -> Generator[ProvenanceResult, None, None]: | |||||
content = self.db.content.find_one({'sha1': id}) | |||||
if not content: | |||||
return None | |||||
lst = [] | |||||
qry = {"_id": {'$in': [ObjectId(obj_id) for obj_id in content['revision']]}} | |||||
for revision in self.db.revision.find(qry): | |||||
for obj_id in content['revision']: | |||||
if ObjectId(obj_id) == revision['_id']: | |||||
path = content['revision'][str(revision['_id'])][0] | |||||
break | |||||
lst.append(ProvenanceResult(content=id, revision=revision['sha1'], | |||||
date=datetime.fromtimestamp(revision['ts'], timezone.utc), | |||||
origin=self.db.origin.find_one({'sha1': revision['preferred']})['url'], | |||||
path=path)) | |||||
yield from sorted(lst, key=lambda x: (x.date, x.revision, x.origin, x.path)) | |||||
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | |||||
from bson.codec_options import CodecOptions | |||||
from datetime import timezone, datetime | |||||
qry = {"sha1": {'$in': list(ids)}, 'ts': {'$ne':None}} | |||||
return {x['sha1']: datetime.fromtimestamp(x['ts'], timezone.utc) for x in self.db.content.find(qry, {'sha1': 1, 'ts': 1, '_id': 0})} | |||||
def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | |||||
from datetime import datetime | |||||
# get all the docuemtns with the id, add date, add missing records | |||||
qry = {"sha1": {'$in': list(dates)}, 'ts': {'$ne':None}} | |||||
contents = {x['sha1']: x for x in self.db.content.find(qry, {'sha1': 1, 'ts': 1, '_id': 1})} | |||||
for sha1, date in dates.items(): | |||||
ts = datetime.timestamp(date) | |||||
if sha1 in contents: | |||||
#update | |||||
if ts < contents[sha1]['ts']: | |||||
self.db.content.update_one({'_id': contents[sha1]['_id']}, {'$set': {'ts': ts}}) | |||||
else: | |||||
#add new content | |||||
self.db.content.save({ | |||||
'sha1': sha1, | |||||
'ts': ts, | |||||
'revision': {}, | |||||
'directory': {}, | |||||
}) | |||||
return True | |||||
def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | |||||
from datetime import datetime | |||||
qry = {"sha1": {'$in': list(dates)}, 'ts': {'$ne':None}} | |||||
dirs = {x['sha1']: x for x in self.db.directory.find(qry, {'sha1': 1, 'ts': 1, '_id': 1})} | |||||
for sha1, date in dates.items(): | |||||
ts = datetime.timestamp(date) | |||||
if sha1 in dirs: | |||||
#update | |||||
if ts < dirs[sha1]['ts']: | |||||
self.db.directory.update_one({'_id': dirs[sha1]['_id']}, {'$set': {'ts': ts}}) | |||||
else: | |||||
#add new dir | |||||
self.db.directory.save({ | |||||
'sha1': sha1, | |||||
'ts': ts, | |||||
'revision': {} | |||||
}) | |||||
return True | |||||
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | |||||
from datetime import timezone, datetime | |||||
qry = {"sha1": {'$in': list(ids)}, 'ts': {'$ne':None}} | |||||
return {x['sha1']: datetime.fromtimestamp(x['ts'], timezone.utc) for x in self.db.directory.find(qry, {'sha1': 1, 'ts': 1, '_id': 0})} | |||||
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | |||||
return {x['sha1'] for x in self.db.get_collection(entity.value).find({}, {'sha1': 1, '_id': 0})} | |||||
def location_get(self) -> Set[bytes]: | |||||
contents = self.db.content.find({}, {'revision': 1, '_id': 0, 'directory': 1}) | |||||
paths = [] | |||||
for content in contents: | |||||
paths.extend(value for key, value in content['revision'].items()) | |||||
paths.extend(value for key, value in content['directory'].items()) | |||||
dirs = self.db.directory.find({}, {'revision': 1, '_id': 0}) | |||||
for each_dir in dirs: | |||||
paths.extend(value for key, value in each_dir['revision'].items()) | |||||
return set(sum(paths, [])) | |||||
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | |||||
qry = {"sha1": {'$in': list(urls)}} | |||||
origins = {x['sha1']: x for x in self.db.origin.find(qry, {'sha1': 1, 'url': 1, '_id': 1})} | |||||
for sha1, url in urls.items(): | |||||
if sha1 not in origins: | |||||
#add new origin | |||||
self.db.origin.save({ | |||||
'sha1': sha1, | |||||
'url': url | |||||
}) | |||||
return True | |||||
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | |||||
qry = {"sha1": {'$in': list(ids)}} | |||||
return {x['sha1']: x['url'] for x in self.db.origin.find(qry, {'sha1': 1, 'url': 1, '_id': 0})} | |||||
def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | |||||
from datetime import datetime | |||||
qry = {"sha1": {'$in': list(dates)}} | |||||
revs = {x['sha1']: x for x in self.db.revision.find(qry, {'sha1': 1, 'ts': 1, '_id': 1})} | |||||
for sha1, date in dates.items(): | |||||
ts = datetime.timestamp(date) | |||||
if sha1 in revs: | |||||
#update | |||||
if revs[sha1]['ts'] is None or ts < revs[sha1]['ts']: | |||||
self.db.revision.update_one({'_id': revs[sha1]['_id']}, {'$set': {'ts': ts}}) | |||||
else: | |||||
#add new rev | |||||
self.db.revision.save({ | |||||
'sha1': sha1, | |||||
'preferred': None, | |||||
'origin': [], | |||||
'revision': [], | |||||
'ts': ts | |||||
}) | |||||
return True | |||||
def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | |||||
qry = {"sha1": {'$in': list(origins)}} | |||||
revs = {x['sha1']: x for x in self.db.revision.find(qry, {'sha1': 1, 'preferred': 1, '_id': 1})} | |||||
for sha1, origin in origins.items(): | |||||
if sha1 in revs: | |||||
self.db.revision.update_one({'_id': revs[sha1]['_id']}, {'$set': {'preferred': origin}}) | |||||
else: | |||||
#add new rev | |||||
self.db.revision.save({ | |||||
'sha1': sha1, | |||||
'preferred': origin, | |||||
'origin': [], | |||||
'revision': [], | |||||
'ts': None | |||||
}) | |||||
return True | |||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | |||||
from datetime import datetime, timezone | |||||
qry = {"sha1": {'$in': list(ids)}} | |||||
return {x['sha1']: RevisionData(date=datetime.fromtimestamp(x['ts'], timezone.utc) if x['ts'] else None, origin=x['preferred']) for x | |||||
in self.db.revision.find(qry, {'sha1': 1, 'preferred': 1, 'ts': 1, '_id': 0})} | |||||
def relation_add( | |||||
self, relation: RelationType, data: Iterable[RelationData] | |||||
) -> bool: | |||||
src, *_, dst = relation.value.split('_') | |||||
set_data = set(data) | |||||
dst_sha1s = {x.dst for x in data} | |||||
obj = {} | |||||
if dst in ['content', 'directory', 'revision']: | |||||
obj['ts'] = None | |||||
if dst == 'content': | |||||
obj['revision'] = {} | |||||
obj['directory'] = {} | |||||
if dst == 'directory': | |||||
obj['revision'] = {} | |||||
if dst == 'revision': | |||||
obj['prefered'] = None | |||||
obj['origin'] = [] | |||||
obj['revision'] = [] | |||||
existing = {x['sha1'] for x in self.db.get_collection(dst).find({"sha1": {'$in': list(dst_sha1s)}}, {'_id': 0, 'sha1': 1})} | |||||
for sha1 in dst_sha1s: | |||||
if sha1 not in existing: | |||||
self.db.get_collection(dst).save(dict(obj, **{'sha1': sha1})) | |||||
if dst == 'origin': | |||||
# TODO, check origins are already in the DB | |||||
# if not, algo has something wrong (algo inserts it initially) | |||||
pass | |||||
qry = {"sha1": {'$in': list(dst_sha1s)}} | |||||
dst_objs = {x['sha1']: x['_id'] for x in self.db.get_collection(dst).find(qry, {'_id': 1, 'sha1': 1})} | |||||
denorm = {} | |||||
for each in set_data: | |||||
if src != 'revision': | |||||
denorm.setdefault(each.src, {}).setdefault(str(dst_objs[each.dst]), []).append(each.path) | |||||
else: | |||||
denorm.setdefault(each.src, []).append(dst_objs[each.dst]) | |||||
qry = {"sha1": {'$in': list(denorm)}} | |||||
src_objs = {x['sha1']: x for x in self.db.get_collection(src).find(qry)} | |||||
for sha1, tuples in denorm.items(): | |||||
if sha1 in src_objs: | |||||
#update | |||||
if src != 'revision': | |||||
k = {obj_id: list(set(paths + denorm[sha1][obj_id])) for obj_id, paths in src_objs[sha1][dst].items()} | |||||
self.db.get_collection(src).update_one({'_id': src_objs[sha1]['_id']}, {'$set': {dst: dict(denorm[sha1], **k)}}) | |||||
else: | |||||
self.db.get_collection(src).update_one({'_id': src_objs[sha1]['_id']}, {'$set': {dst: list(set(src_objs[sha1][dst] + denorm[sha1]))}}) | |||||
else: | |||||
#add new rev | |||||
self.db.get_collection(src).save(dict(obj, **{'sha1': sha1, dst: denorm[sha1]})) | |||||
return True | |||||
def relation_get( | |||||
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | |||||
) -> Set[RelationData]: | |||||
from bson import ObjectId | |||||
src, *_, dst = relation.value.split('_') | |||||
sha1s = set(ids) | |||||
if not reverse: | |||||
qry = {"sha1": {'$in': list(sha1s)}} | |||||
src_objs = {x['sha1']: x[dst] for x in self.db.get_collection(src).find(qry, {'_id': 0, 'sha1': 1, dst: 1})} | |||||
if src != 'revision': | |||||
qry = {"_id": {'$in': list({ObjectId(obj_id) for key, value in src_objs.items() for obj_id in value})}} | |||||
dst_objs = {x['_id']: x['sha1'] for x in self.db.get_collection(dst).find(qry, {'_id': 1, 'sha1': 1})} | |||||
return { | |||||
RelationData(src=src_sha1, dst=dst_sha1, path=path) | |||||
for src_sha1, denorm in src_objs.items() | |||||
for dst_obj_id, dst_sha1 in dst_objs.items() | |||||
for dst_obj_str, paths in denorm.items() | |||||
for path in paths | |||||
if dst_obj_id == ObjectId(dst_obj_str) | |||||
} | |||||
else: | |||||
qry = {"_id": {'$in': list({obj_id for key, value in src_objs.items() for obj_id in value})}} | |||||
dst_objs = {x['_id']: x['sha1'] for x in self.db.get_collection(dst).find(qry, {'_id': 1, 'sha1': 1})} | |||||
return { | |||||
RelationData(src=src_sha1, dst=dst_sha1, path=None) | |||||
for src_sha1, denorm in src_objs.items() | |||||
for dst_obj_id, dst_sha1 in dst_objs.items() | |||||
for dst_obj_ref in denorm | |||||
if dst_obj_id == dst_obj_ref | |||||
} | |||||
else: | |||||
# TODO (reverse=True) | |||||
pass | |||||
def relation_get_all(self, relation: RelationType) -> Set[RelationData]: | |||||
from bson import ObjectId | |||||
src, *_, dst = relation.value.split('_') | |||||
src_objs = {x['sha1']: x[dst] for x in self.db.get_collection(src).find({}, {'_id': 0, 'sha1': 1, dst: 1})} | |||||
if src != 'revision': | |||||
qry = {"_id": {'$in': list({ObjectId(obj_id) for key, value in src_objs.items() for obj_id in value})}} | |||||
dst_objs = {x['_id']: x['sha1'] for x in self.db.get_collection(dst).find(qry, {'_id': 1, 'sha1': 1})} | |||||
return { | |||||
RelationData(src=src_sha1, dst=dst_sha1, path=path) | |||||
for src_sha1, denorm in src_objs.items() | |||||
for dst_obj_id, dst_sha1 in dst_objs.items() | |||||
for dst_obj_str, paths in denorm.items() | |||||
for path in paths | |||||
if dst_obj_id == ObjectId(dst_obj_str) | |||||
} | |||||
else: | |||||
qry = {"_id": {'$in': list({obj_id for key, value in src_objs.items() for obj_id in value})}} | |||||
dst_objs = {x['_id']: x['sha1'] for x in self.db.get_collection(dst).find(qry, {'_id': 1, 'sha1': 1})} | |||||
return { | |||||
RelationData(src=src_sha1, dst=dst_sha1, path=None) | |||||
for src_sha1, denorm in src_objs.items() | |||||
for dst_obj_id, dst_sha1 in dst_objs.items() | |||||
for dst_obj_ref in denorm | |||||
if dst_obj_id == dst_obj_ref | |||||
} | |||||
def with_path(self) -> bool: | |||||
return True | |||||
# from swh.model.hashutil import hash_to_bytes | |||||
# if __name__ == '__main__': | |||||
# # content = hash_to_bytes('6dc7e44ead5c0e300fe94448c3e046dfe33ad4d1') | |||||
# backend = MongoBackEnd() | |||||
# result = MongoBackEnd() | |||||
# print(result) |