From 36604080f142692921da3f9c3fae50031614baff Mon Sep 17 00:00:00 2001 From: Fabian van Koppen Date: Sun, 12 Oct 2025 12:41:52 +0200 Subject: [PATCH] Initial commit --- .gitignore | 1 + prolo-tlsa.py | 296 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 297 insertions(+) create mode 100644 .gitignore create mode 100755 prolo-tlsa.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..16d3c4d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.cache diff --git a/prolo-tlsa.py b/prolo-tlsa.py new file mode 100755 index 0000000..3ed2cdb --- /dev/null +++ b/prolo-tlsa.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 + +""" +prolo-tlsa.py + +This script generates TLSA records for DANE (DNS-based Authentication of Named Entities) +from X.509 certificates obtained via Let's Encrypt. It uses OpenSSL to compute the SHA-256 +hash of the Subject Public Key Info (SPKI) in DER format. + +TLSA records are used to associate a certificate's public key with a domain name, providing +additional security for TLS connections. + +Optionally, it can publish the generated TLSA records to the Prolocation DNS API. + +Usage: + python prolo-tlsa.py [--publish] + +The script assumes certificates are located in /etc/letsencrypt/live//cert.pem +and /etc/letsencrypt/live//chain.pem. + +It generates TLSA records with usage values: +- 3: For the end-entity certificate (cert.pem) +- 2: For the issuer certificate (chain.pem) + +To publish, set the PROLOCATION_API_KEY environment variable and use --publish. +The script will detect the appropriate DNS zone, including sub-zones. +""" + +import argparse +import logging +import os +import subprocess +import sys +from pathlib import Path + + +try: + import requests +except ImportError: + print( + "requests library required. Install with: pip install requests", file=sys.stderr + ) + sys.exit(1) + + +def setup_logging(): + """Configure logging to output to stderr with INFO level.""" + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + + +def validate_hostname(hostname: str) -> bool: + """ + Validate the hostname for basic sanity checks. + + Args: + hostname (str): The hostname to validate. + + Returns: + bool: True if valid, raises ValueError otherwise. + """ + if not hostname: + raise ValueError("Hostname cannot be empty.") + if any(c in hostname for c in "/\\:"): + raise ValueError("Hostname contains invalid characters.") + # Additional validations can be added here if needed + return True + + +def generate_tlsa_for_cert(cert_path: Path, usage: int) -> str: + """ + Generate a TLSA record for the given certificate. + + The process involves: + 1. Extracting the public key from the certificate. + 2. Converting it to DER format. + 3. Computing the SHA-256 hash of the SPKI. + + Args: + cert_path (Path): Path to the certificate file. + usage (int): TLSA usage field (0-3). + + Returns: + str: The TLSA record string. + + Raises: + subprocess.CalledProcessError: If an OpenSSL command fails. + RuntimeError: If unable to process the certificate. + """ + if not cert_path.exists(): + raise FileNotFoundError(f"Certificate file not found: {cert_path}") + + if not (0 <= usage <= 3): + raise ValueError(f"Invalid TLSA usage value: {usage}. Must be between 0 and 3.") + + try: + # Extract the public key in PEM format + spki_pem_output = subprocess.check_output( + ["openssl", "x509", "-in", str(cert_path), "-pubkey", "-noout"], + stderr=subprocess.PIPE, + ) + + # Convert PEM to DER format + spki_der_output = subprocess.check_output( + ["openssl", "pkey", "-pubin", "-pubout", "-outform", "der"], + input=spki_pem_output, + stderr=subprocess.PIPE, + ) + + # Compute SHA-256 hash in hex format + hash_output = subprocess.check_output( + ["openssl", "dgst", "-sha256", "-r"], + input=spki_der_output, + stderr=subprocess.PIPE, + ) + hex_hash = hash_output.decode().split()[0].strip() + + # Form the TLSA value + tlsa_value = f"{usage} 1 1 {hex_hash}" + + logging.info(f"Generated TLSA for {cert_path}: {tlsa_value}") + return tlsa_value + + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"OpenSSL command failed for {cert_path}: {e.stderr.decode().strip()}" + ) + except Exception as e: + raise RuntimeError(f"Unexpected error processing {cert_path}: {e}") + + +def get_api_key() -> str: + """Retrieve the Prolocation API key from environment variable.""" + key = os.environ.get("PROLOCATION_API_KEY") + if not key: + raise ValueError("PROLOCATION_API_KEY environment variable not set") + return key + + +def get_domains(api_key: str) -> list[dict[str, str]]: + """Fetch the list of active domains from Prolocation API.""" + headers = {"Authorization": f"Bearer {api_key}"} + resp = requests.get( + "https://service.prolocation.net/api/v1/domains", headers=headers + ) + resp.raise_for_status() + return resp.json()["data"] + + +def get_zone(hostname: str, domains: list[dict[str, str]]) -> str: + """ + Determine the appropriate DNS zone for the hostname, preferring the deepest sub-zone. + + Args: + hostname (str): The hostname, e.g., 'test.api.fabianvk.nl' + domains (list): List of domain resources from API. + + Returns: + str: The zone name to use. + """ + domain_names = [d["name"] for d in domains] + possible_zones = [ + d for d in domain_names if hostname.endswith("." + d) or hostname == d + ] + if not possible_zones: + raise ValueError( + f"No zone found for hostname {hostname}. Available domains: {domain_names}" + ) + # Use the longest (deepest) zone + zone = max(possible_zones, key=len) + logging.info(f"Using zone '{zone}' for hostname '{hostname}'") + return zone + + +def publish_tlsa_records( + zone: str, + hostname: str, + tlsa_list: list[str], + api_key: str, + port: int, + protocol: str, +): + """ + Publish TLSA records to the DNS zone via Prolocation API. + Removes any existing TLSA records for the hostname before adding new ones. + + Args: + zone (str): The DNS zone name. + hostname (str): The full hostname. + tlsa_list (list): List of TLSA record strings. + api_key (str): The API key. + """ + if hostname.endswith("." + zone): + relative_hostname = hostname[: -len("." + zone)] + else: + relative_hostname = hostname + name = ( + f"_{port}._{protocol}.{relative_hostname}" + if relative_hostname + else f"_{port}._{protocol}" + ) + records_url = f"https://service.prolocation.net/api/v1/zones/{zone}/dns" + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + + # Get existing records + resp = requests.get(records_url, headers=headers) + resp.raise_for_status() + records = resp.json()["data"] + + # Find existing TLSA for this name + existing = [r for r in records if r["name"] == name and r["type"] == "TLSA"] + + # Delete existing + for rec in existing: + params = {"type": "TLSA", "name": name, "value": rec["value"]} + del_resp = requests.delete(records_url, headers=headers, params=params) + if del_resp.status_code == 404: + logging.warning(f"Record to delete not found: {params}") + else: + del_resp.raise_for_status() + logging.info(f"Deleted existing TLSA record: {rec['value']}") + + # Add new + for tlsa_value in tlsa_list: + data = {"type": "TLSA", "name": name, "value": tlsa_value} + add_resp = requests.post(records_url, headers=headers, json=data) + add_resp.raise_for_status() + logging.info(f"Added TLSA record: {tlsa_value}") + + +def main(): + """Main function to parse arguments, generate TLSA records, and optionally publish.""" + setup_logging() + + parser = argparse.ArgumentParser( + description="Generate TLSA records from Let's Encrypt certificates, optionally publish to DNS." + ) + parser.add_argument( + "hostname", help="The hostname for which to generate TLSA records." + ) + parser.add_argument( + "--publish", + action="store_true", + help="Publish TLSA records to Prolocation DNS API.", + ) + parser.add_argument( + "--port", + type=int, + default=25, + help="Port number for the TLSA record (default: 25).", + ) + parser.add_argument( + "--protocol", + default="tcp", + help="Protocol for the TLSA record (default: tcp).", + ) + args = parser.parse_args() + + try: + validate_hostname(args.hostname) + except ValueError as e: + logging.error(e) + sys.exit(1) + + base_path = Path("/etc/letsencrypt/live") / args.hostname + cert_configs = [ + (base_path / "cert.pem", 3), # End-entity certificate + (base_path / "chain.pem", 2), # Intermediate certificate + ] + + tlsa_records = [] + for cert_file, usage in cert_configs: + try: + tlsa_record = generate_tlsa_for_cert(cert_file, usage) + tlsa_records.append(tlsa_record) + print(tlsa_record) + except FileNotFoundError as e: + logging.warning(e) + except (ValueError, RuntimeError) as e: + logging.error(e) + sys.exit(1) + + if args.publish: + try: + api_key = get_api_key() + domains = get_domains(api_key) + zone = get_zone(args.hostname, domains) + publish_tlsa_records( + zone, args.hostname, tlsa_records, api_key, args.port, args.protocol + ) + except Exception as e: + logging.error(f"Failed to publish TLSA records: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main()