Page MenuHomeSoftware Heritage

D608.diff
No OneTemporary

D608.diff

diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -8,6 +8,8 @@
import logging
import shutil
import tempfile
+import datetime
+from copy import deepcopy
from swh.storage import get_storage
from swh.core.config import SWHConfig
@@ -15,6 +17,7 @@
from swh.objstorage.exc import ObjNotFoundError
from swh.model import hashutil
from swh.scheduler.utils import get_task
+from swh.scheduler import get_scheduler
from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY
@@ -302,7 +305,7 @@
"""
pass
- def next_step(self, results):
+ def next_step(self, results, task):
"""Do something else with computations results (e.g. send to another
queue, ...).
@@ -311,15 +314,28 @@
Args:
results ([result]): List of results (dict) as returned
by index function.
+ task (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
Returns:
None
"""
- pass
+ if task:
+ if getattr(self, 'scheduler', None):
+ scheduler = self.scheduler
+ else:
+ scheduler = get_scheduler(**self.config['scheduler'])
+ task = deepcopy(task)
+ result_name = task.pop('result_name')
+ task['next_run'] = datetime.datetime.now()
+ task['arguments']['kwargs'][result_name] = self.results
+ scheduler.create_tasks([task])
@abc.abstractmethod
- def run(self, ids, policy_update, **kwargs):
+ def run(self, ids, policy_update,
+ next_step=None, **kwargs):
"""Given a list of ids:
- retrieves the data from the storage
@@ -328,8 +344,11 @@
Args:
ids ([bytes]): id's identifier list
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
+ next_step (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -347,7 +366,8 @@
"""
- def run(self, ids, policy_update, **kwargs):
+ def run(self, ids, policy_update,
+ next_step=None, **kwargs):
"""Given a list of ids:
- retrieve the content from the storage
@@ -356,9 +376,12 @@
Args:
ids ([bytes]): sha1's identifier list
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
- respectively update duplicates or ignore
- them
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
+ respectively update duplicates or ignore
+ them
+ next_step (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -377,7 +400,7 @@
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results)
+ return self.next_step(results, task=next_step)
except Exception:
self.log.exception(
'Problem when reading contents metadata.')
@@ -396,7 +419,8 @@
class.
"""
- def run(self, ids, policy_update, parse_ids=False, **kwargs):
+ def run(self, ids, policy_update,
+ parse_ids=False, next_step=None, **kwargs):
"""Given a list of origin ids:
- retrieve origins from storage
@@ -406,11 +430,14 @@
Args:
ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or
(type, url) tuples.
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore
them
- parse_ids ([bool]: If `True`, will try to convert `ids`
+ parse_ids (bool: If `True`, will try to convert `ids`
from a human input to the valid type.
+ next_step (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -445,7 +472,7 @@
'Problem when processing origin %s' % id_)
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results)
+ return self.next_step(results, task=next_step)
class RevisionIndexer(BaseIndexer):
@@ -458,7 +485,7 @@
class.
"""
- def run(self, ids, policy_update):
+ def run(self, ids, policy_update, next_step=None):
"""Given a list of sha1_gits:
- retrieve revisions from storage
@@ -466,13 +493,15 @@
- store the results (according to policy_update)
Args:
- ids ([bytes]): sha1_git's identifier list
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
- respectively update duplicates or ignore
- them
+ ids ([bytes or str]): sha1_git's identifier list
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
+ respectively update duplicates or ignore
+ them
"""
results = []
+ ids = [id_.encode() if isinstance(id_, str) else id_
+ for id_ in ids]
revs = self.storage.revision_get(ids)
for rev in revs:
@@ -489,4 +518,4 @@
'Problem when processing revision')
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results)
+ return self.next_step(results, task=next_step)
diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py
--- a/swh/indexer/metadata.py
+++ b/swh/indexer/metadata.py
@@ -158,14 +158,14 @@
Returns:
dict: dictionary representing a revision_metadata, with keys:
- - id (bytes): rev's identifier (sha1_git)
+ - id (str): rev's identifier (sha1_git)
- indexer_configuration_id (bytes): tool used
- translated_metadata (bytes): dict of retrieved metadata
"""
try:
result = {
- 'id': rev['id'],
+ 'id': rev['id'].decode(),
'indexer_configuration_id': self.tool['id'],
'translated_metadata': None
}
@@ -176,9 +176,9 @@
detected_files = detect_metadata(files)
result['translated_metadata'] = self.translate_revision_metadata(
detected_files)
- except Exception:
+ except Exception as e:
self.log.exception(
- 'Problem when indexing rev')
+ 'Problem when indexing rev: %r', e)
return result
def persist_index_computations(self, results, policy_update):
@@ -271,7 +271,7 @@
def filter(self, ids):
return ids
- def run(self, revisions_metadata, policy_update, *, origin_head_pairs):
+ def run(self, revisions_metadata, policy_update, *, origin_head):
"""Expected to be called with the result of RevisionMetadataIndexer
as first argument; ie. not a list of ids as other indexers would.
@@ -282,12 +282,12 @@
passed by RevisionMetadataIndexer via a Celery chain
triggered by OriginIndexer.next_step.
* `policy_update`: `'ignore-dups'` or `'update-dups'`
- * `origin_head_pairs` (List[dict]): list of dictionaries with
+ * `origin_head` (dict): {str(origin_id): rev_id.encode()}
keys `origin_id` and `revision_id`, which is the result
of OriginHeadIndexer.
"""
- origin_head_map = {pair['origin_id']: pair['revision_id']
- for pair in origin_head_pairs}
+ origin_head_map = {int(origin_id): rev_id
+ for (origin_id, rev_id) in origin_head.items()}
# Fix up the argument order. revisions_metadata has to be the
# first argument because of celery.chain; the next line calls
@@ -312,9 +312,6 @@
revision_metadata['indexer_configuration_id'],
}
- # If you get this KeyError with a message like this:
- # 'foo' not in [b'foo']
- # you should check you're not using JSON as task serializer
raise KeyError('%r not in %r' %
(revision_id, [r['id'] for r in revisions_metadata]))
diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py
--- a/swh/indexer/origin_head.py
+++ b/swh/indexer/origin_head.py
@@ -7,10 +7,9 @@
import click
import logging
-from celery import chain
-
+from swh.scheduler import get_scheduler
+from swh.scheduler.utils import create_task_dict
from swh.indexer.indexer import OriginIndexer
-from swh.indexer.tasks import OriginMetadata, RevisionMetadata
class OriginHeadIndexer(OriginIndexer):
@@ -31,8 +30,8 @@
CONFIG_BASE_FILENAME = 'indexer/origin_head'
- revision_metadata_task = RevisionMetadata()
- origin_intrinsic_metadata_task = OriginMetadata()
+ revision_metadata_task = 'revision_metadata'
+ origin_intrinsic_metadata_task = 'origin_metadata'
def filter(self, ids):
yield from ids
@@ -42,7 +41,7 @@
should only be piped to another indexer via the orchestrator."""
pass
- def next_step(self, results):
+ def next_step(self, results, task):
"""Once the head is found, call the RevisionMetadataIndexer
on these revisions, then call the OriginMetadataIndexer with
both the origin_id and the revision metadata, so it can copy the
@@ -52,19 +51,42 @@
results (Iterable[dict]): Iterable of return values from `index`.
"""
+ super().next_step(results, task)
if self.revision_metadata_task is None and \
self.origin_intrinsic_metadata_task is None:
return
assert self.revision_metadata_task is not None
assert self.origin_intrinsic_metadata_task is not None
- return chain(
- self.revision_metadata_task.s(
- ids=[res['revision_id'] for res in results],
- policy_update='update-dups'),
- self.origin_intrinsic_metadata_task.s(
- origin_head_pairs=results,
- policy_update='update-dups'),
- )()
+
+ # Second task to run after this one: copy the revision's metadata
+ # to the origin
+ sub_task = create_task_dict(
+ self.origin_intrinsic_metadata_task,
+ 'oneshot',
+ origin_head={
+ str(result['origin_id']):
+ result['revision_id'].decode()
+ for result in results},
+ policy_update='update-dups',
+ )
+ del sub_task['next_run'] # Not json-serializable
+
+ # First task to run after this one: index the metadata of the
+ # revision
+ task = create_task_dict(
+ self.revision_metadata_task,
+ 'oneshot',
+ ids=[res['revision_id'].decode() for res in results],
+ policy_update='update-dups',
+ next_step={
+ **sub_task,
+ 'result_name': 'revisions_metadata'},
+ )
+ if getattr(self, 'scheduler', None):
+ scheduler = self.scheduler
+ else:
+ scheduler = get_scheduler(**self.config['scheduler'])
+ scheduler.create_tasks([task])
# Dispatch
diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py
--- a/swh/indexer/tests/test_metadata.py
+++ b/swh/indexer/tests/test_metadata.py
@@ -346,7 +346,7 @@
results = metadata_indexer.idx_storage.added_data
expected_results = [('revision_metadata', True, [{
- 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
+ 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
'translated_metadata': {
'identifier': None,
'maintainer': None,
diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py
--- a/swh/indexer/tests/test_origin_metadata.py
+++ b/swh/indexer/tests/test_origin_metadata.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import time
import logging
import unittest
from celery import task
@@ -13,8 +14,7 @@
from swh.indexer.tests.test_origin_head import TestOriginHeadIndexer
from swh.indexer.tests.test_metadata import TestRevisionMetadataIndexer
-from swh.scheduler.tests.celery_testing import CeleryTestFixture
-from swh.indexer.tests import start_worker_thread
+from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture
class TestOriginMetadataIndexer(OriginMetadataIndexer):
@@ -57,24 +57,40 @@
class TestOriginHeadIndexer(TestOriginHeadIndexer):
- revision_metadata_task = revision_metadata_test_task
- origin_intrinsic_metadata_task = origin_intrinsic_metadata_test_task
+ revision_metadata_task = 'revision_metadata_test_task'
+ origin_intrinsic_metadata_task = 'origin_intrinsic_metadata_test_task'
-class TestOriginMetadata(CeleryTestFixture, unittest.TestCase):
+class TestOriginMetadata(SchedulerTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
self.maxDiff = None
MockIndexerStorage.added_data = []
+ self.add_scheduler_task_type(
+ 'revision_metadata_test_task',
+ 'swh.indexer.tests.test_origin_metadata.'
+ 'revision_metadata_test_task')
+ self.add_scheduler_task_type(
+ 'origin_intrinsic_metadata_test_task',
+ 'swh.indexer.tests.test_origin_metadata.'
+ 'origin_intrinsic_metadata_test_task')
+ TestRevisionMetadataIndexer.scheduler = self.scheduler
+
+ def tearDown(self):
+ del TestRevisionMetadataIndexer.scheduler
+ super().tearDown()
def test_pipeline(self):
indexer = TestOriginHeadIndexer()
- with start_worker_thread():
- promise = indexer.run(
- ["git+https://github.com/librariesio/yarn-parser"],
- policy_update='update-dups',
- parse_ids=True)
- promise.get()
+ indexer.scheduler = self.scheduler
+ indexer.run(
+ ["git+https://github.com/librariesio/yarn-parser"],
+ policy_update='update-dups',
+ parse_ids=True)
+
+ self.run_ready_tasks() # Run the first task
+ time.sleep(0.1) # Give it time to complete and schedule the 2nd one
+ self.run_ready_tasks() # Run the second task
metadata = {
'identifier': None,
@@ -108,13 +124,13 @@
'email': None
}
rev_metadata = {
- 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
+ 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
'translated_metadata': metadata,
'indexer_configuration_id': 7,
}
origin_metadata = {
'origin_id': 54974445,
- 'from_revision': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
+ 'from_revision': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
'metadata': metadata,
'indexer_configuration_id': 7,
}

File Metadata

Mime Type
text/plain
Expires
Wed, Jul 2, 10:51 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217837

Event Timeline