diff --git a/docs/README.rst b/docs/README.rst index bc3ab62..01a5b56 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -1,39 +1,43 @@ Software Heritage - Datastore Scrubber ====================================== Tools to periodically checks data integrity in swh-storage and swh-objstorage, reports errors, and (try to) fix them. This is a work in progress; some of the components described below do not exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection) The Scrubber package is made of the following parts: Checking -------- Highly parallel processes continuously read objects from a data store, compute checksums, and write any failure in a database, along with the data of the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- Then, from time to time, jobs go through the list of known corrupt objects, and try to recover the original objects, through various means: * Brute-forcing variations until they match their checksum * Recovering from another data store * As a last resort, recovering from known origins, if any Reinjection ----------- Finally, when an original object is recovered, it is reinjected in the original data store, replacing the corrupt one. diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index 2305fa7..7abf35f 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -1,280 +1,331 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" import collections import contextlib import dataclasses +import datetime import logging -from typing import Iterable, Union +from typing import Iterable, Optional, Tuple, Union from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka from swh.model import swhids from swh.model.model import ( Content, Directory, ObjectType, Release, Revision, Snapshot, TargetType, ) from swh.storage import backfill from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content] @contextlib.contextmanager def storage_db(storage): db = storage.get_db() try: yield db finally: storage.put_db(db) +def _get_inclusive_range_swhids( + inclusive_range_start: Optional[bytes], + exclusive_range_end: Optional[bytes], + object_type: swhids.ObjectType, +) -> Tuple[swhids.CoreSWHID, swhids.CoreSWHID]: + r""" + Given a ``[range_start, range_end)`` right-open interval of id prefixes + and an object type (as returned by :const:`swh.storage.backfill.RANGE_GENERATORS`), + returns a ``[range_start_swhid, range_end_swhid]`` closed interval of SWHIDs + suitable for the scrubber database. + + >>> _get_inclusive_range_swhids(b"\x42", None, swhids.ObjectType.SNAPSHOT) + (CoreSWHID.from_string('swh:1:snp:4200000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:snp:ffffffffffffffffffffffffffffffffffffffff')) + + >>> _get_inclusive_range_swhids(b"\x00", b"\x12\x34", swhids.ObjectType.REVISION) + (CoreSWHID.from_string('swh:1:rev:0000000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:rev:1233ffffffffffffffffffffffffffffffffffff')) + + """ # noqa + range_start_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=(inclusive_range_start or b"").ljust(20, b"\00"), + ) + if exclusive_range_end is None: + inclusive_range_end = b"\xff" * 20 + else: + # convert "1230000000..." to "122fffffff..." + inclusive_range_end = ( + int.from_bytes(exclusive_range_end.ljust(20, b"\x00"), "big") - 1 + ).to_bytes(20, "big") + range_end_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=inclusive_range_end, + ) + + return (range_start_swhid, range_end_swhid) + + @dataclasses.dataclass class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" db: ScrubberDb storage: StorageInterface object_type: str """``directory``/``revision``/``release``/``snapshot``""" start_object: str """minimum value of the hexdigest of the object's sha1.""" end_object: str """maximum value of the hexdigest of the object's sha1.""" _datastore = None _statsd = None def datastore_info(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" if self._datastore is None: if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: self._datastore = Datastore( package="storage", cls="postgresql", instance=db.conn.dsn, ) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).datastore()" ) return self._datastore def statsd(self) -> Statsd: if self._statsd is None: self._statsd = Statsd( namespace="swh_scrubber", constant_tags={"object_type": self.object_type}, ) return self._statsd def run(self): """Runs on all objects of ``object_type`` and with id between ``start_object`` and ``end_object``. """ if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: return self._check_postgresql(db) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).check_storage()" ) def _check_postgresql(self, db): + object_type = getattr(swhids.ObjectType, self.object_type.upper()) for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): + (range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( + range_start, range_end, object_type + ) + + start_time = datetime.datetime.now(tz=datetime.timezone.utc) logger.debug( "Processing %s range %s to %s", self.object_type, backfill._format_range_bound(range_start), backfill._format_range_bound(range_end), ) objects = backfill.fetch( db, self.object_type, start=range_start, end=range_end ) objects = list(objects) with self.statsd().timed( "batch_duration_seconds", tags={"operation": "check_hashes"} ): self.check_object_hashes(objects) with self.statsd().timed( "batch_duration_seconds", tags={"operation": "check_references"} ): self.check_object_references(objects) + self.db.checked_range_upsert( + self.datastore_info(), + range_start_swhid, + range_end_swhid, + start_time, + ) + def check_object_hashes(self, objects: Iterable[ScrubbableObject]): """Recomputes hashes, and reports mismatches.""" count = 0 for object_ in objects: if isinstance(object_, Content): # TODO continue real_id = object_.compute_hash() count += 1 if object_.id != real_id: self.statsd().increment("hash_mismatch_total") self.db.corrupt_object_add( object_.swhid(), self.datastore_info(), value_to_kafka(object_.to_dict()), ) if count: self.statsd().increment("objects_hashed_total", count) def check_object_references(self, objects: Iterable[ScrubbableObject]): """Check all objects references by these objects exist.""" cnt_references = collections.defaultdict(set) dir_references = collections.defaultdict(set) rev_references = collections.defaultdict(set) rel_references = collections.defaultdict(set) snp_references = collections.defaultdict(set) for object_ in objects: swhid = object_.swhid() if isinstance(object_, Content): pass elif isinstance(object_, Directory): for entry in object_.entries: if entry.type == "file": cnt_references[entry.target].add(swhid) elif entry.type == "dir": dir_references[entry.target].add(swhid) elif entry.type == "rev": # dir->rev holes are not considered a problem because they # happen whenever git submodules point to repositories that # were not loaded yet; ignore them pass else: assert False, entry elif isinstance(object_, Revision): dir_references[object_.directory].add(swhid) for parent in object_.parents: rev_references[parent].add(swhid) elif isinstance(object_, Release): if object_.target is None: pass elif object_.target_type == ObjectType.CONTENT: cnt_references[object_.target].add(swhid) elif object_.target_type == ObjectType.DIRECTORY: dir_references[object_.target].add(swhid) elif object_.target_type == ObjectType.REVISION: rev_references[object_.target].add(swhid) elif object_.target_type == ObjectType.RELEASE: rel_references[object_.target].add(swhid) else: assert False, object_ elif isinstance(object_, Snapshot): for branch in object_.branches.values(): if branch is None: pass elif branch.target_type == TargetType.ALIAS: pass elif branch.target_type == TargetType.CONTENT: cnt_references[branch.target].add(swhid) elif branch.target_type == TargetType.DIRECTORY: dir_references[branch.target].add(swhid) elif branch.target_type == TargetType.REVISION: rev_references[branch.target].add(swhid) elif branch.target_type == TargetType.RELEASE: rel_references[branch.target].add(swhid) elif branch.target_type == TargetType.SNAPSHOT: snp_references[branch.target].add(swhid) else: assert False, (str(object_.swhid()), branch) else: assert False, object_.swhid() missing_cnts = set( self.storage.content_missing_per_sha1_git(list(cnt_references)) ) missing_dirs = set(self.storage.directory_missing(list(dir_references))) missing_revs = set(self.storage.revision_missing(list(rev_references))) missing_rels = set(self.storage.release_missing(list(rel_references))) missing_snps = set(self.storage.snapshot_missing(list(snp_references))) self.statsd().increment( "missing_object_total", len(missing_cnts), tags={"target_object_type": "content"}, ) self.statsd().increment( "missing_object_total", len(missing_dirs), tags={"target_object_type": "directory"}, ) self.statsd().increment( "missing_object_total", len(missing_revs), tags={"target_object_type": "revision"}, ) self.statsd().increment( "missing_object_total", len(missing_rels), tags={"target_object_type": "release"}, ) self.statsd().increment( "missing_object_total", len(missing_snps), tags={"target_object_type": "snapshot"}, ) for missing_id in missing_cnts: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.CONTENT, object_id=missing_id ) self.db.missing_object_add( missing_swhid, cnt_references[missing_id], self.datastore_info() ) for missing_id in missing_dirs: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id ) self.db.missing_object_add( missing_swhid, dir_references[missing_id], self.datastore_info() ) for missing_id in missing_revs: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.REVISION, object_id=missing_id ) self.db.missing_object_add( missing_swhid, rev_references[missing_id], self.datastore_info() ) for missing_id in missing_rels: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.RELEASE, object_id=missing_id ) self.db.missing_object_add( missing_swhid, rel_references[missing_id], self.datastore_info() ) for missing_id in missing_snps: missing_swhid = swhids.CoreSWHID( object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id ) self.db.missing_object_add( missing_swhid, snp_references[missing_id], self.datastore_info() ) diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py index efd38d7..047cf52 100644 --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -1,291 +1,406 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest.mock import attr import pytest from swh.journal.serializers import kafka_to_value from swh.model import model, swhids from swh.model.tests import swh_model_data -from swh.scrubber.storage_checker import StorageChecker +from swh.scrubber.db import Datastore +from swh.scrubber.storage_checker import StorageChecker, storage_db from swh.storage.backfill import byte_ranges CONTENT1 = model.Content.from_data(b"foo") DIRECTORY1 = model.Directory( entries=( model.DirectoryEntry( target=CONTENT1.sha1_git, type="file", name=b"file1", perms=0o1 ), ) ) DIRECTORY2 = model.Directory( entries=( model.DirectoryEntry( target=CONTENT1.sha1_git, type="file", name=b"file2", perms=0o1 ), model.DirectoryEntry(target=DIRECTORY1.id, type="dir", name=b"dir1", perms=0o1), model.DirectoryEntry(target=b"\x00" * 20, type="rev", name=b"rev1", perms=0o1), ) ) REVISION1 = model.Revision( message=b"blah", directory=DIRECTORY2.id, author=None, committer=None, date=None, committer_date=None, type=model.RevisionType.GIT, synthetic=True, ) RELEASE1 = model.Release( message=b"blih", name=b"bluh", target_type=model.ObjectType.REVISION, target=REVISION1.id, synthetic=True, ) SNAPSHOT1 = model.Snapshot( branches={ b"rel1": model.SnapshotBranch( target_type=model.TargetType.RELEASE, target=RELEASE1.id ), } ) +@pytest.fixture +def datastore(swh_storage): + with storage_db(swh_storage) as db: + return Datastore( + package="storage", + cls="postgresql", + instance=db.conn.dsn, + ) + + # decorator to make swh.storage.backfill use fewer ranges, so tests run faster patch_byte_ranges = unittest.mock.patch( "swh.storage.backfill.byte_ranges", lambda numbits, start, end: byte_ranges(numbits // 8, start, end), ) +def _short_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:1fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:2000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:5fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:6000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:9fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:a000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:dfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:e000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +def _long_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +EXPECTED_RANGES = [ + *_short_ranges("dir"), + *_long_ranges("rel"), + *_short_ranges("rev"), + *_long_ranges("snp"), +] + + +def assert_checked_ranges( + scrubber_db, datastore, expected_ranges, before_date=None, after_date=None +): + if before_date is not None: + assert all( + before_date < date < after_date + for (_, _, date) in scrubber_db.checked_range_iter(datastore) + ) + + checked_ranges = [ + (str(start), str(end)) + for (start, end, date) in scrubber_db.checked_range_iter(datastore) + ] + checked_ranges.sort(key=str) + + assert checked_ranges == expected_ranges + + @patch_byte_ranges -def test_no_corruption(scrubber_db, swh_storage): +def test_no_corruption(scrubber_db, datastore, swh_storage): swh_storage.directory_add(swh_model_data.DIRECTORIES) swh_storage.revision_add(swh_model_data.REVISIONS) swh_storage.release_add(swh_model_data.RELEASES) swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) + before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) assert list(scrubber_db.corrupt_object_iter()) == [] + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) @patch_byte_ranges -def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): +def test_corrupt_snapshot(scrubber_db, datastore, swh_storage, corrupt_idx): storage_dsn = swh_storage.get_db().conn.dsn snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) swh_storage.snapshot_add(snapshots) before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( "swh:1:snp:0000000000000000000000000000000000000000" ) assert corrupt_objects[0].datastore.package == "storage" assert corrupt_objects[0].datastore.cls == "postgresql" assert corrupt_objects[0].datastore.instance.startswith(storage_dsn) assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence <= after_date + datetime.timedelta(seconds=5) ) assert ( kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() ) + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @patch_byte_ranges -def test_corrupt_snapshots_same_batch(scrubber_db, swh_storage): +def test_corrupt_snapshots_same_batch(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) swh_storage.snapshot_add(snapshots) StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="00" * 20, end_object="ff" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:0101010101010101010101010101010101010101", ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_corrupt_snapshots_different_batches(scrubber_db, swh_storage): +def test_corrupt_snapshots_different_batches(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i * 255]) * 20) swh_storage.snapshot_add(snapshots) StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="00" * 20, end_object="87" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 # Simulates resuming from a different process, with an empty lru_cache scrubber_db.datastore_get_or_add.cache_clear() StorageChecker( db=scrubber_db, storage=swh_storage, object_type="snapshot", start_object="88" * 20, end_object="ff" * 20, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 2 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) for swhid in [ "swh:1:snp:0000000000000000000000000000000000000000", "swh:1:snp:ffffffffffffffffffffffffffffffffffffffff", ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_no_hole(scrubber_db, swh_storage): +def test_no_hole(scrubber_db, datastore, swh_storage): swh_storage.content_add([CONTENT1]) swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.revision_add([REVISION1]) swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() assert list(scrubber_db.missing_object_iter()) == [] + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @pytest.mark.parametrize( "missing_object", ["content1", "directory1", "directory2", "revision1", "release1"], ) @patch_byte_ranges -def test_one_hole(scrubber_db, swh_storage, missing_object): +def test_one_hole(scrubber_db, datastore, swh_storage, missing_object): if missing_object == "content1": missing_swhid = CONTENT1.swhid() reference_swhids = [DIRECTORY1.swhid(), DIRECTORY2.swhid()] else: swh_storage.content_add([CONTENT1]) if missing_object == "directory1": missing_swhid = DIRECTORY1.swhid() reference_swhids = [DIRECTORY2.swhid()] else: swh_storage.directory_add([DIRECTORY1]) if missing_object == "directory2": missing_swhid = DIRECTORY2.swhid() reference_swhids = [REVISION1.swhid()] else: swh_storage.directory_add([DIRECTORY2]) if missing_object == "revision1": missing_swhid = REVISION1.swhid() reference_swhids = [RELEASE1.swhid()] else: swh_storage.revision_add([REVISION1]) if missing_object == "release1": missing_swhid = RELEASE1.swhid() reference_swhids = [SNAPSHOT1.swhid()] else: swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() assert [mo.id for mo in scrubber_db.missing_object_iter()] == [missing_swhid] assert { (mor.missing_id, mor.reference_id) for mor in scrubber_db.missing_object_reference_iter(missing_swhid) } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids} + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @patch_byte_ranges -def test_two_holes(scrubber_db, swh_storage): +def test_two_holes(scrubber_db, datastore, swh_storage): # missing content and revision swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, storage=swh_storage, object_type=object_type, start_object="00" * 20, end_object="ff" * 20, ).run() assert {mo.id for mo in scrubber_db.missing_object_iter()} == { CONTENT1.swhid(), REVISION1.swhid(), } assert { mor.reference_id for mor in scrubber_db.missing_object_reference_iter(CONTENT1.swhid()) } == {DIRECTORY1.swhid(), DIRECTORY2.swhid()} assert { mor.reference_id for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid()) } == {RELEASE1.swhid()} + + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES)