Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 11 Lines | |||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import random | import random | ||||
import warnings | import warnings | ||||
import attr | import attr | ||||
from swh.model.model import Content, Directory, Revision, Release, Snapshot | from swh.model.model import \ | ||||
Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin | |||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from .journal_writer import get_journal_writer | from .journal_writer import get_journal_writer | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
▲ Show 20 Lines • Show All 798 Lines • ▼ Show 20 Lines | def snapshot_get_by_origin_visit(self, origin, visit): | ||||
""" | """ | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if not origin_url: | if not origin_url: | ||||
return | return | ||||
if origin_url not in self._origins or \ | if origin_url not in self._origins or \ | ||||
visit > len(self._origin_visits[origin_url]): | visit > len(self._origin_visits[origin_url]): | ||||
return None | return None | ||||
snapshot_id = self._origin_visits[origin_url][visit-1]['snapshot'] | snapshot_id = self._origin_visits[origin_url][visit-1].snapshot | ||||
if snapshot_id: | if snapshot_id: | ||||
return self.snapshot_get(snapshot_id) | return self.snapshot_get(snapshot_id) | ||||
else: | else: | ||||
return None | return None | ||||
def snapshot_get_latest(self, origin, allowed_statuses=None): | def snapshot_get_latest(self, origin, allowed_statuses=None): | ||||
"""Get the content, possibly partial, of the latest snapshot for the | """Get the content, possibly partial, of the latest snapshot for the | ||||
given origin, optionally only from visits that have one of the given | given origin, optionally only from visits that have one of the given | ||||
▲ Show 20 Lines • Show All 137 Lines • ▼ Show 20 Lines | def object_find_by_sha1_git(self, ids, db=None, cur=None): | ||||
ret[id_] = [{ | ret[id_] = [{ | ||||
'sha1_git': id_, | 'sha1_git': id_, | ||||
'type': obj[0], | 'type': obj[0], | ||||
'id': obj[1], | 'id': obj[1], | ||||
'object_id': id_, | 'object_id': id_, | ||||
} for obj in objs] | } for obj in objs] | ||||
return ret | return ret | ||||
def _convert_origin(self, t): | |||||
if t is None: | |||||
return None | |||||
(origin_id, origin) = t | |||||
origin = origin.to_dict() | |||||
if ENABLE_ORIGIN_IDS: | |||||
origin['id'] = origin_id | |||||
return origin | |||||
def origin_get(self, origins): | def origin_get(self, origins): | ||||
"""Return origins, either all identified by their ids or all | """Return origins, either all identified by their ids or all | ||||
identified by tuples (type, url). | identified by tuples (type, url). | ||||
If the url is given and the type is omitted, one of the origins with | If the url is given and the type is omitted, one of the origins with | ||||
that url is returned. | that url is returned. | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def origin_get(self, origins): | ||||
for origin in origins: | for origin in origins: | ||||
result = None | result = None | ||||
if 'id' in origin: | if 'id' in origin: | ||||
assert ENABLE_ORIGIN_IDS, 'origin ids are disabled' | assert ENABLE_ORIGIN_IDS, 'origin ids are disabled' | ||||
if origin['id'] <= len(self._origins_by_id): | if origin['id'] <= len(self._origins_by_id): | ||||
result = self._origins[self._origins_by_id[origin['id']-1]] | result = self._origins[self._origins_by_id[origin['id']-1]] | ||||
elif 'url' in origin: | elif 'url' in origin: | ||||
if origin['url'] in self._origins: | if origin['url'] in self._origins: | ||||
result = copy.deepcopy(self._origins[origin['url']]) | result = self._origins[origin['url']] | ||||
else: | else: | ||||
raise ValueError( | raise ValueError( | ||||
'Origin must have either id or url.') | 'Origin must have either id or url.') | ||||
results.append(result) | results.append(self._convert_origin(result)) | ||||
if return_single: | if return_single: | ||||
assert len(results) == 1 | assert len(results) == 1 | ||||
return results[0] | return results[0] | ||||
else: | else: | ||||
return results | return results | ||||
def origin_get_range(self, origin_from=1, origin_count=100): | def origin_get_range(self, origin_from=1, origin_count=100): | ||||
Show All 11 Lines | def origin_get_range(self, origin_from=1, origin_count=100): | ||||
by :meth:`swh.storage.in_memory.Storage.origin_get`. | by :meth:`swh.storage.in_memory.Storage.origin_get`. | ||||
""" | """ | ||||
origin_from = max(origin_from, 1) | origin_from = max(origin_from, 1) | ||||
if origin_from <= len(self._origins_by_id): | if origin_from <= len(self._origins_by_id): | ||||
max_idx = origin_from + origin_count - 1 | max_idx = origin_from + origin_count - 1 | ||||
if max_idx > len(self._origins_by_id): | if max_idx > len(self._origins_by_id): | ||||
max_idx = len(self._origins_by_id) | max_idx = len(self._origins_by_id) | ||||
for idx in range(origin_from-1, max_idx): | for idx in range(origin_from-1, max_idx): | ||||
yield copy.deepcopy(self._origins[self._origins_by_id[idx]]) | yield self._convert_origin( | ||||
self._origins[self._origins_by_id[idx]]) | |||||
def origin_search(self, url_pattern, offset=0, limit=50, | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
regexp=False, with_visit=False, db=None, cur=None): | regexp=False, with_visit=False, db=None, cur=None): | ||||
"""Search for origins whose urls contain a provided string pattern | """Search for origins whose urls contain a provided string pattern | ||||
or match a provided regular expression. | or match a provided regular expression. | ||||
The search is performed in a case insensitive way. | The search is performed in a case insensitive way. | ||||
Args: | Args: | ||||
url_pattern (str): the string pattern to search for in origin urls | url_pattern (str): the string pattern to search for in origin urls | ||||
offset (int): number of found origins to skip before returning | offset (int): number of found origins to skip before returning | ||||
results | results | ||||
limit (int): the maximum number of found origins to return | limit (int): the maximum number of found origins to return | ||||
regexp (bool): if True, consider the provided pattern as a regular | regexp (bool): if True, consider the provided pattern as a regular | ||||
expression and return origins whose urls match it | expression and return origins whose urls match it | ||||
with_visit (bool): if True, filter out origins with no visit | with_visit (bool): if True, filter out origins with no visit | ||||
Returns: | Returns: | ||||
An iterable of dict containing origin information as returned | An iterable of dict containing origin information as returned | ||||
by :meth:`swh.storage.storage.Storage.origin_get`. | by :meth:`swh.storage.storage.Storage.origin_get`. | ||||
""" | """ | ||||
origins = self._origins.values() | origins = map(self._convert_origin, self._origins.values()) | ||||
if regexp: | if regexp: | ||||
pat = re.compile(url_pattern) | pat = re.compile(url_pattern) | ||||
origins = [orig for orig in origins if pat.search(orig['url'])] | origins = [orig for orig in origins if pat.search(orig['url'])] | ||||
else: | else: | ||||
origins = [orig for orig in origins if url_pattern in orig['url']] | origins = [orig for orig in origins if url_pattern in orig['url']] | ||||
if with_visit: | if with_visit: | ||||
origins = [orig for orig in origins | origins = [orig for orig in origins | ||||
if len(self._origin_visits[orig['url']]) > 0] | if len(self._origin_visits[orig['url']]) > 0] | ||||
if ENABLE_ORIGIN_IDS: | if ENABLE_ORIGIN_IDS: | ||||
origins.sort(key=lambda origin: origin['id']) | origins.sort(key=lambda origin: origin['id']) | ||||
origins = copy.deepcopy(origins[offset:offset+limit]) | return origins[offset:offset+limit] | ||||
return origins | |||||
def origin_count(self, url_pattern, regexp=False, with_visit=False, | def origin_count(self, url_pattern, regexp=False, with_visit=False, | ||||
db=None, cur=None): | db=None, cur=None): | ||||
"""Count origins whose urls contain a provided string pattern | """Count origins whose urls contain a provided string pattern | ||||
or match a provided regular expression. | or match a provided regular expression. | ||||
The pattern search in origin urls is performed in a case insensitive | The pattern search in origin urls is performed in a case insensitive | ||||
way. | way. | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def origin_add_one(self, origin): | ||||
- type (FIXME: enum TBD): the origin type ('git', 'wget', ...) | - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) | ||||
- url (bytes): the url the origin points to | - url (bytes): the url the origin points to | ||||
Returns: | Returns: | ||||
the id of the added origin, or of the identical one that already | the id of the added origin, or of the identical one that already | ||||
exists. | exists. | ||||
""" | """ | ||||
origin = copy.deepcopy(origin) | origin = Origin.from_dict(origin) | ||||
assert 'id' not in origin | if origin.url in self._origins: | ||||
if origin['url'] in self._origins: | |||||
if ENABLE_ORIGIN_IDS: | if ENABLE_ORIGIN_IDS: | ||||
origin_id = self._origins[origin['url']]['id'] | (origin_id, _) = self._origins[origin.url] | ||||
else: | else: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('origin', origin) | self.journal_writer.write_addition('origin', origin) | ||||
if ENABLE_ORIGIN_IDS: | if ENABLE_ORIGIN_IDS: | ||||
# origin ids are in the range [1, +inf[ | # origin ids are in the range [1, +inf[ | ||||
origin_id = len(self._origins) + 1 | origin_id = len(self._origins) + 1 | ||||
origin['id'] = origin_id | self._origins_by_id.append(origin.url) | ||||
self._origins_by_id.append(origin['url']) | |||||
assert len(self._origins_by_id) == origin_id | assert len(self._origins_by_id) == origin_id | ||||
self._origins[origin['url']] = origin | else: | ||||
self._origin_visits[origin['url']] = [] | origin_id = None | ||||
self._objects[origin['url']].append(('origin', origin['url'])) | self._origins[origin.url] = (origin_id, origin) | ||||
self._origin_visits[origin.url] = [] | |||||
self._objects[origin.url].append(('origin', origin.url)) | |||||
if ENABLE_ORIGIN_IDS: | if ENABLE_ORIGIN_IDS: | ||||
return origin_id | return origin_id | ||||
else: | else: | ||||
return origin['url'] | return origin.url | ||||
def fetch_history_start(self, origin_id): | def fetch_history_start(self, origin_id): | ||||
"""Add an entry for origin origin_id in fetch_history. Returns the id | """Add an entry for origin origin_id in fetch_history. Returns the id | ||||
of the added fetch_history entry | of the added fetch_history entry | ||||
""" | """ | ||||
assert not ENABLE_ORIGIN_IDS, 'origin ids are disabled' | assert not ENABLE_ORIGIN_IDS, 'origin ids are disabled' | ||||
pass | pass | ||||
Show All 38 Lines | def origin_visit_add(self, origin, date=None, type=None, *, ts=None): | ||||
date = ts | date = ts | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url is None: | if origin_url is None: | ||||
raise ValueError('Unknown origin.') | raise ValueError('Unknown origin.') | ||||
if isinstance(date, str): | if isinstance(date, str): | ||||
date = dateutil.parser.parse(date) | date = dateutil.parser.parse(date) | ||||
elif not isinstance(date, datetime.datetime): | |||||
raise TypeError('date must be a datetime or a string.') | |||||
visit_ret = None | visit_ret = None | ||||
if origin_url in self._origins: | if origin_url in self._origins: | ||||
origin = self._origins[origin_url] | (origin_id, origin) = self._origins[origin_url] | ||||
# visit ids are in the range [1, +inf[ | # visit ids are in the range [1, +inf[ | ||||
visit_id = len(self._origin_visits[origin_url]) + 1 | visit_id = len(self._origin_visits[origin_url]) + 1 | ||||
status = 'ongoing' | status = 'ongoing' | ||||
visit = { | visit = OriginVisit( | ||||
'origin': {'url': origin['url']}, | origin=origin, | ||||
'date': date, | date=date, | ||||
'type': type or origin['type'], | type=type or origin.type, | ||||
'status': status, | status=status, | ||||
'snapshot': None, | snapshot=None, | ||||
'metadata': None, | metadata=None, | ||||
'visit': visit_id | visit=visit_id, | ||||
} | ) | ||||
self._origin_visits[origin_url].append(visit) | self._origin_visits[origin_url].append(visit) | ||||
visit_ret = { | visit_ret = { | ||||
'origin': origin['id'] if ENABLE_ORIGIN_IDS else origin['url'], | 'origin': origin_id if ENABLE_ORIGIN_IDS else origin.url, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
} | } | ||||
self._objects[(origin_url, visit_id)].append( | self._objects[(origin_url, visit_id)].append( | ||||
('origin_visit', None)) | ('origin_visit', None)) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
origin = self._origins[origin_url].copy() | self.journal_writer.write_addition('origin_visit', visit) | ||||
if 'id' in origin: | |||||
del origin['id'] | |||||
self.journal_writer.write_addition('origin_visit', { | |||||
**visit, 'origin': origin}) | |||||
return visit_ret | return visit_ret | ||||
def origin_visit_update(self, origin, visit_id, status=None, | def origin_visit_update(self, origin, visit_id, status=None, | ||||
metadata=None, snapshot=None): | metadata=None, snapshot=None): | ||||
"""Update an origin_visit's status. | """Update an origin_visit's status. | ||||
Args: | Args: | ||||
Show All 12 Lines | def origin_visit_update(self, origin, visit_id, status=None, | ||||
if origin_url is None: | if origin_url is None: | ||||
raise ValueError('Unknown origin.') | raise ValueError('Unknown origin.') | ||||
try: | try: | ||||
visit = self._origin_visits[origin_url][visit_id-1] | visit = self._origin_visits[origin_url][visit_id-1] | ||||
except IndexError: | except IndexError: | ||||
raise ValueError('Unknown visit_id for this origin') \ | raise ValueError('Unknown visit_id for this origin') \ | ||||
from None | from None | ||||
updates = {} | |||||
if status: | |||||
updates['status'] = status | |||||
if metadata: | |||||
updates['metadata'] = metadata | |||||
if snapshot: | |||||
updates['snapshot'] = snapshot | |||||
visit = attr.evolve(visit, **updates) | |||||
if self.journal_writer: | if self.journal_writer: | ||||
origin = self._origins[origin_url].copy() | (_, origin) = self._origins[origin_url] | ||||
if 'id' in origin: | self.journal_writer.write_update('origin_visit', visit) | ||||
del origin['id'] | |||||
self.journal_writer.write_update('origin_visit', { | self._origin_visits[origin_url][visit_id-1] = visit | ||||
'origin': origin, | |||||
'type': origin['type'], | |||||
'visit': visit_id, | |||||
'status': status or visit['status'], | |||||
'date': visit['date'], | |||||
'metadata': metadata or visit['metadata'], | |||||
'snapshot': snapshot or visit['snapshot']}) | |||||
if origin_url not in self._origin_visits or \ | if origin_url not in self._origin_visits or \ | ||||
visit_id > len(self._origin_visits[origin_url]): | visit_id > len(self._origin_visits[origin_url]): | ||||
return | return | ||||
if status: | |||||
visit['status'] = status | |||||
if metadata: | |||||
visit['metadata'] = metadata | |||||
if snapshot: | |||||
visit['snapshot'] = snapshot | |||||
def origin_visit_upsert(self, visits): | def origin_visit_upsert(self, visits): | ||||
"""Add a origin_visits with a specific id and with all its data. | """Add a origin_visits with a specific id and with all its data. | ||||
If there is already an origin_visit with the same | If there is already an origin_visit with the same | ||||
`(origin_url, visit_id)`, updates it instead of inserting a new one. | `(origin_url, visit_id)`, updates it instead of inserting a new one. | ||||
Args: | Args: | ||||
visits: iterable of dicts with keys: | visits: iterable of dicts with keys: | ||||
origin: dict with keys either `id` or `url` | origin: dict with keys either `id` or `url` | ||||
visit: origin visit id | visit: origin visit id | ||||
type: type of loader used for the visit | type: type of loader used for the visit | ||||
date: timestamp of such visit | date: timestamp of such visit | ||||
status: Visit's new status | status: Visit's new status | ||||
metadata: Data associated to the visit | metadata: Data associated to the visit | ||||
snapshot (sha1_git): identifier of the snapshot to add to | snapshot (sha1_git): identifier of the snapshot to add to | ||||
the visit | the visit | ||||
""" | """ | ||||
visits = copy.deepcopy(visits) | visits = [OriginVisit.from_dict(d) for d in visits] | ||||
for visit in visits: | |||||
if isinstance(visit['date'], str): | |||||
visit['date'] = dateutil.parser.parse(visit['date']) | |||||
if self.journal_writer: | if self.journal_writer: | ||||
for visit in visits: | for visit in visits: | ||||
visit = visit.copy() | (_, visit.origin) = self._origins[visit.origin.url] | ||||
visit['origin'] = self._origins[visit['origin']['url']].copy() | |||||
if 'id' in visit['origin']: | |||||
del visit['origin']['id'] | |||||
self.journal_writer.write_addition('origin_visit', visit) | self.journal_writer.write_addition('origin_visit', visit) | ||||
for visit in visits: | for visit in visits: | ||||
visit_id = visit['visit'] | visit_id = visit.visit | ||||
origin_url = visit['origin']['url'] | origin_url = visit.origin.url | ||||
self._objects[(origin_url, visit_id)].append( | self._objects[(origin_url, visit_id)].append( | ||||
('origin_visit', None)) | ('origin_visit', None)) | ||||
while len(self._origin_visits[origin_url]) <= visit_id: | while len(self._origin_visits[origin_url]) <= visit_id: | ||||
self._origin_visits[origin_url].append(None) | self._origin_visits[origin_url].append(None) | ||||
visit = visit.copy() | self._origin_visits[origin_url][visit_id-1] = visit | ||||
visit['origin'] = {'url': visit['origin']['url']} | |||||
visit = self._origin_visits[origin_url][visit_id-1] = visit | |||||
def _convert_visit(self, visit): | def _convert_visit(self, visit): | ||||
if visit is None: | if visit is None: | ||||
return | return | ||||
visit = visit.copy() | (origin_id, origin) = self._origins[visit.origin.url] | ||||
origin = self._origins[visit['origin']['url']] | visit = visit.to_dict() | ||||
if ENABLE_ORIGIN_IDS: | if ENABLE_ORIGIN_IDS: | ||||
visit['origin'] = origin['id'] | visit['origin'] = origin_id | ||||
else: | else: | ||||
visit['origin'] = origin['url'] | visit['origin'] = origin.url | ||||
return visit | return visit | ||||
def origin_visit_get(self, origin, last_visit=None, limit=None): | def origin_visit_get(self, origin, last_visit=None, limit=None): | ||||
"""Retrieve all the origin's visit's information. | """Retrieve all the origin's visit's information. | ||||
Args: | Args: | ||||
origin (int): the origin's identifier | origin (int): the origin's identifier | ||||
Show All 11 Lines | def origin_visit_get(self, origin, last_visit=None, limit=None): | ||||
visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
if last_visit is not None: | if last_visit is not None: | ||||
visits = visits[last_visit:] | visits = visits[last_visit:] | ||||
if limit is not None: | if limit is not None: | ||||
visits = visits[:limit] | visits = visits[:limit] | ||||
for visit in visits: | for visit in visits: | ||||
if not visit: | if not visit: | ||||
continue | continue | ||||
visit_id = visit['visit'] | visit_id = visit.visit | ||||
yield self._convert_visit( | yield self._convert_visit( | ||||
self._origin_visits[origin_url][visit_id-1]) | self._origin_visits[origin_url][visit_id-1]) | ||||
def origin_visit_find_by_date(self, origin, visit_date): | def origin_visit_find_by_date(self, origin, visit_date): | ||||
"""Retrieves the origin visit whose date is closest to the provided | """Retrieves the origin visit whose date is closest to the provided | ||||
timestamp. | timestamp. | ||||
In case of a tie, the visit with largest id is selected. | In case of a tie, the visit with largest id is selected. | ||||
Args: | Args: | ||||
origin (str): The occurrence's origin (URL). | origin (str): The occurrence's origin (URL). | ||||
target (datetime): target timestamp | target (datetime): target timestamp | ||||
Returns: | Returns: | ||||
A visit. | A visit. | ||||
""" | """ | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
visit = min( | visit = min( | ||||
visits, | visits, | ||||
key=lambda v: (abs(v['date'] - visit_date), -v['visit'])) | key=lambda v: (abs(v.date - visit_date), -v.visit)) | ||||
return self._convert_visit(visit) | return self._convert_visit(visit) | ||||
def origin_visit_get_by(self, origin, visit): | def origin_visit_get_by(self, origin, visit): | ||||
"""Retrieve origin visit's information. | """Retrieve origin visit's information. | ||||
Args: | Args: | ||||
origin (int): the origin's identifier | origin (int): the origin's identifier | ||||
Show All 29 Lines | def origin_visit_get_latest( | ||||
visit: origin visit id | visit: origin visit id | ||||
type: type of loader used for the visit | type: type of loader used for the visit | ||||
date: timestamp of such visit | date: timestamp of such visit | ||||
status: Visit's new status | status: Visit's new status | ||||
metadata: Data associated to the visit | metadata: Data associated to the visit | ||||
snapshot (Optional[sha1_git]): identifier of the snapshot | snapshot (Optional[sha1_git]): identifier of the snapshot | ||||
associated to the visit | associated to the visit | ||||
""" | """ | ||||
origin = self._origins.get(origin) | res = self._origins.get(origin) | ||||
if not origin: | if not res: | ||||
return | return | ||||
visits = self._origin_visits[origin['url']] | (_, origin) = res | ||||
visits = self._origin_visits[origin.url] | |||||
if allowed_statuses is not None: | if allowed_statuses is not None: | ||||
visits = [visit for visit in visits | visits = [visit for visit in visits | ||||
if visit['status'] in allowed_statuses] | if visit.status in allowed_statuses] | ||||
if require_snapshot: | if require_snapshot: | ||||
visits = [visit for visit in visits | visits = [visit for visit in visits | ||||
if visit['snapshot']] | if visit.snapshot] | ||||
visit = max( | visit = max( | ||||
visits, key=lambda v: (v['date'], v['visit']), default=None) | visits, key=lambda v: (v.date, v.visit), default=None) | ||||
return self._convert_visit(visit) | return self._convert_visit(visit) | ||||
def stat_counters(self): | def stat_counters(self): | ||||
"""compute statistics about the number of tuples in various tables | """compute statistics about the number of tuples in various tables | ||||
Returns: | Returns: | ||||
dict: a dictionary mapping textual labels (e.g., content) to | dict: a dictionary mapping textual labels (e.g., content) to | ||||
integer values (e.g., the number of tuples in table content) | integer values (e.g., the number of tuples in table content) | ||||
▲ Show 20 Lines • Show All 226 Lines • Show Last 20 Lines |