#!/usr/bin/env python3
"""
pfSense to OPNsense configuration converter.
Reads a pfSense config.xml and converts it to an OPNsense-compatible config.xml.
Usage:
python pfsense2opnsense.py config-pfsense.xml > config-opnsense.xml
Sections converted:
- System (hostname, domain, DNS, timezone, webgui, SSH, users)
- Interfaces (WAN, LAN, OPTx) and VLANs
- DHCP server config and static mappings
- Firewall rules (filter)
- NAT rules (port forwarding + outbound NAT)
- Aliases
- Certificates and CAs
- OpenVPN servers/clients
- WireGuard (core since 24.1, no plugin needed on 26.x)
- IPsec
- Static routes
- DNS (dnsmasq/unbound) with domain overrides and host overrides
- SNMP
- Schedules
- Interface groups
- Virtual IPs
- CRLs
- System tunables (sysctl)
"""
import xml.etree.ElementTree as ET
import sys
import uuid
import ipaddress
import re
# Interface name mapping (e.g. tun_wg0 -> wg0 for OPNsense)
# Can be overridden via --interface-map CLI arg
IFACE_MAP = {}
# OPNsense name validation: alphanumeric, dot, underscore, dash, 1-64 chars
OPN_NAME_RE = re.compile(r'^[0-9a-zA-Z._\-]{1,64}$')
def _sanitize_name(name):
"""Replace characters not allowed in OPNsense names with underscore."""
return re.sub(r'[^0-9a-zA-Z._\-]', '_', name)
def indent(elem, level=0):
"""Pretty-print XML with proper indentation."""
i = "\n" + level * "\t"
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + "\t"
if not elem.tail or not elem.tail.strip():
elem.tail = i
for child in elem:
indent(child, level + 1)
if not child.tail or not child.tail.strip():
child.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
def safe_text(elem, path, default=""):
"""Safely get text from a sub-element."""
if elem is None:
return default
found = elem.find(path)
if found is not None and found.text:
return found.text
return default
def set_text(parent, tag, value, cdata=False):
"""Set text in a sub-element, creating it if needed."""
if not value and value != 0:
return
child = parent.find(tag)
if child is None:
child = ET.SubElement(parent, tag)
if cdata:
child.text = value
else:
child.text = str(value)
def copy_element_into(src_elem, dst_parent, tag_map=None, skip_tags=None):
"""
Copy an element (with all children) from src into dst_parent with optional tag renaming.
Returns the new element.
"""
if tag_map is None:
tag_map = {}
if skip_tags is None:
skip_tags = set()
tag = tag_map.get(src_elem.tag, src_elem.tag)
if tag in skip_tags:
return None
new = ET.SubElement(dst_parent, tag)
if src_elem.text and src_elem.text.strip():
new.text = src_elem.text
new.tail = src_elem.tail
for attr_name, attr_val in src_elem.attrib.items():
new.set(attr_name, attr_val)
for child in src_elem:
copy_element_into(child, new, tag_map, skip_tags)
return new
def convert_system(pf_root, op_root):
"""Convert system section."""
pf_sys = pf_root.find('system')
if pf_sys is None:
return
op_sys = ET.SubElement(op_root, 'system')
# Hostname and domain
set_text(op_sys, 'hostname', safe_text(pf_sys, 'hostname'))
set_text(op_sys, 'domain', safe_text(pf_sys, 'domain'))
set_text(op_sys, 'timezone', safe_text(pf_sys, 'timezone'))
set_text(op_sys, 'language', safe_text(pf_sys, 'language'))
# NTP
set_text(op_sys, 'timeservers', safe_text(pf_sys, 'timeservers'))
set_text(op_sys, 'time-update-interval', safe_text(pf_sys, 'time-update-interval'))
# DNS servers (use append_element for multi-value tags)
for dns in pf_sys.findall('dnsserver'):
if dns.text:
child = ET.SubElement(op_sys, 'dnsserver')
child.text = dns.text
# DNS gateway settings
for gw in ['dns1gw', 'dns2gw', 'dns3gw', 'dns4gw']:
val = safe_text(pf_sys, gw)
if val:
set_text(op_sys, gw, val)
# WebGUI — skip pfSense-specific elements that break OPNsense (e.g. pfSense.css)
pf_gui = pf_sys.find('webgui')
if pf_gui is not None:
op_gui = ET.SubElement(op_sys, 'webgui')
pf_skip = {'loginautocomplete', 'loginshowhost', 'webguicss', 'logincss',
'dashboardenabled', 'msscheck', 'ipv6', 'sha256'}
for child in pf_gui:
if child.tag not in pf_skip:
copy_element_into(child, op_gui)
# SSH
pf_ssh = pf_sys.find('ssh')
if pf_ssh is not None:
op_ssh = ET.SubElement(op_sys, 'ssh')
pf_ssh_enable = pf_ssh.find('enable')
if pf_ssh_enable is not None:
copy_element_into(pf_ssh_enable, op_ssh)
# Optimization
set_text(op_sys, 'optimization', safe_text(pf_sys, 'optimization'))
# Maximum table entries
set_text(op_sys, 'maximumtableentries', safe_text(pf_sys, 'maximumtableentries'))
set_text(op_sys, 'maximumstates', safe_text(pf_sys, 'maximumstates'))
set_text(op_sys, 'maximumfrags', safe_text(pf_sys, 'maximumfrags'))
set_text(op_sys, 'maximumtables', safe_text(pf_sys, 'maximumtables'))
# Disable offloading
for offload in ['disablesegmentationoffloading', 'disablelargereceiveoffloading']:
val = safe_text(pf_sys, offload)
if val:
set_text(op_sys, offload, val)
# Proxy
set_text(op_sys, 'proxyuser', safe_text(pf_sys, 'proxyuser'))
set_text(op_sys, 'proxypass', safe_text(pf_sys, 'proxypass'))
# Early shellcmd
for cmd in pf_sys.findall('earlyshellcmd'):
set_text(op_sys, 'earlyshellcmd', cmd.text)
# Bogons
pf_bogons = pf_sys.find('bogons')
if pf_bogons is not None:
op_bogons = ET.SubElement(op_sys, 'bogons')
pf_interval = pf_bogons.find('interval')
if pf_interval is not None:
copy_element_into(pf_interval, op_bogons)
# Enable serial
set_text(op_sys, 'enableserial', safe_text(pf_sys, 'enableserial'))
# Disable NAT reflection
set_text(op_sys, 'disablenatreflection', safe_text(pf_sys, 'disablenatreflection'))
# Cryptographic hardware
set_text(op_sys, 'crypto_hardware', safe_text(pf_sys, 'crypto_hardware'))
# Already run config upgrade
set_text(op_sys, 'already_run_config_upgrade', safe_text(pf_sys, 'already_run_config_upgrade'))
# Users and groups
for group in pf_sys.findall('group'):
copy_element_into(group, op_sys)
for user in pf_sys.findall('user'):
copy_element_into(user, op_sys)
# nextuid/nextgid
set_text(op_sys, 'nextuid', safe_text(pf_sys, 'nextuid'))
set_text(op_sys, 'nextgid', safe_text(pf_sys, 'nextgid'))
def convert_interfaces(pf_root, op_root):
"""Convert interface configuration."""
pf_ifaces = pf_root.find('interfaces')
if pf_ifaces is None:
return
op_ifaces = ET.SubElement(op_root, 'interfaces')
for iface_tag in ['wan', 'lan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
pf_iface = pf_ifaces.find(iface_tag)
if pf_iface is None:
continue
op_iface = ET.SubElement(op_ifaces, iface_tag)
# Copy interface configuration elements
for tag in ['enable', 'if', 'descr', 'spoofmac', 'ipaddr', 'subnet',
'blockpriv', 'blockbogons', 'gateway', 'mtu', 'media',
'mediaopt', 'adv']:
elem = pf_iface.find(tag)
if elem is not None:
copy_element_into(elem, op_iface)
# OPNsense expects 1 for enabled interfaces,
# not the pfSense-style (empty self-closing tag)
enable_elem = op_iface.find('enable')
if enable_elem is not None and not enable_elem.text:
enable_elem.text = '1'
# Apply interface name mapping to (e.g. tun_wg0 -> wg0)
iface_elem = op_iface.find('if')
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
# Lock interfaces to prevent OPNsense from auto-reassigning
# them during boot (mismatch detection). Without this, OPNsense
# may overwrite WAN/LAN interface assignments on every boot.
lock_elem = op_iface.find('lock')
if lock_elem is None:
ET.SubElement(op_iface, 'lock').text = '1'
def convert_vlans(pf_root, op_root):
"""Convert VLAN configuration."""
pf_vlans = pf_root.find('vlans')
if pf_vlans is None:
return
op_vlans = ET.SubElement(op_root, 'vlans')
for pf_vlan in pf_vlans.findall('vlan'):
copy_element_into(pf_vlan, op_vlans)
def convert_dhcpd(pf_root, op_root):
"""Convert DHCP server configuration."""
pf_dhcpd = pf_root.find('dhcpd')
if pf_dhcpd is None:
return
op_dhcpd = ET.SubElement(op_root, 'dhcpd')
for tag in ['lan', 'wan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
pf_scope = pf_dhcpd.find(tag)
if pf_scope is None:
continue
op_scope = ET.SubElement(op_dhcpd, tag)
# Copy simple config elements
for simple in ['enable', 'range', 'defaultleasetime', 'maxleasetime',
'netmask', 'gateway', 'domain', 'domainsearchlist',
'dnsserver', 'ntpserver', 'tfpt', 'ldap', 'filename',
'rootpath', 'nextserver', 'failover_peerip',
'mac_allow', 'mac_deny', 'ddnsdomain', 'ddnsdomainprimary',
'ddnsdomainkeyname', 'ddnsdomainkey', 'ddnsdomainkeyalgorithm',
'ddnsclientupdates', 'ddnsdomainsecondary',
'filename32', 'filename64', 'filename32arm', 'filename64arm',
'uefihttpboot', 'numberoptions']:
for elem in pf_scope.findall(simple):
copy_element_into(elem, op_scope)
# Static mappings with enhanced field mapping
for pf_map in pf_scope.findall('staticmap'):
op_map = ET.SubElement(op_scope, 'staticmap')
for tag_name in ['mac', 'cid', 'ipaddr', 'hostname', 'descr',
'filename', 'rootpath', 'defaultleasetime',
'maxleasetime', 'gateway', 'domain',
'domainsearchlist', 'ddnsdomain',
'ddnsdomainprimary', 'ddnsdomainsecondary',
'ddnsdomainkeyname', 'ddnsdomainkeyalgorithm',
'ddnsdomainkey', 'tftp', 'ldap', 'nextserver',
'filename32', 'filename64', 'filename32arm',
'filename64arm', 'uefihttpboot', 'numberoptions',
'earlydnsregpolicy']:
elem = pf_map.find(tag_name)
if elem is not None and elem.text:
copy_element_into(elem, op_map)
# Pool (used in some OPNsense versions)
pf_pool = pf_scope.find('pool')
if pf_pool is not None:
copy_element_into(pf_pool, op_scope)
# Normalize to 1
enable_elem = op_scope.find('enable')
if enable_elem is not None and not enable_elem.text:
enable_elem.text = '1'
# DHCP backend (preserve pfSense value: kea or isc)
set_text(op_dhcpd, 'backend', safe_text(pf_root, 'dhcpbackend', 'kea'))
def convert_kea_dhcp(pf_root, op_root):
"""Convert Kea DHCP configuration.
pfSense stores DHCP config in ISC-format section even when
using Kea backend. OPNsense expects Kea config under
with subnets and reservations in OPNsense MVC model format.
"""
pf_dhcpd = pf_root.find('dhcpd')
if pf_dhcpd is None:
return
# Build interface info map: scope tag -> {if, ip, subnet, descr}
scope_to_iface = {}
pf_ifaces = pf_root.find('interfaces')
if pf_ifaces is not None:
for tag in ['wan', 'lan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
iface = pf_ifaces.find(tag)
if iface is None:
continue
if_el = iface.find('if')
ip_el = iface.find('ipaddr')
sn_el = iface.find('subnet')
descr_el = iface.find('descr')
scope_to_iface[tag] = {
'if': if_el.text if if_el is not None and if_el.text else '',
'ip': ip_el.text if ip_el is not None and ip_el.text else '',
'subnet': sn_el.text if sn_el is not None and sn_el.text else '',
'descr': descr_el.text if descr_el is not None and descr_el.text else tag.upper(),
}
# Find or create for OPNsense MVC mount elements
op_opnsense = op_root.find('OPNsense')
if op_opnsense is None:
op_opnsense = ET.SubElement(op_root, 'OPNsense')
# Create structure
op_kea = ET.SubElement(op_opnsense, 'Kea')
op_dhcp4 = ET.SubElement(op_kea, 'dhcp4')
# General settings
op_general = ET.SubElement(op_dhcp4, 'general')
set_text(op_general, 'enabled', '1')
set_text(op_general, 'fwrules', '1')
set_text(op_general, 'valid_lifetime', '7200')
set_text(op_general, 'dhcp_socket_type', 'raw')
# Collect unique interfaces for the interfaces field
dhcp_ifaces = []
# Subnets container
op_subnets = ET.SubElement(op_dhcp4, 'subnets')
# Build subnets from enabled DHCP scopes + scopes with static maps
subnet_uuids = {} # scope tag -> subnet UUID
for tag in ['lan', 'wan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
pf_scope = pf_dhcpd.find(tag)
if pf_scope is None:
continue
enable_el = pf_scope.find('enable')
has_staticmaps = pf_scope.find('staticmap') is not None
pf_range = pf_scope.find('range')
has_range = pf_range is not None and (pf_range.find('from') is not None or pf_range.text is not None)
# Only create subnet if scope is enabled or has static maps/ranges
if enable_el is None and not has_staticmaps and not has_range:
continue
# Get interface info
iface_info = scope_to_iface.get(tag, {})
ifname = iface_info.get('if', '')
ipaddr = iface_info.get('ip', '')
subnet_mask = iface_info.get('subnet', '')
# Kea's interfaces field expects OPNsense interface description
# names (lan, opt2, opt3, ...) not device names (igc1, igc1.10).
# OPNsense maps these to device names internally.
if tag not in dhcp_ifaces:
dhcp_ifaces.append(tag)
# Compute subnet CIDR
subnet_cidr = ''
if ipaddr and subnet_mask:
try:
ip_intf = ipaddress.IPv4Interface('%s/%s' % (ipaddr, subnet_mask))
subnet_cidr = str(ip_intf.network)
except Exception:
pass
# Skip WAN-type interfaces that aren't real subnets
if ipaddr == 'pppoe' or not subnet_cidr:
continue
# Generate subnet UUID
suuid = _gen_uuid('dhcp-subnet-' + tag)
subnet_uuids[tag] = suuid
# Create subnet4
op_subnet = ET.SubElement(op_subnets, 'subnet4')
op_subnet.set('uuid', suuid)
set_text(op_subnet, 'subnet', subnet_cidr)
set_text(op_subnet, 'option_data_autocollect', '0')
set_text(op_subnet, 'description', iface_info.get('descr', tag.upper()))
# Option data
op_od = ET.SubElement(op_subnet, 'option_data')
# Gateway -> routers. When no explicit gateway in dhcpd scope,
# use the interface IP (it acts as the subnet's default gateway).
gw = pf_scope.find('gateway')
if gw is not None and gw.text:
set_text(op_od, 'routers', gw.text)
elif ipaddr:
set_text(op_od, 'routers', ipaddr)
# DNS servers
dns_servers = [d.text for d in pf_scope.findall('dnsserver') if d.text]
if dns_servers:
set_text(op_od, 'domain_name_servers', ','.join(dns_servers))
# Domain
domain = pf_scope.find('domain')
if domain is not None and domain.text:
set_text(op_od, 'domain_name', domain.text)
# Domain search
dsl = pf_scope.find('domainsearchlist')
if dsl is not None and dsl.text:
set_text(op_od, 'domain_search', dsl.text)
# NTP
ntp = pf_scope.find('ntpserver')
if ntp is not None and ntp.text:
set_text(op_od, 'ntp_servers', ntp.text)
# Next server (PXE)
ns = pf_scope.find('nextserver')
if ns is not None and ns.text:
set_text(op_subnet, 'next_server', ns.text)
# Pools: convert pfSense XY to "X-Y"
if pf_range is not None:
from_elem = pf_range.find('from')
to_elem = pf_range.find('to')
from_ip = from_elem.text if from_elem is not None else None
to_ip = to_elem.text if to_elem is not None else None
if from_ip and to_ip:
set_text(op_subnet, 'pools', '%s-%s' % (from_ip, to_ip))
# Set interfaces list on general
if dhcp_ifaces:
set_text(op_general, 'interfaces', ','.join(dhcp_ifaces))
# Reservations container
op_reservations = ET.SubElement(op_dhcp4, 'reservations')
# Build reservations from static maps across all scopes
seen_ips_per_subnet = {} # suuid -> set of IPs already reserved
for tag, suuid in subnet_uuids.items():
pf_scope = pf_dhcpd.find(tag)
if pf_scope is None:
continue
if suuid not in seen_ips_per_subnet:
seen_ips_per_subnet[suuid] = set()
for pf_sm in pf_scope.findall('staticmap'):
mac = pf_sm.findtext('mac', '')
ipaddr = pf_sm.findtext('ipaddr', '')
hostname = pf_sm.findtext('hostname', '')
descr = pf_sm.findtext('descr', '')
if not mac and not ipaddr:
continue
# Deduplicate: skip same IP already reserved in this subnet
if ipaddr:
if ipaddr in seen_ips_per_subnet[suuid]:
continue
seen_ips_per_subnet[suuid].add(ipaddr)
# Deterministic UUID from MAC or IP
seed = mac or ipaddr
ruuid = _gen_uuid('dhcp-reservation-' + seed)
op_res = ET.SubElement(op_reservations, 'reservation')
op_res.set('uuid', ruuid)
set_text(op_res, 'subnet', suuid)
if ipaddr:
set_text(op_res, 'ip_address', ipaddr)
if mac:
set_text(op_res, 'hw_address', mac)
if hostname:
set_text(op_res, 'hostname', hostname)
if descr:
set_text(op_res, 'description', descr)
# Empty option_data for reservation
ET.SubElement(op_res, 'option_data')
def convert_aliases(pf_root, op_root):
"""Convert firewall aliases."""
pf_aliases = pf_root.find('aliases')
if pf_aliases is None:
return
# Check if there's an section or individual entries
if pf_aliases.find('alias') is not None:
op_aliases = ET.SubElement(op_root, 'aliases')
for pf_alias in pf_aliases.findall('alias'):
copy_element_into(pf_alias, op_aliases)
def convert_nat(pf_root, op_root):
"""Convert NAT rules (port forwarding + outbound NAT)."""
pf_nat = pf_root.find('nat')
if pf_nat is None:
return
op_nat = ET.SubElement(op_root, 'nat')
# Port forwarding rules
for pf_rule in pf_nat.findall('rule'):
op_rule = ET.SubElement(op_nat, 'rule')
for child_tag in ['source', 'destination', 'ipprotocol', 'protocol',
'target', 'local-port', 'interface', 'descr',
'associated-rule-id', 'disabled', 'log', 'natreflection',
'nordr', 'poolopts', 'source_hash_key',
'allow', 'max', 'max-src-nodes', 'max-src-conn',
'max-src-states', 'statetimeout', 'statetype']:
for elem in pf_rule.findall(child_tag):
copy_element_into(elem, op_rule)
# Apply interface name mapping
iface_elem = op_rule.find('interface')
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
# updated/created timestamps if present
for meta in ['updated', 'created']:
pf_meta = pf_rule.find(meta)
if pf_meta is not None:
copy_element_into(pf_meta, op_rule)
# Outbound NAT
pf_outbound = pf_nat.find('outbound')
if pf_outbound is not None:
op_outbound = ET.SubElement(op_nat, 'outbound')
copy_element_into(pf_outbound.find('mode'), op_outbound)
for pf_orule in pf_outbound.findall('rule'):
op_orule = ET.SubElement(op_outbound, 'rule')
for child_tag in ['source', 'sourceport', 'destination', 'descr',
'target', 'interface', 'poolopts',
'source_hash_key', 'staticnatport', 'ipprotocol',
'protocol', 'disabled', 'target_subnet',
'nonat', 'log', 'mirror']:
for elem in pf_orule.findall(child_tag):
copy_element_into(elem, op_orule)
# Apply interface name mapping
iface_elem = op_orule.find('interface')
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
for meta in ['created', 'updated']:
pf_meta = pf_orule.find(meta)
if pf_meta is not None:
copy_element_into(pf_meta, op_orule)
# 1:1 NAT
pf_onetoone = pf_nat.find('onetoone')
if pf_onetoone is not None:
op_onetoone = ET.SubElement(op_nat, 'onetoone')
for pf_rule in pf_onetoone.findall('rule'):
copy_element_into(pf_rule, op_onetoone)
# NAT separators
pf_sep = pf_nat.find('separator')
if pf_sep is not None:
copy_element_into(pf_sep, op_nat)
def convert_filter(pf_root, op_root):
"""Convert firewall rules."""
pf_filter = pf_root.find('filter')
if pf_filter is None:
return
op_filter = ET.SubElement(op_root, 'filter')
for pf_rule in pf_filter.findall('rule'):
op_rule = ET.SubElement(op_filter, 'rule')
for child_tag in ['id', 'tracker', 'type', 'interface', 'ipprotocol',
'protocol', 'icmptype', 'source', 'destination',
'descr', 'tag', 'tagged', 'max', 'max-src-nodes',
'max-src-conn', 'max-src-states', 'statetimeout',
'statetype', 'os', 'direction', 'log',
'disabled', 'gateway', 'bridgeto', 'srcmac',
'dstmac', 'allowopts', 'pflow']:
for elem in pf_rule.findall(child_tag):
copy_element_into(elem, op_rule)
# Apply interface name mapping to the tag
iface_elem = op_rule.find('interface')
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
for meta in ['updated', 'created']:
pf_meta = pf_rule.find(meta)
if pf_meta is not None:
copy_element_into(pf_meta, op_rule)
# Filter separators (if present)
pf_sep = pf_filter.find('separator')
if pf_sep is not None:
copy_element_into(pf_sep, op_filter)
# Default deny rules configuration
for tag in ['defaultallow', 'disablestatenormalization',
'defaultdenyimg']:
val = safe_text(pf_filter, tag)
if val:
set_text(op_filter, tag, val)
def convert_certificates(pf_root, op_root):
"""Convert Certificate Authorities and Certificates.
pfSense structure:
......
......
......
OPNsense uses the same structure.
"""
for section_tag in ['ca', 'cert', 'crl']:
pf_section = pf_root.find(section_tag)
if pf_section is None:
continue
op_section = ET.SubElement(op_root, section_tag)
# Each child of the section element is a single entry
for pf_entry in pf_section:
copy_element_into(pf_entry, op_section)
def convert_openvpn(pf_root, op_root):
"""Convert OpenVPN configuration."""
pf_ovpn = pf_root.find('openvpn')
if pf_ovpn is None:
return
op_ovpn = ET.SubElement(op_root, 'openvpn')
# OpenVPN servers
for pf_srv in pf_ovpn.findall('openvpn-server'):
op_srv = ET.SubElement(op_ovpn, 'openvpn-server')
for child in pf_srv:
copy_element_into(child, op_srv)
# OpenVPN clients
for pf_cli in pf_ovpn.findall('openvpn-client'):
op_cli = ET.SubElement(op_ovpn, 'openvpn-client')
for child in pf_cli:
copy_element_into(child, op_cli)
# OpenVPN CSO (client-specific overrides)
for pf_cso in pf_ovpn.findall('openvpn-cso'):
op_cso = ET.SubElement(op_ovpn, 'openvpn-cso')
for child in pf_cso:
copy_element_into(child, op_cso)
def convert_ipsec(pf_root, op_root):
"""Convert IPsec configuration."""
pf_ipsec = pf_root.find('ipsec')
if pf_ipsec is None:
return
op_ipsec = ET.SubElement(op_root, 'ipsec')
# IPsec phase 1
for pf_p1 in pf_ipsec.findall('phase1'):
copy_element_into(pf_p1, op_ipsec)
# IPsec phase 2
for pf_p2 in pf_ipsec.findall('phase2'):
copy_element_into(pf_p2, op_ipsec)
# IPsec mobile clients
for pf_m in pf_ipsec.findall('mobile'):
copy_element_into(pf_m, op_ipsec)
def _map_iface_in_text(text):
"""Map interface names inside a text value (replacing tun_wgX with wgX etc)."""
result = text
for old, new in IFACE_MAP.items():
result = result.replace(old, new)
return result
def _find_wireguard(pf_root):
"""Find the WireGuard config in pfSense XML.
pfSense stores WireGuard config either:
- At top level:
- Inside installedpackages:
"""
pf_wg = pf_root.find('wireguard')
if pf_wg is not None and len(pf_wg):
return pf_wg
pf_wg = pf_root.find('installedpackages/wireguard')
if pf_wg is not None:
return pf_wg
return None
def _gen_uuid(seed_str):
"""Generate a UUID that is valid as an XML element name (starts with letter)."""
while True:
u = str(uuid.uuid5(uuid.NAMESPACE_DNS, seed_str))
if u[0].isalpha():
return u
# Salt and retry if UUID starts with a digit (invalid XML element name)
seed_str = f'{seed_str}_retry'
def convert_wireguard(pf_root, op_root):
"""Convert WireGuard configuration.
pfSense stores (in ):
-
tun_wg0
yes
...
...
51820
1420
...
10.0.9.25424
-
yes
tun_wg0
...
...
host51820
60
...
10.0.9.132
OPNsense stores under (nested, mount: OPNsense/wireguard):
1
1
wg0
0
...
...
51820
10.0.9.254/24
uuid1,uuid2
1420
0
1
...
...
10.0.9.1/32
...
51820
60
"""
pf_wg = _find_wireguard(pf_root)
if pf_wg is None:
return
# OPNsense MVC mount: OPNsense/wireguard → (nested, not a single tag!)
op_opnsense = op_root.find('OPNsense')
if op_opnsense is None:
op_opnsense = ET.SubElement(op_root, 'OPNsense')
op_wg = ET.SubElement(op_opnsense, 'wireguard')
# General section
op_general = ET.SubElement(op_wg, 'general')
set_text(op_general, 'enabled', '1')
# First pass: collect all peers and assign UUIDs
peer_data = [] # list of (elem, uuid)
pf_peers = pf_wg.find('peers')
if pf_peers is not None:
for pf_item in pf_peers.findall('item'):
pubkey = safe_text(pf_item, 'publickey', '')
descr = safe_text(pf_item, 'descr', '')
seed = pubkey or descr or str(len(peer_data))
puuid = _gen_uuid(f'wg-peer-{seed}')
peer_data.append((pf_item, puuid))
# Servers (tunnels) section
op_server_root = ET.SubElement(op_wg, 'server')
op_servers = ET.SubElement(op_server_root, 'servers')
pf_tunnels = pf_wg.find('tunnels')
if pf_tunnels is not None:
for instance_idx, pf_item in enumerate(pf_tunnels.findall('item')):
op_server = ET.SubElement(op_servers, 'server')
enabled = safe_text(pf_item, 'enabled', 'yes')
set_text(op_server, 'enabled', '1' if enabled == 'yes' else '0')
raw_name = safe_text(pf_item, 'name', '')
raw_name = _map_iface_in_text(raw_name) if raw_name else ''
set_text(op_server, 'name', _sanitize_name(raw_name))
set_text(op_server, 'instance', str(instance_idx))
set_text(op_server, 'pubkey', safe_text(pf_item, 'publickey'))
set_text(op_server, 'privkey', safe_text(pf_item, 'privatekey'))
set_text(op_server, 'port', safe_text(pf_item, 'listenport'))
set_text(op_server, 'mtu', safe_text(pf_item, 'mtu'))
# Defaults
set_text(op_server, 'disableroutes', '0')
ET.SubElement(op_server, 'gateway')
# Tunnel addresses as comma-separated CIDR
addresses = []
pf_addresses = pf_item.find('addresses')
if pf_addresses is not None:
for pf_row in pf_addresses.findall('row'):
addr = safe_text(pf_row, 'address')
mask = safe_text(pf_row, 'mask')
if addr and mask:
addresses.append(f'{addr}/{mask}')
if addresses:
set_text(op_server, 'tunneladdress', ','.join(addresses))
# Peers reference (comma-separated UUIDs)
tunnel_name = safe_text(pf_item, 'name', '')
mapped_tunnel_name = _map_iface_in_text(tunnel_name) if tunnel_name else ''
peer_uuids_for_server = []
for pf_peer_item, puuid in peer_data:
peer_tun = safe_text(pf_peer_item, 'tun', '')
if peer_tun == tunnel_name or _map_iface_in_text(peer_tun) == mapped_tunnel_name:
peer_uuids_for_server.append(puuid)
if peer_uuids_for_server:
set_text(op_server, 'peers', ','.join(peer_uuids_for_server))
# Clients (peers) section
# In OPNsense, each client entry uses its UUID as the XML element name,
# placed directly under (no wrapper).
op_client_root = ET.SubElement(op_wg, 'client')
op_clients = ET.SubElement(op_client_root, 'clients')
for pf_item, puuid in peer_data:
# The UUID is the element name for the client entry itself
op_client_uuid = ET.SubElement(op_clients, puuid)
enabled = safe_text(pf_item, 'enabled', 'yes')
set_text(op_client_uuid, 'enabled', '1' if enabled == 'yes' else '0')
# Sanitize name to match OPNsense validation (no slashes/spaces)
raw_name = safe_text(pf_item, 'descr', '')
sane_name = _sanitize_name(raw_name) if raw_name else f'peer_{puuid[:8]}'
set_text(op_client_uuid, 'name', sane_name)
set_text(op_client_uuid, 'pubkey', safe_text(pf_item, 'publickey'))
set_text(op_client_uuid, 'psk', safe_text(pf_item, 'presharedkey'))
# Allowed IPs (comma-separated CIDRs)
allowed_ips = []
pf_allowed = pf_item.find('allowedips')
if pf_allowed is not None:
for pf_row in pf_allowed.findall('row'):
addr = safe_text(pf_row, 'address')
mask = safe_text(pf_row, 'mask')
if addr and mask:
allowed_ips.append(f'{addr}/{mask}')
if allowed_ips:
set_text(op_client_uuid, 'tunneladdress', ','.join(allowed_ips))
# Endpoint
endpoint = safe_text(pf_item, 'endpoint', '')
port = safe_text(pf_item, 'port', '')
if endpoint:
set_text(op_client_uuid, 'serveraddress', endpoint)
if port:
set_text(op_client_uuid, 'serverport', port)
set_text(op_client_uuid, 'keepalive', safe_text(pf_item, 'persistentkeepalive'))
def convert_static_routes(pf_root, op_root):
"""Convert static routes."""
pf_routes = pf_root.find('staticroutes')
if pf_routes is None:
return
op_routes = ET.SubElement(op_root, 'staticroutes')
for pf_route in pf_routes:
copy_element_into(pf_route, op_routes)
def convert_gateways(pf_root, op_root):
"""Convert gateway configuration."""
pf_gws = pf_root.find('gateways')
if pf_gws is None:
return
op_gws = ET.SubElement(op_root, 'gateways')
# Default gateway
pf_dgw = pf_gws.find('defaultgw')
if pf_dgw is not None:
copy_element_into(pf_dgw, op_gws)
# Gateway entries
for pf_gw in pf_gws.findall('gateway_item'):
copy_element_into(pf_gw, op_gws)
# Gateway groups
for pf_gg in pf_gws.findall('gateway_group'):
copy_element_into(pf_gg, op_gws)
def convert_dns(pf_root, op_root):
"""Convert DNS resolver/forwarder configuration."""
# dnsmasq
pf_dnsmasq = pf_root.find('dnsmasq')
if pf_dnsmasq is not None:
op_dnsmasq = ET.SubElement(op_root, 'dnsmasq')
for child in pf_dnsmasq:
copy_element_into(child, op_dnsmasq)
# unbound
pf_unbound = pf_root.find('unbound')
if pf_unbound is not None:
op_unbound = ET.SubElement(op_root, 'unbound')
for child in pf_unbound:
copy_element_into(child, op_unbound)
def convert_snmp(pf_root, op_root):
"""Convert SNMP configuration."""
pf_snmp = pf_root.find('snmpd')
if pf_snmp is not None:
op_snmp = ET.SubElement(op_root, 'snmpd')
for child in pf_snmp:
copy_element_into(child, op_snmp)
def convert_schedules(pf_root, op_root):
"""Convert firewall schedules."""
pf_scheds = pf_root.find('schedules')
if pf_scheds is not None:
op_scheds = ET.SubElement(op_root, 'schedules')
for pf_sched in pf_scheds.findall('schedule'):
copy_element_into(pf_sched, op_scheds)
def convert_virtual_ips(pf_root, op_root):
"""Convert virtual IPs."""
pf_vips = pf_root.find('virtualip')
if pf_vips is not None:
op_vips = ET.SubElement(op_root, 'virtualip')
for pf_vip in pf_vips.findall('vip'):
copy_element_into(pf_vip, op_vips)
def convert_ifgroups(pf_root, op_root):
"""Convert interface groups."""
pf_groups = pf_root.find('ifgroups')
if pf_groups is not None:
op_groups = ET.SubElement(op_root, 'ifgroups')
for pf_group in pf_groups.findall('ifgroupentry'):
copy_element_into(pf_group, op_groups)
def convert_sysctl(pf_root, op_root):
"""Convert sysctl tunables."""
pf_sysctl = pf_root.find('sysctl')
if pf_sysctl is None:
return
op_sysctl = ET.SubElement(op_root, 'sysctl')
for pf_item in pf_sysctl.findall('item'):
copy_element_into(pf_item, op_sysctl)
def convert_syslog(pf_root, op_root):
"""Convert syslog configuration."""
pf_syslog = pf_root.find('syslog')
if pf_syslog is not None:
op_syslog = ET.SubElement(op_root, 'syslog')
for child in pf_syslog:
copy_element_into(child, op_syslog)
def convert_rrd(pf_root, op_root):
"""Convert RRD graphing configuration."""
pf_rrd = pf_root.find('rrd')
if pf_rrd is not None:
op_rrd = ET.SubElement(op_root, 'rrd')
for child in pf_rrd:
copy_element_into(child, op_rrd)
def convert_dyndns(pf_root, op_root):
"""Convert Dynamic DNS configuration."""
pf_ddns = pf_root.find('dyndnses')
if pf_ddns is not None:
op_ddns = ET.SubElement(op_root, 'dyndnses')
for pf_entry in pf_ddns.findall('dyndns'):
copy_element_into(pf_entry, op_ddns)
def convert_captive_portal(pf_root, op_root):
"""Convert Captive Portal configuration."""
pf_cp = pf_root.find('captiveportal')
if pf_cp is not None:
op_cp = ET.SubElement(op_root, 'captiveportal')
for pf_zone in pf_cp.findall('zone'):
copy_element_into(pf_zone, op_cp)
def convert_load_balancer(pf_root, op_root):
"""Convert HAProxy/relayd load balancer config."""
for section in ['lbpool', 'lbvirtual', 'lbserver', 'haproxy']:
pf_section = pf_root.find(section)
if pf_section is not None:
copy_element_into(pf_section, op_root)
def convert_cron(pf_root, op_root):
"""Convert cron jobs."""
pf_cron = pf_root.find('cron')
if pf_cron is not None:
op_cron = ET.SubElement(op_root, 'cron')
for pf_item in pf_cron.findall('item'):
copy_element_into(pf_item, op_cron)
def convert_dhcrelay(pf_root, op_root):
"""Convert DHCP relay config."""
for section in ['dhcrelay', 'dhcrelay6']:
pf_section = pf_root.find(section)
if pf_section is not None:
copy_element_into(pf_section, op_root)
def convert_ppps(pf_root, op_root):
"""Convert PPPoE/PPTP/L2TP interface configs."""
pf_ppps = pf_root.find('ppps')
if pf_ppps is not None:
op_ppps = ET.SubElement(op_root, 'ppps')
for pf_ppp in pf_ppps.findall('ppp'):
copy_element_into(pf_ppp, op_ppps)
def convert_igmpproxy(pf_root, op_root):
"""Convert IGMP proxy config."""
pf_igmp = pf_root.find('igmpproxy')
if pf_igmp is not None:
op_igmp = ET.SubElement(op_root, 'igmpproxy')
for child in pf_igmp:
copy_element_into(child, op_igmp)
def convert_ntpd(pf_root, op_root):
"""Convert NTPd configuration."""
pf_ntpd = pf_root.find('ntpd')
if pf_ntpd is not None:
op_ntpd = ET.SubElement(op_root, 'ntpd')
for child in pf_ntpd:
copy_element_into(child, op_ntpd)
def convert_sshdata(pf_root, op_root):
"""Convert SSH host keys."""
pf_sshdata = pf_root.find('sshdata')
if pf_sshdata is not None:
op_sshdata = ET.SubElement(op_root, 'sshdata')
for pf_keyfile in pf_sshdata.findall('sshkeyfile'):
copy_element_into(pf_keyfile, op_sshdata)
def convert_bridge(pf_root, op_root):
"""Convert bridge configuration."""
pf_bridge = pf_root.find('bridge')
if pf_bridge is not None:
op_bridge = ET.SubElement(op_root, 'bridge')
for pf_bridge_entry in pf_bridge.findall('bridge'):
copy_element_into(pf_bridge_entry, op_bridge)
def convert(pf_config_path, output_path=None, interface_map=None):
"""Main conversion function."""
global IFACE_MAP
if interface_map:
IFACE_MAP.update(interface_map)
tree = ET.parse(pf_config_path)
pf_root = tree.getroot()
op_root = ET.Element('opnsense')
# Version
ET.SubElement(op_root, 'version').text = '26.1'
# Convert sections
convert_system(pf_root, op_root)
convert_interfaces(pf_root, op_root)
convert_vlans(pf_root, op_root)
convert_dhcpd(pf_root, op_root)
convert_kea_dhcp(pf_root, op_root)
convert_aliases(pf_root, op_root)
convert_nat(pf_root, op_root)
convert_filter(pf_root, op_root)
convert_certificates(pf_root, op_root)
convert_openvpn(pf_root, op_root)
convert_ipsec(pf_root, op_root)
convert_wireguard(pf_root, op_root)
convert_static_routes(pf_root, op_root)
convert_gateways(pf_root, op_root)
convert_dns(pf_root, op_root)
convert_snmp(pf_root, op_root)
convert_schedules(pf_root, op_root)
convert_virtual_ips(pf_root, op_root)
convert_ifgroups(pf_root, op_root)
convert_sysctl(pf_root, op_root)
convert_syslog(pf_root, op_root)
convert_rrd(pf_root, op_root)
convert_dyndns(pf_root, op_root)
convert_captive_portal(pf_root, op_root)
convert_load_balancer(pf_root, op_root)
convert_cron(pf_root, op_root)
convert_dhcrelay(pf_root, op_root)
convert_ppps(pf_root, op_root)
convert_igmpproxy(pf_root, op_root)
convert_ntpd(pf_root, op_root)
convert_sshdata(pf_root, op_root)
convert_bridge(pf_root, op_root)
# Copy unknown/unhandled top-level sections as-is for maximum compatibility
# List all sections that our converter functions create
handled_sections = {
'pfsense', 'system', 'interfaces', 'vlans', 'dhcpd', 'dnsmasq',
'snmpd', 'diag', 'bridge', 'syslog', 'nat', 'filter',
'staticroutes', 'gateways', 'aliases', 'ca', 'cert', 'certca',
'crl', 'openvpn', 'ipsec', 'wireguard', 'unbound', 'ifgroups',
'virtualip', 'dhcpbackend', 'qinqs', 'schedules', 'sshdata',
'rrd', 'dyndnses', 'captiveportal', 'cron', 'dhcrelay',
'dhcrelay6', 'ppps', 'igmpproxy', 'ntpd', 'OPNsense',
}
for pf_child in pf_root:
tag = pf_child.tag
# Skip the pfSense root element name
if tag == 'pfsense':
continue
# Skip version - we set our own OPNsense version
if tag == 'version':
continue
# Skip sections we already handled
if tag in handled_sections:
continue
# Also skip if the element already exists in output
if op_root.find(tag) is not None:
continue
# Handle installedpackages specially: copy but remove wireguard child since we handled it
if tag == 'installedpackages':
op_copy = copy_element_into(pf_child, op_root)
if op_copy is not None:
wg_copy = op_copy.find('wireguard')
if wg_copy is not None:
op_copy.remove(wg_copy)
continue
# Copy unhandled sections for maximum compatibility
copy_element_into(pf_child, op_root)
# Apply interface name mapping to any remaining references in the output tree
_post_process_iface_map(op_root)
# Set default root password
_set_root_password(op_root)
# Add OPNsense-specific defaults
_add_opnsense_defaults(op_root)
# Pretty-print
indent(op_root)
# Add XML declaration
result = '\n' + ET.tostring(op_root, encoding='unicode')
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(result)
print(f"Converted config written to: {output_path}", file=sys.stderr)
else:
print(result)
def _set_root_password(op_root):
"""Set the admin/root password to 'opnsense' as a bcrypt hash."""
op_sys = op_root.find('system')
if op_sys is None:
return
# OPNsense expects the admin user to be named 'root' (not 'admin' as in pfSense)
for user in op_sys.findall('user'):
name = user.find('name')
uid = user.find('uid')
is_admin = (name is not None and name.text == 'admin') or (uid is not None and uid.text == '0')
if is_admin:
# Rename admin -> root for OPNsense compatibility
if name is not None and name.text == 'admin':
name.text = 'root'
# Replace password with bcrypt hash of 'opnsense'
pw = user.find('password')
if pw is not None:
pw.text = '$2b$10$ebdSKP5HpJ1eOmgnTrfmy.pKBjDIrrzSTIRPNZd8EGmm/EU6hIwAy'
# Also set bcrypt-hash field that OPNsense may use
set_text(user, 'bcrypt-hash', '$2b$10$ebdSKP5HpJ1eOmgnTrfmy.pKBjDIrrzSTIRPNZd8EGmm/EU6hIwAy')
# Remove md5-hash as OPNsense uses bcrypt
md5 = user.find('md5-hash')
if md5 is not None:
user.remove(md5)
def _post_process_iface_map(op_root):
"""Post-process the output tree to apply interface name mapping
to any tags that may reference interface names (e.g. tun_wg0 -> wg0).
This catches any references in sections that were copied passthrough.
"""
if not IFACE_MAP:
return
# Tags that can contain interface device names or references
iface_tags = {'if', 'interface', 'tun', 'name', 'bridgeif', 'member', 'tunnel'}
for elem in op_root.iter():
if elem.tag in iface_tags and elem.text:
elem.text = _map_iface_in_text(elem.text)
# Also check CDATA-like description fields that might reference interface names
if elem.tag == 'description' and elem.text and any(
old in elem.text for old in IFACE_MAP
):
elem.text = _map_iface_in_text(elem.text)
def _add_opnsense_defaults(op_root):
"""Add OPNsense-specific default elements that may be needed."""
# OPNsense uses opnsense (text content, no child elements).
# pfSense may use pfsense.
# ControllerBase.php casts to string, so child elements would yield "" → broken paths.
op_theme = op_root.find('theme')
if op_theme is not None:
op_root.remove(op_theme)
theme = ET.SubElement(op_root, 'theme')
theme.text = 'opnsense'
# Already set in convert()
# Set lastchange if missing
if op_root.find('lastchange') is None:
ET.SubElement(op_root, 'lastchange')
# Add some OPNsense-specific system defaults
op_sys = op_root.find('system')
if op_sys is not None:
# Ensure webgui defaults
if op_sys.find('webgui') is None:
wg = ET.SubElement(op_sys, 'webgui')
set_text(wg, 'protocol', 'https')
# Add OPNsense-specific hn_* defaults
for hn_opt in ['hn_altq_enable', 'hn_mac_filter']:
val = safe_text(op_sys, hn_opt)
if not val:
ET.SubElement(op_sys, hn_opt)
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} [output.xml] [--interface-map old=new ...]", file=sys.stderr)
print(f"", file=sys.stderr)
print(f"Examples:", file=sys.stderr)
print(f" {sys.argv[0]} config-pfsense.xml config-opnsense.xml", file=sys.stderr)
print(f" {sys.argv[0]} config-pfsense.xml --interface-map tun_wg0=wg0 tun_wg1=wg1", file=sys.stderr)
print(f"", file=sys.stderr)
print(f"If output.xml is omitted, the converted config is printed to stdout.", file=sys.stderr)
sys.exit(1)
input_path = sys.argv[1]
output_path = None
interface_map = {}
# Default mapping: tun_wgX -> wgX
interface_map['tun_wg0'] = 'wg0'
interface_map['tun_wg1'] = 'wg1'
interface_map['tun_wg2'] = 'wg2'
interface_map['tun_wg3'] = 'wg3'
interface_map['tun_wg4'] = 'wg4'
interface_map['tun_wg5'] = 'wg5'
i = 2
while i < len(sys.argv):
arg = sys.argv[i]
if arg == '--interface-map' or arg == '-m':
i += 1
if i >= len(sys.argv):
print("Error: --interface-map requires a mapping", file=sys.stderr)
sys.exit(1)
mapping = sys.argv[i]
if '=' in mapping:
old, new = mapping.split('=', 1)
interface_map[old] = new
else:
print(f"Error: invalid mapping '{mapping}'. Use old=new format.", file=sys.stderr)
sys.exit(1)
elif output_path is None and not arg.startswith('-'):
output_path = arg
else:
print(f"Error: unknown argument '{arg}'", file=sys.stderr)
sys.exit(1)
i += 1
convert(input_path, output_path, interface_map=interface_map)