diff --git a/site-modules/profile/templates/letsencrypt/letsencrypt_gandi_livedns.erb b/site-modules/profile/templates/letsencrypt/letsencrypt_gandi_livedns.erb index 25b38334..2a2ba3db 100755 --- a/site-modules/profile/templates/letsencrypt/letsencrypt_gandi_livedns.erb +++ b/site-modules/profile/templates/letsencrypt/letsencrypt_gandi_livedns.erb @@ -1,211 +1,215 @@ #!/usr/bin/python3 # # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import sys import urllib.parse import yaml import requests logger = logging.getLogger(__name__) CONFIG_FILE = os.environ.get( 'CERTBOT_GANDI_CONFIG', '<%= @hook_configfile %>', ) CONFIG = None DEFAULT_CONFIG = { 'gandi_api': 'https://dns.api.gandi.net/api/v5/', 'zones': {}, } def load_config(): """Load the hook configuration from CONFIG_FILE""" global CONFIG if CONFIG is not None: return try: with open(CONFIG_FILE, 'r') as f: CONFIG = yaml.safe_load(f) return True except Exception as e: logger.warning( 'Could not open configuration file %s: %s', CONFIG_FILE, e ) CONFIG = DEFAULT_CONFIG return False def get_domain_config(domain): """Retrieve the configuration for the zone containing `domain`.""" labels = domain.split('.') for i in range(len(labels)): zone = '.'.join(labels[i:]) if zone in CONFIG['zones']: zone_config = CONFIG['zones'][zone] if labels[0] == '*': relative = '.'.join(labels[1:i]) else: relative = '.'.join(labels[:i]) acme_subdomain = '_acme-challenge%s' % ( ('.%s' % relative) if relative else '' ) return { 'domain': domain, 'zone': zone, 'relative_subdomain': relative, 'acme_subdomain': acme_subdomain, 'api_key': zone_config['api_key'], 'sharing_id': zone_config.get('sharing_id') } else: logger.error( 'Could not find zone for domain %s, available zones: %s', domain, ', '.join(CONFIG['zones'].keys()), ) + def gandi_request(url, domain_config, method='GET', data=None): """Perform a request to the Gandi website, with the given data""" if url.startswith('https://'): parsed_url = urllib.parse.urlparse(url) else: parsed_url = urllib.parse.urlparse(CONFIG['gandi_api']) parsed_url = parsed_url._replace( path='/'.join([parsed_url.path.rstrip('/'), url]) ) # Add sharing_id to the query string if needed if domain_config.get('sharing_id'): qs = urllib.parse.parse_qs(parsed_url.query) qs['sharing_id'] = domain_config['sharing_id'] parsed_url = parsed_url._replace( query=urllib.parse.urlencode(qs, doseq=True) ) headers = { 'X-Api-Key': domain_config['api_key'], } url = urllib.parse.urlunparse(parsed_url) method = method.lower() response = getattr(requests, method)(url, headers=headers, json=data) if response.status_code < 400 or response.status_code == 404: return response logger.warn('Got unexpected error %s from the Gandi API: %s', response.status_code, response.text) response.raise_for_status() def get_zone_info(domain_config): """Retrieve the zone information from Gandi's website""" response = gandi_request('domains', domain_config) for domain in response.json(): if domain['fqdn'] == domain_config['zone']: return domain else: return {} def get_acme_url(domain_config): """Get the URL for the acme records for the given domain config""" zone_info = get_zone_info(domain_config) acme_records_url = '%s/%s/TXT' % ( zone_info['domain_records_href'], domain_config['acme_subdomain'] ) return acme_records_url def get_acme_records(domain_config): """Retrieve existing ACME TXT records from the Gandi API""" acme_records_url = get_acme_url(domain_config) response = gandi_request(acme_records_url, domain_config) if response.status_code == 404: return set() rrset = response.json() return {value.strip('"') for value in rrset['rrset_values']} def set_acme_records(domain_config, acme_records): """Set the ACME TXT records on the given domain to the given""" acme_records_url = get_acme_url(domain_config) if not acme_records: response = gandi_request(acme_records_url, domain_config, method='delete') return True new_record = { "rrset_ttl": 300, "rrset_values": list(set(acme_records)), } response = gandi_request(acme_records_url, domain_config, method='put', data=new_record) if response.status_code == 404: response.raise_for_status() return True def usage(): print(""" Usage: %s {auth, cleanup, purge} Set the CERTBOT_DOMAIN environment variable to set the domain used by the hook. Set the CERTBOT_VALIDATION environment variable to set the ACME challenge (only for auth/cleanup). """.strip() % sys.argv[0], file=sys.stderr) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) certbot_domain = os.environ.get('CERTBOT_DOMAIN') if len(sys.argv) != 2 or not certbot_domain: usage() sys.exit(1) certbot_validation = os.environ.get('CERTBOT_VALIDATION') load_config() domain_config = get_domain_config(certbot_domain) if not domain_config: sys.exit(2) if sys.argv[1] == 'auth': if not certbot_validation: usage() sys.exit(1) acme_records = get_acme_records(domain_config) acme_records.add(certbot_validation) set_acme_records(domain_config, acme_records) elif sys.argv[1] == 'cleanup': if not certbot_validation: usage() sys.exit(1) acme_records = get_acme_records(domain_config) - acme_records.add(certbot_validation) + acme_records.remove(certbot_validation) set_acme_records(domain_config, acme_records) + elif sys.argv[1] == 'show': + for record in get_acme_records(domain_config): + print(record) elif sys.argv[1] == 'purge': set_acme_records(domain_config, set()) else: usage() sys.exit(1)