Files
prolo-tlsa/prolo-tlsa.py
2025-10-12 12:41:52 +02:00

297 lines
9.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
prolo-tlsa.py
This script generates TLSA records for DANE (DNS-based Authentication of Named Entities)
from X.509 certificates obtained via Let's Encrypt. It uses OpenSSL to compute the SHA-256
hash of the Subject Public Key Info (SPKI) in DER format.
TLSA records are used to associate a certificate's public key with a domain name, providing
additional security for TLS connections.
Optionally, it can publish the generated TLSA records to the Prolocation DNS API.
Usage:
python prolo-tlsa.py <hostname> [--publish]
The script assumes certificates are located in /etc/letsencrypt/live/<hostname>/cert.pem
and /etc/letsencrypt/live/<hostname>/chain.pem.
It generates TLSA records with usage values:
- 3: For the end-entity certificate (cert.pem)
- 2: For the issuer certificate (chain.pem)
To publish, set the PROLOCATION_API_KEY environment variable and use --publish.
The script will detect the appropriate DNS zone, including sub-zones.
"""
import argparse
import logging
import os
import subprocess
import sys
from pathlib import Path
try:
import requests
except ImportError:
print(
"requests library required. Install with: pip install requests", file=sys.stderr
)
sys.exit(1)
def setup_logging():
"""Configure logging to output to stderr with INFO level."""
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
def validate_hostname(hostname: str) -> bool:
"""
Validate the hostname for basic sanity checks.
Args:
hostname (str): The hostname to validate.
Returns:
bool: True if valid, raises ValueError otherwise.
"""
if not hostname:
raise ValueError("Hostname cannot be empty.")
if any(c in hostname for c in "/\\:"):
raise ValueError("Hostname contains invalid characters.")
# Additional validations can be added here if needed
return True
def generate_tlsa_for_cert(cert_path: Path, usage: int) -> str:
"""
Generate a TLSA record for the given certificate.
The process involves:
1. Extracting the public key from the certificate.
2. Converting it to DER format.
3. Computing the SHA-256 hash of the SPKI.
Args:
cert_path (Path): Path to the certificate file.
usage (int): TLSA usage field (0-3).
Returns:
str: The TLSA record string.
Raises:
subprocess.CalledProcessError: If an OpenSSL command fails.
RuntimeError: If unable to process the certificate.
"""
if not cert_path.exists():
raise FileNotFoundError(f"Certificate file not found: {cert_path}")
if not (0 <= usage <= 3):
raise ValueError(f"Invalid TLSA usage value: {usage}. Must be between 0 and 3.")
try:
# Extract the public key in PEM format
spki_pem_output = subprocess.check_output(
["openssl", "x509", "-in", str(cert_path), "-pubkey", "-noout"],
stderr=subprocess.PIPE,
)
# Convert PEM to DER format
spki_der_output = subprocess.check_output(
["openssl", "pkey", "-pubin", "-pubout", "-outform", "der"],
input=spki_pem_output,
stderr=subprocess.PIPE,
)
# Compute SHA-256 hash in hex format
hash_output = subprocess.check_output(
["openssl", "dgst", "-sha256", "-r"],
input=spki_der_output,
stderr=subprocess.PIPE,
)
hex_hash = hash_output.decode().split()[0].strip()
# Form the TLSA value
tlsa_value = f"{usage} 1 1 {hex_hash}"
logging.info(f"Generated TLSA for {cert_path}: {tlsa_value}")
return tlsa_value
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"OpenSSL command failed for {cert_path}: {e.stderr.decode().strip()}"
)
except Exception as e:
raise RuntimeError(f"Unexpected error processing {cert_path}: {e}")
def get_api_key() -> str:
"""Retrieve the Prolocation API key from environment variable."""
key = os.environ.get("PROLOCATION_API_KEY")
if not key:
raise ValueError("PROLOCATION_API_KEY environment variable not set")
return key
def get_domains(api_key: str) -> list[dict[str, str]]:
"""Fetch the list of active domains from Prolocation API."""
headers = {"Authorization": f"Bearer {api_key}"}
resp = requests.get(
"https://service.prolocation.net/api/v1/domains", headers=headers
)
resp.raise_for_status()
return resp.json()["data"]
def get_zone(hostname: str, domains: list[dict[str, str]]) -> str:
"""
Determine the appropriate DNS zone for the hostname, preferring the deepest sub-zone.
Args:
hostname (str): The hostname, e.g., 'test.api.fabianvk.nl'
domains (list): List of domain resources from API.
Returns:
str: The zone name to use.
"""
domain_names = [d["name"] for d in domains]
possible_zones = [
d for d in domain_names if hostname.endswith("." + d) or hostname == d
]
if not possible_zones:
raise ValueError(
f"No zone found for hostname {hostname}. Available domains: {domain_names}"
)
# Use the longest (deepest) zone
zone = max(possible_zones, key=len)
logging.info(f"Using zone '{zone}' for hostname '{hostname}'")
return zone
def publish_tlsa_records(
zone: str,
hostname: str,
tlsa_list: list[str],
api_key: str,
port: int,
protocol: str,
):
"""
Publish TLSA records to the DNS zone via Prolocation API.
Removes any existing TLSA records for the hostname before adding new ones.
Args:
zone (str): The DNS zone name.
hostname (str): The full hostname.
tlsa_list (list): List of TLSA record strings.
api_key (str): The API key.
"""
if hostname.endswith("." + zone):
relative_hostname = hostname[: -len("." + zone)]
else:
relative_hostname = hostname
name = (
f"_{port}._{protocol}.{relative_hostname}"
if relative_hostname
else f"_{port}._{protocol}"
)
records_url = f"https://service.prolocation.net/api/v1/zones/{zone}/dns"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
# Get existing records
resp = requests.get(records_url, headers=headers)
resp.raise_for_status()
records = resp.json()["data"]
# Find existing TLSA for this name
existing = [r for r in records if r["name"] == name and r["type"] == "TLSA"]
# Delete existing
for rec in existing:
params = {"type": "TLSA", "name": name, "value": rec["value"]}
del_resp = requests.delete(records_url, headers=headers, params=params)
if del_resp.status_code == 404:
logging.warning(f"Record to delete not found: {params}")
else:
del_resp.raise_for_status()
logging.info(f"Deleted existing TLSA record: {rec['value']}")
# Add new
for tlsa_value in tlsa_list:
data = {"type": "TLSA", "name": name, "value": tlsa_value}
add_resp = requests.post(records_url, headers=headers, json=data)
add_resp.raise_for_status()
logging.info(f"Added TLSA record: {tlsa_value}")
def main():
"""Main function to parse arguments, generate TLSA records, and optionally publish."""
setup_logging()
parser = argparse.ArgumentParser(
description="Generate TLSA records from Let's Encrypt certificates, optionally publish to DNS."
)
parser.add_argument(
"hostname", help="The hostname for which to generate TLSA records."
)
parser.add_argument(
"--publish",
action="store_true",
help="Publish TLSA records to Prolocation DNS API.",
)
parser.add_argument(
"--port",
type=int,
default=25,
help="Port number for the TLSA record (default: 25).",
)
parser.add_argument(
"--protocol",
default="tcp",
help="Protocol for the TLSA record (default: tcp).",
)
args = parser.parse_args()
try:
validate_hostname(args.hostname)
except ValueError as e:
logging.error(e)
sys.exit(1)
base_path = Path("/etc/letsencrypt/live") / args.hostname
cert_configs = [
(base_path / "cert.pem", 3), # End-entity certificate
(base_path / "chain.pem", 2), # Intermediate certificate
]
tlsa_records = []
for cert_file, usage in cert_configs:
try:
tlsa_record = generate_tlsa_for_cert(cert_file, usage)
tlsa_records.append(tlsa_record)
print(tlsa_record)
except FileNotFoundError as e:
logging.warning(e)
except (ValueError, RuntimeError) as e:
logging.error(e)
sys.exit(1)
if args.publish:
try:
api_key = get_api_key()
domains = get_domains(api_key)
zone = get_zone(args.hostname, domains)
publish_tlsa_records(
zone, args.hostname, tlsa_records, api_key, args.port, args.protocol
)
except Exception as e:
logging.error(f"Failed to publish TLSA records: {e}")
sys.exit(1)
if __name__ == "__main__":
main()