Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338619
letsencrypt_gandi_livedns.erb
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
letsencrypt_gandi_livedns.erb
View Options
#!/usr/bin/python3
#
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import os
import sys
import urllib.parse
import yaml
import requests
logger = logging.getLogger(__name__)
CONFIG_FILE = os.environ.get(
'CERTBOT_GANDI_CONFIG',
'
<%=
@hook_configfile
%>
',
)
CONFIG = None
DEFAULT_CONFIG = {
'gandi_api': 'https://dns.api.gandi.net/api/v5/',
'zones': {},
}
def load_config():
"""Load the hook configuration from CONFIG_FILE"""
global CONFIG
if CONFIG is not None:
return
try:
with open(CONFIG_FILE, 'r') as f:
CONFIG = yaml.safe_load(f)
return True
except Exception as e:
logger.warning(
'Could not open configuration file %s: %s', CONFIG_FILE, e
)
CONFIG = DEFAULT_CONFIG
return False
def get_domain_config(domain):
"""Retrieve the configuration for the zone containing `domain`."""
labels = domain.split('.')
for i in range(len(labels)):
zone = '.'.join(labels[i:])
if zone in CONFIG['zones']:
zone_config = CONFIG['zones'][zone]
if labels[0] == '*':
relative = '.'.join(labels[1:i])
else:
relative = '.'.join(labels[:i])
acme_subdomain = '_acme-challenge%s' % (
('.%s' % relative) if relative else ''
)
return {
'domain': domain,
'zone': zone,
'relative_subdomain': relative,
'acme_subdomain': acme_subdomain,
'api_key': zone_config['api_key'],
'sharing_id': zone_config.get('sharing_id')
}
else:
logger.error(
'Could not find zone for domain %s, available zones: %s',
domain, ', '.join(CONFIG['zones'].keys()),
)
def gandi_request(url, domain_config, method='GET', data=None):
"""Perform a request to the Gandi website, with the given data"""
if url.startswith('https://'):
parsed_url = urllib.parse.urlparse(url)
else:
parsed_url = urllib.parse.urlparse(CONFIG['gandi_api'])
parsed_url = parsed_url._replace(
path='/'.join([parsed_url.path.rstrip('/'), url])
)
# Add sharing_id to the query string if needed
if domain_config.get('sharing_id'):
qs = urllib.parse.parse_qs(parsed_url.query)
qs['sharing_id'] = domain_config['sharing_id']
parsed_url = parsed_url._replace(
query=urllib.parse.urlencode(qs, doseq=True)
)
headers = {
'X-Api-Key': domain_config['api_key'],
}
url = urllib.parse.urlunparse(parsed_url)
method = method.lower()
response = getattr(requests, method)(url, headers=headers, json=data)
if response.status_code < 400 or response.status_code == 404:
return response
logger.warn('Got unexpected error %s from the Gandi API: %s',
response.status_code, response.text)
response.raise_for_status()
def get_zone_info(domain_config):
"""Retrieve the zone information from Gandi's website"""
response = gandi_request('domains', domain_config)
for domain in response.json():
if domain['fqdn'] == domain_config['zone']:
return domain
else:
return {}
def get_acme_url(domain_config):
"""Get the URL for the acme records for the given domain config"""
zone_info = get_zone_info(domain_config)
acme_records_url = '%s/%s/TXT' % (
zone_info['domain_records_href'], domain_config['acme_subdomain']
)
return acme_records_url
def get_acme_records(domain_config):
"""Retrieve existing ACME TXT records from the Gandi API"""
acme_records_url = get_acme_url(domain_config)
response = gandi_request(acme_records_url, domain_config)
if response.status_code == 404:
return set()
rrset = response.json()
return {value.strip('"') for value in rrset['rrset_values']}
def set_acme_records(domain_config, acme_records):
"""Set the ACME TXT records on the given domain to the given"""
acme_records_url = get_acme_url(domain_config)
if not acme_records:
response = gandi_request(acme_records_url, domain_config,
method='delete')
return True
new_record = {
"rrset_ttl": 300,
"rrset_values": list(set(acme_records)),
}
response = gandi_request(acme_records_url, domain_config,
method='put', data=new_record)
if response.status_code == 404:
response.raise_for_status()
return True
def usage():
print("""
Usage: %s {auth, cleanup, purge}
Set the CERTBOT_DOMAIN environment variable to set the domain used by the hook.
Set the CERTBOT_VALIDATION environment variable to set the ACME challenge (only
for auth/cleanup).
""".strip() % sys.argv[0], file=sys.stderr)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
certbot_domain = os.environ.get('CERTBOT_DOMAIN')
if len(sys.argv) != 2 or not certbot_domain:
usage()
sys.exit(1)
certbot_validation = os.environ.get('CERTBOT_VALIDATION')
load_config()
domain_config = get_domain_config(certbot_domain)
if not domain_config:
sys.exit(2)
if sys.argv[1] == 'auth':
if not certbot_validation:
usage()
sys.exit(1)
acme_records = get_acme_records(domain_config)
acme_records.add(certbot_validation)
set_acme_records(domain_config, acme_records)
elif sys.argv[1] == 'cleanup':
if not certbot_validation:
usage()
sys.exit(1)
acme_records = get_acme_records(domain_config)
acme_records.remove(certbot_validation)
set_acme_records(domain_config, acme_records)
elif sys.argv[1] == 'show':
for record in get_acme_records(domain_config):
print(record)
elif sys.argv[1] == 'purge':
set_acme_records(domain_config, set())
else:
usage()
sys.exit(1)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jul 4 2025, 8:59 AM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3370764
Attached To
rSPSITE puppet-swh-site
Event Timeline
Log In to Comment