Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312375
D608.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D608.diff
View Options
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -8,6 +8,8 @@
import logging
import shutil
import tempfile
+import datetime
+from copy import deepcopy
from swh.storage import get_storage
from swh.core.config import SWHConfig
@@ -15,6 +17,7 @@
from swh.objstorage.exc import ObjNotFoundError
from swh.model import hashutil
from swh.scheduler.utils import get_task
+from swh.scheduler import get_scheduler
from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY
@@ -302,7 +305,7 @@
"""
pass
- def next_step(self, results):
+ def next_step(self, results, task):
"""Do something else with computations results (e.g. send to another
queue, ...).
@@ -311,15 +314,28 @@
Args:
results ([result]): List of results (dict) as returned
by index function.
+ task (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
Returns:
None
"""
- pass
+ if task:
+ if getattr(self, 'scheduler', None):
+ scheduler = self.scheduler
+ else:
+ scheduler = get_scheduler(**self.config['scheduler'])
+ task = deepcopy(task)
+ result_name = task.pop('result_name')
+ task['next_run'] = datetime.datetime.now()
+ task['arguments']['kwargs'][result_name] = self.results
+ scheduler.create_tasks([task])
@abc.abstractmethod
- def run(self, ids, policy_update, **kwargs):
+ def run(self, ids, policy_update,
+ next_step=None, **kwargs):
"""Given a list of ids:
- retrieves the data from the storage
@@ -328,8 +344,11 @@
Args:
ids ([bytes]): id's identifier list
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
+ next_step (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -347,7 +366,8 @@
"""
- def run(self, ids, policy_update, **kwargs):
+ def run(self, ids, policy_update,
+ next_step=None, **kwargs):
"""Given a list of ids:
- retrieve the content from the storage
@@ -356,9 +376,12 @@
Args:
ids ([bytes]): sha1's identifier list
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
- respectively update duplicates or ignore
- them
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
+ respectively update duplicates or ignore
+ them
+ next_step (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -377,7 +400,7 @@
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results)
+ return self.next_step(results, task=next_step)
except Exception:
self.log.exception(
'Problem when reading contents metadata.')
@@ -396,7 +419,8 @@
class.
"""
- def run(self, ids, policy_update, parse_ids=False, **kwargs):
+ def run(self, ids, policy_update,
+ parse_ids=False, next_step=None, **kwargs):
"""Given a list of origin ids:
- retrieve origins from storage
@@ -406,11 +430,14 @@
Args:
ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or
(type, url) tuples.
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore
them
- parse_ids ([bool]: If `True`, will try to convert `ids`
+ parse_ids (bool: If `True`, will try to convert `ids`
from a human input to the valid type.
+ next_step (dict): a dict in the form expected by
+ `scheduler.backend.SchedulerBackend.create_tasks`
+ without `next_run`, plus a `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -445,7 +472,7 @@
'Problem when processing origin %s' % id_)
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results)
+ return self.next_step(results, task=next_step)
class RevisionIndexer(BaseIndexer):
@@ -458,7 +485,7 @@
class.
"""
- def run(self, ids, policy_update):
+ def run(self, ids, policy_update, next_step=None):
"""Given a list of sha1_gits:
- retrieve revisions from storage
@@ -466,13 +493,15 @@
- store the results (according to policy_update)
Args:
- ids ([bytes]): sha1_git's identifier list
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
- respectively update duplicates or ignore
- them
+ ids ([bytes or str]): sha1_git's identifier list
+ policy_update (str): either 'update-dups' or 'ignore-dups' to
+ respectively update duplicates or ignore
+ them
"""
results = []
+ ids = [id_.encode() if isinstance(id_, str) else id_
+ for id_ in ids]
revs = self.storage.revision_get(ids)
for rev in revs:
@@ -489,4 +518,4 @@
'Problem when processing revision')
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results)
+ return self.next_step(results, task=next_step)
diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py
--- a/swh/indexer/metadata.py
+++ b/swh/indexer/metadata.py
@@ -158,14 +158,14 @@
Returns:
dict: dictionary representing a revision_metadata, with keys:
- - id (bytes): rev's identifier (sha1_git)
+ - id (str): rev's identifier (sha1_git)
- indexer_configuration_id (bytes): tool used
- translated_metadata (bytes): dict of retrieved metadata
"""
try:
result = {
- 'id': rev['id'],
+ 'id': rev['id'].decode(),
'indexer_configuration_id': self.tool['id'],
'translated_metadata': None
}
@@ -176,9 +176,9 @@
detected_files = detect_metadata(files)
result['translated_metadata'] = self.translate_revision_metadata(
detected_files)
- except Exception:
+ except Exception as e:
self.log.exception(
- 'Problem when indexing rev')
+ 'Problem when indexing rev: %r', e)
return result
def persist_index_computations(self, results, policy_update):
@@ -271,7 +271,7 @@
def filter(self, ids):
return ids
- def run(self, revisions_metadata, policy_update, *, origin_head_pairs):
+ def run(self, revisions_metadata, policy_update, *, origin_head):
"""Expected to be called with the result of RevisionMetadataIndexer
as first argument; ie. not a list of ids as other indexers would.
@@ -282,12 +282,12 @@
passed by RevisionMetadataIndexer via a Celery chain
triggered by OriginIndexer.next_step.
* `policy_update`: `'ignore-dups'` or `'update-dups'`
- * `origin_head_pairs` (List[dict]): list of dictionaries with
+ * `origin_head` (dict): {str(origin_id): rev_id.encode()}
keys `origin_id` and `revision_id`, which is the result
of OriginHeadIndexer.
"""
- origin_head_map = {pair['origin_id']: pair['revision_id']
- for pair in origin_head_pairs}
+ origin_head_map = {int(origin_id): rev_id
+ for (origin_id, rev_id) in origin_head.items()}
# Fix up the argument order. revisions_metadata has to be the
# first argument because of celery.chain; the next line calls
@@ -312,9 +312,6 @@
revision_metadata['indexer_configuration_id'],
}
- # If you get this KeyError with a message like this:
- # 'foo' not in [b'foo']
- # you should check you're not using JSON as task serializer
raise KeyError('%r not in %r' %
(revision_id, [r['id'] for r in revisions_metadata]))
diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py
--- a/swh/indexer/origin_head.py
+++ b/swh/indexer/origin_head.py
@@ -7,10 +7,9 @@
import click
import logging
-from celery import chain
-
+from swh.scheduler import get_scheduler
+from swh.scheduler.utils import create_task_dict
from swh.indexer.indexer import OriginIndexer
-from swh.indexer.tasks import OriginMetadata, RevisionMetadata
class OriginHeadIndexer(OriginIndexer):
@@ -31,8 +30,8 @@
CONFIG_BASE_FILENAME = 'indexer/origin_head'
- revision_metadata_task = RevisionMetadata()
- origin_intrinsic_metadata_task = OriginMetadata()
+ revision_metadata_task = 'revision_metadata'
+ origin_intrinsic_metadata_task = 'origin_metadata'
def filter(self, ids):
yield from ids
@@ -42,7 +41,7 @@
should only be piped to another indexer via the orchestrator."""
pass
- def next_step(self, results):
+ def next_step(self, results, task):
"""Once the head is found, call the RevisionMetadataIndexer
on these revisions, then call the OriginMetadataIndexer with
both the origin_id and the revision metadata, so it can copy the
@@ -52,19 +51,42 @@
results (Iterable[dict]): Iterable of return values from `index`.
"""
+ super().next_step(results, task)
if self.revision_metadata_task is None and \
self.origin_intrinsic_metadata_task is None:
return
assert self.revision_metadata_task is not None
assert self.origin_intrinsic_metadata_task is not None
- return chain(
- self.revision_metadata_task.s(
- ids=[res['revision_id'] for res in results],
- policy_update='update-dups'),
- self.origin_intrinsic_metadata_task.s(
- origin_head_pairs=results,
- policy_update='update-dups'),
- )()
+
+ # Second task to run after this one: copy the revision's metadata
+ # to the origin
+ sub_task = create_task_dict(
+ self.origin_intrinsic_metadata_task,
+ 'oneshot',
+ origin_head={
+ str(result['origin_id']):
+ result['revision_id'].decode()
+ for result in results},
+ policy_update='update-dups',
+ )
+ del sub_task['next_run'] # Not json-serializable
+
+ # First task to run after this one: index the metadata of the
+ # revision
+ task = create_task_dict(
+ self.revision_metadata_task,
+ 'oneshot',
+ ids=[res['revision_id'].decode() for res in results],
+ policy_update='update-dups',
+ next_step={
+ **sub_task,
+ 'result_name': 'revisions_metadata'},
+ )
+ if getattr(self, 'scheduler', None):
+ scheduler = self.scheduler
+ else:
+ scheduler = get_scheduler(**self.config['scheduler'])
+ scheduler.create_tasks([task])
# Dispatch
diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py
--- a/swh/indexer/tests/test_metadata.py
+++ b/swh/indexer/tests/test_metadata.py
@@ -346,7 +346,7 @@
results = metadata_indexer.idx_storage.added_data
expected_results = [('revision_metadata', True, [{
- 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
+ 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
'translated_metadata': {
'identifier': None,
'maintainer': None,
diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py
--- a/swh/indexer/tests/test_origin_metadata.py
+++ b/swh/indexer/tests/test_origin_metadata.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import time
import logging
import unittest
from celery import task
@@ -13,8 +14,7 @@
from swh.indexer.tests.test_origin_head import TestOriginHeadIndexer
from swh.indexer.tests.test_metadata import TestRevisionMetadataIndexer
-from swh.scheduler.tests.celery_testing import CeleryTestFixture
-from swh.indexer.tests import start_worker_thread
+from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture
class TestOriginMetadataIndexer(OriginMetadataIndexer):
@@ -57,24 +57,40 @@
class TestOriginHeadIndexer(TestOriginHeadIndexer):
- revision_metadata_task = revision_metadata_test_task
- origin_intrinsic_metadata_task = origin_intrinsic_metadata_test_task
+ revision_metadata_task = 'revision_metadata_test_task'
+ origin_intrinsic_metadata_task = 'origin_intrinsic_metadata_test_task'
-class TestOriginMetadata(CeleryTestFixture, unittest.TestCase):
+class TestOriginMetadata(SchedulerTestFixture, unittest.TestCase):
def setUp(self):
super().setUp()
self.maxDiff = None
MockIndexerStorage.added_data = []
+ self.add_scheduler_task_type(
+ 'revision_metadata_test_task',
+ 'swh.indexer.tests.test_origin_metadata.'
+ 'revision_metadata_test_task')
+ self.add_scheduler_task_type(
+ 'origin_intrinsic_metadata_test_task',
+ 'swh.indexer.tests.test_origin_metadata.'
+ 'origin_intrinsic_metadata_test_task')
+ TestRevisionMetadataIndexer.scheduler = self.scheduler
+
+ def tearDown(self):
+ del TestRevisionMetadataIndexer.scheduler
+ super().tearDown()
def test_pipeline(self):
indexer = TestOriginHeadIndexer()
- with start_worker_thread():
- promise = indexer.run(
- ["git+https://github.com/librariesio/yarn-parser"],
- policy_update='update-dups',
- parse_ids=True)
- promise.get()
+ indexer.scheduler = self.scheduler
+ indexer.run(
+ ["git+https://github.com/librariesio/yarn-parser"],
+ policy_update='update-dups',
+ parse_ids=True)
+
+ self.run_ready_tasks() # Run the first task
+ time.sleep(0.1) # Give it time to complete and schedule the 2nd one
+ self.run_ready_tasks() # Run the second task
metadata = {
'identifier': None,
@@ -108,13 +124,13 @@
'email': None
}
rev_metadata = {
- 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
+ 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
'translated_metadata': metadata,
'indexer_configuration_id': 7,
}
origin_metadata = {
'origin_id': 54974445,
- 'from_revision': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
+ 'from_revision': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',
'metadata': metadata,
'indexer_configuration_id': 7,
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 10:51 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217837
Attached To
D608: Use swh.scheduler instead of celery in the OriginHeadIndexer.
Event Timeline
Log In to Comment