Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show All 11 Lines | |||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.scheduler.utils import get_task | from swh.scheduler.utils import get_task | ||||
class DiskIndexer: | |||||
"""Mixin intended to be used with other *Indexer classes. | |||||
Indexer* inheriting from this class are a category of indexers | |||||
which needs the disk for their computations. | |||||
Expects: | |||||
self.working_directory variable defined at runtime. | |||||
""" | |||||
def __init__(self): | |||||
super().__init__() | |||||
def write_to_temp(self, filename, data): | |||||
"""Write the sha1's content in a temporary file. | |||||
Args: | |||||
sha1 (str): the sha1 name | |||||
filename (str): one of sha1's many filenames | |||||
data (bytes): the sha1's content to write in temporary | |||||
file | |||||
Returns: | |||||
The path to the temporary file created. That file is | |||||
filled in with the raw content's data. | |||||
""" | |||||
os.makedirs(self.working_directory, exist_ok=True) | |||||
temp_dir = tempfile.mkdtemp(dir=self.working_directory) | |||||
content_path = os.path.join(temp_dir, filename) | |||||
with open(content_path, 'wb') as f: | |||||
f.write(data) | |||||
return content_path | |||||
def cleanup(self, content_path): | |||||
"""Remove content_path from working directory. | |||||
Args: | |||||
content_path (str): the file to remove | |||||
""" | |||||
temp_dir = os.path.dirname(content_path) | |||||
shutil.rmtree(temp_dir) | |||||
class BaseIndexer(SWHConfig, | class BaseIndexer(SWHConfig, | ||||
metaclass=abc.ABCMeta): | metaclass=abc.ABCMeta): | ||||
"""Base class for indexers to inherit from. | """Base class for indexers to inherit from. | ||||
The main entry point is the `run` functions which is in charge to | The main entry point is the `run` functions which is in charge to | ||||
trigger the computations on the sha1s batch received. | trigger the computations on the sha1s batch received. | ||||
Indexers can: | Indexers can: | ||||
▲ Show 20 Lines • Show All 224 Lines • ▼ Show 20 Lines | def run(self, sha1s, policy_update): | ||||
self.persist_index_computations(results, policy_update) | self.persist_index_computations(results, policy_update) | ||||
self.next_step(results) | self.next_step(results) | ||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
'Problem when reading contents metadata.') | 'Problem when reading contents metadata.') | ||||
if self.rescheduling_task: | if self.rescheduling_task: | ||||
self.log.warn('Rescheduling batch') | self.log.warn('Rescheduling batch') | ||||
self.rescheduling_task.delay(sha1s, policy_update) | self.rescheduling_task.delay(sha1s, policy_update) | ||||
class DiskIndexer: | |||||
"""Mixin intended to be used with other *Indexer classes. | |||||
Indexer* inheriting from this class are a category of indexers | |||||
which needs the disk for their computations. | |||||
Expects: | |||||
self.working_directory variable defined at runtime. | |||||
""" | |||||
def __init__(self): | |||||
super().__init__() | |||||
def write_to_temp(self, filename, data): | |||||
"""Write the sha1's content in a temporary file. | |||||
Args: | |||||
sha1 (str): the sha1 name | |||||
filename (str): one of sha1's many filenames | |||||
data (bytes): the sha1's content to write in temporary | |||||
file | |||||
Returns: | |||||
The path to the temporary file created. That file is | |||||
filled in with the raw content's data. | |||||
""" | |||||
os.makedirs(self.working_directory, exist_ok=True) | |||||
temp_dir = tempfile.mkdtemp(dir=self.working_directory) | |||||
content_path = os.path.join(temp_dir, filename) | |||||
with open(content_path, 'wb') as f: | |||||
f.write(data) | |||||
return content_path | |||||
def cleanup(self, content_path): | |||||
"""Remove content_path from working directory. | |||||
Args: | |||||
content_path (str): the file to remove | |||||
""" | |||||
temp_dir = os.path.dirname(content_path) | |||||
shutil.rmtree(temp_dir) |