Files
pf2opnsense/pfsense2opnsense.py

2343 lines
87 KiB
Python
Executable File

#!/usr/bin/env python3
"""
pfSense to OPNsense configuration converter.
Reads a pfSense config.xml and converts it to an OPNsense-compatible config.xml.
Usage:
python pfsense2opnsense.py config-pfsense.xml > config-opnsense.xml
Sections converted:
- System (hostname, domain, DNS, timezone, webgui, SSH, users)
- Interfaces (WAN, LAN, OPTx) and VLANs
- DHCP server config and static mappings
- Firewall rules (filter)
- NAT rules (port forwarding + outbound NAT)
- Aliases
- Certificates and CAs
- OpenVPN servers/clients
- WireGuard (core since 24.1, no plugin needed on 26.x)
- IPsec
- Static routes
- DNS (dnsmasq/unbound) with domain overrides and host overrides
- SNMP
- Schedules
- Interface groups
- Virtual IPs
- CRLs
- System tunables (sysctl)
"""
import xml.etree.ElementTree as ET
import sys
import uuid
import ipaddress
import re
# Interface name mapping (e.g. tun_wg0 -> wg0 for OPNsense)
# Can be overridden via --interface-map CLI arg
IFACE_MAP = {}
# OPNsense name validation: alphanumeric, dot, underscore, dash, 1-64 chars
OPN_NAME_RE = re.compile(r'^[0-9a-zA-Z._\-]{1,64}$')
def _sanitize_name(name):
"""Replace characters not allowed in OPNsense names with underscore."""
return re.sub(r'[^0-9a-zA-Z._\-]', '_', name)
def indent(elem, level=0):
"""Pretty-print XML with proper indentation."""
i = "\n" + level * "\t"
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + "\t"
if not elem.tail or not elem.tail.strip():
elem.tail = i
for child in elem:
indent(child, level + 1)
if not child.tail or not child.tail.strip():
child.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
def safe_text(elem, path, default=""):
"""Safely get text from a sub-element."""
if elem is None:
return default
found = elem.find(path)
if found is not None and found.text:
return found.text
return default
def set_text(parent, tag, value, cdata=False):
"""Set text in a sub-element, creating it if needed."""
if not value and value != 0:
return
child = parent.find(tag)
if child is None:
child = ET.SubElement(parent, tag)
if cdata:
child.text = value
else:
child.text = str(value)
def copy_element_into(src_elem, dst_parent, tag_map=None, skip_tags=None):
"""
Copy an element (with all children) from src into dst_parent with optional tag renaming.
Returns the new element.
"""
if tag_map is None:
tag_map = {}
if skip_tags is None:
skip_tags = set()
tag = tag_map.get(src_elem.tag, src_elem.tag)
if tag in skip_tags:
return None
new = ET.SubElement(dst_parent, tag)
if src_elem.text and src_elem.text.strip():
new.text = src_elem.text
new.tail = src_elem.tail
for attr_name, attr_val in src_elem.attrib.items():
new.set(attr_name, attr_val)
for child in src_elem:
copy_element_into(child, new, tag_map, skip_tags)
return new
def convert_system(pf_root, op_root):
"""Convert system section."""
pf_sys = pf_root.find('system')
if pf_sys is None:
return
op_sys = ET.SubElement(op_root, 'system')
# Hostname and domain
set_text(op_sys, 'hostname', safe_text(pf_sys, 'hostname'))
set_text(op_sys, 'domain', safe_text(pf_sys, 'domain'))
set_text(op_sys, 'timezone', safe_text(pf_sys, 'timezone'))
set_text(op_sys, 'language', safe_text(pf_sys, 'language'))
# NTP
set_text(op_sys, 'timeservers', safe_text(pf_sys, 'timeservers'))
set_text(op_sys, 'time-update-interval', safe_text(pf_sys, 'time-update-interval'))
# DNS servers: pfSense uses <dns1>..<dns4> tags, while OPNsense uses
# repeated <dnsserver> elements. Also check for direct <dnsserver> tags.
pf_dns_added = 0
for dns_tag in ['dns1', 'dns2', 'dns3', 'dns4']:
val = safe_text(pf_sys, dns_tag)
if val:
child = ET.SubElement(op_sys, 'dnsserver')
child.text = val
pf_dns_added += 1
for dns in pf_sys.findall('dnsserver'):
if dns.text:
child = ET.SubElement(op_sys, 'dnsserver')
child.text = dns.text
pf_dns_added += 1
# DNS gateway settings
for gw in ['dns1gw', 'dns2gw', 'dns3gw', 'dns4gw']:
val = safe_text(pf_sys, gw)
if val:
set_text(op_sys, gw, val)
# WebGUI — skip pfSense-specific elements that break OPNsense (e.g. pfSense.css)
pf_gui = pf_sys.find('webgui')
if pf_gui is not None:
op_gui = ET.SubElement(op_sys, 'webgui')
pf_skip = {'loginautocomplete', 'loginshowhost', 'webguicss', 'logincss',
'dashboardenabled', 'msscheck', 'ipv6', 'sha256'}
for child in pf_gui:
if child.tag not in pf_skip:
copy_element_into(child, op_gui)
# SSH
pf_ssh = pf_sys.find('ssh')
if pf_ssh is not None:
op_ssh = ET.SubElement(op_sys, 'ssh')
pf_ssh_enable = pf_ssh.find('enable')
if pf_ssh_enable is not None:
copy_element_into(pf_ssh_enable, op_ssh)
# Optimization
set_text(op_sys, 'optimization', safe_text(pf_sys, 'optimization'))
# Maximum table entries
set_text(op_sys, 'maximumtableentries', safe_text(pf_sys, 'maximumtableentries'))
set_text(op_sys, 'maximumstates', safe_text(pf_sys, 'maximumstates'))
set_text(op_sys, 'maximumfrags', safe_text(pf_sys, 'maximumfrags'))
set_text(op_sys, 'maximumtables', safe_text(pf_sys, 'maximumtables'))
# Disable offloading
for offload in ['disablesegmentationoffloading', 'disablelargereceiveoffloading']:
val = safe_text(pf_sys, offload)
if val:
set_text(op_sys, offload, val)
# Proxy
set_text(op_sys, 'proxyuser', safe_text(pf_sys, 'proxyuser'))
set_text(op_sys, 'proxypass', safe_text(pf_sys, 'proxypass'))
# Early shellcmd
for cmd in pf_sys.findall('earlyshellcmd'):
set_text(op_sys, 'earlyshellcmd', cmd.text)
# Bogons
pf_bogons = pf_sys.find('bogons')
if pf_bogons is not None:
op_bogons = ET.SubElement(op_sys, 'bogons')
pf_interval = pf_bogons.find('interval')
if pf_interval is not None:
copy_element_into(pf_interval, op_bogons)
# Enable serial
set_text(op_sys, 'enableserial', safe_text(pf_sys, 'enableserial'))
# Disable NAT reflection
set_text(op_sys, 'disablenatreflection', safe_text(pf_sys, 'disablenatreflection'))
# Cryptographic hardware
set_text(op_sys, 'crypto_hardware', safe_text(pf_sys, 'crypto_hardware'))
# Already run config upgrade
set_text(op_sys, 'already_run_config_upgrade', safe_text(pf_sys, 'already_run_config_upgrade'))
# Users and groups
for group in pf_sys.findall('group'):
copy_element_into(group, op_sys)
for user in pf_sys.findall('user'):
copy_element_into(user, op_sys)
# nextuid/nextgid
set_text(op_sys, 'nextuid', safe_text(pf_sys, 'nextuid'))
set_text(op_sys, 'nextgid', safe_text(pf_sys, 'nextgid'))
def _build_virtual_ifaces(pf_root):
"""Build a set of virtual pseudo-interface names that should not be
locked in OPNsense (VLANs, WireGuard tunnels, etc.)"""
vif_set = set()
# VLANs: vlanif like vlan0.10, vlan1.20
for vlan in pf_root.findall('.//vlans/vlan'):
vif = vlan.findtext('vlanif', '')
if vif:
vif_set.add(vif)
# WireGuard tunnels: tun_wgX
pf_wg = _find_wireguard(pf_root)
if pf_wg is not None:
tunnels = pf_wg.find('tunnels')
if tunnels is not None:
for item in tunnels.findall('item'):
name = item.findtext('name', '')
if name:
vif_set.add(name)
# OpenVPN tun/tap devices
pf_ovpn = pf_root.find('openvpn')
if pf_ovpn is not None:
for srv in pf_ovpn.findall('openvpn-server'):
dev = srv.findtext('dev', '')
if dev:
vif_set.add(dev)
for cli in pf_ovpn.findall('openvpn-client'):
dev = cli.findtext('dev', '')
if dev:
vif_set.add(dev)
return vif_set
def convert_interfaces(pf_root, op_root):
"""Convert interface configuration."""
pf_ifaces = pf_root.find('interfaces')
if pf_ifaces is None:
return
op_ifaces = ET.SubElement(op_root, 'interfaces')
# Collect VLAN device names from pfSense so we can skip
# locking them — locks interfere with OPNsense VLAN assignment
# for pseudo-interfaces that don't exist at boot time.
vlan_ifaces = _build_virtual_ifaces(pf_root)
for iface_tag in ['wan', 'lan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
pf_iface = pf_ifaces.find(iface_tag)
if pf_iface is None:
continue
op_iface = ET.SubElement(op_ifaces, iface_tag)
# Copy interface configuration elements
for tag in ['enable', 'if', 'descr', 'spoofmac', 'ipaddr', 'subnet',
'blockpriv', 'blockbogons', 'gateway', 'mtu', 'media',
'mediaopt', 'adv']:
elem = pf_iface.find(tag)
if elem is not None:
copy_element_into(elem, op_iface)
# OPNsense expects <enable>1</enable> for enabled interfaces,
# not the pfSense-style <enable /> (empty self-closing tag)
enable_elem = op_iface.find('enable')
if enable_elem is not None and not enable_elem.text:
enable_elem.text = '1'
# Apply interface name mapping to <if> (e.g. tun_wg0 -> wg0)
iface_elem = op_iface.find('if')
raw_iface_val = iface_elem.text if iface_elem is not None else ''
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
# Ensure <enable>1</enable> for interfaces that have a device
# assigned but no explicit <enable> element. OPNsense requires
# explicit enable to bring up the interface and apply its IP.
# This is critical for VLAN/OPT interfaces: pfSense often omits
# <enable> for implicitly-enabled interfaces, but OPNsense will
# not configure the IP without it.
if raw_iface_val and op_iface.find('enable') is None:
ET.SubElement(op_iface, 'enable').text = '1'
# Lock interfaces to prevent OPNsense from auto-reassigning
# them during boot (mismatch detection). Without this, OPNsense
# may overwrite WAN/LAN interface assignments on every boot.
# Skip locking for virtual pseudo-interfaces (VLANs, WireGuard
# tunnels, OpenVPN tunnels) — the lock prevents OPNsense from
# properly binding them since they don't exist as hardware
# devices at boot time.
# Use the original (pre-mapping) name to match pfSense VLAN vlanif.
if raw_iface_val not in vlan_ifaces:
lock_elem = op_iface.find('lock')
if lock_elem is None:
ET.SubElement(op_iface, 'lock').text = '1'
def _vlan_uuid(pif, tag):
"""Generate a deterministic UUID for a VLAN entry."""
return _gen_uuid(f'vlan-{pif}-{tag}')
def convert_vlans(pf_root, op_root):
"""Convert VLAN configuration.
OPNsense requires all VLAN interface names to start with 'vlan0.'
(e.g. vlan0.10, vlan0.20). pfSense uses an incrementing instance
number (vlan0.10, vlan1.20, vlan2.30). We normalize to vlan0.X
and add IFACE_MAP entries so all interface references are updated.
OPNsense also uses <if> for the parent interface (pfSense uses <pif>),
requires a <pcp> element, and expects a uuid attribute on each
<vlan> entry for proper GUI editing (see OPNsense issue #6086).
"""
pf_vlans = pf_root.find('vlans')
if pf_vlans is None:
return
op_vlans = ET.SubElement(op_root, 'vlans')
for pf_vlan in pf_vlans.findall('vlan'):
op_vlan = copy_element_into(pf_vlan, op_vlans)
pif = pf_vlan.findtext('pif', '')
tag = pf_vlan.findtext('tag', '')
# Add uuid attribute — OPNsense model needs it for the API to
# look up individual VLAN items. Without it, the edit form in
# the GUI loads empty (issue #6086).
op_vlan.set('uuid', _vlan_uuid(pif, tag))
# Rename <pif> to <if> — OPNsense model expects <if> for parent
# interface, while pfSense uses <pif>.
pif_elem = op_vlan.find('pif')
if_elem = op_vlan.find('if')
if pif_elem is not None and if_elem is None:
# Promote <pif> to <if>
if_elem = ET.SubElement(op_vlan, 'if')
if_elem.text = pif_elem.text
if_elem.tail = pif_elem.tail
op_vlan.remove(pif_elem)
# Add default PCP if missing — OPNsense model requires it
if op_vlan.find('pcp') is None:
ET.SubElement(op_vlan, 'pcp').text = '0'
vlanif_elem = op_vlan.find('vlanif')
tag_elem = op_vlan.find('tag')
if vlanif_elem is not None and vlanif_elem.text and tag_elem is not None and tag_elem.text:
old_name = vlanif_elem.text
# Normalize to vlan0.{tag} — OPNsense won't accept vlan1.X, vlan2.X, etc.
new_name = f'vlan0.{tag_elem.text}'
if old_name != new_name:
vlanif_elem.text = new_name
# Add mapping so all interface <if> references are updated too
IFACE_MAP[old_name] = new_name
def convert_dhcpd(pf_root, op_root):
"""Convert DHCP server configuration."""
pf_dhcpd = pf_root.find('dhcpd')
if pf_dhcpd is None:
return
op_dhcpd = ET.SubElement(op_root, 'dhcpd')
for tag in ['lan', 'wan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
pf_scope = pf_dhcpd.find(tag)
if pf_scope is None:
continue
op_scope = ET.SubElement(op_dhcpd, tag)
# Copy simple config elements
for simple in ['enable', 'range', 'defaultleasetime', 'maxleasetime',
'netmask', 'gateway', 'domain', 'domainsearchlist',
'dnsserver', 'ntpserver', 'tfpt', 'ldap', 'filename',
'rootpath', 'nextserver', 'failover_peerip',
'mac_allow', 'mac_deny', 'ddnsdomain', 'ddnsdomainprimary',
'ddnsdomainkeyname', 'ddnsdomainkey', 'ddnsdomainkeyalgorithm',
'ddnsclientupdates', 'ddnsdomainsecondary',
'filename32', 'filename64', 'filename32arm', 'filename64arm',
'uefihttpboot', 'numberoptions']:
for elem in pf_scope.findall(simple):
copy_element_into(elem, op_scope)
# Static mappings with enhanced field mapping
for pf_map in pf_scope.findall('staticmap'):
op_map = ET.SubElement(op_scope, 'staticmap')
for tag_name in ['mac', 'cid', 'ipaddr', 'hostname', 'descr',
'filename', 'rootpath', 'defaultleasetime',
'maxleasetime', 'gateway', 'domain',
'domainsearchlist', 'ddnsdomain',
'ddnsdomainprimary', 'ddnsdomainsecondary',
'ddnsdomainkeyname', 'ddnsdomainkeyalgorithm',
'ddnsdomainkey', 'tftp', 'ldap', 'nextserver',
'filename32', 'filename64', 'filename32arm',
'filename64arm', 'uefihttpboot', 'numberoptions',
'earlydnsregpolicy']:
elem = pf_map.find(tag_name)
if elem is not None and elem.text:
copy_element_into(elem, op_map)
# Pool (used in some OPNsense versions)
pf_pool = pf_scope.find('pool')
if pf_pool is not None:
copy_element_into(pf_pool, op_scope)
# Normalize <enable /> to <enable>1</enable>
enable_elem = op_scope.find('enable')
if enable_elem is not None and not enable_elem.text:
enable_elem.text = '1'
# DHCP backend (preserve pfSense value: kea or isc)
set_text(op_dhcpd, 'backend', safe_text(pf_root, 'dhcpbackend', 'kea'))
def convert_kea_dhcp(pf_root, op_root):
"""Convert Kea DHCP configuration.
pfSense stores DHCP config in ISC-format <dhcpd> section even when
using Kea backend. OPNsense expects Kea config under <OPNsense><Kea>
with subnets and reservations in OPNsense MVC model format.
"""
pf_dhcpd = pf_root.find('dhcpd')
if pf_dhcpd is None:
return
# Build interface info map: scope tag -> {if, ip, subnet, descr}
scope_to_iface = {}
pf_ifaces = pf_root.find('interfaces')
if pf_ifaces is not None:
for tag in ['wan', 'lan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
iface = pf_ifaces.find(tag)
if iface is None:
continue
if_el = iface.find('if')
ip_el = iface.find('ipaddr')
sn_el = iface.find('subnet')
descr_el = iface.find('descr')
scope_to_iface[tag] = {
'if': if_el.text if if_el is not None and if_el.text else '',
'ip': ip_el.text if ip_el is not None and ip_el.text else '',
'subnet': sn_el.text if sn_el is not None and sn_el.text else '',
'descr': descr_el.text if descr_el is not None and descr_el.text else tag.upper(),
}
# Find or create <OPNsense> for OPNsense MVC mount elements
op_opnsense = op_root.find('OPNsense')
if op_opnsense is None:
op_opnsense = ET.SubElement(op_root, 'OPNsense')
# Create <Kea><dhcp4> structure
op_kea = ET.SubElement(op_opnsense, 'Kea')
op_dhcp4 = ET.SubElement(op_kea, 'dhcp4')
# General settings
op_general = ET.SubElement(op_dhcp4, 'general')
set_text(op_general, 'enabled', '1')
set_text(op_general, 'fwrules', '1')
set_text(op_general, 'valid_lifetime', '7200')
set_text(op_general, 'dhcp_socket_type', 'raw')
# Collect unique interfaces for the interfaces field
dhcp_ifaces = []
# Subnets container
op_subnets = ET.SubElement(op_dhcp4, 'subnets')
# Build subnets from enabled DHCP scopes + scopes with static maps
subnet_uuids = {} # scope tag -> subnet UUID
for tag in ['lan', 'wan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11',
'opt12', 'opt13', 'opt14', 'opt15']:
pf_scope = pf_dhcpd.find(tag)
if pf_scope is None:
continue
enable_el = pf_scope.find('enable')
has_staticmaps = pf_scope.find('staticmap') is not None
pf_range = pf_scope.find('range')
has_range = pf_range is not None and (pf_range.find('from') is not None or pf_range.text is not None)
# Only create subnet if scope is enabled or has static maps/ranges
if enable_el is None and not has_staticmaps and not has_range:
continue
# Get interface info
iface_info = scope_to_iface.get(tag, {})
ifname = iface_info.get('if', '')
ipaddr = iface_info.get('ip', '')
subnet_mask = iface_info.get('subnet', '')
# OPNsense Kea model uses InterfaceField which stores logical
# interface names (lan, opt2), not device names (igc1, vlan0.20).
# The model's getConfigPhysicalInterfaces() internally resolves
# these to the actual device names for the Kea config file.
if tag not in dhcp_ifaces:
dhcp_ifaces.append(tag)
# Compute subnet CIDR
subnet_cidr = ''
if ipaddr and subnet_mask:
try:
ip_intf = ipaddress.IPv4Interface('%s/%s' % (ipaddr, subnet_mask))
subnet_cidr = str(ip_intf.network)
except Exception:
pass
# Skip WAN-type interfaces that aren't real subnets
if ipaddr == 'pppoe' or not subnet_cidr:
continue
# Generate subnet UUID
suuid = _gen_uuid('dhcp-subnet-' + tag)
subnet_uuids[tag] = suuid
# Create subnet4
op_subnet = ET.SubElement(op_subnets, 'subnet4')
op_subnet.set('uuid', suuid)
set_text(op_subnet, 'subnet', subnet_cidr)
set_text(op_subnet, 'option_data_autocollect', '0')
set_text(op_subnet, 'description', iface_info.get('descr', tag.upper()))
# Option data
op_od = ET.SubElement(op_subnet, 'option_data')
# Gateway -> routers. When no explicit gateway in dhcpd scope,
# use the interface IP (it acts as the subnet's default gateway).
gw = pf_scope.find('gateway')
if gw is not None and gw.text:
set_text(op_od, 'routers', gw.text)
elif ipaddr:
set_text(op_od, 'routers', ipaddr)
# DNS servers
dns_servers = [d.text for d in pf_scope.findall('dnsserver') if d.text]
if dns_servers:
set_text(op_od, 'domain_name_servers', ','.join(dns_servers))
# Domain
domain = pf_scope.find('domain')
if domain is not None and domain.text:
set_text(op_od, 'domain_name', domain.text)
# Domain search
dsl = pf_scope.find('domainsearchlist')
if dsl is not None and dsl.text:
set_text(op_od, 'domain_search', dsl.text)
# NTP
ntp = pf_scope.find('ntpserver')
if ntp is not None and ntp.text:
set_text(op_od, 'ntp_servers', ntp.text)
# Next server (PXE)
ns = pf_scope.find('nextserver')
if ns is not None and ns.text:
set_text(op_subnet, 'next_server', ns.text)
# Pools: convert pfSense <range><from>X</from><to>Y</to></range> to "X-Y"
if pf_range is not None:
from_elem = pf_range.find('from')
to_elem = pf_range.find('to')
from_ip = from_elem.text if from_elem is not None else None
to_ip = to_elem.text if to_elem is not None else None
if from_ip and to_ip:
set_text(op_subnet, 'pools', '%s-%s' % (from_ip, to_ip))
# Set interfaces list on general
if dhcp_ifaces:
set_text(op_general, 'interfaces', ','.join(dhcp_ifaces))
# Reservations container
op_reservations = ET.SubElement(op_dhcp4, 'reservations')
# Build reservations from static maps across all scopes
seen_ips_per_subnet = {} # suuid -> set of IPs already reserved
for tag, suuid in subnet_uuids.items():
pf_scope = pf_dhcpd.find(tag)
if pf_scope is None:
continue
if suuid not in seen_ips_per_subnet:
seen_ips_per_subnet[suuid] = set()
for pf_sm in pf_scope.findall('staticmap'):
mac = pf_sm.findtext('mac', '')
ipaddr = pf_sm.findtext('ipaddr', '')
hostname = pf_sm.findtext('hostname', '')
descr = pf_sm.findtext('descr', '')
if not mac and not ipaddr:
continue
# Deduplicate: skip same IP already reserved in this subnet
if ipaddr:
if ipaddr in seen_ips_per_subnet[suuid]:
continue
seen_ips_per_subnet[suuid].add(ipaddr)
# Deterministic UUID from MAC or IP
seed = mac or ipaddr
ruuid = _gen_uuid('dhcp-reservation-' + seed)
op_res = ET.SubElement(op_reservations, 'reservation')
op_res.set('uuid', ruuid)
set_text(op_res, 'subnet', suuid)
if ipaddr:
set_text(op_res, 'ip_address', ipaddr)
if mac:
set_text(op_res, 'hw_address', mac)
if hostname:
set_text(op_res, 'hostname', hostname)
if descr:
set_text(op_res, 'description', descr)
# Empty option_data for reservation
ET.SubElement(op_res, 'option_data')
def convert_aliases(pf_root, op_root):
"""Convert firewall aliases to OPNsense format.
OPNsense stores aliases in the MVC model at:
//OPNsense/Firewall/Alias -> <OPNsense><Firewall><Alias><aliases><alias>
Key field mappings (pfSense -> OPNsense):
<address> -> <content>
<descr> -> <description>
uuid attribute -> added (required by OPNsense model)
<enabled> -> <enabled>1</enabled> (added if missing)
"""
pf_aliases = pf_root.find('aliases')
if pf_aliases is None:
return
# Root-level <aliases> (legacy OPNsense parser compatibility)
op_aliases_root = ET.SubElement(op_root, 'aliases')
# MVC format at <OPNsense><Firewall><Alias><aliases>
op_opnsense = op_root.find('OPNsense')
if op_opnsense is None:
op_opnsense = ET.SubElement(op_root, 'OPNsense')
op_firewall = op_opnsense.find('Firewall')
if op_firewall is None:
op_firewall = ET.SubElement(op_opnsense, 'Firewall')
op_firewall_alias = ET.SubElement(op_firewall, 'Alias')
op_aliases_mvc = ET.SubElement(op_firewall_alias, 'aliases')
for pf_alias in pf_aliases.findall('alias'):
name = pf_alias.findtext('name', '')
auuid = _gen_uuid(f'alias-{name}')
def _build_alias(parent):
op_alias = ET.SubElement(parent, 'alias')
op_alias.set('uuid', auuid)
name_elem = pf_alias.find('name')
if name_elem is not None and name_elem.text:
ET.SubElement(op_alias, 'name').text = name_elem.text
type_elem = pf_alias.find('type')
if type_elem is not None and type_elem.text:
ET.SubElement(op_alias, 'type').text = type_elem.text
address = pf_alias.find('address')
if address is not None and address.text:
ET.SubElement(op_alias, 'content').text = address.text
descr = pf_alias.find('descr')
if descr is not None and descr.text:
ET.SubElement(op_alias, 'description').text = descr.text
detail = pf_alias.find('detail')
if detail is not None and detail.text:
ET.SubElement(op_alias, 'detail').text = detail.text
ET.SubElement(op_alias, 'enabled').text = '1'
_build_alias(op_aliases_root)
_build_alias(op_aliases_mvc)
def convert_nat(pf_root, op_root):
"""Convert NAT rules (port forwarding + outbound NAT)."""
pf_nat = pf_root.find('nat')
if pf_nat is None:
return
op_nat = ET.SubElement(op_root, 'nat')
# Port forwarding rules
for pf_rule in pf_nat.findall('rule'):
op_rule = ET.SubElement(op_nat, 'rule')
for child_tag in ['source', 'destination', 'ipprotocol', 'protocol',
'target', 'local-port', 'interface', 'descr',
'associated-rule-id', 'disabled', 'log', 'natreflection',
'nordr', 'poolopts', 'source_hash_key',
'allow', 'max', 'max-src-nodes', 'max-src-conn',
'max-src-states', 'statetimeout', 'statetype']:
for elem in pf_rule.findall(child_tag):
copy_element_into(elem, op_rule)
# Apply interface name mapping
iface_elem = op_rule.find('interface')
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
# updated/created timestamps if present
for meta in ['updated', 'created']:
pf_meta = pf_rule.find(meta)
if pf_meta is not None:
copy_element_into(pf_meta, op_rule)
# Outbound NAT
pf_outbound = pf_nat.find('outbound')
if pf_outbound is not None:
op_outbound = ET.SubElement(op_nat, 'outbound')
# OPNsense does NOT support pfSense's "hybrid" mode — it only
# supports "automatic", "manual", and "disabled". Map "hybrid"
# to "automatic" so that auto-generated rules are still created.
mode_el = pf_outbound.find('mode')
if mode_el is not None and mode_el.text == 'hybrid':
set_text(op_outbound, 'mode', 'automatic')
else:
copy_element_into(mode_el, op_outbound)
for pf_orule in pf_outbound.findall('rule'):
op_orule = ET.SubElement(op_outbound, 'rule')
for child_tag in ['source', 'sourceport', 'destination', 'descr',
'target', 'interface', 'poolopts',
'source_hash_key', 'staticnatport', 'ipprotocol',
'protocol', 'disabled', 'target_subnet',
'nonat', 'log', 'mirror']:
for elem in pf_orule.findall(child_tag):
copy_element_into(elem, op_orule)
# Apply interface name mapping
iface_elem = op_orule.find('interface')
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
for meta in ['created', 'updated']:
pf_meta = pf_orule.find(meta)
if pf_meta is not None:
copy_element_into(pf_meta, op_orule)
# 1:1 NAT
pf_onetoone = pf_nat.find('onetoone')
if pf_onetoone is not None:
op_onetoone = ET.SubElement(op_nat, 'onetoone')
for pf_rule in pf_onetoone.findall('rule'):
copy_element_into(pf_rule, op_onetoone)
# NAT separators
pf_sep = pf_nat.find('separator')
if pf_sep is not None:
copy_element_into(pf_sep, op_nat)
def _build_iface_networks(pf_root):
"""Build a map: interface tag (lan, opt2, etc.) -> subnet CIDR."""
nets = {}
pf_ifaces = pf_root.find('interfaces')
if pf_ifaces is None:
return nets
for tag in ['wan', 'lan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5',
'opt6', 'opt7', 'opt8', 'opt9', 'opt10']:
iface = pf_ifaces.find(tag)
if iface is None:
continue
ip = iface.findtext('ipaddr', '')
subnet = iface.findtext('subnet', '')
if ip and subnet and ip != 'pppoe' and ip != 'dhcp':
try:
intf = ipaddress.IPv4Interface(f'{ip}/{subnet}')
nets[tag] = str(intf.network)
except Exception:
pass
return nets
def convert_filter(pf_root, op_root):
"""Convert firewall rules."""
pf_filter = pf_root.find('filter')
if pf_filter is None:
return
op_filter = ET.SubElement(op_root, 'filter')
# Build interface-to-subnet map to resolve dynamic network references.
iface_networks = _build_iface_networks(pf_root)
for pf_rule in pf_filter.findall('rule'):
op_rule = ET.SubElement(op_filter, 'rule')
for child_tag in ['id', 'tracker', 'type', 'interface', 'ipprotocol',
'protocol', 'icmptype', 'source', 'destination',
'descr', 'tag', 'tagged', 'max', 'max-src-nodes',
'max-src-conn', 'max-src-states', 'statetimeout',
'statetype', 'os', 'direction', 'log',
'disabled', 'gateway', 'bridgeto', 'srcmac',
'dstmac', 'allowopts', 'pflow']:
for elem in pf_rule.findall(child_tag):
copy_element_into(elem, op_rule)
# Apply interface name mapping to the <interface> tag
iface_elem = op_rule.find('interface')
if iface_elem is not None and iface_elem.text:
iface_elem.text = _map_iface_in_text(iface_elem.text)
# Resolve dynamic <network>interface_name</network> references
# in source and destination to explicit <address>cidr</address>
# values. OPNsense can sometimes fail to resolve these dynamic
# references during config import, especially for VLAN interfaces
# whose IPs may not yet be assigned.
for scope_tag in ['source', 'destination']:
scope = op_rule.find(scope_tag)
if scope is None:
continue
net_elem = scope.find('network')
if net_elem is not None and net_elem.text:
ifname = net_elem.text
if ifname in iface_networks:
addr_elem = ET.SubElement(scope, 'address')
addr_elem.text = iface_networks[ifname]
scope.remove(net_elem)
for meta in ['updated', 'created']:
pf_meta = pf_rule.find(meta)
if pf_meta is not None:
copy_element_into(pf_meta, op_rule)
# Filter separators (if present)
pf_sep = pf_filter.find('separator')
if pf_sep is not None:
copy_element_into(pf_sep, op_filter)
# Default deny rules configuration
for tag in ['defaultallow', 'disablestatenormalization',
'defaultdenyimg']:
val = safe_text(pf_filter, tag)
if val:
set_text(op_filter, tag, val)
def convert_certificates(pf_root, op_root):
"""Convert Certificate Authorities and Certificates.
pfSense can store CAs/certs in different formats:
(a) Flat: <ca><refid>xxx</refid><descr>...</descr><crt>...</crt></ca>
(single CA, or multiple CAs as repeated root-level <ca>)
(b) Nested: <ca><ca><refid>xxx</refid>...</ca></ca>
OPNsense expects:
<ca>
<ca refid="xxx">
<descr>...</descr>
<crt>...</crt>
</ca>
</ca>
where refid is an ATTRIBUTE, not a child element.
"""
for section_tag in ['ca', 'cert', 'crl']:
# Find all matching elements at root level (handles both single
# and multiple CA/cert entries).
pf_entries = pf_root.findall(section_tag)
if not pf_entries:
continue
op_section = ET.SubElement(op_root, section_tag)
for pf_entry in pf_entries:
# Check if already in OPNsense format: has <ca> children
# with refid attributes.
nested = pf_entry.find(section_tag)
if nested is not None:
# Already nested format — just copy children
for child in pf_entry:
copy_element_into(child, op_section)
continue
# Flat format: <refid> is a child element.
# Need to promote it to an attribute on a wrapping element.
refid_elem = pf_entry.find('refid')
if refid_elem is not None and refid_elem.text:
refid = refid_elem.text
op_entry = ET.SubElement(op_section, section_tag)
op_entry.set('refid', refid)
for pf_child in pf_entry:
if pf_child.tag == 'refid':
continue
copy_element_into(pf_child, op_entry)
else:
# No refid found — copy children as-is
for pf_child in pf_entry:
copy_element_into(pf_child, op_section)
def _map_openvpn_mode_role(mode):
"""Map pfSense OpenVPN mode to OPNsense role."""
if mode in ('server_tls', 'p2p_tls', 'server_user', 'server_user_tls',
'p2p_shared_key', 'server_tls_validate'):
return 'server'
elif mode in ('client_tls', 'client_user', 'client_user_tls',
'p2p_tls_client', 'client_tls_validate'):
return 'client'
return 'server'
def _map_openvpn_protocol(proto):
"""Map pfSense protocol to OPNsense proto."""
m = {
'UDP': 'udp', 'UDP4': 'udp4', 'UDP6': 'udp6',
'TCP': 'tcp', 'TCP4': 'tcp4', 'TCP6': 'tcp6',
}
return m.get(proto, 'udp')
def _map_openvpn_dev_mode(dev):
"""Map pfSense dev_mode to OPNsense dev_type."""
return dev if dev in ('tun', 'tap', 'ovpn') else 'tun'
def _map_openvpn_verbosity(level):
"""Map pfSense verbosity level string to OPNsense verb."""
try:
v = int(level)
return str(max(0, min(11, v)))
except (ValueError, TypeError):
return '3'
def _pf_openvpn_ip_list(pf_srv, tag, count=4):
"""Collect a list of IP addresses from pfSense openvpn N tags."""
ips = []
for i in range(1, count + 1):
val = safe_text(pf_srv, f'{tag}{i}')
if val:
ips.append(val)
return ips
def _pf_openvpn_network_list(pf_srv, tag):
"""Collect a CIDR list from pfSense openvpn remote/local network tags."""
networks = []
for row in pf_srv.findall(tag):
val = safe_text(row, 'network')
if val:
networks.append(val)
return networks
def _openvpn_mode_to_authmode(mode, pf_srv):
"""Infer OPNsense authmode from pfSense OpenVPN mode and config."""
if mode in ('server_user', 'server_user_tls', 'client_user', 'client_user_tls'):
return 'Local Database'
pf_authmode = pf_srv.find('authmode')
if pf_authmode is not None and pf_authmode.text:
return pf_authmode.text
return None
def _openvpn_flags_from_pf(pf_srv):
"""Collect various_flags boolean options from pfSense tags."""
flags = []
flag_map = {
'passtos': 'passtos',
'client_to_client': 'client-to-client',
'duplicate_cn': 'duplicate-cn',
'dynamic_ip': 'float',
}
for pf_tag, ovpn_flag in flag_map.items():
val = safe_text(pf_srv, pf_tag)
if val and val in ('yes', '1'):
flags.append(ovpn_flag)
return flags
def convert_openvpn(pf_root, op_root):
"""Convert OpenVPN configuration to OPNsense MVC format.
OPNsense 24.1+ uses MVC model at <OPNsense><OpenVPN> for OpenVPN.
The legacy <openvpn> root-level format is still supported via the
os-openvpn-legacy plugin.
pfSense stores:
<openvpn>
<openvpn-server>
<vpnid>1</vpnid>
<mode>server_tls</mode>
<protocol>UDP</protocol>
<dev_mode>tun</dev_mode>
<interface>wan</interface>
<local_port>1194</local_port>
...
</openvpn-server>
<openvpn-client>...</openvpn-client>
<openvpn-csc>...</openvpn-csc>
</openvpn>
"""
pf_ovpn = pf_root.find('openvpn')
if pf_ovpn is None:
return
# Legacy format (kept for os-openvpn-legacy plugin compatibility)
op_ovpn = ET.SubElement(op_root, 'openvpn')
# OPNsense MVC format
op_opnsense = op_root.find('OPNsense')
if op_opnsense is None:
op_opnsense = ET.SubElement(op_root, 'OPNsense')
op_mvc = ET.SubElement(op_opnsense, 'OpenVPN')
# --- Static Keys (collect first so instances can reference them) ---
# pfSense stores TLS key inline in each server/client. OPNsense MVC
# requires them as separate StaticKey objects with a UUID reference.
# We collect unique key+mode combos and assign deterministic UUIDs.
op_static_keys = ET.SubElement(op_mvc, 'StaticKeys')
tls_key_map = {} # (key_content, mode) -> uuid
# --- Instances (servers + clients) ---
op_instances = ET.SubElement(op_mvc, 'Instances')
# Process servers
for pf_srv in pf_ovpn.findall('openvpn-server'):
op_srv = ET.SubElement(op_ovpn, 'openvpn-server')
for child in pf_srv:
copy_element_into(child, op_srv)
mode = safe_text(pf_srv, 'mode', 'server_tls')
role = _map_openvpn_mode_role(mode)
# Build unique seed for deterministic instance UUID
vpnid = safe_text(pf_srv, 'vpnid', '1')
descr = safe_text(pf_srv, 'description', '')
certref = safe_text(pf_srv, 'certref', '')
iuuid = _gen_uuid(f'ovpn-instance-{vpnid}-{certref}-{descr}')
op_instance = ET.SubElement(op_instances, 'Instance')
op_instance.set('uuid', iuuid)
# Basic fields
set_text(op_instance, 'vpnid', vpnid)
set_text(op_instance, 'enabled', '1')
set_text(op_instance, 'role', role)
set_text(op_instance, 'dev_type',
_map_openvpn_dev_mode(safe_text(pf_srv, 'dev_mode', 'tun')))
set_text(op_instance, 'proto',
_map_openvpn_protocol(safe_text(pf_srv, 'protocol', 'UDP')))
set_text(op_instance, 'port', safe_text(pf_srv, 'local_port'))
set_text(op_instance, 'description', descr)
set_text(op_instance, 'verb',
_map_openvpn_verbosity(safe_text(pf_srv, 'verbosity_level', '3')))
ET.SubElement(op_instance, 'compression')
# Topology
topology = safe_text(pf_srv, 'topology')
if topology:
set_text(op_instance, 'topology', topology)
# Server tunnel network (CIDR)
tun_net = safe_text(pf_srv, 'tunnel_network')
if tun_net:
set_text(op_instance, 'server', tun_net)
tun_net6 = safe_text(pf_srv, 'tunnel_networkv6')
if tun_net6:
set_text(op_instance, 'server_ipv6', tun_net6)
# Bridge config
bridge_gw = safe_text(pf_srv, 'server_bridge_gateway')
if bridge_gw:
set_text(op_instance, 'bridge_gateway', bridge_gw)
bridge_start = safe_text(pf_srv, 'server_bridge_dhcp_start')
bridge_end = safe_text(pf_srv, 'server_bridge_dhcp_end')
if bridge_start and bridge_end:
set_text(op_instance, 'bridge_pool', f'{bridge_start}-{bridge_end}')
# Certificate references
if certref:
set_text(op_instance, 'cert', certref)
caref = safe_text(pf_srv, 'caref')
if caref:
set_text(op_instance, 'ca', caref)
# TLS key -> StaticKeys
tls_text = safe_text(pf_srv, 'tls')
if tls_text:
tls_type = safe_text(pf_srv, 'tls_type', 'crypt')
key_tuple = (tls_text, tls_type)
if key_tuple not in tls_key_map:
sk_uuid = _gen_uuid(f'ovpn-tls-{tls_text[:32]}')
op_sk = ET.SubElement(op_static_keys, 'StaticKey')
op_sk.set('uuid', sk_uuid)
set_text(op_sk, 'mode', tls_type)
set_text(op_sk, 'key', tls_text)
set_text(op_sk, 'description',
f'Converted from {descr or f"VPN {vpnid}"}')
tls_key_map[key_tuple] = sk_uuid
set_text(op_instance, 'tls_key', tls_key_map[key_tuple])
# Routes
routes = _pf_openvpn_network_list(pf_srv, 'remote_network')
if routes:
set_text(op_instance, 'route', ','.join(routes))
# Push routes
push_routes = _pf_openvpn_network_list(pf_srv, 'local_network')
if push_routes:
set_text(op_instance, 'push_route', ','.join(push_routes))
# DNS
dns_servers = _pf_openvpn_ip_list(pf_srv, 'dns_server', 4)
if dns_servers:
set_text(op_instance, 'dns_servers', ','.join(dns_servers))
dns_domain = safe_text(pf_srv, 'dns_domain')
if dns_domain:
set_text(op_instance, 'dns_domain', dns_domain)
# NTP
ntp_servers = _pf_openvpn_ip_list(pf_srv, 'ntp_server', 2)
if ntp_servers:
set_text(op_instance, 'ntp_servers', ','.join(ntp_servers))
# Various flags
flags = _openvpn_flags_from_pf(pf_srv)
if flags:
set_text(op_instance, 'various_flags', ','.join(flags))
# Max clients
max_clients = safe_text(pf_srv, 'maxclients')
if max_clients:
set_text(op_instance, 'maxclients', max_clients)
# Certificate depth
cert_depth = safe_text(pf_srv, 'cert_depth')
if cert_depth:
set_text(op_instance, 'cert_depth', cert_depth)
# Remote cert TLS
remote_cert_tls = safe_text(pf_srv, 'remote_cert_tls')
if remote_cert_tls and remote_cert_tls in ('yes', '1'):
set_text(op_instance, 'remote_cert_tls', '1')
# OCSP
use_ocsp = safe_text(pf_srv, 'use_ocsp')
if use_ocsp and use_ocsp in ('yes', '1'):
set_text(op_instance, 'use_ocsp', '1')
# Keepalive
ka_interval = safe_text(pf_srv, 'keepalive_interval')
if ka_interval:
set_text(op_instance, 'keepalive_interval', ka_interval)
ka_timeout = safe_text(pf_srv, 'keepalive_timeout')
if ka_timeout:
set_text(op_instance, 'keepalive_timeout', ka_timeout)
# Renegotiation
reneg = safe_text(pf_srv, 'renegotiate_seconds')
if not reneg:
reneg = safe_text(pf_srv, 'reneg-sec')
if reneg:
set_text(op_instance, 'reneg-sec', reneg)
# Data ciphers
ciphers = safe_text(pf_srv, 'data_ciphers')
if ciphers:
set_text(op_instance, 'data-ciphers', ciphers)
fallback = safe_text(pf_srv, 'data_ciphers_fallback')
if fallback:
set_text(op_instance, 'data-ciphers-fallback', fallback)
# Auth
auth = safe_text(pf_srv, 'auth')
if auth:
set_text(op_instance, 'auth', auth)
# HTTP proxy
proxy_addr = safe_text(pf_srv, 'proxy_addr')
proxy_port = safe_text(pf_srv, 'proxy_port')
if proxy_addr and proxy_port:
set_text(op_instance, 'http-proxy', f'{proxy_addr}:{proxy_port}')
# Auth mode / username-as-common-name
local_auth = safe_text(pf_srv, 'local_auth')
if local_auth and local_auth in ('yes', '1'):
set_text(op_instance, 'username_as_common_name', '1')
# Various push flags
push_flags = []
block_outside = safe_text(pf_srv, 'block_outside_dns')
if block_outside and block_outside in ('yes', '1'):
push_flags.append('block-outside-dns')
# Register DNS
register_dns = safe_text(pf_srv, 'register_dns')
if register_dns and register_dns in ('yes', '1'):
push_flags.append('register-dns')
if push_flags:
set_text(op_instance, 'various_push_flags', ','.join(push_flags))
# Send/Receive buffers
sndbuf = safe_text(pf_srv, 'sndbuf')
if sndbuf:
set_text(op_instance, 'tun_mtu', sndbuf)
# Note: OPNsense only has tun_mtu for buffer sizes
# Compression
comp = safe_text(pf_srv, 'compression')
if comp:
set_text(op_instance, 'compress_migrate', '1' if comp in ('yes', '1') else '0')
# MSS fix
mssfix = safe_text(pf_srv, 'mssfix')
if mssfix:
set_text(op_instance, 'mssfix', '1' if mssfix in ('yes', '1') else '0')
# Route metric
route_metric = safe_text(pf_srv, 'route_metric')
if route_metric:
set_text(op_instance, 'route_metric', route_metric)
# Redirect gateway
redir_gw = safe_text(pf_srv, 'gw_redir')
if redir_gw:
set_text(op_instance, 'redirect_gateway', redir_gw)
# Process clients
for pf_cli in pf_ovpn.findall('openvpn-client'):
op_cli = ET.SubElement(op_ovpn, 'openvpn-client')
for child in pf_cli:
copy_element_into(child, op_cli)
mode = safe_text(pf_cli, 'mode', 'client_tls')
role = _map_openvpn_mode_role(mode)
vpnid = safe_text(pf_cli, 'vpnid', '1')
descr = safe_text(pf_cli, 'description', '')
certref = safe_text(pf_cli, 'certref', '')
iuuid = _gen_uuid(f'ovpn-instance-{vpnid}-{certref}-{descr}')
op_instance = ET.SubElement(op_instances, 'Instance')
op_instance.set('uuid', iuuid)
set_text(op_instance, 'vpnid', vpnid)
set_text(op_instance, 'enabled', '1')
set_text(op_instance, 'role', role)
set_text(op_instance, 'dev_type',
_map_openvpn_dev_mode(safe_text(pf_cli, 'dev_mode', 'tun')))
set_text(op_instance, 'proto',
_map_openvpn_protocol(safe_text(pf_cli, 'protocol', 'UDP')))
set_text(op_instance, 'port', safe_text(pf_cli, 'local_port'))
set_text(op_instance, 'description', descr)
set_text(op_instance, 'verb',
_map_openvpn_verbosity(safe_text(pf_cli, 'verbosity_level', '3')))
ET.SubElement(op_instance, 'compression')
# Server address/port for client mode
server_addr = safe_text(pf_cli, 'server_addr')
if server_addr:
server_port = safe_text(pf_cli, 'server_port', '1194')
op_remote = ET.SubElement(op_instance, 'remote')
set_text(op_remote, 'host', server_addr)
set_text(op_remote, 'port', server_port)
# Resolve interface name to IP for 'local'
iface_name = safe_text(pf_cli, 'interface')
if iface_name:
set_text(op_instance, 'local', iface_name)
# Certificate references
if certref:
set_text(op_instance, 'cert', certref)
caref = safe_text(pf_cli, 'caref')
if caref:
set_text(op_instance, 'ca', caref)
# TLS key
tls_text = safe_text(pf_cli, 'tls')
if tls_text:
tls_type = safe_text(pf_cli, 'tls_type', 'crypt')
key_tuple = (tls_text, tls_type)
if key_tuple not in tls_key_map:
sk_uuid = _gen_uuid(f'ovpn-tls-{tls_text[:32]}')
op_sk = ET.SubElement(op_static_keys, 'StaticKey')
op_sk.set('uuid', sk_uuid)
set_text(op_sk, 'mode', tls_type)
set_text(op_sk, 'key', tls_text)
set_text(op_sk, 'description',
f'Converted from {descr or f"VPN {vpnid}"}')
tls_key_map[key_tuple] = sk_uuid
set_text(op_instance, 'tls_key', tls_key_map[key_tuple])
# Client-specific: tunnel network for client
tun_net = safe_text(pf_cli, 'tunnel_network')
if tun_net:
# Client uses the tunnel network as its local tunnel address
set_text(op_instance, 'server', tun_net)
# DNS
dns_servers = _pf_openvpn_ip_list(pf_cli, 'dns_server', 4)
if dns_servers:
set_text(op_instance, 'dns_servers', ','.join(dns_servers))
dns_domain = safe_text(pf_cli, 'dns_domain')
if dns_domain:
set_text(op_instance, 'dns_domain', dns_domain)
# Keepalive
ka_interval = safe_text(pf_cli, 'keepalive_interval')
if ka_interval:
set_text(op_instance, 'keepalive_interval', ka_interval)
ka_timeout = safe_text(pf_cli, 'keepalive_timeout')
if ka_timeout:
set_text(op_instance, 'keepalive_timeout', ka_timeout)
# Auth
auth = safe_text(pf_cli, 'auth')
if auth:
set_text(op_instance, 'auth', auth)
# Data ciphers
ciphers = safe_text(pf_cli, 'data_ciphers')
if ciphers:
set_text(op_instance, 'data-ciphers', ciphers)
fallback = safe_text(pf_cli, 'data_ciphers_fallback')
if fallback:
set_text(op_instance, 'data-ciphers-fallback', fallback)
# Topology
topology = safe_text(pf_cli, 'topology')
if topology:
set_text(op_instance, 'topology', topology)
# Proxy
proxy_addr = safe_text(pf_cli, 'proxy_addr')
proxy_port = safe_text(pf_cli, 'proxy_port')
if proxy_addr and proxy_port:
set_text(op_instance, 'http-proxy', f'{proxy_addr}:{proxy_port}')
# --- Client-Specific Overrides ---
op_overwrites = ET.SubElement(op_mvc, 'Overwrites')
for pf_cso in pf_ovpn.findall('openvpn-csc'):
op_cso = ET.SubElement(op_ovpn, 'openvpn-cso')
for child in pf_cso:
copy_element_into(child, op_cso)
common_name = safe_text(pf_cso, 'common_name', '')
server_list = safe_text(pf_cso, 'server_list', '')
ouuid = _gen_uuid(f'ovpn-overwrite-{common_name}-{server_list}')
op_overwrite = ET.SubElement(op_overwrites, 'Overwrite')
op_overwrite.set('uuid', ouuid)
set_text(op_overwrite, 'enabled', '1')
set_text(op_overwrite, 'common_name', common_name)
if server_list:
set_text(op_overwrite, 'servers', server_list)
# Tunnel network
tun_net = safe_text(pf_cso, 'tunnel_network')
if tun_net:
set_text(op_overwrite, 'tunnel_network', tun_net)
tun_net6 = safe_text(pf_cso, 'tunnel_networkv6')
if tun_net6:
set_text(op_overwrite, 'tunnel_networkv6', tun_net6)
# Local/remote networks
local_networks = []
for row in pf_cso.findall('local_network'):
val = safe_text(row, 'network')
if val:
local_networks.append(val)
if local_networks:
set_text(op_overwrite, 'local_networks', ','.join(local_networks))
remote_networks = []
for row in pf_cso.findall('remote_network'):
val = safe_text(row, 'network')
if val:
remote_networks.append(val)
if remote_networks:
set_text(op_overwrite, 'remote_networks', ','.join(remote_networks))
# Redirect gateway
redir_gw = safe_text(pf_cso, 'gw_redir')
if redir_gw:
set_text(op_overwrite, 'redirect_gateway', redir_gw)
# DNS
dns_servers = _pf_openvpn_ip_list(pf_cso, 'dns_server', 4)
if dns_servers:
set_text(op_overwrite, 'dns_servers', ','.join(dns_servers))
dns_domain = safe_text(pf_cso, 'dns_domain')
if dns_domain:
set_text(op_overwrite, 'dns_domain', dns_domain)
set_text(op_overwrite, 'description', safe_text(pf_cso, 'description'))
def convert_ipsec(pf_root, op_root):
"""Convert IPsec configuration."""
pf_ipsec = pf_root.find('ipsec')
if pf_ipsec is None:
return
op_ipsec = ET.SubElement(op_root, 'ipsec')
# IPsec phase 1
for pf_p1 in pf_ipsec.findall('phase1'):
copy_element_into(pf_p1, op_ipsec)
# IPsec phase 2
for pf_p2 in pf_ipsec.findall('phase2'):
copy_element_into(pf_p2, op_ipsec)
# IPsec mobile clients
for pf_m in pf_ipsec.findall('mobile'):
copy_element_into(pf_m, op_ipsec)
def _map_iface_in_text(text):
"""Map interface names inside a text value (replacing tun_wgX with wgX etc)."""
result = text
for old, new in IFACE_MAP.items():
result = result.replace(old, new)
return result
def _find_wireguard(pf_root):
"""Find the WireGuard config in pfSense XML.
pfSense stores WireGuard config either:
- At top level: <wireguard>
- Inside installedpackages: <installedpackages><wireguard>
"""
pf_wg = pf_root.find('wireguard')
if pf_wg is not None and len(pf_wg):
return pf_wg
pf_wg = pf_root.find('installedpackages/wireguard')
if pf_wg is not None:
return pf_wg
return None
def _gen_uuid(seed_str):
"""Generate a UUID that is valid as an XML element name (starts with letter)."""
while True:
u = str(uuid.uuid5(uuid.NAMESPACE_DNS, seed_str))
if u[0].isalpha():
return u
# Salt and retry if UUID starts with a digit (invalid XML element name)
seed_str = f'{seed_str}_retry'
def convert_wireguard(pf_root, op_root):
"""Convert WireGuard configuration.
pfSense stores (in <installedpackages><wireguard>):
<tunnels><item>
<name>tun_wg0</name>
<enabled>yes</enabled>
<publickey>...</publickey>
<privatekey>...</privatekey>
<listenport>51820</listenport>
<mtu>1420</mtu>
<descr>...</descr>
<addresses><row><address>10.0.9.254</address><mask>24</mask></row></addresses>
</item></tunnels>
<peers><item>
<enabled>yes</enabled>
<tun>tun_wg0</tun>
<publickey>...</publickey>
<presharedkey>...</presharedkey>
<endpoint>host</endpoint><port>51820</port>
<persistentkeepalive>60</persistentkeepalive>
<descr>...</descr>
<allowedips><row><address>10.0.9.1</address><mask>32</mask></row></allowedips>
</item></peers>
OPNsense stores under <OPNsense><wireguard> (nested, mount: OPNsense/wireguard):
<general><enabled>1</enabled></general>
<server><servers>
<uuid> <!-- element name is the UUID -->
<enabled>1</enabled>
<name>wg0</name>
<instance>0</instance>
<pubkey>...</pubkey>
<privkey>...</privkey>
<port>51820</port>
<tunneladdress>10.0.9.254/24</tunneladdress>
<peers>uuid1,uuid2</peers>
<mtu>1420</mtu>
<disableroutes>0</disableroutes>
<debug>0</debug>
<gateway></gateway>
</uuid>
</servers></server>
<client><clients>
<uuid1> <!-- element name is the UUID -->
<enabled>1</enabled>
<name>...</name>
<pubkey>...</pubkey>
<psk></psk>
<tunneladdress>10.0.9.1/32</tunneladdress>
<serveraddress>...</serveraddress>
<serverport>51820</serverport>
<keepalive>60</keepalive>
</uuid1>
</clients></client>
"""
pf_wg = _find_wireguard(pf_root)
if pf_wg is None:
return
# OPNsense MVC mount: OPNsense/wireguard → <OPNsense><wireguard> (nested, not a single tag!)
op_opnsense = op_root.find('OPNsense')
if op_opnsense is None:
op_opnsense = ET.SubElement(op_root, 'OPNsense')
op_wg = ET.SubElement(op_opnsense, 'wireguard')
# General section
op_general = ET.SubElement(op_wg, 'general')
set_text(op_general, 'enabled', '1')
# First pass: collect all peers and assign UUIDs
peer_data = [] # list of (elem, uuid)
pf_peers = pf_wg.find('peers')
if pf_peers is not None:
for pf_item in pf_peers.findall('item'):
pubkey = safe_text(pf_item, 'publickey', '')
descr = safe_text(pf_item, 'descr', '')
seed = pubkey or descr or str(len(peer_data))
puuid = _gen_uuid(f'wg-peer-{seed}')
peer_data.append((pf_item, puuid))
# Servers (tunnels) section
# Each server entry uses its UUID as the XML element name (same pattern
# as clients), placed directly under <servers>.
op_server_root = ET.SubElement(op_wg, 'server')
op_servers = ET.SubElement(op_server_root, 'servers')
pf_tunnels = pf_wg.find('tunnels')
if pf_tunnels is not None:
for instance_idx, pf_item in enumerate(pf_tunnels.findall('item')):
raw_name = safe_text(pf_item, 'name', '')
mapped_name = _map_iface_in_text(raw_name) if raw_name else ''
pubkey = safe_text(pf_item, 'publickey', '')
suuid = _gen_uuid(f'wg-server-{pubkey or mapped_name or instance_idx}')
op_server = ET.SubElement(op_servers, 'server')
op_server.set('uuid', suuid)
enabled = safe_text(pf_item, 'enabled', 'yes')
set_text(op_server, 'enabled', '1' if enabled == 'yes' else '0')
set_text(op_server, 'name', _sanitize_name(mapped_name))
set_text(op_server, 'instance', str(instance_idx))
set_text(op_server, 'pubkey', pubkey)
set_text(op_server, 'privkey', safe_text(pf_item, 'privatekey'))
set_text(op_server, 'port', safe_text(pf_item, 'listenport'))
set_text(op_server, 'mtu', safe_text(pf_item, 'mtu'))
# Defaults
set_text(op_server, 'disableroutes', '0')
set_text(op_server, 'debug', '0')
ET.SubElement(op_server, 'gateway')
# Tunnel addresses as comma-separated CIDR
addresses = []
pf_addresses = pf_item.find('addresses')
if pf_addresses is not None:
for pf_row in pf_addresses.findall('row'):
addr = safe_text(pf_row, 'address')
mask = safe_text(pf_row, 'mask')
if addr and mask:
addresses.append(f'{addr}/{mask}')
if addresses:
set_text(op_server, 'tunneladdress', ','.join(addresses))
# Peers reference (comma-separated UUIDs)
tunnel_name = safe_text(pf_item, 'name', '')
mapped_tunnel_name = _map_iface_in_text(tunnel_name) if tunnel_name else ''
peer_uuids_for_server = []
for pf_peer_item, puuid in peer_data:
peer_tun = safe_text(pf_peer_item, 'tun', '')
if peer_tun == tunnel_name or _map_iface_in_text(peer_tun) == mapped_tunnel_name:
peer_uuids_for_server.append(puuid)
if peer_uuids_for_server:
set_text(op_server, 'peers', ','.join(peer_uuids_for_server))
# Clients (peers) section
# In OPNsense, each client entry uses its UUID as the XML element name,
# placed directly under <clients> (no <client> wrapper).
op_client_root = ET.SubElement(op_wg, 'client')
op_clients = ET.SubElement(op_client_root, 'clients')
for pf_item, puuid in peer_data:
# The UUID is the element name for the client entry itself
op_client_uuid = ET.SubElement(op_clients, puuid)
enabled = safe_text(pf_item, 'enabled', 'yes')
set_text(op_client_uuid, 'enabled', '1' if enabled == 'yes' else '0')
# Sanitize name to match OPNsense validation (no slashes/spaces)
raw_name = safe_text(pf_item, 'descr', '')
sane_name = _sanitize_name(raw_name) if raw_name else f'peer_{puuid[:8]}'
set_text(op_client_uuid, 'name', sane_name)
set_text(op_client_uuid, 'pubkey', safe_text(pf_item, 'publickey'))
set_text(op_client_uuid, 'psk', safe_text(pf_item, 'presharedkey'))
# Allowed IPs (comma-separated CIDRs)
allowed_ips = []
pf_allowed = pf_item.find('allowedips')
if pf_allowed is not None:
for pf_row in pf_allowed.findall('row'):
addr = safe_text(pf_row, 'address')
mask = safe_text(pf_row, 'mask')
if addr and mask:
allowed_ips.append(f'{addr}/{mask}')
if allowed_ips:
set_text(op_client_uuid, 'tunneladdress', ','.join(allowed_ips))
# Endpoint
endpoint = safe_text(pf_item, 'endpoint', '')
port = safe_text(pf_item, 'port', '')
if endpoint:
set_text(op_client_uuid, 'serveraddress', endpoint)
if port:
set_text(op_client_uuid, 'serverport', port)
set_text(op_client_uuid, 'keepalive', safe_text(pf_item, 'persistentkeepalive'))
def convert_static_routes(pf_root, op_root):
"""Convert static routes."""
pf_routes = pf_root.find('staticroutes')
if pf_routes is None:
return
op_routes = ET.SubElement(op_root, 'staticroutes')
for pf_route in pf_routes:
copy_element_into(pf_route, op_routes)
def convert_gateways(pf_root, op_root):
"""Convert gateway configuration.
OPNsense uses separate defaultgw4/defaultgw6 elements, while
pfSense uses a single defaultgw element. Convert accordingly.
"""
pf_gws = pf_root.find('gateways')
if pf_gws is None:
return
op_gws = ET.SubElement(op_root, 'gateways')
# Default gateway — OPNsense uses defaultgw4/defaultgw6
pf_dgw = pf_gws.find('defaultgw')
if pf_dgw is not None and pf_dgw.text:
set_text(op_gws, 'defaultgw4', pf_dgw.text)
ET.SubElement(op_gws, 'defaultgw6')
# Gateway entries
for pf_gw in pf_gws.findall('gateway_item'):
copy_element_into(pf_gw, op_gws)
# Gateway groups
for pf_gg in pf_gws.findall('gateway_group'):
copy_element_into(pf_gg, op_gws)
def _convert_dns_host_override(pf_host, op_hosts, host_uuids):
"""Convert one pfSense dnsmasq/unbound host override to OPNsense format.
Returns (uuid, alias_list) where alias_list is a list of (hostname, domain) tuples.
"""
hostname = safe_text(pf_host, 'host', '')
domain = safe_text(pf_host, 'domain', '')
ip = safe_text(pf_host, 'ip', '')
descr = safe_text(pf_host, 'descr', '')
if not hostname and not domain:
return None, None
seed = f'unbound-host-{hostname}-{domain}-{ip}'
huuid = _gen_uuid(seed)
op_host = ET.SubElement(op_hosts, 'host')
op_host.set('uuid', huuid)
ET.SubElement(op_host, 'enabled').text = '1'
if hostname:
ET.SubElement(op_host, 'hostname').text = hostname
if domain:
ET.SubElement(op_host, 'domain').text = domain
if ip:
# Determine RR type from IP
if ':' in ip:
ET.SubElement(op_host, 'rr').text = 'AAAA'
else:
ET.SubElement(op_host, 'rr').text = 'A'
ET.SubElement(op_host, 'server').text = ip
ET.SubElement(op_host, 'addptr').text = '0'
if descr:
ET.SubElement(op_host, 'description').text = descr
# Extract inline aliases from pfSense format
aliases = []
pf_aliases = pf_host.find('aliases')
if pf_aliases is not None:
for pf_item in pf_aliases.findall('item'):
a_host = safe_text(pf_item, 'host', '')
a_domain = safe_text(pf_item, 'domain', '')
if a_host or a_domain:
aliases.append((a_host, a_domain))
return huuid, aliases
def convert_dns(pf_root, op_root):
"""Convert pfSense dnsmasq config to OPNsense unbound.
pfSense stores DNS config in <dnsmasq> and <unbound> sections.
OPNsense uses <unbound> (legacy root-level) or <OPNsense><unboundplus> (MVC).
We convert dnsmasq config to unbound format since OPNsense only runs unbound.
"""
pf_dnsmasq = pf_root.find('dnsmasq')
pf_unbound = pf_root.find('unbound')
if pf_dnsmasq is None and pf_unbound is None:
return
# Root-level <unbound> section (works in both legacy and modern OPNsense)
op_unbound = ET.SubElement(op_root, 'unbound')
# Also produce MVC format under <OPNsense><unboundplus>
op_opnsense = op_root.find('OPNsense')
if op_opnsense is None:
op_opnsense = ET.SubElement(op_root, 'OPNsense')
op_mvc = ET.SubElement(op_opnsense, 'unboundplus')
op_mvc_general = ET.SubElement(op_mvc, 'general')
# Which section to read from: prefer dnsmasq if present (pfSense uses
# dnsmasq as primary DNS forwarder by default), else fall back to unbound.
pf_src = pf_dnsmasq if pf_dnsmasq is not None else pf_unbound
# --- General settings ---
def _pf_bool(src, tag):
"""Check pfSense boolean: True if tag exists (self-closing or with truthy text)."""
el = src.find(tag)
if el is None:
return False
return el.text is None or el.text in ('1', 'yes')
# Enable: pfSense <enable/> or <enable>1</enable> -> <enabled>1</enabled>
if _pf_bool(pf_src, 'enable'):
set_text(op_unbound, 'enabled', '1')
set_text(op_mvc_general, 'enabled', '1')
else:
set_text(op_unbound, 'enabled', '0')
set_text(op_mvc_general, 'enabled', '0')
# Port
port = safe_text(pf_src, 'port', '53')
set_text(op_unbound, 'port', port)
set_text(op_mvc_general, 'port', port)
# Interface: pfSense <interface>lan</interface> -> <active_interface>lan</active_interface>
iface = safe_text(pf_src, 'interface', '')
if iface:
set_text(op_unbound, 'active_interface', iface)
set_text(op_mvc_general, 'active_interface', iface)
# Register DHCP leases
if _pf_bool(pf_src, 'regdhcp'):
set_text(op_unbound, 'regdhcp', '1')
set_text(op_mvc_general, 'regdhcp', '1')
# Register DHCP static mappings
if _pf_bool(pf_src, 'regdhcpstatic'):
set_text(op_unbound, 'regdhcpstatic', '1')
set_text(op_mvc_general, 'regdhcpstatic', '1')
# Domain
domain = safe_text(pf_src, 'domain', '')
if domain:
set_text(op_unbound, 'domain', domain)
set_text(op_mvc_general, 'regdhcpdomain', domain)
# DNSSEC: default to on in OPNsense
set_text(op_unbound, 'dnssec', '1')
set_text(op_mvc_general, 'dnssec', '1')
# Local zone type: domainneeded -> deny (approximate mapping)
if _pf_bool(pf_src, 'domainneeded'):
set_text(op_unbound, 'local_zone_type', 'deny')
set_text(op_mvc_general, 'local_zone_type', 'deny')
else:
set_text(op_mvc_general, 'local_zone_type', 'transparent')
# Hide version/identity (OPNsense security defaults)
set_text(op_unbound, 'hideidentity', '1')
set_text(op_unbound, 'hideversion', '1')
# Custom options
custom = safe_text(pf_src, 'custom_options', '')
if custom:
ET.SubElement(op_unbound, 'custom_options').text = custom
# --- Host overrides ---
# pfSense dnsmasq stores hosts in <dnsmasq><hosts><host>
# Also check <unbound><hosts><host> if dnsmasq is absent.
op_hosts = ET.SubElement(op_unbound, 'hosts')
op_mvc_hosts = ET.SubElement(op_mvc, 'hosts')
op_mvc_aliases = ET.SubElement(op_mvc, 'aliases')
all_aliases = [] # list of (parent_uuid, hostname, domain)
for pf_host_src in [pf_dnsmasq, pf_unbound]:
if pf_host_src is None:
continue
pf_hosts = pf_host_src.find('hosts')
if pf_hosts is None:
continue
for pf_host in pf_hosts.findall('host'):
huuid, aliases = _convert_dns_host_override(pf_host, op_hosts, {})
if huuid is None:
continue
# Add to MVC as well
mvc_host = ET.SubElement(op_mvc_hosts, 'host')
mvc_host.set('uuid', huuid)
for child in op_hosts.findall(f'host[@uuid="{huuid}"]')[0]:
copy_element_into(child, mvc_host)
for a_hostname, a_domain in (aliases or []):
all_aliases.append((huuid, a_hostname, a_domain))
# Create MVC host alias entries referencing parent host UUIDs
for parent_uuid, a_hostname, a_domain in all_aliases:
auuid = _gen_uuid(f'unbound-hostalias-{parent_uuid}-{a_hostname}-{a_domain}')
op_alias = ET.SubElement(op_mvc_aliases, 'alias')
op_alias.set('uuid', auuid)
ET.SubElement(op_alias, 'enabled').text = '1'
if parent_uuid:
ET.SubElement(op_alias, 'host').text = parent_uuid
if a_hostname:
ET.SubElement(op_alias, 'hostname').text = a_hostname
if a_domain:
ET.SubElement(op_alias, 'domain').text = a_domain
# --- Forwarding (upstream DNS) ---
# In OPNsense, Unbound forwarding forwards all queries to upstream
# DNS servers. pfSense dnsmasq always forwards to upstream servers.
# Enable forwarding and add system DNS servers as dot entries.
op_mvc_forwarding = ET.SubElement(op_mvc, 'forwarding')
ET.SubElement(op_mvc_forwarding, 'enabled').text = '1'
# Collect upstream DNS servers from system section
upstream_dns = []
pf_sys = pf_root.find('system')
if pf_sys is not None:
for dns_tag in ['dns1', 'dns2', 'dns3', 'dns4']:
val = safe_text(pf_sys, dns_tag)
if val:
upstream_dns.append(val)
for dns in pf_sys.findall('dnsserver'):
if dns.text:
upstream_dns.append(dns.text)
# MVC dots section for forwarding entries
op_mvc_dots = ET.SubElement(op_mvc, 'dots')
# Add system DNS servers as upstream forwarders (domain = ".")
for idx, dns_ip in enumerate(upstream_dns):
duuid = _gen_uuid(f'unbound-upstream-{dns_ip}')
op_dot = ET.SubElement(op_mvc_dots, 'dot')
op_dot.set('uuid', duuid)
ET.SubElement(op_dot, 'enabled').text = '1'
ET.SubElement(op_dot, 'type').text = 'forward'
ET.SubElement(op_dot, 'domain').text = '.'
ET.SubElement(op_dot, 'server').text = dns_ip
ET.SubElement(op_dot, 'description').text = f'Upstream DNS {idx + 1}'
# --- Domain overrides ---
# Root-level <domainoverrides> (legacy format)
op_domain_overrides = ET.SubElement(op_unbound, 'domainoverrides')
for pf_do_src in [pf_dnsmasq, pf_unbound]:
if pf_do_src is None:
continue
pf_dos = pf_do_src.find('domainoverrides')
if pf_dos is None:
continue
for pf_entry in pf_dos.findall('entry'):
do_domain = safe_text(pf_entry, 'domain', '')
do_ip = safe_text(pf_entry, 'ip', '')
do_descr = safe_text(pf_entry, 'descr', '')
if not do_domain:
continue
douuid = _gen_uuid(f'unbound-domainoverride-{do_domain}-{do_ip}')
op_do = ET.SubElement(op_domain_overrides, 'domainoverride')
op_do.set('uuid', douuid)
ET.SubElement(op_do, 'enabled').text = '1'
ET.SubElement(op_do, 'domain').text = do_domain
if do_ip:
ET.SubElement(op_do, 'server').text = do_ip
if do_descr:
ET.SubElement(op_do, 'description').text = do_descr
# Also add as MVC dot entry for the domain override
douuid2 = _gen_uuid(f'unbound-domain-dot-{do_domain}-{do_ip}')
op_dot = ET.SubElement(op_mvc_dots, 'dot')
op_dot.set('uuid', douuid2)
ET.SubElement(op_dot, 'enabled').text = '1'
ET.SubElement(op_dot, 'type').text = 'forward'
ET.SubElement(op_dot, 'domain').text = do_domain
if do_ip:
ET.SubElement(op_dot, 'server').text = do_ip
if do_descr:
ET.SubElement(op_dot, 'description').text = do_descr
# --- Copy pfSense unbound config as-is for additional settings ---
# If pfSense had an <unbound> section, merge any remaining fields
# that weren't covered by the dnsmasq conversion above.
if pf_unbound is not None:
# Copy tags not already set
for pf_child in pf_unbound:
tag = pf_child.tag
if tag in ('enable', 'port', 'interface', 'regdhcp', 'regdhcpstatic',
'domain', 'hosts', 'domainoverrides', 'custom_options',
'dnssec', 'active_interface'):
continue
if op_unbound.find(tag) is None:
copy_element_into(pf_child, op_unbound)
def convert_snmp(pf_root, op_root):
"""Convert SNMP configuration."""
pf_snmp = pf_root.find('snmpd')
if pf_snmp is not None:
op_snmp = ET.SubElement(op_root, 'snmpd')
for child in pf_snmp:
copy_element_into(child, op_snmp)
def convert_schedules(pf_root, op_root):
"""Convert firewall schedules."""
pf_scheds = pf_root.find('schedules')
if pf_scheds is not None:
op_scheds = ET.SubElement(op_root, 'schedules')
for pf_sched in pf_scheds.findall('schedule'):
copy_element_into(pf_sched, op_scheds)
def convert_virtual_ips(pf_root, op_root):
"""Convert virtual IPs."""
pf_vips = pf_root.find('virtualip')
if pf_vips is not None:
op_vips = ET.SubElement(op_root, 'virtualip')
for pf_vip in pf_vips.findall('vip'):
copy_element_into(pf_vip, op_vips)
def convert_ifgroups(pf_root, op_root):
"""Convert interface groups."""
pf_groups = pf_root.find('ifgroups')
if pf_groups is not None:
op_groups = ET.SubElement(op_root, 'ifgroups')
for pf_group in pf_groups.findall('ifgroupentry'):
copy_element_into(pf_group, op_groups)
def convert_sysctl(pf_root, op_root):
"""Convert sysctl tunables."""
pf_sysctl = pf_root.find('sysctl')
if pf_sysctl is None:
return
op_sysctl = ET.SubElement(op_root, 'sysctl')
for pf_item in pf_sysctl.findall('item'):
copy_element_into(pf_item, op_sysctl)
def convert_syslog(pf_root, op_root):
"""Convert syslog configuration."""
pf_syslog = pf_root.find('syslog')
if pf_syslog is not None:
op_syslog = ET.SubElement(op_root, 'syslog')
for child in pf_syslog:
copy_element_into(child, op_syslog)
def convert_rrd(pf_root, op_root):
"""Convert RRD graphing configuration."""
pf_rrd = pf_root.find('rrd')
if pf_rrd is not None:
op_rrd = ET.SubElement(op_root, 'rrd')
for child in pf_rrd:
copy_element_into(child, op_rrd)
def convert_dyndns(pf_root, op_root):
"""Convert Dynamic DNS configuration."""
pf_ddns = pf_root.find('dyndnses')
if pf_ddns is not None:
op_ddns = ET.SubElement(op_root, 'dyndnses')
for pf_entry in pf_ddns.findall('dyndns'):
copy_element_into(pf_entry, op_ddns)
def convert_captive_portal(pf_root, op_root):
"""Convert Captive Portal configuration."""
pf_cp = pf_root.find('captiveportal')
if pf_cp is not None:
op_cp = ET.SubElement(op_root, 'captiveportal')
for pf_zone in pf_cp.findall('zone'):
copy_element_into(pf_zone, op_cp)
def convert_load_balancer(pf_root, op_root):
"""Convert HAProxy/relayd load balancer config."""
for section in ['lbpool', 'lbvirtual', 'lbserver', 'haproxy']:
pf_section = pf_root.find(section)
if pf_section is not None:
copy_element_into(pf_section, op_root)
def convert_cron(pf_root, op_root):
"""Convert cron jobs."""
pf_cron = pf_root.find('cron')
if pf_cron is not None:
op_cron = ET.SubElement(op_root, 'cron')
for pf_item in pf_cron.findall('item'):
copy_element_into(pf_item, op_cron)
def convert_dhcrelay(pf_root, op_root):
"""Convert DHCP relay config."""
for section in ['dhcrelay', 'dhcrelay6']:
pf_section = pf_root.find(section)
if pf_section is not None:
copy_element_into(pf_section, op_root)
def convert_ppps(pf_root, op_root):
"""Convert PPPoE/PPTP/L2TP interface configs."""
pf_ppps = pf_root.find('ppps')
if pf_ppps is not None:
op_ppps = ET.SubElement(op_root, 'ppps')
for pf_ppp in pf_ppps.findall('ppp'):
copy_element_into(pf_ppp, op_ppps)
def convert_igmpproxy(pf_root, op_root):
"""Convert IGMP proxy config."""
pf_igmp = pf_root.find('igmpproxy')
if pf_igmp is not None:
op_igmp = ET.SubElement(op_root, 'igmpproxy')
for child in pf_igmp:
copy_element_into(child, op_igmp)
def convert_ntpd(pf_root, op_root):
"""Convert NTPd configuration."""
pf_ntpd = pf_root.find('ntpd')
if pf_ntpd is not None:
op_ntpd = ET.SubElement(op_root, 'ntpd')
for child in pf_ntpd:
copy_element_into(child, op_ntpd)
def convert_sshdata(pf_root, op_root):
"""Convert SSH host keys."""
pf_sshdata = pf_root.find('sshdata')
if pf_sshdata is not None:
op_sshdata = ET.SubElement(op_root, 'sshdata')
for pf_keyfile in pf_sshdata.findall('sshkeyfile'):
copy_element_into(pf_keyfile, op_sshdata)
def convert_bridge(pf_root, op_root):
"""Convert bridge configuration."""
pf_bridge = pf_root.find('bridge')
if pf_bridge is not None:
op_bridge = ET.SubElement(op_root, 'bridge')
for pf_bridge_entry in pf_bridge.findall('bridge'):
copy_element_into(pf_bridge_entry, op_bridge)
def convert(pf_config_path, output_path=None, interface_map=None):
"""Main conversion function."""
global IFACE_MAP
if interface_map:
IFACE_MAP.update(interface_map)
tree = ET.parse(pf_config_path)
pf_root = tree.getroot()
op_root = ET.Element('opnsense')
# Version
ET.SubElement(op_root, 'version').text = '26.1'
# Convert sections
convert_system(pf_root, op_root)
convert_interfaces(pf_root, op_root)
convert_vlans(pf_root, op_root)
convert_dhcpd(pf_root, op_root)
convert_kea_dhcp(pf_root, op_root)
convert_aliases(pf_root, op_root)
convert_nat(pf_root, op_root)
convert_filter(pf_root, op_root)
convert_certificates(pf_root, op_root)
convert_openvpn(pf_root, op_root)
convert_ipsec(pf_root, op_root)
convert_wireguard(pf_root, op_root)
convert_static_routes(pf_root, op_root)
convert_gateways(pf_root, op_root)
convert_dns(pf_root, op_root)
convert_snmp(pf_root, op_root)
convert_schedules(pf_root, op_root)
convert_virtual_ips(pf_root, op_root)
convert_ifgroups(pf_root, op_root)
convert_sysctl(pf_root, op_root)
convert_syslog(pf_root, op_root)
convert_rrd(pf_root, op_root)
convert_dyndns(pf_root, op_root)
convert_captive_portal(pf_root, op_root)
convert_load_balancer(pf_root, op_root)
convert_cron(pf_root, op_root)
convert_dhcrelay(pf_root, op_root)
convert_ppps(pf_root, op_root)
convert_igmpproxy(pf_root, op_root)
convert_ntpd(pf_root, op_root)
convert_sshdata(pf_root, op_root)
convert_bridge(pf_root, op_root)
# Copy unknown/unhandled top-level sections as-is for maximum compatibility
# List all sections that our converter functions create
handled_sections = {
'pfsense', 'system', 'interfaces', 'vlans', 'dhcpd', 'dnsmasq',
'snmpd', 'diag', 'bridge', 'syslog', 'nat', 'filter',
'staticroutes', 'gateways', 'aliases', 'ca', 'cert', 'certca',
'crl', 'openvpn', 'ipsec', 'wireguard', 'unbound', 'ifgroups',
'virtualip', 'dhcpbackend', 'qinqs', 'schedules', 'sshdata',
'rrd', 'dyndnses', 'captiveportal', 'cron', 'dhcrelay',
'dhcrelay6', 'ppps', 'igmpproxy', 'ntpd', 'OPNsense',
}
for pf_child in pf_root:
tag = pf_child.tag
# Skip the pfSense root element name
if tag == 'pfsense':
continue
# Skip version - we set our own OPNsense version
if tag == 'version':
continue
# Skip sections we already handled
if tag in handled_sections:
continue
# Also skip if the element already exists in output
if op_root.find(tag) is not None:
continue
# Handle installedpackages specially: copy but remove wireguard child since we handled it
if tag == 'installedpackages':
op_copy = copy_element_into(pf_child, op_root)
if op_copy is not None:
wg_copy = op_copy.find('wireguard')
if wg_copy is not None:
op_copy.remove(wg_copy)
continue
# Copy unhandled sections for maximum compatibility
copy_element_into(pf_child, op_root)
# Apply interface name mapping to any remaining references in the output tree
_post_process_iface_map(op_root)
# Set default root password
_set_root_password(op_root)
# Add OPNsense-specific defaults
_add_opnsense_defaults(op_root)
# Pretty-print
indent(op_root)
# Add XML declaration
result = '<?xml version="1.0"?>\n' + ET.tostring(op_root, encoding='unicode')
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(result)
print(f"Converted config written to: {output_path}", file=sys.stderr)
else:
print(result)
def _set_root_password(op_root):
"""Set the admin/root password to 'opnsense' as a bcrypt hash."""
op_sys = op_root.find('system')
if op_sys is None:
return
# OPNsense expects the admin user to be named 'root' (not 'admin' as in pfSense)
for user in op_sys.findall('user'):
name = user.find('name')
uid = user.find('uid')
is_admin = (name is not None and name.text == 'admin') or (uid is not None and uid.text == '0')
if is_admin:
# Rename admin -> root for OPNsense compatibility
if name is not None and name.text == 'admin':
name.text = 'root'
# Replace password with bcrypt hash of 'opnsense'
pw = user.find('password')
if pw is not None:
pw.text = '$2b$10$ebdSKP5HpJ1eOmgnTrfmy.pKBjDIrrzSTIRPNZd8EGmm/EU6hIwAy'
# Also set bcrypt-hash field that OPNsense may use
set_text(user, 'bcrypt-hash', '$2b$10$ebdSKP5HpJ1eOmgnTrfmy.pKBjDIrrzSTIRPNZd8EGmm/EU6hIwAy')
# Remove md5-hash as OPNsense uses bcrypt
md5 = user.find('md5-hash')
if md5 is not None:
user.remove(md5)
def _post_process_iface_map(op_root):
"""Post-process the output tree to apply interface name mapping
to any tags that may reference interface names (e.g. tun_wg0 -> wg0).
This catches any references in sections that were copied passthrough.
"""
if not IFACE_MAP:
return
# Tags that can contain interface device names or references
iface_tags = {'if', 'interface', 'interfaces', 'tun', 'name', 'bridgeif', 'member', 'tunnel', 'pif'}
for elem in op_root.iter():
if elem.tag in iface_tags and elem.text:
elem.text = _map_iface_in_text(elem.text)
# Also check CDATA-like description fields that might reference interface names
if elem.tag == 'description' and elem.text and any(
old in elem.text for old in IFACE_MAP
):
elem.text = _map_iface_in_text(elem.text)
def _add_opnsense_defaults(op_root):
"""Add OPNsense-specific default elements that may be needed."""
# OPNsense uses <theme>opnsense</theme> (text content, no child elements).
# pfSense may use <theme>pfsense</theme>.
# ControllerBase.php casts to string, so child elements would yield "" → broken paths.
op_theme = op_root.find('theme')
if op_theme is not None:
op_root.remove(op_theme)
theme = ET.SubElement(op_root, 'theme')
theme.text = 'opnsense'
# Already set in convert()
# Set lastchange if missing
if op_root.find('lastchange') is None:
ET.SubElement(op_root, 'lastchange')
# Add some OPNsense-specific system defaults
op_sys = op_root.find('system')
if op_sys is not None:
# Ensure webgui defaults
if op_sys.find('webgui') is None:
wg = ET.SubElement(op_sys, 'webgui')
set_text(wg, 'protocol', 'https')
# Add OPNsense-specific hn_* defaults
for hn_opt in ['hn_altq_enable', 'hn_mac_filter']:
val = safe_text(op_sys, hn_opt)
if not val:
ET.SubElement(op_sys, hn_opt)
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <pfsense-config.xml> [output.xml] [--interface-map old=new ...]", file=sys.stderr)
print(f"", file=sys.stderr)
print(f"Examples:", file=sys.stderr)
print(f" {sys.argv[0]} config-pfsense.xml config-opnsense.xml", file=sys.stderr)
print(f" {sys.argv[0]} config-pfsense.xml --interface-map tun_wg0=wg0 tun_wg1=wg1", file=sys.stderr)
print(f"", file=sys.stderr)
print(f"If output.xml is omitted, the converted config is printed to stdout.", file=sys.stderr)
sys.exit(1)
input_path = sys.argv[1]
output_path = None
interface_map = {}
# Default mapping: tun_wgX -> wgX
interface_map['tun_wg0'] = 'wg0'
interface_map['tun_wg1'] = 'wg1'
interface_map['tun_wg2'] = 'wg2'
interface_map['tun_wg3'] = 'wg3'
interface_map['tun_wg4'] = 'wg4'
interface_map['tun_wg5'] = 'wg5'
i = 2
while i < len(sys.argv):
arg = sys.argv[i]
if arg == '--interface-map' or arg == '-m':
i += 1
if i >= len(sys.argv):
print("Error: --interface-map requires a mapping", file=sys.stderr)
sys.exit(1)
mapping = sys.argv[i]
if '=' in mapping:
old, new = mapping.split('=', 1)
interface_map[old] = new
else:
print(f"Error: invalid mapping '{mapping}'. Use old=new format.", file=sys.stderr)
sys.exit(1)
elif output_path is None and not arg.startswith('-'):
output_path = arg
else:
print(f"Error: unknown argument '{arg}'", file=sys.stderr)
sys.exit(1)
i += 1
convert(input_path, output_path, interface_map=interface_map)