diff --git a/debian/control b/debian/control index 82c3ef3..5e4d9bf 100644 --- a/debian/control +++ b/debian/control @@ -1,24 +1,25 @@ Source: swh-scheduler Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-arrow, python3-celery, python3-click, python3-elasticsearch (>= 5.4.0), python3-flask, + python3-kombu, python3-nose, python3-psycopg2, python3-setuptools, python3-swh.core (>= 0.0.38~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSCH/ Package: python3-swh.scheduler Architecture: all Depends: python3-swh.core (>= 0.0.38~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Scheduler diff --git a/requirements.txt b/requirements.txt index df47e53..1af0cad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,12 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow celery Click elasticsearch>5.4 flask +kombu psycopg2 vcversioner diff --git a/swh/scheduler/updater/__init__.py b/swh/scheduler/updater/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/scheduler/updater/ghtorrent.py b/swh/scheduler/updater/ghtorrent.py new file mode 100644 index 0000000..592b319 --- /dev/null +++ b/swh/scheduler/updater/ghtorrent.py @@ -0,0 +1,140 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import string + +from kombu import Connection, Exchange, Queue + + +events = { + # ghtorrent events related to github events (that's the one we are + # interested in) + 'evt': [ + 'commitcomment', 'create', 'delete', 'deployment', + 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', + 'gist', 'gollum', 'issuecomment', 'issues', 'member', + 'membership', 'pagebuild', 'public', 'pullrequest', + 'pullrequestreviewcomment', 'push', 'release', 'repository', + 'status', 'teamadd', 'watch' + ], + # ghtorrent events related to mongodb insert + 'ent': [ + 'commit_comments', 'commits', 'followers', 'forks', + 'geo_cache', 'issue_comments', 'issue_events', 'issues', + 'org_members', 'pull_request_comments', 'pull_requests', + 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' + ] +} + + +class FakeRandomOriginGenerator: + def _random_string(self, length): + """Build a fake string of length length. + + """ + return ''.join([ + random.choice(string.ascii_letters + string.digits) + for n in range(length)]) + + def generate(self, user_range=range(5, 10), repo_range=range(10, 15)): + """Build a fake url + + """ + length_username = random.choice(user_range) + user = self._random_string(length_username) + length_repo = random.choice(repo_range) + repo = self._random_string(length_repo) + return 'https://fake.github.com/%s/%s' % (user, repo) + + +class RabbitMQConn: + DEFAULT_CONFIG = { + 'user': 'guest', + 'pass': 'guest', + 'port': 5672, + 'server': 'localhost', + 'exchange_name': 'ght-streams', + 'routing_key': 'something', + 'queue_name': 'fake-events' + } + + def _conn_queue(self, config): + """Build the connection queue string. + + """ + return 'amqp://%s:%s@%s:%s' % ( + self.config['user'], + self.config['pass'], + self.config['server'], + self.config['port'] + ) + + def __init__(self, **config): + if config: + self.config = config + else: + self.config = self.DEFAULT_CONFIG + + self.conn_queue = self._conn_queue(self.config) + self.exchange = Exchange(self.config['exchange_name'], + 'topic', durable=True) + self.queue = Queue(self.config['queue_name'], + exchange=self.exchange, + routing_key=self.config['routing_key'], + auto_delete=True) + + +class FakeGHTorrentPublisher(RabbitMQConn): + """Fake GHTorrent that randomly publishes fake events. Those events + are published in similar manner as described by ghtorrent's + documentation [2]. + + context: stuck with raw ghtorrent so far [1] + + [1] https://github.com/ghtorrent/ghtorrent.org/issues/397#issuecomment-387052462 # noqa + [2] http://ghtorrent.org/streaming.html + + """ + def __init__(self, **config): + super().__init__(**config) + self.fake_origin_generator = FakeRandomOriginGenerator() + + def _random_event(self): + """Create a fake and random event + + """ + event_type = random.choice(['evt', 'ent']) + sub_event = random.choice(events[event_type]) + return { + 'event': sub_event, + 'url': self.fake_origin_generator.generate(), + } + + def publish(self, nb_messages=10): + with Connection(self.conn_queue) as conn: + with conn.Producer(serializer='json') as producer: + for n in range(nb_messages): + event = self._random_event() + producer.publish(event, + exchange=self.exchange, + routing_key=self.config['routing_key'], + declare=[self.queue]) + + +def process_message(body, message): + print('####', body, message) + message.ack() + + +class GHTorrentConsumer(RabbitMQConn): + """GHTorrent consumer + + """ + def consume(self): + with Connection(self.conn_queue) as conn: + with conn.Consumer(self.queue, callbacks=[process_message], + auto_declare=True) as consumer: + consumer.consume()