295 lines
9.1 KiB
Python
295 lines
9.1 KiB
Python
"""
|
|
prolo_tlsa
|
|
|
|
This package generates TLSA records for DANE (DNS-based Authentication of Named Entities)
|
|
from X.509 certificates obtained via Let's Encrypt. It uses OpenSSL to compute the SHA-256
|
|
hash of the Subject Public Key Info (SPKI) in DER format.
|
|
|
|
TLSA records are used to associate a certificate's public key with a domain name, providing
|
|
additional security for TLS connections.
|
|
|
|
Optionally, it can publish the generated TLSA records to the Prolocation DNS API.
|
|
|
|
Usage:
|
|
python -m prolo_tlsa <hostname> [--publish]
|
|
|
|
The package assumes certificates are located in /etc/letsencrypt/live/<hostname>/cert.pem
|
|
and /etc/letsencrypt/live/<hostname>/chain.pem.
|
|
|
|
It generates TLSA records with usage values:
|
|
- 3: For the end-entity certificate (cert.pem)
|
|
- 2: For the issuer certificate (chain.pem)
|
|
|
|
To publish, set the PROLOCATION_API_KEY environment variable and use --publish.
|
|
The package will detect the appropriate DNS zone, including sub-zones.
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
print(
|
|
"requests library required. Install with: pip install requests", file=sys.stderr
|
|
)
|
|
sys.exit(1)
|
|
|
|
|
|
def setup_logging():
|
|
"""Configure logging to output to stderr with INFO level."""
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
|
|
|
|
|
def validate_hostname(hostname: str) -> bool:
|
|
"""
|
|
Validate the hostname for basic sanity checks.
|
|
|
|
Args:
|
|
hostname (str): The hostname to validate.
|
|
|
|
Returns:
|
|
bool: True if valid, raises ValueError otherwise.
|
|
"""
|
|
if not hostname:
|
|
raise ValueError("Hostname cannot be empty.")
|
|
if any(c in hostname for c in "/\\:"):
|
|
raise ValueError("Hostname contains invalid characters.")
|
|
# Additional validations can be added here if needed
|
|
return True
|
|
|
|
|
|
def generate_tlsa_for_cert(cert_path: Path, usage: int) -> str:
|
|
"""
|
|
Generate a TLSA record for the given certificate.
|
|
|
|
The process involves:
|
|
1. Extracting the public key from the certificate.
|
|
2. Converting it to DER format.
|
|
3. Computing the SHA-256 hash of the SPKI.
|
|
|
|
Args:
|
|
cert_path (Path): Path to the certificate file.
|
|
usage (int): TLSA usage field (0-3).
|
|
|
|
Returns:
|
|
str: The TLSA record string.
|
|
|
|
Raises:
|
|
subprocess.CalledProcessError: If an OpenSSL command fails.
|
|
RuntimeError: If unable to process the certificate.
|
|
"""
|
|
if not cert_path.exists():
|
|
raise FileNotFoundError(f"Certificate file not found: {cert_path}")
|
|
|
|
if not (0 <= usage <= 3):
|
|
raise ValueError(f"Invalid TLSA usage value: {usage}. Must be between 0 and 3.")
|
|
|
|
try:
|
|
# Extract the public key in PEM format
|
|
spki_pem_output = subprocess.check_output(
|
|
["openssl", "x509", "-in", str(cert_path), "-pubkey", "-noout"],
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
# Convert PEM to DER format
|
|
spki_der_output = subprocess.check_output(
|
|
["openssl", "pkey", "-pubin", "-pubout", "-outform", "der"],
|
|
input=spki_pem_output,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
# Compute SHA-256 hash in hex format
|
|
hash_output = subprocess.check_output(
|
|
["openssl", "dgst", "-sha256", "-r"],
|
|
input=spki_der_output,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
hex_hash = hash_output.decode().split()[0].strip()
|
|
|
|
# Form the TLSA value
|
|
tlsa_value = f"{usage} 1 1 {hex_hash}"
|
|
|
|
logging.info(f"Generated TLSA for {cert_path}: {tlsa_value}")
|
|
return tlsa_value
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(
|
|
f"OpenSSL command failed for {cert_path}: {e.stderr.decode().strip()}"
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Unexpected error processing {cert_path}: {e}")
|
|
|
|
|
|
def get_api_key() -> str:
|
|
"""Retrieve the Prolocation API key from environment variable."""
|
|
key = os.environ.get("PROLOCATION_API_KEY")
|
|
if not key:
|
|
raise ValueError("PROLOCATION_API_KEY environment variable not set")
|
|
return key
|
|
|
|
|
|
def get_domains(api_key: str) -> list[dict[str, str]]:
|
|
"""Fetch the list of active domains from Prolocation API."""
|
|
headers = {"Authorization": f"Bearer {api_key}"}
|
|
resp = requests.get(
|
|
"https://service.prolocation.net/api/v1/domains", headers=headers
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()["data"]
|
|
|
|
|
|
def get_zone(hostname: str, domains: list[dict[str, str]]) -> str:
|
|
"""
|
|
Determine the appropriate DNS zone for the hostname, preferring the deepest sub-zone.
|
|
|
|
Args:
|
|
hostname (str): The hostname, e.g., 'test.api.fabianvk.nl'
|
|
domains (list): List of domain resources from API.
|
|
|
|
Returns:
|
|
str: The zone name to use.
|
|
"""
|
|
domain_names = [d["name"] for d in domains]
|
|
possible_zones = [
|
|
d for d in domain_names if hostname.endswith("." + d) or hostname == d
|
|
]
|
|
if not possible_zones:
|
|
raise ValueError(
|
|
f"No zone found for hostname {hostname}. Available domains: {domain_names}"
|
|
)
|
|
# Use the longest (deepest) zone
|
|
zone = max(possible_zones, key=len)
|
|
logging.info(f"Using zone '{zone}' for hostname '{hostname}'")
|
|
return zone
|
|
|
|
|
|
def publish_tlsa_records(
|
|
zone: str,
|
|
hostname: str,
|
|
tlsa_list: list[str],
|
|
api_key: str,
|
|
port: int,
|
|
protocol: str,
|
|
):
|
|
"""
|
|
Publish TLSA records to the DNS zone via Prolocation API.
|
|
Removes any existing TLSA records for the hostname before adding new ones.
|
|
|
|
Args:
|
|
zone (str): The DNS zone name.
|
|
hostname (str): The full hostname.
|
|
tlsa_list (list): List of TLSA record strings.
|
|
api_key (str): The API key.
|
|
"""
|
|
if hostname.endswith("." + zone):
|
|
relative_hostname = hostname[: -len("." + zone)]
|
|
else:
|
|
relative_hostname = hostname
|
|
name = (
|
|
f"_{port}._{protocol}.{relative_hostname}"
|
|
if relative_hostname
|
|
else f"_{port}._{protocol}"
|
|
)
|
|
records_url = f"https://service.prolocation.net/api/v1/zones/{zone}/dns"
|
|
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
|
|
|
|
# Get existing records
|
|
resp = requests.get(records_url, headers=headers)
|
|
resp.raise_for_status()
|
|
records = resp.json()["data"]
|
|
|
|
# Find existing TLSA for this name
|
|
existing = [r for r in records if r["name"] == name and r["type"] == "TLSA"]
|
|
|
|
# Delete existing
|
|
for rec in existing:
|
|
params = {"type": "TLSA", "name": name, "value": rec["value"]}
|
|
del_resp = requests.delete(records_url, headers=headers, params=params)
|
|
if del_resp.status_code == 404:
|
|
logging.warning(f"Record to delete not found: {params}")
|
|
else:
|
|
del_resp.raise_for_status()
|
|
logging.info(f"Deleted existing TLSA record: {rec['value']}")
|
|
|
|
# Add new
|
|
for tlsa_value in tlsa_list:
|
|
data = {"type": "TLSA", "name": name, "value": tlsa_value}
|
|
add_resp = requests.post(records_url, headers=headers, json=data)
|
|
add_resp.raise_for_status()
|
|
logging.info(f"Added TLSA record: {tlsa_value}")
|
|
|
|
|
|
def main():
|
|
"""Main function to parse arguments, generate TLSA records, and optionally publish."""
|
|
setup_logging()
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate TLSA records from Let's Encrypt certificates, optionally publish to DNS."
|
|
)
|
|
parser.add_argument(
|
|
"hostname", help="The hostname for which to generate TLSA records."
|
|
)
|
|
parser.add_argument(
|
|
"--publish",
|
|
action="store_true",
|
|
help="Publish TLSA records to Prolocation DNS API.",
|
|
)
|
|
parser.add_argument(
|
|
"--port",
|
|
type=int,
|
|
default=25,
|
|
help="Port number for the TLSA record (default: 25).",
|
|
)
|
|
parser.add_argument(
|
|
"--protocol",
|
|
default="tcp",
|
|
help="Protocol for the TLSA record (default: tcp).",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
validate_hostname(args.hostname)
|
|
except ValueError as e:
|
|
logging.error(e)
|
|
sys.exit(1)
|
|
|
|
base_path = Path("/etc/letsencrypt/live") / args.hostname
|
|
cert_configs = [
|
|
(base_path / "cert.pem", 3), # End-entity certificate
|
|
(base_path / "chain.pem", 2), # Intermediate certificate
|
|
]
|
|
|
|
tlsa_records = []
|
|
for cert_file, usage in cert_configs:
|
|
try:
|
|
tlsa_record = generate_tlsa_for_cert(cert_file, usage)
|
|
tlsa_records.append(tlsa_record)
|
|
print(tlsa_record)
|
|
except FileNotFoundError as e:
|
|
logging.warning(e)
|
|
except (ValueError, RuntimeError) as e:
|
|
logging.error(e)
|
|
sys.exit(1)
|
|
|
|
if args.publish:
|
|
try:
|
|
api_key = get_api_key()
|
|
domains = get_domains(api_key)
|
|
zone = get_zone(args.hostname, domains)
|
|
publish_tlsa_records(
|
|
zone, args.hostname, tlsa_records, api_key, args.port, args.protocol
|
|
)
|
|
except Exception as e:
|
|
logging.error(f"Failed to publish TLSA records: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|