diff --git a/README.md b/README.md index bd17cb9..8465976 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,8 @@ # pf2opnsense +This tool converts a pfsense 2 opnsense config, including VLAN's and Wireguard tunnels. +It allows mapping of interface names to take migrations easier. + +Warning: this has ben generated by a (local) AI, though it took almost a day of itterations and testing on an actual firewall to iron out the problems. + +It has only been tested against our own config diff --git a/pfsense2opnsense.py b/pfsense2opnsense.py new file mode 100644 index 0000000..5e65753 --- /dev/null +++ b/pfsense2opnsense.py @@ -0,0 +1,1364 @@ +#!/usr/bin/env python3 +""" +pfSense to OPNsense configuration converter. + +Reads a pfSense config.xml and converts it to an OPNsense-compatible config.xml. + +Usage: + python pfsense2opnsense.py config-pfsense.xml > config-opnsense.xml + +Sections converted: + - System (hostname, domain, DNS, timezone, webgui, SSH, users) + - Interfaces (WAN, LAN, OPTx) and VLANs + - DHCP server config and static mappings + - Firewall rules (filter) + - NAT rules (port forwarding + outbound NAT) + - Aliases + - Certificates and CAs + - OpenVPN servers/clients + - WireGuard (core since 24.1, no plugin needed on 26.x) + - IPsec + - Static routes + - DNS (dnsmasq/unbound) with domain overrides and host overrides + - SNMP + - Schedules + - Interface groups + - Virtual IPs + - CRLs + - System tunables (sysctl) +""" + +import xml.etree.ElementTree as ET +import sys +import uuid +import ipaddress +import re + +# Interface name mapping (e.g. tun_wg0 -> wg0 for OPNsense) +# Can be overridden via --interface-map CLI arg +IFACE_MAP = {} + +# OPNsense name validation: alphanumeric, dot, underscore, dash, 1-64 chars +OPN_NAME_RE = re.compile(r'^[0-9a-zA-Z._\-]{1,64}$') + +def _sanitize_name(name): + """Replace characters not allowed in OPNsense names with underscore.""" + return re.sub(r'[^0-9a-zA-Z._\-]', '_', name) + + +def indent(elem, level=0): + """Pretty-print XML with proper indentation.""" + i = "\n" + level * "\t" + if len(elem): + if not elem.text or not elem.text.strip(): + elem.text = i + "\t" + if not elem.tail or not elem.tail.strip(): + elem.tail = i + for child in elem: + indent(child, level + 1) + if not child.tail or not child.tail.strip(): + child.tail = i + else: + if level and (not elem.tail or not elem.tail.strip()): + elem.tail = i + + +def safe_text(elem, path, default=""): + """Safely get text from a sub-element.""" + if elem is None: + return default + found = elem.find(path) + if found is not None and found.text: + return found.text + return default + + +def set_text(parent, tag, value, cdata=False): + """Set text in a sub-element, creating it if needed.""" + if not value and value != 0: + return + child = parent.find(tag) + if child is None: + child = ET.SubElement(parent, tag) + if cdata: + child.text = value + else: + child.text = str(value) + + +def copy_element_into(src_elem, dst_parent, tag_map=None, skip_tags=None): + """ + Copy an element (with all children) from src into dst_parent with optional tag renaming. + Returns the new element. + """ + if tag_map is None: + tag_map = {} + if skip_tags is None: + skip_tags = set() + + tag = tag_map.get(src_elem.tag, src_elem.tag) + if tag in skip_tags: + return None + + new = ET.SubElement(dst_parent, tag) + if src_elem.text and src_elem.text.strip(): + new.text = src_elem.text + new.tail = src_elem.tail + + for attr_name, attr_val in src_elem.attrib.items(): + new.set(attr_name, attr_val) + + for child in src_elem: + copy_element_into(child, new, tag_map, skip_tags) + return new + + +def convert_system(pf_root, op_root): + """Convert system section.""" + pf_sys = pf_root.find('system') + if pf_sys is None: + return + + op_sys = ET.SubElement(op_root, 'system') + + # Hostname and domain + set_text(op_sys, 'hostname', safe_text(pf_sys, 'hostname')) + set_text(op_sys, 'domain', safe_text(pf_sys, 'domain')) + set_text(op_sys, 'timezone', safe_text(pf_sys, 'timezone')) + set_text(op_sys, 'language', safe_text(pf_sys, 'language')) + + # NTP + set_text(op_sys, 'timeservers', safe_text(pf_sys, 'timeservers')) + set_text(op_sys, 'time-update-interval', safe_text(pf_sys, 'time-update-interval')) + + # DNS servers (use append_element for multi-value tags) + for dns in pf_sys.findall('dnsserver'): + if dns.text: + child = ET.SubElement(op_sys, 'dnsserver') + child.text = dns.text + + # DNS gateway settings + for gw in ['dns1gw', 'dns2gw', 'dns3gw', 'dns4gw']: + val = safe_text(pf_sys, gw) + if val: + set_text(op_sys, gw, val) + + # WebGUI — skip pfSense-specific elements that break OPNsense (e.g. pfSense.css) + pf_gui = pf_sys.find('webgui') + if pf_gui is not None: + op_gui = ET.SubElement(op_sys, 'webgui') + pf_skip = {'loginautocomplete', 'loginshowhost', 'webguicss', 'logincss', + 'dashboardenabled', 'msscheck', 'ipv6', 'sha256'} + for child in pf_gui: + if child.tag not in pf_skip: + copy_element_into(child, op_gui) + + # SSH + pf_ssh = pf_sys.find('ssh') + if pf_ssh is not None: + op_ssh = ET.SubElement(op_sys, 'ssh') + pf_ssh_enable = pf_ssh.find('enable') + if pf_ssh_enable is not None: + copy_element_into(pf_ssh_enable, op_ssh) + + # Optimization + set_text(op_sys, 'optimization', safe_text(pf_sys, 'optimization')) + + # Maximum table entries + set_text(op_sys, 'maximumtableentries', safe_text(pf_sys, 'maximumtableentries')) + set_text(op_sys, 'maximumstates', safe_text(pf_sys, 'maximumstates')) + set_text(op_sys, 'maximumfrags', safe_text(pf_sys, 'maximumfrags')) + set_text(op_sys, 'maximumtables', safe_text(pf_sys, 'maximumtables')) + + # Disable offloading + for offload in ['disablesegmentationoffloading', 'disablelargereceiveoffloading']: + val = safe_text(pf_sys, offload) + if val: + set_text(op_sys, offload, val) + + # Proxy + set_text(op_sys, 'proxyuser', safe_text(pf_sys, 'proxyuser')) + set_text(op_sys, 'proxypass', safe_text(pf_sys, 'proxypass')) + + # Early shellcmd + for cmd in pf_sys.findall('earlyshellcmd'): + set_text(op_sys, 'earlyshellcmd', cmd.text) + + # Bogons + pf_bogons = pf_sys.find('bogons') + if pf_bogons is not None: + op_bogons = ET.SubElement(op_sys, 'bogons') + pf_interval = pf_bogons.find('interval') + if pf_interval is not None: + copy_element_into(pf_interval, op_bogons) + + # Enable serial + set_text(op_sys, 'enableserial', safe_text(pf_sys, 'enableserial')) + + # Disable NAT reflection + set_text(op_sys, 'disablenatreflection', safe_text(pf_sys, 'disablenatreflection')) + + # Cryptographic hardware + set_text(op_sys, 'crypto_hardware', safe_text(pf_sys, 'crypto_hardware')) + + # Already run config upgrade + set_text(op_sys, 'already_run_config_upgrade', safe_text(pf_sys, 'already_run_config_upgrade')) + + # Users and groups + for group in pf_sys.findall('group'): + copy_element_into(group, op_sys) + + for user in pf_sys.findall('user'): + copy_element_into(user, op_sys) + + # nextuid/nextgid + set_text(op_sys, 'nextuid', safe_text(pf_sys, 'nextuid')) + set_text(op_sys, 'nextgid', safe_text(pf_sys, 'nextgid')) + + +def convert_interfaces(pf_root, op_root): + """Convert interface configuration.""" + pf_ifaces = pf_root.find('interfaces') + if pf_ifaces is None: + return + + op_ifaces = ET.SubElement(op_root, 'interfaces') + + for iface_tag in ['wan', 'lan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5', + 'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11', + 'opt12', 'opt13', 'opt14', 'opt15']: + pf_iface = pf_ifaces.find(iface_tag) + if pf_iface is None: + continue + + op_iface = ET.SubElement(op_ifaces, iface_tag) + + # Copy interface configuration elements + for tag in ['enable', 'if', 'descr', 'spoofmac', 'ipaddr', 'subnet', + 'blockpriv', 'blockbogons', 'gateway', 'mtu', 'media', + 'mediaopt', 'adv']: + elem = pf_iface.find(tag) + if elem is not None: + copy_element_into(elem, op_iface) + + # OPNsense expects 1 for enabled interfaces, + # not the pfSense-style (empty self-closing tag) + enable_elem = op_iface.find('enable') + if enable_elem is not None and not enable_elem.text: + enable_elem.text = '1' + + # Apply interface name mapping to (e.g. tun_wg0 -> wg0) + iface_elem = op_iface.find('if') + if iface_elem is not None and iface_elem.text: + iface_elem.text = _map_iface_in_text(iface_elem.text) + + # Lock interfaces to prevent OPNsense from auto-reassigning + # them during boot (mismatch detection). Without this, OPNsense + # may overwrite WAN/LAN interface assignments on every boot. + lock_elem = op_iface.find('lock') + if lock_elem is None: + ET.SubElement(op_iface, 'lock').text = '1' + + +def convert_vlans(pf_root, op_root): + """Convert VLAN configuration.""" + pf_vlans = pf_root.find('vlans') + if pf_vlans is None: + return + + op_vlans = ET.SubElement(op_root, 'vlans') + for pf_vlan in pf_vlans.findall('vlan'): + copy_element_into(pf_vlan, op_vlans) + + +def convert_dhcpd(pf_root, op_root): + """Convert DHCP server configuration.""" + pf_dhcpd = pf_root.find('dhcpd') + if pf_dhcpd is None: + return + + op_dhcpd = ET.SubElement(op_root, 'dhcpd') + + for tag in ['lan', 'wan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5', + 'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11', + 'opt12', 'opt13', 'opt14', 'opt15']: + pf_scope = pf_dhcpd.find(tag) + if pf_scope is None: + continue + + op_scope = ET.SubElement(op_dhcpd, tag) + + # Copy simple config elements + for simple in ['enable', 'range', 'defaultleasetime', 'maxleasetime', + 'netmask', 'gateway', 'domain', 'domainsearchlist', + 'dnsserver', 'ntpserver', 'tfpt', 'ldap', 'filename', + 'rootpath', 'nextserver', 'failover_peerip', + 'mac_allow', 'mac_deny', 'ddnsdomain', 'ddnsdomainprimary', + 'ddnsdomainkeyname', 'ddnsdomainkey', 'ddnsdomainkeyalgorithm', + 'ddnsclientupdates', 'ddnsdomainsecondary', + 'filename32', 'filename64', 'filename32arm', 'filename64arm', + 'uefihttpboot', 'numberoptions']: + for elem in pf_scope.findall(simple): + copy_element_into(elem, op_scope) + + # Static mappings with enhanced field mapping + for pf_map in pf_scope.findall('staticmap'): + op_map = ET.SubElement(op_scope, 'staticmap') + for tag_name in ['mac', 'cid', 'ipaddr', 'hostname', 'descr', + 'filename', 'rootpath', 'defaultleasetime', + 'maxleasetime', 'gateway', 'domain', + 'domainsearchlist', 'ddnsdomain', + 'ddnsdomainprimary', 'ddnsdomainsecondary', + 'ddnsdomainkeyname', 'ddnsdomainkeyalgorithm', + 'ddnsdomainkey', 'tftp', 'ldap', 'nextserver', + 'filename32', 'filename64', 'filename32arm', + 'filename64arm', 'uefihttpboot', 'numberoptions', + 'earlydnsregpolicy']: + elem = pf_map.find(tag_name) + if elem is not None and elem.text: + copy_element_into(elem, op_map) + + # Pool (used in some OPNsense versions) + pf_pool = pf_scope.find('pool') + if pf_pool is not None: + copy_element_into(pf_pool, op_scope) + + # Normalize to 1 + enable_elem = op_scope.find('enable') + if enable_elem is not None and not enable_elem.text: + enable_elem.text = '1' + + # DHCP backend (preserve pfSense value: kea or isc) + set_text(op_dhcpd, 'backend', safe_text(pf_root, 'dhcpbackend', 'kea')) + + +def convert_kea_dhcp(pf_root, op_root): + """Convert Kea DHCP configuration. + + pfSense stores DHCP config in ISC-format section even when + using Kea backend. OPNsense expects Kea config under + with subnets and reservations in OPNsense MVC model format. + """ + pf_dhcpd = pf_root.find('dhcpd') + if pf_dhcpd is None: + return + + # Build interface info map: scope tag -> {if, ip, subnet, descr} + scope_to_iface = {} + pf_ifaces = pf_root.find('interfaces') + if pf_ifaces is not None: + for tag in ['wan', 'lan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5', + 'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11', + 'opt12', 'opt13', 'opt14', 'opt15']: + iface = pf_ifaces.find(tag) + if iface is None: + continue + if_el = iface.find('if') + ip_el = iface.find('ipaddr') + sn_el = iface.find('subnet') + descr_el = iface.find('descr') + scope_to_iface[tag] = { + 'if': if_el.text if if_el is not None and if_el.text else '', + 'ip': ip_el.text if ip_el is not None and ip_el.text else '', + 'subnet': sn_el.text if sn_el is not None and sn_el.text else '', + 'descr': descr_el.text if descr_el is not None and descr_el.text else tag.upper(), + } + + # Find or create for OPNsense MVC mount elements + op_opnsense = op_root.find('OPNsense') + if op_opnsense is None: + op_opnsense = ET.SubElement(op_root, 'OPNsense') + + # Create structure + op_kea = ET.SubElement(op_opnsense, 'Kea') + op_dhcp4 = ET.SubElement(op_kea, 'dhcp4') + + # General settings + op_general = ET.SubElement(op_dhcp4, 'general') + set_text(op_general, 'enabled', '1') + set_text(op_general, 'fwrules', '1') + set_text(op_general, 'valid_lifetime', '7200') + set_text(op_general, 'dhcp_socket_type', 'raw') + + # Collect unique interfaces for the interfaces field + dhcp_ifaces = [] + + # Subnets container + op_subnets = ET.SubElement(op_dhcp4, 'subnets') + + # Build subnets from enabled DHCP scopes + scopes with static maps + subnet_uuids = {} # scope tag -> subnet UUID + for tag in ['lan', 'wan', 'opt1', 'opt2', 'opt3', 'opt4', 'opt5', + 'opt6', 'opt7', 'opt8', 'opt9', 'opt10', 'opt11', + 'opt12', 'opt13', 'opt14', 'opt15']: + pf_scope = pf_dhcpd.find(tag) + if pf_scope is None: + continue + + enable_el = pf_scope.find('enable') + has_staticmaps = pf_scope.find('staticmap') is not None + pf_range = pf_scope.find('range') + has_range = pf_range is not None and (pf_range.find('from') is not None or pf_range.text is not None) + + # Only create subnet if scope is enabled or has static maps/ranges + if enable_el is None and not has_staticmaps and not has_range: + continue + + # Get interface info + iface_info = scope_to_iface.get(tag, {}) + ifname = iface_info.get('if', '') + ipaddr = iface_info.get('ip', '') + subnet_mask = iface_info.get('subnet', '') + + # Kea's interfaces field expects OPNsense interface description + # names (lan, opt2, opt3, ...) not device names (igc1, igc1.10). + # OPNsense maps these to device names internally. + if tag not in dhcp_ifaces: + dhcp_ifaces.append(tag) + + # Compute subnet CIDR + subnet_cidr = '' + if ipaddr and subnet_mask: + try: + ip_intf = ipaddress.IPv4Interface('%s/%s' % (ipaddr, subnet_mask)) + subnet_cidr = str(ip_intf.network) + except Exception: + pass + + # Skip WAN-type interfaces that aren't real subnets + if ipaddr == 'pppoe' or not subnet_cidr: + continue + + # Generate subnet UUID + suuid = _gen_uuid('dhcp-subnet-' + tag) + subnet_uuids[tag] = suuid + + # Create subnet4 + op_subnet = ET.SubElement(op_subnets, 'subnet4') + op_subnet.set('uuid', suuid) + set_text(op_subnet, 'subnet', subnet_cidr) + set_text(op_subnet, 'option_data_autocollect', '0') + set_text(op_subnet, 'description', iface_info.get('descr', tag.upper())) + + # Option data + op_od = ET.SubElement(op_subnet, 'option_data') + # Gateway -> routers. When no explicit gateway in dhcpd scope, + # use the interface IP (it acts as the subnet's default gateway). + gw = pf_scope.find('gateway') + if gw is not None and gw.text: + set_text(op_od, 'routers', gw.text) + elif ipaddr: + set_text(op_od, 'routers', ipaddr) + # DNS servers + dns_servers = [d.text for d in pf_scope.findall('dnsserver') if d.text] + if dns_servers: + set_text(op_od, 'domain_name_servers', ','.join(dns_servers)) + # Domain + domain = pf_scope.find('domain') + if domain is not None and domain.text: + set_text(op_od, 'domain_name', domain.text) + # Domain search + dsl = pf_scope.find('domainsearchlist') + if dsl is not None and dsl.text: + set_text(op_od, 'domain_search', dsl.text) + # NTP + ntp = pf_scope.find('ntpserver') + if ntp is not None and ntp.text: + set_text(op_od, 'ntp_servers', ntp.text) + # Next server (PXE) + ns = pf_scope.find('nextserver') + if ns is not None and ns.text: + set_text(op_subnet, 'next_server', ns.text) + + # Pools: convert pfSense XY to "X-Y" + if pf_range is not None: + from_elem = pf_range.find('from') + to_elem = pf_range.find('to') + from_ip = from_elem.text if from_elem is not None else None + to_ip = to_elem.text if to_elem is not None else None + if from_ip and to_ip: + set_text(op_subnet, 'pools', '%s-%s' % (from_ip, to_ip)) + + # Set interfaces list on general + if dhcp_ifaces: + set_text(op_general, 'interfaces', ','.join(dhcp_ifaces)) + + # Reservations container + op_reservations = ET.SubElement(op_dhcp4, 'reservations') + + # Build reservations from static maps across all scopes + seen_ips_per_subnet = {} # suuid -> set of IPs already reserved + for tag, suuid in subnet_uuids.items(): + pf_scope = pf_dhcpd.find(tag) + if pf_scope is None: + continue + + if suuid not in seen_ips_per_subnet: + seen_ips_per_subnet[suuid] = set() + + for pf_sm in pf_scope.findall('staticmap'): + mac = pf_sm.findtext('mac', '') + ipaddr = pf_sm.findtext('ipaddr', '') + hostname = pf_sm.findtext('hostname', '') + descr = pf_sm.findtext('descr', '') + + if not mac and not ipaddr: + continue + + # Deduplicate: skip same IP already reserved in this subnet + if ipaddr: + if ipaddr in seen_ips_per_subnet[suuid]: + continue + seen_ips_per_subnet[suuid].add(ipaddr) + + # Deterministic UUID from MAC or IP + seed = mac or ipaddr + ruuid = _gen_uuid('dhcp-reservation-' + seed) + + op_res = ET.SubElement(op_reservations, 'reservation') + op_res.set('uuid', ruuid) + set_text(op_res, 'subnet', suuid) + if ipaddr: + set_text(op_res, 'ip_address', ipaddr) + if mac: + set_text(op_res, 'hw_address', mac) + if hostname: + set_text(op_res, 'hostname', hostname) + if descr: + set_text(op_res, 'description', descr) + # Empty option_data for reservation + ET.SubElement(op_res, 'option_data') + + +def convert_aliases(pf_root, op_root): + """Convert firewall aliases.""" + pf_aliases = pf_root.find('aliases') + if pf_aliases is None: + return + + # Check if there's an section or individual entries + if pf_aliases.find('alias') is not None: + op_aliases = ET.SubElement(op_root, 'aliases') + for pf_alias in pf_aliases.findall('alias'): + copy_element_into(pf_alias, op_aliases) + + +def convert_nat(pf_root, op_root): + """Convert NAT rules (port forwarding + outbound NAT).""" + pf_nat = pf_root.find('nat') + if pf_nat is None: + return + + op_nat = ET.SubElement(op_root, 'nat') + + # Port forwarding rules + for pf_rule in pf_nat.findall('rule'): + op_rule = ET.SubElement(op_nat, 'rule') + + for child_tag in ['source', 'destination', 'ipprotocol', 'protocol', + 'target', 'local-port', 'interface', 'descr', + 'associated-rule-id', 'disabled', 'log', 'natreflection', + 'nordr', 'poolopts', 'source_hash_key', + 'allow', 'max', 'max-src-nodes', 'max-src-conn', + 'max-src-states', 'statetimeout', 'statetype']: + for elem in pf_rule.findall(child_tag): + copy_element_into(elem, op_rule) + + # Apply interface name mapping + iface_elem = op_rule.find('interface') + if iface_elem is not None and iface_elem.text: + iface_elem.text = _map_iface_in_text(iface_elem.text) + + # updated/created timestamps if present + for meta in ['updated', 'created']: + pf_meta = pf_rule.find(meta) + if pf_meta is not None: + copy_element_into(pf_meta, op_rule) + + # Outbound NAT + pf_outbound = pf_nat.find('outbound') + if pf_outbound is not None: + op_outbound = ET.SubElement(op_nat, 'outbound') + copy_element_into(pf_outbound.find('mode'), op_outbound) + + for pf_orule in pf_outbound.findall('rule'): + op_orule = ET.SubElement(op_outbound, 'rule') + for child_tag in ['source', 'sourceport', 'destination', 'descr', + 'target', 'interface', 'poolopts', + 'source_hash_key', 'staticnatport', 'ipprotocol', + 'protocol', 'disabled', 'target_subnet', + 'nonat', 'log', 'mirror']: + for elem in pf_orule.findall(child_tag): + copy_element_into(elem, op_orule) + + # Apply interface name mapping + iface_elem = op_orule.find('interface') + if iface_elem is not None and iface_elem.text: + iface_elem.text = _map_iface_in_text(iface_elem.text) + + for meta in ['created', 'updated']: + pf_meta = pf_orule.find(meta) + if pf_meta is not None: + copy_element_into(pf_meta, op_orule) + + # 1:1 NAT + pf_onetoone = pf_nat.find('onetoone') + if pf_onetoone is not None: + op_onetoone = ET.SubElement(op_nat, 'onetoone') + for pf_rule in pf_onetoone.findall('rule'): + copy_element_into(pf_rule, op_onetoone) + + # NAT separators + pf_sep = pf_nat.find('separator') + if pf_sep is not None: + copy_element_into(pf_sep, op_nat) + + +def convert_filter(pf_root, op_root): + """Convert firewall rules.""" + pf_filter = pf_root.find('filter') + if pf_filter is None: + return + + op_filter = ET.SubElement(op_root, 'filter') + + for pf_rule in pf_filter.findall('rule'): + op_rule = ET.SubElement(op_filter, 'rule') + + for child_tag in ['id', 'tracker', 'type', 'interface', 'ipprotocol', + 'protocol', 'icmptype', 'source', 'destination', + 'descr', 'tag', 'tagged', 'max', 'max-src-nodes', + 'max-src-conn', 'max-src-states', 'statetimeout', + 'statetype', 'os', 'direction', 'log', + 'disabled', 'gateway', 'bridgeto', 'srcmac', + 'dstmac', 'allowopts', 'pflow']: + for elem in pf_rule.findall(child_tag): + copy_element_into(elem, op_rule) + + # Apply interface name mapping to the tag + iface_elem = op_rule.find('interface') + if iface_elem is not None and iface_elem.text: + iface_elem.text = _map_iface_in_text(iface_elem.text) + + for meta in ['updated', 'created']: + pf_meta = pf_rule.find(meta) + if pf_meta is not None: + copy_element_into(pf_meta, op_rule) + + # Filter separators (if present) + pf_sep = pf_filter.find('separator') + if pf_sep is not None: + copy_element_into(pf_sep, op_filter) + + # Default deny rules configuration + for tag in ['defaultallow', 'disablestatenormalization', + 'defaultdenyimg']: + val = safe_text(pf_filter, tag) + if val: + set_text(op_filter, tag, val) + + +def convert_certificates(pf_root, op_root): + """Convert Certificate Authorities and Certificates. + + pfSense structure: + ...... + ...... + ...... + + OPNsense uses the same structure. + """ + for section_tag in ['ca', 'cert', 'crl']: + pf_section = pf_root.find(section_tag) + if pf_section is None: + continue + op_section = ET.SubElement(op_root, section_tag) + # Each child of the section element is a single entry + for pf_entry in pf_section: + copy_element_into(pf_entry, op_section) + + +def convert_openvpn(pf_root, op_root): + """Convert OpenVPN configuration.""" + pf_ovpn = pf_root.find('openvpn') + if pf_ovpn is None: + return + + op_ovpn = ET.SubElement(op_root, 'openvpn') + + # OpenVPN servers + for pf_srv in pf_ovpn.findall('openvpn-server'): + op_srv = ET.SubElement(op_ovpn, 'openvpn-server') + for child in pf_srv: + copy_element_into(child, op_srv) + + # OpenVPN clients + for pf_cli in pf_ovpn.findall('openvpn-client'): + op_cli = ET.SubElement(op_ovpn, 'openvpn-client') + for child in pf_cli: + copy_element_into(child, op_cli) + + # OpenVPN CSO (client-specific overrides) + for pf_cso in pf_ovpn.findall('openvpn-cso'): + op_cso = ET.SubElement(op_ovpn, 'openvpn-cso') + for child in pf_cso: + copy_element_into(child, op_cso) + + +def convert_ipsec(pf_root, op_root): + """Convert IPsec configuration.""" + pf_ipsec = pf_root.find('ipsec') + if pf_ipsec is None: + return + + op_ipsec = ET.SubElement(op_root, 'ipsec') + + # IPsec phase 1 + for pf_p1 in pf_ipsec.findall('phase1'): + copy_element_into(pf_p1, op_ipsec) + + # IPsec phase 2 + for pf_p2 in pf_ipsec.findall('phase2'): + copy_element_into(pf_p2, op_ipsec) + + # IPsec mobile clients + for pf_m in pf_ipsec.findall('mobile'): + copy_element_into(pf_m, op_ipsec) + + +def _map_iface_in_text(text): + """Map interface names inside a text value (replacing tun_wgX with wgX etc).""" + result = text + for old, new in IFACE_MAP.items(): + result = result.replace(old, new) + return result + + +def _find_wireguard(pf_root): + """Find the WireGuard config in pfSense XML. + + pfSense stores WireGuard config either: + - At top level: + - Inside installedpackages: + """ + pf_wg = pf_root.find('wireguard') + if pf_wg is not None and len(pf_wg): + return pf_wg + pf_wg = pf_root.find('installedpackages/wireguard') + if pf_wg is not None: + return pf_wg + return None + + +def _gen_uuid(seed_str): + """Generate a UUID that is valid as an XML element name (starts with letter).""" + while True: + u = str(uuid.uuid5(uuid.NAMESPACE_DNS, seed_str)) + if u[0].isalpha(): + return u + # Salt and retry if UUID starts with a digit (invalid XML element name) + seed_str = f'{seed_str}_retry' + + +def convert_wireguard(pf_root, op_root): + """Convert WireGuard configuration. + + pfSense stores (in ): + + tun_wg0 + yes + ... + ... + 51820 + 1420 + ... +
10.0.9.254
24
+
+ + yes + tun_wg0 + ... + ... + host51820 + 60 + ... +
10.0.9.1
32
+
+ + OPNsense stores under (nested, mount: OPNsense/wireguard): + 1 + + 1 + wg0 + 0 + ... + ... + 51820 + 10.0.9.254/24 + uuid1,uuid2 + 1420 + 0 + + + + + 1 + ... + ... + + 10.0.9.1/32 + ... + 51820 + 60 + + + """ + pf_wg = _find_wireguard(pf_root) + if pf_wg is None: + return + + # OPNsense MVC mount: OPNsense/wireguard → (nested, not a single tag!) + op_opnsense = op_root.find('OPNsense') + if op_opnsense is None: + op_opnsense = ET.SubElement(op_root, 'OPNsense') + op_wg = ET.SubElement(op_opnsense, 'wireguard') + + # General section + op_general = ET.SubElement(op_wg, 'general') + set_text(op_general, 'enabled', '1') + + # First pass: collect all peers and assign UUIDs + peer_data = [] # list of (elem, uuid) + pf_peers = pf_wg.find('peers') + if pf_peers is not None: + for pf_item in pf_peers.findall('item'): + pubkey = safe_text(pf_item, 'publickey', '') + descr = safe_text(pf_item, 'descr', '') + seed = pubkey or descr or str(len(peer_data)) + puuid = _gen_uuid(f'wg-peer-{seed}') + peer_data.append((pf_item, puuid)) + + # Servers (tunnels) section + op_server_root = ET.SubElement(op_wg, 'server') + op_servers = ET.SubElement(op_server_root, 'servers') + pf_tunnels = pf_wg.find('tunnels') + if pf_tunnels is not None: + for instance_idx, pf_item in enumerate(pf_tunnels.findall('item')): + op_server = ET.SubElement(op_servers, 'server') + + enabled = safe_text(pf_item, 'enabled', 'yes') + set_text(op_server, 'enabled', '1' if enabled == 'yes' else '0') + + raw_name = safe_text(pf_item, 'name', '') + raw_name = _map_iface_in_text(raw_name) if raw_name else '' + set_text(op_server, 'name', _sanitize_name(raw_name)) + + set_text(op_server, 'instance', str(instance_idx)) + set_text(op_server, 'pubkey', safe_text(pf_item, 'publickey')) + set_text(op_server, 'privkey', safe_text(pf_item, 'privatekey')) + set_text(op_server, 'port', safe_text(pf_item, 'listenport')) + set_text(op_server, 'mtu', safe_text(pf_item, 'mtu')) + + # Defaults + set_text(op_server, 'disableroutes', '0') + ET.SubElement(op_server, 'gateway') + + # Tunnel addresses as comma-separated CIDR + addresses = [] + pf_addresses = pf_item.find('addresses') + if pf_addresses is not None: + for pf_row in pf_addresses.findall('row'): + addr = safe_text(pf_row, 'address') + mask = safe_text(pf_row, 'mask') + if addr and mask: + addresses.append(f'{addr}/{mask}') + if addresses: + set_text(op_server, 'tunneladdress', ','.join(addresses)) + + # Peers reference (comma-separated UUIDs) + tunnel_name = safe_text(pf_item, 'name', '') + mapped_tunnel_name = _map_iface_in_text(tunnel_name) if tunnel_name else '' + peer_uuids_for_server = [] + for pf_peer_item, puuid in peer_data: + peer_tun = safe_text(pf_peer_item, 'tun', '') + if peer_tun == tunnel_name or _map_iface_in_text(peer_tun) == mapped_tunnel_name: + peer_uuids_for_server.append(puuid) + if peer_uuids_for_server: + set_text(op_server, 'peers', ','.join(peer_uuids_for_server)) + + # Clients (peers) section + # In OPNsense, each client entry uses its UUID as the XML element name, + # placed directly under (no wrapper). + op_client_root = ET.SubElement(op_wg, 'client') + op_clients = ET.SubElement(op_client_root, 'clients') + for pf_item, puuid in peer_data: + + # The UUID is the element name for the client entry itself + op_client_uuid = ET.SubElement(op_clients, puuid) + + enabled = safe_text(pf_item, 'enabled', 'yes') + set_text(op_client_uuid, 'enabled', '1' if enabled == 'yes' else '0') + + # Sanitize name to match OPNsense validation (no slashes/spaces) + raw_name = safe_text(pf_item, 'descr', '') + sane_name = _sanitize_name(raw_name) if raw_name else f'peer_{puuid[:8]}' + set_text(op_client_uuid, 'name', sane_name) + set_text(op_client_uuid, 'pubkey', safe_text(pf_item, 'publickey')) + set_text(op_client_uuid, 'psk', safe_text(pf_item, 'presharedkey')) + + # Allowed IPs (comma-separated CIDRs) + allowed_ips = [] + pf_allowed = pf_item.find('allowedips') + if pf_allowed is not None: + for pf_row in pf_allowed.findall('row'): + addr = safe_text(pf_row, 'address') + mask = safe_text(pf_row, 'mask') + if addr and mask: + allowed_ips.append(f'{addr}/{mask}') + if allowed_ips: + set_text(op_client_uuid, 'tunneladdress', ','.join(allowed_ips)) + + # Endpoint + endpoint = safe_text(pf_item, 'endpoint', '') + port = safe_text(pf_item, 'port', '') + if endpoint: + set_text(op_client_uuid, 'serveraddress', endpoint) + if port: + set_text(op_client_uuid, 'serverport', port) + + set_text(op_client_uuid, 'keepalive', safe_text(pf_item, 'persistentkeepalive')) + + +def convert_static_routes(pf_root, op_root): + """Convert static routes.""" + pf_routes = pf_root.find('staticroutes') + if pf_routes is None: + return + + op_routes = ET.SubElement(op_root, 'staticroutes') + for pf_route in pf_routes: + copy_element_into(pf_route, op_routes) + + +def convert_gateways(pf_root, op_root): + """Convert gateway configuration.""" + pf_gws = pf_root.find('gateways') + if pf_gws is None: + return + + op_gws = ET.SubElement(op_root, 'gateways') + + # Default gateway + pf_dgw = pf_gws.find('defaultgw') + if pf_dgw is not None: + copy_element_into(pf_dgw, op_gws) + + # Gateway entries + for pf_gw in pf_gws.findall('gateway_item'): + copy_element_into(pf_gw, op_gws) + + # Gateway groups + for pf_gg in pf_gws.findall('gateway_group'): + copy_element_into(pf_gg, op_gws) + + +def convert_dns(pf_root, op_root): + """Convert DNS resolver/forwarder configuration.""" + # dnsmasq + pf_dnsmasq = pf_root.find('dnsmasq') + if pf_dnsmasq is not None: + op_dnsmasq = ET.SubElement(op_root, 'dnsmasq') + for child in pf_dnsmasq: + copy_element_into(child, op_dnsmasq) + + # unbound + pf_unbound = pf_root.find('unbound') + if pf_unbound is not None: + op_unbound = ET.SubElement(op_root, 'unbound') + for child in pf_unbound: + copy_element_into(child, op_unbound) + + +def convert_snmp(pf_root, op_root): + """Convert SNMP configuration.""" + pf_snmp = pf_root.find('snmpd') + if pf_snmp is not None: + op_snmp = ET.SubElement(op_root, 'snmpd') + for child in pf_snmp: + copy_element_into(child, op_snmp) + + +def convert_schedules(pf_root, op_root): + """Convert firewall schedules.""" + pf_scheds = pf_root.find('schedules') + if pf_scheds is not None: + op_scheds = ET.SubElement(op_root, 'schedules') + for pf_sched in pf_scheds.findall('schedule'): + copy_element_into(pf_sched, op_scheds) + + +def convert_virtual_ips(pf_root, op_root): + """Convert virtual IPs.""" + pf_vips = pf_root.find('virtualip') + if pf_vips is not None: + op_vips = ET.SubElement(op_root, 'virtualip') + for pf_vip in pf_vips.findall('vip'): + copy_element_into(pf_vip, op_vips) + + +def convert_ifgroups(pf_root, op_root): + """Convert interface groups.""" + pf_groups = pf_root.find('ifgroups') + if pf_groups is not None: + op_groups = ET.SubElement(op_root, 'ifgroups') + for pf_group in pf_groups.findall('ifgroupentry'): + copy_element_into(pf_group, op_groups) + + +def convert_sysctl(pf_root, op_root): + """Convert sysctl tunables.""" + pf_sysctl = pf_root.find('sysctl') + if pf_sysctl is None: + return + + op_sysctl = ET.SubElement(op_root, 'sysctl') + for pf_item in pf_sysctl.findall('item'): + copy_element_into(pf_item, op_sysctl) + + +def convert_syslog(pf_root, op_root): + """Convert syslog configuration.""" + pf_syslog = pf_root.find('syslog') + if pf_syslog is not None: + op_syslog = ET.SubElement(op_root, 'syslog') + for child in pf_syslog: + copy_element_into(child, op_syslog) + + +def convert_rrd(pf_root, op_root): + """Convert RRD graphing configuration.""" + pf_rrd = pf_root.find('rrd') + if pf_rrd is not None: + op_rrd = ET.SubElement(op_root, 'rrd') + for child in pf_rrd: + copy_element_into(child, op_rrd) + + +def convert_dyndns(pf_root, op_root): + """Convert Dynamic DNS configuration.""" + pf_ddns = pf_root.find('dyndnses') + if pf_ddns is not None: + op_ddns = ET.SubElement(op_root, 'dyndnses') + for pf_entry in pf_ddns.findall('dyndns'): + copy_element_into(pf_entry, op_ddns) + + +def convert_captive_portal(pf_root, op_root): + """Convert Captive Portal configuration.""" + pf_cp = pf_root.find('captiveportal') + if pf_cp is not None: + op_cp = ET.SubElement(op_root, 'captiveportal') + for pf_zone in pf_cp.findall('zone'): + copy_element_into(pf_zone, op_cp) + + +def convert_load_balancer(pf_root, op_root): + """Convert HAProxy/relayd load balancer config.""" + for section in ['lbpool', 'lbvirtual', 'lbserver', 'haproxy']: + pf_section = pf_root.find(section) + if pf_section is not None: + copy_element_into(pf_section, op_root) + + +def convert_cron(pf_root, op_root): + """Convert cron jobs.""" + pf_cron = pf_root.find('cron') + if pf_cron is not None: + op_cron = ET.SubElement(op_root, 'cron') + for pf_item in pf_cron.findall('item'): + copy_element_into(pf_item, op_cron) + + +def convert_dhcrelay(pf_root, op_root): + """Convert DHCP relay config.""" + for section in ['dhcrelay', 'dhcrelay6']: + pf_section = pf_root.find(section) + if pf_section is not None: + copy_element_into(pf_section, op_root) + + +def convert_ppps(pf_root, op_root): + """Convert PPPoE/PPTP/L2TP interface configs.""" + pf_ppps = pf_root.find('ppps') + if pf_ppps is not None: + op_ppps = ET.SubElement(op_root, 'ppps') + for pf_ppp in pf_ppps.findall('ppp'): + copy_element_into(pf_ppp, op_ppps) + + +def convert_igmpproxy(pf_root, op_root): + """Convert IGMP proxy config.""" + pf_igmp = pf_root.find('igmpproxy') + if pf_igmp is not None: + op_igmp = ET.SubElement(op_root, 'igmpproxy') + for child in pf_igmp: + copy_element_into(child, op_igmp) + + +def convert_ntpd(pf_root, op_root): + """Convert NTPd configuration.""" + pf_ntpd = pf_root.find('ntpd') + if pf_ntpd is not None: + op_ntpd = ET.SubElement(op_root, 'ntpd') + for child in pf_ntpd: + copy_element_into(child, op_ntpd) + + +def convert_sshdata(pf_root, op_root): + """Convert SSH host keys.""" + pf_sshdata = pf_root.find('sshdata') + if pf_sshdata is not None: + op_sshdata = ET.SubElement(op_root, 'sshdata') + for pf_keyfile in pf_sshdata.findall('sshkeyfile'): + copy_element_into(pf_keyfile, op_sshdata) + + +def convert_bridge(pf_root, op_root): + """Convert bridge configuration.""" + pf_bridge = pf_root.find('bridge') + if pf_bridge is not None: + op_bridge = ET.SubElement(op_root, 'bridge') + for pf_bridge_entry in pf_bridge.findall('bridge'): + copy_element_into(pf_bridge_entry, op_bridge) + + +def convert(pf_config_path, output_path=None, interface_map=None): + """Main conversion function.""" + global IFACE_MAP + if interface_map: + IFACE_MAP.update(interface_map) + + tree = ET.parse(pf_config_path) + pf_root = tree.getroot() + + op_root = ET.Element('opnsense') + + # Version + ET.SubElement(op_root, 'version').text = '26.1' + + # Convert sections + convert_system(pf_root, op_root) + convert_interfaces(pf_root, op_root) + convert_vlans(pf_root, op_root) + convert_dhcpd(pf_root, op_root) + convert_kea_dhcp(pf_root, op_root) + convert_aliases(pf_root, op_root) + convert_nat(pf_root, op_root) + convert_filter(pf_root, op_root) + convert_certificates(pf_root, op_root) + convert_openvpn(pf_root, op_root) + convert_ipsec(pf_root, op_root) + convert_wireguard(pf_root, op_root) + convert_static_routes(pf_root, op_root) + convert_gateways(pf_root, op_root) + convert_dns(pf_root, op_root) + convert_snmp(pf_root, op_root) + convert_schedules(pf_root, op_root) + convert_virtual_ips(pf_root, op_root) + convert_ifgroups(pf_root, op_root) + convert_sysctl(pf_root, op_root) + convert_syslog(pf_root, op_root) + convert_rrd(pf_root, op_root) + convert_dyndns(pf_root, op_root) + convert_captive_portal(pf_root, op_root) + convert_load_balancer(pf_root, op_root) + convert_cron(pf_root, op_root) + convert_dhcrelay(pf_root, op_root) + convert_ppps(pf_root, op_root) + convert_igmpproxy(pf_root, op_root) + convert_ntpd(pf_root, op_root) + convert_sshdata(pf_root, op_root) + convert_bridge(pf_root, op_root) + + # Copy unknown/unhandled top-level sections as-is for maximum compatibility + # List all sections that our converter functions create + handled_sections = { + 'pfsense', 'system', 'interfaces', 'vlans', 'dhcpd', 'dnsmasq', + 'snmpd', 'diag', 'bridge', 'syslog', 'nat', 'filter', + 'staticroutes', 'gateways', 'aliases', 'ca', 'cert', 'certca', + 'crl', 'openvpn', 'ipsec', 'wireguard', 'unbound', 'ifgroups', + 'virtualip', 'dhcpbackend', 'qinqs', 'schedules', 'sshdata', + 'rrd', 'dyndnses', 'captiveportal', 'cron', 'dhcrelay', + 'dhcrelay6', 'ppps', 'igmpproxy', 'ntpd', 'OPNsense', + } + + for pf_child in pf_root: + tag = pf_child.tag + # Skip the pfSense root element name + if tag == 'pfsense': + continue + # Skip version - we set our own OPNsense version + if tag == 'version': + continue + # Skip sections we already handled + if tag in handled_sections: + continue + # Also skip if the element already exists in output + if op_root.find(tag) is not None: + continue + # Handle installedpackages specially: copy but remove wireguard child since we handled it + if tag == 'installedpackages': + op_copy = copy_element_into(pf_child, op_root) + if op_copy is not None: + wg_copy = op_copy.find('wireguard') + if wg_copy is not None: + op_copy.remove(wg_copy) + continue + # Copy unhandled sections for maximum compatibility + copy_element_into(pf_child, op_root) + + # Apply interface name mapping to any remaining references in the output tree + _post_process_iface_map(op_root) + + # Set default root password + _set_root_password(op_root) + + # Add OPNsense-specific defaults + _add_opnsense_defaults(op_root) + + # Pretty-print + indent(op_root) + + # Add XML declaration + result = '\n' + ET.tostring(op_root, encoding='unicode') + + if output_path: + with open(output_path, 'w', encoding='utf-8') as f: + f.write(result) + print(f"Converted config written to: {output_path}", file=sys.stderr) + else: + print(result) + + +def _set_root_password(op_root): + """Set the admin/root password to 'opnsense' as a bcrypt hash.""" + op_sys = op_root.find('system') + if op_sys is None: + return + # OPNsense expects the admin user to be named 'root' (not 'admin' as in pfSense) + for user in op_sys.findall('user'): + name = user.find('name') + uid = user.find('uid') + is_admin = (name is not None and name.text == 'admin') or (uid is not None and uid.text == '0') + if is_admin: + # Rename admin -> root for OPNsense compatibility + if name is not None and name.text == 'admin': + name.text = 'root' + # Replace password with bcrypt hash of 'opnsense' + pw = user.find('password') + if pw is not None: + pw.text = '$2b$10$ebdSKP5HpJ1eOmgnTrfmy.pKBjDIrrzSTIRPNZd8EGmm/EU6hIwAy' + # Also set bcrypt-hash field that OPNsense may use + set_text(user, 'bcrypt-hash', '$2b$10$ebdSKP5HpJ1eOmgnTrfmy.pKBjDIrrzSTIRPNZd8EGmm/EU6hIwAy') + # Remove md5-hash as OPNsense uses bcrypt + md5 = user.find('md5-hash') + if md5 is not None: + user.remove(md5) + + +def _post_process_iface_map(op_root): + """Post-process the output tree to apply interface name mapping + to any tags that may reference interface names (e.g. tun_wg0 -> wg0). + This catches any references in sections that were copied passthrough. + """ + if not IFACE_MAP: + return + # Tags that can contain interface device names or references + iface_tags = {'if', 'interface', 'tun', 'name', 'bridgeif', 'member', 'tunnel'} + for elem in op_root.iter(): + if elem.tag in iface_tags and elem.text: + elem.text = _map_iface_in_text(elem.text) + # Also check CDATA-like description fields that might reference interface names + if elem.tag == 'description' and elem.text and any( + old in elem.text for old in IFACE_MAP + ): + elem.text = _map_iface_in_text(elem.text) + + +def _add_opnsense_defaults(op_root): + """Add OPNsense-specific default elements that may be needed.""" + # OPNsense uses opnsense (text content, no child elements). + # pfSense may use pfsense. + # ControllerBase.php casts to string, so child elements would yield "" → broken paths. + op_theme = op_root.find('theme') + if op_theme is not None: + op_root.remove(op_theme) + theme = ET.SubElement(op_root, 'theme') + theme.text = 'opnsense' + + # Already set in convert() + + # Set lastchange if missing + if op_root.find('lastchange') is None: + ET.SubElement(op_root, 'lastchange') + + # Add some OPNsense-specific system defaults + op_sys = op_root.find('system') + if op_sys is not None: + # Ensure webgui defaults + if op_sys.find('webgui') is None: + wg = ET.SubElement(op_sys, 'webgui') + set_text(wg, 'protocol', 'https') + # Add OPNsense-specific hn_* defaults + for hn_opt in ['hn_altq_enable', 'hn_mac_filter']: + val = safe_text(op_sys, hn_opt) + if not val: + ET.SubElement(op_sys, hn_opt) + + +if __name__ == '__main__': + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} [output.xml] [--interface-map old=new ...]", file=sys.stderr) + print(f"", file=sys.stderr) + print(f"Examples:", file=sys.stderr) + print(f" {sys.argv[0]} config-pfsense.xml config-opnsense.xml", file=sys.stderr) + print(f" {sys.argv[0]} config-pfsense.xml --interface-map tun_wg0=wg0 tun_wg1=wg1", file=sys.stderr) + print(f"", file=sys.stderr) + print(f"If output.xml is omitted, the converted config is printed to stdout.", file=sys.stderr) + sys.exit(1) + + input_path = sys.argv[1] + output_path = None + interface_map = {} + + # Default mapping: tun_wgX -> wgX + interface_map['tun_wg0'] = 'wg0' + interface_map['tun_wg1'] = 'wg1' + interface_map['tun_wg2'] = 'wg2' + interface_map['tun_wg3'] = 'wg3' + interface_map['tun_wg4'] = 'wg4' + interface_map['tun_wg5'] = 'wg5' + + i = 2 + while i < len(sys.argv): + arg = sys.argv[i] + if arg == '--interface-map' or arg == '-m': + i += 1 + if i >= len(sys.argv): + print("Error: --interface-map requires a mapping", file=sys.stderr) + sys.exit(1) + mapping = sys.argv[i] + if '=' in mapping: + old, new = mapping.split('=', 1) + interface_map[old] = new + else: + print(f"Error: invalid mapping '{mapping}'. Use old=new format.", file=sys.stderr) + sys.exit(1) + elif output_path is None and not arg.startswith('-'): + output_path = arg + else: + print(f"Error: unknown argument '{arg}'", file=sys.stderr) + sys.exit(1) + i += 1 + + convert(input_path, output_path, interface_map=interface_map)