Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342630
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py
index d6d0bef..0d638f8 100644
--- a/swh/scheduler/updater/consumer.py
+++ b/swh/scheduler/updater/consumer.py
@@ -1,110 +1,117 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from abc import ABCMeta, abstractmethod
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
class UpdaterConsumer(metaclass=ABCMeta):
"""Event consumer
"""
def __init__(self, batch=1000):
super().__init__()
self._reset_cache()
self.backend = SchedulerUpdaterBackend()
self.batch = batch
def _reset_cache(self):
+ """Reset internal cache.
+
+ """
self.count = 0
self.seen_events = set()
self.events = []
def is_interesting(self, event):
"""Determine if an event is interesting or not.
Args
event (SWHEvent): SWH event
"""
return event.is_interesting()
@abstractmethod
def convert_event(self, event):
"""Parse an event into an SWHEvent.
"""
pass
- @abstractmethod
- def post_process_message(self, message):
- pass
+ def process_event(self, body):
+ """Process event
- def process_message(self, body, message):
+ """
try:
event = self.convert_event(body)
if self.debug:
print('#### body', body)
if self.is_interesting(event):
if event.url in self.seen_events:
event.rate += 1
else:
self.events.append(event)
self.seen_events.add(event.url)
self.count += 1
finally:
- self.post_process_message(message)
if self.count >= self.batch:
if self.events:
self.backend.cache_put(self.events)
self._reset_cache()
- def flush(self):
+ def _flush(self):
+ """Flush remaining internal cache if any.
+
+ """
if self.events:
self.backend.cache_put(self.events)
self._reset_cache()
@abstractmethod
def has_events(self):
"""Determine if there remains events to consume.
+ Returns
+ boolean value, true for remaining events, False otherwise
+
"""
pass
@abstractmethod
def consume(self):
- """The main entry point to consume event.
+ """The main entry point to consume events.
- This should be defined per consumer and call the
- self.process_message function.
+ This should either yield or return message for consumption.
"""
pass
@abstractmethod
def open_connection(self):
"""Open a connection to the remote system we are supposed to consume
from.
"""
pass
@abstractmethod
def close_connection(self):
"""Close opened connection to the remote system.
"""
pass
def run(self):
"""The main entry point to consume events.
"""
self.open_connection()
while self.has_events():
- self.consume()
+ for event in self.consume():
+ self.process_event(event)
self.close_connection()
- self.flush()
+ self._flush()
diff --git a/swh/scheduler/updater/ghtorrent.py b/swh/scheduler/updater/ghtorrent.py
index 92b6786..8d0c1ca 100644
--- a/swh/scheduler/updater/ghtorrent.py
+++ b/swh/scheduler/updater/ghtorrent.py
@@ -1,240 +1,233 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import json
import random
import string
from arrow import utcnow
from kombu import Connection, Exchange, Queue
from kombu.common import collect_replies
from swh.core.config import SWHConfig
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.consumer import UpdaterConsumer
events = {
# ghtorrent events related to github events (interesting)
'evt': [
'commitcomment', 'create', 'delete', 'deployment',
'deploymentstatus', 'download', 'follow', 'fork', 'forkapply',
'gist', 'gollum', 'issuecomment', 'issues', 'member',
'membership', 'pagebuild', 'public', 'pullrequest',
'pullrequestreviewcomment', 'push', 'release', 'repository',
'status', 'teamadd', 'watch'
],
# ghtorrent events related to mongodb insert (not interesting)
'ent': [
'commit_comments', 'commits', 'followers', 'forks',
'geo_cache', 'issue_comments', 'issue_events', 'issues',
'org_members', 'pull_request_comments', 'pull_requests',
'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers'
]
}
class FakeRandomOriginGenerator:
def _random_string(self, length):
"""Build a fake string of length length.
"""
return ''.join([
random.choice(string.ascii_letters + string.digits)
for n in range(length)])
def generate(self, user_range=range(5, 10), repo_range=range(10, 15)):
"""Build a fake url
"""
length_username = random.choice(user_range)
user = self._random_string(length_username)
length_repo = random.choice(repo_range)
repo = self._random_string(length_repo)
return '%s/%s' % (user, repo)
class RabbitMQConn(SWHConfig):
"""RabbitMQ Connection class
"""
CONFIG_BASE_FILENAME = 'backend/ghtorrent'
DEFAULT_CONFIG = {
'conn': ('dict', {
'user': 'guest',
'pass': 'guest',
'port': 5672,
'server': 'localhost',
'exchange_name': 'ght-streams',
'routing_key': 'something',
'queue_name': 'fake-events'
})
}
ADDITIONAL_CONFIG = {}
def _connection_string(self):
"""Build the connection queue string.
"""
return 'amqp://%s:%s@%s:%s' % (
self.config['conn']['user'],
self.config['conn']['pass'],
self.config['conn']['server'],
self.config['conn']['port']
)
def __init__(self, **config):
super().__init__()
if config:
self.config = config
else:
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.conn_string = self._connection_string()
self.exchange = Exchange(self.config['conn']['exchange_name'],
'topic', durable=True)
self.routing_key = self.config['conn']['routing_key']
self.queue = Queue(self.config['conn']['queue_name'],
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=True)
class FakeGHTorrentPublisher(RabbitMQConn):
"""Fake GHTorrent that randomly publishes fake events. Those events
are published in similar manner as described by ghtorrent's
documentation [2].
context: stuck with raw ghtorrent so far [1]
[1] https://github.com/ghtorrent/ghtorrent.org/issues/397#issuecomment-387052462 # noqa
[2] http://ghtorrent.org/streaming.html
"""
ADDITIONAL_CONFIG = {
'nb_messages': ('int', 100)
}
def __init__(self, **config):
super().__init__(**config)
self.fake_origin_generator = FakeRandomOriginGenerator()
self.nb_messages = self.config['nb_messages']
def _random_event(self):
"""Create a fake and random event
"""
event_type = random.choice(['evt', 'ent'])
sub_event = random.choice(events[event_type])
return {
'type': sub_event,
'repo': {
'name': self.fake_origin_generator.generate(),
},
'created_at': utcnow().isoformat()
}
def publish(self, nb_messages=None):
if not nb_messages:
nb_messages = self.nb_messages
with Connection(self.conn_string) as conn:
with conn.Producer(serializer='json') as producer:
for n in range(nb_messages):
event = self._random_event()
producer.publish(event,
exchange=self.exchange,
routing_key=self.routing_key,
declare=[self.queue])
def convert_event(event):
"""Given ghtorrent event, convert it to an swhevent instance.
"""
if isinstance(event, str):
event = json.loads(event)
keys = ['type', 'repo', 'created_at']
for k in keys:
if k not in event:
raise ValueError('Event should have the \'%s\' entry defined' % k)
_type = event['type'].lower().rstrip('Event')
_repo_name = 'https://github.com/%s' % event['repo']['name']
return SWHEvent({
'type': _type,
'url': _repo_name,
'last_seen': event['created_at']
})
class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer):
"""GHTorrent consumer
"""
ADDITIONAL_CONFIG = {
'debug': ('bool', False),
'batch': ('int', 1000),
}
def __init__(self, **config):
super().__init__(**config)
self.debug = self.config['debug']
self.batch = self.config['batch']
self.messages = []
def has_events(self):
return True
- def post_process_message(self, message):
- """Acknowledge the read message.
-
- """
- pass
-
def convert_event(self, event):
"""Given ghtorrent event, convert it to an swhevent instance.
"""
return convert_event(event)
def _on_message(self, message):
self.messages.append((message.body.decode('utf-8'), message))
def open_connection(self):
self.conn = Connection(self.conn_string)
self.conn.connect()
def close_connection(self):
self.conn.release()
def consume(self):
- """Open a rabbitmq connection and consumes events from that endpoint.
+ """Consume events from rabbitmq connection
"""
- for body in collect_replies(
+ yield from collect_replies(
self.conn, self.conn.channel(), self.queue, no_ack=False,
- limit=100):
- self.process_message(body, None)
+ limit=100)
@click.command()
def main():
"""Consume events from ghtorrent
"""
GHTorrentConsumer().run()
if __name__ == '__main__':
main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:53 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3255246
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment