diff --git a/PKG-INFO b/PKG-INFO index c56a078..99b583e 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.53 +Version: 0.0.54 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.core.egg-info/PKG-INFO b/swh.core.egg-info/PKG-INFO index c56a078..99b583e 100644 --- a/swh.core.egg-info/PKG-INFO +++ b/swh.core.egg-info/PKG-INFO @@ -1,28 +1,28 @@ Metadata-Version: 2.1 Name: swh.core -Version: 0.0.53 +Version: 0.0.54 Summary: Software Heritage core utilities Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN -Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate +Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Source, https://forge.softwareheritage.org/source/swh-core Description: swh-core ======== core library for swh's modules: - config parser - hash computations - serialization - logging mechanism Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.core.egg-info/SOURCES.txt b/swh.core.egg-info/SOURCES.txt index 284d9da..a635254 100644 --- a/swh.core.egg-info/SOURCES.txt +++ b/swh.core.egg-info/SOURCES.txt @@ -1,39 +1,40 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt swh/__init__.py swh.core.egg-info/PKG-INFO swh.core.egg-info/SOURCES.txt swh.core.egg-info/dependency_links.txt swh.core.egg-info/entry_points.txt swh.core.egg-info/requires.txt swh.core.egg-info/top_level.txt swh/core/__init__.py swh/core/api_async.py swh/core/cli.py swh/core/config.py swh/core/logger.py swh/core/statsd.py swh/core/tarball.py swh/core/utils.py swh/core/api/__init__.py swh/core/api/asynchronous.py swh/core/api/negotiate.py swh/core/api/serializers.py swh/core/db/__init__.py swh/core/db/common.py swh/core/db/db_utils.py swh/core/sql/log-schema.sql swh/core/tests/__init__.py swh/core/tests/db_testing.py swh/core/tests/server_testing.py swh/core/tests/test_api.py swh/core/tests/test_config.py +swh/core/tests/test_db.py swh/core/tests/test_logger.py swh/core/tests/test_serializers.py swh/core/tests/test_statsd.py swh/core/tests/test_utils.py \ No newline at end of file diff --git a/swh.core.egg-info/requires.txt b/swh.core.egg-info/requires.txt index ca675a9..4ef9ee7 100644 --- a/swh.core.egg-info/requires.txt +++ b/swh.core.egg-info/requires.txt @@ -1,16 +1,17 @@ arrow aiohttp msgpack-python psycopg2 python-dateutil vcversioner PyYAML requests Flask systemd-python decorator [testing] pytest<4 pytest-postgresql requests-mock +hypothesis>=3.11.0 diff --git a/swh/core/db/__init__.py b/swh/core/db/__init__.py index cab7ddb..57506e7 100644 --- a/swh/core/db/__init__.py +++ b/swh/core/db/__init__.py @@ -1,193 +1,192 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import enum import json import os import threading from contextlib import contextmanager import psycopg2 import psycopg2.extras psycopg2.extras.register_uuid() def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, datetime.datetime): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) elif isinstance(data, enum.IntEnum): return escape(int(data)) else: # We don't escape here to make sure we pass literals properly return str(data) def typecast_bytea(value, cur): if value is not None: data = psycopg2.BINARY(value, cur) return data.tobytes() class BaseDb: """Base class for swh.*.*Db. cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb """ @classmethod def adapt_conn(cls, conn): """Makes psycopg2 use 'bytes' to decode bytea instead of 'memoryview', for this connection.""" cur = conn.cursor() cur.execute("SELECT null::bytea, null::bytea[]") bytea_oid = cur.description[0][1] bytea_array_oid = cur.description[1][1] t_bytes = psycopg2.extensions.new_type( (bytea_oid,), "bytea", typecast_bytea) psycopg2.extensions.register_type(t_bytes, conn) t_bytes_array = psycopg2.extensions.new_array_type( (bytea_array_oid,), "bytea[]", t_bytes) psycopg2.extensions.register_type(t_bytes_array, conn) @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) - cls.adapt_conn(conn) return cls(conn) @classmethod def from_pool(cls, pool): conn = pool.getconn() - cls.adapt_conn(conn) return cls(conn, pool=pool) def __init__(self, conn, pool=None): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB pool: psycopg2 pool of connections """ + self.adapt_conn(conn) self.conn = conn self.pool = pool def __del__(self): if self.pool: self.pool.putconn(self.conn) def cursor(self, cur_arg=None): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg else: return self.conn.cursor() _cursor = cursor # for bw compat @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except Exception: if not self.conn.closed: self.conn.rollback() raise def copy_to(self, items, tblname, columns, cur=None, item_cb=None, default_values={}): """Copy items' entries to table tblname with columns information. Args: - items (dict): dictionary of data to copy over tblname. + items (List[dict]): dictionaries of data to copy over tblname. tblname (str): destination table's name. columns ([str]): keys to access data in items and also the column names in the destination table. default_values (dict): dictionnary of default values to use when inserting entried int the tblname table. cur: a db cursor; if not given, a new cursor will be created. item_cb (fn): optional function to apply to items's entry. """ read_file, write_file = os.pipe() def writer(): cursor = self.cursor(cur) with open(read_file, 'r') as f: cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) write_thread = threading.Thread(target=writer) write_thread.start() try: with open(write_file, 'w') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k, default_values.get(k))) for k in columns] f.write(','.join(line)) f.write('\n') finally: # No problem bubbling up exceptions, but we still need to make sure # we finish copying, even though we're probably going to cancel the # transaction. write_thread.join() def mktemp(self, tblname, cur=None): self.cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) diff --git a/swh/core/statsd.py b/swh/core/statsd.py index a297781..c9f0691 100644 --- a/swh/core/statsd.py +++ b/swh/core/statsd.py @@ -1,408 +1,408 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # # Vastly adapted for integration in swh.core: # # - Removed python < 3.5 compat code # - trimmed the imports down to be a single module # - adjust some options: # - drop unix socket connection option # - add environment variable support for setting the statsd host and # port (pulled the idea from the main python statsd module) # - only send timer metrics in milliseconds (that's what # prometheus-statsd-exporter expects) # - drop DataDog-specific metric types (that are unsupported in # prometheus-statsd-exporter) # - made the tags a dict instead of a list (prometheus-statsd-exporter only # supports tags with a value, mirroring prometheus) # - switch from time.time to time.monotonic # - improve unit test coverage # - documentation cleanup from asyncio import iscoroutinefunction from functools import wraps from random import random from time import monotonic import itertools import logging import os import socket import warnings log = logging.getLogger('swh.core.statsd') class TimedContextManagerDecorator(object): """ A context manager and a decorator which will report the elapsed time in the context OR in a function call. Attributes: elapsed (float): the elapsed time at the point of completion """ def __init__(self, statsd, metric=None, tags=None, sample_rate=1): self.statsd = statsd self.metric = metric self.tags = tags self.sample_rate = sample_rate self.elapsed = None def __call__(self, func): """ Decorator which returns the elapsed time of the function call. Default to the function name if metric was not provided. """ if not self.metric: self.metric = '%s.%s' % (func.__module__, func.__name__) # Coroutines if iscoroutinefunction(func): @wraps(func) async def wrapped_co(*args, **kwargs): start = monotonic() try: result = await func(*args, **kwargs) return result finally: self._send(start) return wrapped_co # Others @wraps(func) def wrapped(*args, **kwargs): start = monotonic() try: return func(*args, **kwargs) finally: self._send(start) return wrapped def __enter__(self): if not self.metric: raise TypeError("Cannot used timed without a metric!") self._start = monotonic() return self def __exit__(self, type, value, traceback): # Report the elapsed time of the context manager. self._send(self._start) def _send(self, start): elapsed = (monotonic() - start) * 1000 self.statsd.timing(self.metric, elapsed, self.tags, self.sample_rate) self.elapsed = elapsed def start(self): """Start the timer""" self.__enter__() def stop(self): """Stop the timer, send the metric value""" self.__exit__(None, None, None) class Statsd(object): """Initialize a client to send metrics to a StatsD server. Arguments: host (str): the host of the StatsD server. Defaults to localhost. port (int): the port of the StatsD server. Defaults to 8125. max_buffer_size (int): Maximum number of metrics to buffer before sending to the server if sending metrics in batch namespace (str): Namespace to prefix all metric names constant_tags (Dict[str, str]): Tags to attach to all metrics Note: This class also supports the following environment variables: STATSD_HOST Override the default host of the statsd server STATSD_PORT Override the default port of the statsd server STATSD_TAGS Tags to attach to every metric reported. Example value: "label:value,other_label:other_value" """ def __init__(self, host=None, port=None, max_buffer_size=50, namespace=None, constant_tags=None): # Connection if host is None: host = os.environ.get('STATSD_HOST') or 'localhost' self.host = host if port is None: port = os.environ.get('STATSD_PORT') or 8125 self.port = int(port) # Socket self.socket = None self.max_buffer_size = max_buffer_size self._send = self._send_to_server self.encoding = 'utf-8' # Tags self.constant_tags = {} tags_envvar = os.environ.get('STATSD_TAGS', '') for tag in tags_envvar.split(','): if not tag: continue if ':' not in tag: warnings.warn( 'STATSD_TAGS needs to be in key:value format, ' '%s invalid' % tag, UserWarning, ) continue print(tag) k, v = tag.split(':', 1) self.constant_tags[k] = v if constant_tags: self.constant_tags.update({ str(k): str(v) for k, v in constant_tags.items() }) # Namespace if namespace is not None: namespace = str(namespace) self.namespace = namespace def __enter__(self): self.open_buffer(self.max_buffer_size) return self def __exit__(self, type, value, traceback): self.close_buffer() def gauge(self, metric, value, tags=None, sample_rate=1): """ Record the value of a gauge, optionally setting a list of tags and a sample rate. >>> statsd.gauge('users.online', 123) >>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"}) """ return self._report(metric, 'g', value, tags, sample_rate) def increment(self, metric, value=1, tags=None, sample_rate=1): """ Increment a counter, optionally setting a value, tags and a sample rate. >>> statsd.increment('page.views') >>> statsd.increment('files.transferred', 124) """ self._report(metric, 'c', value, tags, sample_rate) def decrement(self, metric, value=1, tags=None, sample_rate=1): """ Decrement a counter, optionally setting a value, tags and a sample rate. >>> statsd.decrement('files.remaining') >>> statsd.decrement('active.connections', 2) """ metric_value = -value if value else value self._report(metric, 'c', metric_value, tags, sample_rate) def histogram(self, metric, value, tags=None, sample_rate=1): """ Sample a histogram value, optionally setting tags and a sample rate. >>> statsd.histogram('uploaded.file.size', 1445) >>> statsd.histogram('file.count', 26, tags={"filetype": "python"}) """ self._report(metric, 'h', value, tags, sample_rate) def timing(self, metric, value, tags=None, sample_rate=1): """ Record a timing, optionally setting tags and a sample rate. >>> statsd.timing("query.response.time", 1234) """ self._report(metric, 'ms', value, tags, sample_rate) def timed(self, metric=None, tags=None, sample_rate=1): """ A decorator or context manager that will measure the distribution of a function's/context's run time. Optionally specify a list of tags or a sample rate. If the metric is not defined as a decorator, the module name and function name will be used. The metric is required as a context manager. :: @statsd.timed('user.query.time', sample_rate=0.5) def get_user(user_id): # Do what you need to ... pass # Is equivalent to ... with statsd.timed('user.query.time', sample_rate=0.5): # Do what you need to ... pass # Is equivalent to ... start = time.monotonic() try: get_user(user_id) finally: statsd.timing('user.query.time', time.monotonic() - start) """ return TimedContextManagerDecorator(self, metric, tags, sample_rate) def set(self, metric, value, tags=None, sample_rate=1): """ Sample a set value. >>> statsd.set('visitors.uniques', 999) """ self._report(metric, 's', value, tags, sample_rate) def get_socket(self): """ Return a connected socket. Note: connect the socket before assigning it to the class instance to avoid bad thread race conditions. """ if not self.socket: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect((self.host, self.port)) self.socket = sock return self.socket def open_buffer(self, max_buffer_size=50): """ Open a buffer to send a batch of metrics in one packet. You can also use this as a context manager. >>> with Statsd() as batch: - >>> batch.gauge('users.online', 123) - >>> batch.gauge('active.connections', 1001) + ... batch.gauge('users.online', 123) + ... batch.gauge('active.connections', 1001) """ self.max_buffer_size = max_buffer_size self.buffer = [] self._send = self._send_to_buffer def close_buffer(self): """ Flush the buffer and switch back to single metric packets. """ self._send = self._send_to_server if self.buffer: # Only send packets if there are packets to send self._flush_buffer() def close_socket(self): """ Closes connected socket if connected. """ if self.socket: self.socket.close() self.socket = None def _report(self, metric, metric_type, value, tags, sample_rate): """ Create a metric packet and send it. """ if value is None: return if sample_rate != 1 and random() > sample_rate: return # Resolve the full tag list tags = self._add_constant_tags(tags) # Create/format the metric packet payload = "%s%s:%s|%s%s%s" % ( (self.namespace + ".") if self.namespace else "", metric, value, metric_type, ("|@" + str(sample_rate)) if sample_rate != 1 else "", ("|#" + ",".join( "%s:%s" % (k, v) for (k, v) in sorted(tags.items()) )) if tags else "", ) # Send it self._send(payload) def _send_to_server(self, packet): try: # If set, use socket directly (self.socket or self.get_socket()).send(packet.encode('utf-8')) except socket.timeout: return except socket.error: log.debug( "Error submitting statsd packet." " Dropping the packet and closing the socket." ) self.close_socket() def _send_to_buffer(self, packet): self.buffer.append(packet) if len(self.buffer) >= self.max_buffer_size: self._flush_buffer() def _flush_buffer(self): self._send_to_server("\n".join(self.buffer)) self.buffer = [] def _add_constant_tags(self, tags): return { str(k): str(v) for k, v in itertools.chain( self.constant_tags.items(), (tags if tags else {}).items(), ) } statsd = Statsd() diff --git a/swh/core/tests/test_db.py b/swh/core/tests/test_db.py new file mode 100644 index 0000000..a9c70e8 --- /dev/null +++ b/swh/core/tests/test_db.py @@ -0,0 +1,102 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os.path +import tempfile +import unittest + +from hypothesis import strategies, given +import pytest + +from swh.core.db import BaseDb +from swh.core.tests.db_testing import ( + SingleDbTestFixture, db_create, db_destroy, db_close, +) + + +INIT_SQL = ''' +create table test_table +( + i int, + txt text, + bytes bytea +); +''' + +db_rows = strategies.lists(strategies.tuples( + strategies.integers(-2147483648, +2147483647), + strategies.text( + alphabet=strategies.characters( + blacklist_categories=['Cs'], # surrogates + blacklist_characters=[ + '\x00', # pgsql does not support the null codepoint + '\r', # pgsql normalizes those + ] + ), + ), + strategies.binary(), +)) + + +@pytest.mark.db +def test_connect(): + db_name = db_create('test-db2', dumps=[]) + try: + db = BaseDb.connect('dbname=%s' % db_name) + with db.cursor() as cur: + cur.execute(INIT_SQL) + cur.execute("insert into test_table values (1, %s, %s);", + ('foo', b'bar')) + cur.execute("select * from test_table;") + assert list(cur) == [(1, 'foo', b'bar')] + finally: + db_close(db.conn) + db_destroy(db_name) + + +@pytest.mark.db +class TestDb(SingleDbTestFixture, unittest.TestCase): + TEST_DB_NAME = 'test-db' + + @classmethod + def setUpClass(cls): + with tempfile.TemporaryDirectory() as td: + with open(os.path.join(td, 'init.sql'), 'a') as fd: + fd.write(INIT_SQL) + + cls.TEST_DB_DUMP = os.path.join(td, '*.sql') + + super().setUpClass() + + def setUp(self): + super().setUp() + self.db = BaseDb(self.conn) + + def test_initialized(self): + cur = self.db.cursor() + cur.execute("insert into test_table values (1, %s, %s);", + ('foo', b'bar')) + cur.execute("select * from test_table;") + self.assertEqual(list(cur), [(1, 'foo', b'bar')]) + + def test_reset_tables(self): + cur = self.db.cursor() + cur.execute("insert into test_table values (1, %s, %s);", + ('foo', b'bar')) + self.reset_db_tables('test-db') + cur.execute("select * from test_table;") + self.assertEqual(list(cur), []) + + @given(db_rows) + def test_copy_to(self, data): + # the table is not reset between runs by hypothesis + self.reset_db_tables('test-db') + + items = [dict(zip(['i', 'txt', 'bytes'], item)) for item in data] + self.db.copy_to(items, 'test_table', ['i', 'txt', 'bytes']) + + cur = self.db.cursor() + cur.execute('select * from test_table;') + self.assertCountEqual(list(cur), data) diff --git a/version.txt b/version.txt index 1c348f9..dd9ee8d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.53-0-g993625f \ No newline at end of file +v0.0.54-0-gc80a4af \ No newline at end of file