Every operating system provides mechanisms to control network access at the application level, but the APIs, command-line tools, and permission models differ radically. This guide presents a unified Python abstraction layer that works across Windows, macOS, and Linux—enabling you to build cross-platform network policy tools without rewriting core logic for each OS.
The OS Landscape: Three Different Philosophies
| OS | Firewall Tool | Granularity | Privilege Model |
|---|---|---|---|
| Linux | iptables / nftables | Process UID, cgroup, path | root / CAP_NET_ADMIN |
| macOS | pfctl / SocketFilter | Application bundle ID | root / entitlements |
| Windows | WFAS / netsh | Executable path | Administrator |
The Unified Abstraction
Instead of littering your code with if platform.system() == ... blocks, define a clean interface and implement backends:
# firewall_abc.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional
class NetworkPolicy(Enum):
ALLOW = auto()
DENY = auto()
LOCAL_ONLY = auto() # RFC 1918 + loopback
@dataclass
class AppRule:
app_name: str
executable_path: str
policy: NetworkPolicy
description: Optional[str] = None
class FirewallBackend(ABC):
"""Abstract base for OS-specific firewall implementations."""
@property
@abstractmethod
def os_name(self) -> str:
pass
@abstractmethod
def is_available(self) -> bool:
"""Check if the backend can operate on this system."""
pass
@abstractmethod
def apply_rule(self, rule: AppRule) -> bool:
"""Apply a network policy rule. Returns success/failure."""
pass
@abstractmethod
def remove_rule(self, rule: AppRule) -> bool:
"""Remove a previously applied rule."""
pass
@abstractmethod
def list_rules(self) -> list[AppRule]:
"""List all currently applied rules."""
pass
@abstractmethod
def enable_logging(self) -> bool:
"""Enable per-rule traffic logging."""
pass
@abstractmethod
def get_violations(self, since_seconds: int = 300) -> list[dict]:
"""Get policy violations in the given timeframe."""
pass
class FirewallManager:
"""Cross-platform firewall orchestrator."""
def __init__(self):
self._backend = self._detect_backend()
self._rules: dict[str, AppRule] = {}
def _detect_backend(self) -> FirewallBackend:
import platform
os_name = platform.system()
if os_name == "Linux":
from .linux_backend import LinuxBackend
backend = LinuxBackend()
if backend.is_available():
return backend
# Fallback: try nftables
from .nftables_backend import NftablesBackend
backend = NftablesBackend()
if backend.is_available():
return backend
elif os_name == "Darwin":
from .macos_backend import MacOSBackend
backend = MacOSBackend()
if backend.is_available():
return backend
elif os_name == "Windows":
from .windows_backend import WindowsBackend
backend = WindowsBackend()
if backend.is_available():
return backend
raise RuntimeError(f"No firewall backend available for {os_name}")
@property
def backend_name(self) -> str:
return self._backend.os_name
def apply(self, rule: AppRule) -> bool:
if self._backend.apply_rule(rule):
self._rules[rule.app_name] = rule
return True
return False
def remove(self, app_name: str) -> bool:
if app_name in self._rules:
rule = self._rules[app_name]
if self._backend.remove_rule(rule):
del self._rules[app_name]
return True
return False
def get_rules(self) -> list[AppRule]:
return list(self._rules.values())
def sync(self) -> list[AppRule]:
"""Reconcile local rule state with actual firewall state.
Returns list of discrepancies found."""
active = self._backend.list_rules()
active_names = {r.app_name for r in active}
local_names = set(self._rules.keys())
discrepancies = []
# Rules we think exist but don't
for name in local_names - active_names:
discrepancies.append(self._rules[name])
# Rules that exist but we don't know about
for rule in active:
if rule.app_name not in local_names:
discrepancies.append(rule)
return discrepancies
Linux: iptables Implementation
The Linux backend uses the owner module to match by process name or UID. This avoids PID tracking complexity:
# linux_backend.py
import subprocess
import re
from pathlib import Path
class LinuxBackend(FirewallBackend):
os_name = "Linux (iptables)"
def __init__(self, chain: str = "NETWORK_GUARD"):
self.chain = chain
self._table = "filter"
self._ensure_chain()
def is_available(self) -> bool:
try:
result = subprocess.run(
["iptables", "-L", "-n"],
capture_output=True, timeout=5
)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
def _run(self, args: list, use_sudo: bool = True) -> tuple[bool, str]:
cmd = ["sudo", "iptables"] if use_sudo else ["iptables"]
cmd.extend(args)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return result.returncode == 0, result.stderr
except subprocess.TimeoutExpired:
return False, "Command timed out"
def _ensure_chain(self):
"""Create custom chain and link it to OUTPUT."""
exists, _ = self._run(["-L", self.chain, "-n"])
if not exists:
self._run(["-N", self.chain])
self._run(["-I", "OUTPUT", "1", "-j", self.chain])
def _clear_app_rules(self, app_name: str):
"""Remove all existing rules for this app from our chain."""
# List rules with numbers
success, output = self._run(["-L", self.chain, "--line-numbers", "-n"])
if not success:
return
# Find and delete matching rules (process in reverse to maintain numbering)
to_delete = []
for line in output.split("\n"):
match = re.match(r"^\s*(\d+)\s+", line)
if match and app_name in line:
to_delete.append(int(match.group(1)))
for num in reversed(to_delete):
self._run(["-D", self.chain, str(num)])
def apply_rule(self, rule: AppRule) -> bool:
self._clear_app_rules(Path(rule.executable_path).name)
app_name = Path(rule.executable_path).name
if rule.policy == NetworkPolicy.ALLOW:
success, _ = self._run([
"-A", self.chain,
"-m", "owner", "--cmd-owner", app_name,
"-j", "ACCEPT"
])
return success
elif rule.policy == NetworkPolicy.DENY:
success, _ = self._run([
"-A", self.chain,
"-m", "owner", "--cmd-owner", app_name,
"-j", "DROP"
])
return success
elif rule.policy == NetworkPolicy.LOCAL_ONLY:
# Allow loopback
self._run([
"-A", self.chain,
"-m", "owner", "--cmd-owner", app_name,
"-d", "127.0.0.0/8",
"-j", "ACCEPT"
])
# Allow RFC 1918
for subnet in ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]:
self._run([
"-A", self.chain,
"-m", "owner", "--cmd-owner", app_name,
"-d", subnet,
"-j", "ACCEPT"
])
# Block everything else
success, _ = self._run([
"-A", self.chain,
"-m", "owner", "--cmd-owner", app_name,
"-j", "DROP"
])
return success
return False
def remove_rule(self, rule: AppRule) -> bool:
self._clear_app_rules(Path(rule.executable_path).name)
return True
def list_rules(self) -> list[AppRule]:
success, output = self._run(["-L", self.chain, "-n", "-v"])
if not success:
return []
rules = []
for line in output.split("\n"):
# Parse iptables output format
if "owner CMD match" in line:
parts = line.split()
# Extract app name and target
app_match = re.search(r"owner CMD match (\S+)", line)
target_match = re.search(r"(\w+)\s*$", line)
if app_match and target_match:
app_name = app_match.group(1)
target = target_match.group(1)
policy = NetworkPolicy.DENY if target == "DROP" else NetworkPolicy.ALLOW
rules.append(AppRule(
app_name=app_name,
executable_path=f"/usr/bin/{app_name}",
policy=policy
))
return rules
def enable_logging(self) -> bool:
# Add LOG target before DROP rules
success, _ = self._run([
"-A", self.chain,
"-m", "limit", "--limit", "10/min",
"-j", "LOG", "--log-prefix", "NETWORK_GUARD: "
])
return success
def get_violations(self, since_seconds: int = 300) -> list[dict]:
# Parse syslog/kernel log for NETWORK_GUARD entries
try:
result = subprocess.run(
["journalctl", "-k", "--since", f"{since_seconds}s ago", "-q"],
capture_output=True, text=True, timeout=10
)
violations = []
for line in result.stdout.split("\n"):
if "NETWORK_GUARD:" in line:
# Extract details from log line
match = re.search(r"SRC=(\S+).*DST=(\S+).*DPT=(\d+)", line)
if match:
violations.append({
"timestamp": line[:15], # Simplified
"source": match.group(1),
"destination": match.group(2),
"port": match.group(3),
"raw": line
})
return violations
except Exception:
return []
macOS: Application Firewall
macOS provides two paths: the Application Firewall (binary allow/block) or the packet filter (pfctl) for fine-grained control. For application-level granularity, we use a hybrid approach:
# macos_backend.py
import subprocess
import plistlib
from pathlib import Path
class MacOSBackend(FirewallBackend):
os_name = "macOS"
def __init__(self):
self._socket_filter_available = self._check_socket_filter()
def _check_socket_filter(self) -> bool:
"""Check if NEFilterSocketProvider is available (macOS 10.15+)."""
try:
result = subprocess.run(
["system_profiler", "SPApplicationsDataType"],
capture_output=True, timeout=5
)
return result.returncode == 0
except:
return False
def is_available(self) -> bool:
# Check for pfctl or socket filter capability
try:
subprocess.run(["pfctl", "-s", "all"], capture_output=True, timeout=5)
return True
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
def apply_rule(self, rule: AppRule) -> bool:
"""Use Application Firewall for binary-level control."""
app_path = rule.executable_path
app_name = Path(app_path).name
if rule.policy == NetworkPolicy.DENY:
# Add to Application Firewall block list
result = subprocess.run([
"/usr/libexec/ApplicationFirewall/socketfilterfw",
"--block", app_path
], capture_output=True, text=True, timeout=10)
return result.returncode == 0
elif rule.policy == NetworkPolicy.ALLOW:
result = subprocess.run([
"/usr/libexec/ApplicationFirewall/socketfilterfw",
"--unblockapp", app_path
], capture_output=True, text=True, timeout=10)
return result.returncode == 0
elif rule.policy == NetworkPolicy.LOCAL_ONLY:
# For LOCAL_ONLY, we need pfctl with anchors
return self._apply_local_only_pf(rule)
return False
def _apply_local_only_pf(self, rule: AppRule) -> bool:
"""Use pfctl anchor for local-only policy."""
anchor_name = f"network_guard_{rule.app_name}"
# Build pf rules
pf_rules = f"""
# Allow loopback
pass out quick on lo0 from any to 127.0.0.0/8
pass out quick on lo0 from any to ::1/128
# Allow RFC 1918
pass out quick from any to 10.0.0.0/8
pass out quick from any to 172.16.0.0/12
pass out quick from any to 192.168.0.0/16
# Block everything else for this app's UID
block drop out from any to any \
user = {self._get_uid(rule.executable_path)}
"""
# Write to anchor file
anchor_path = Path(f"/etc/pf.anchors/{anchor_name}")
anchor_path.write_text(pf_rules)
# Load anchor
result = subprocess.run(
["sudo", "pfctl", "-a", anchor_name, "-f", str(anchor_path)],
capture_output=True, text=True, timeout=10
)
# Enable pf if not already
subprocess.run(["sudo", "pfctl", "-e"], capture_output=True)
return result.returncode == 0
def _get_uid(self, path: str) -> int:
"""Get UID that runs this application."""
import os
stat = os.stat(path)
return stat.st_uid
def remove_rule(self, rule: AppRule) -> bool:
app_path = rule.executable_path
# Remove from Application Firewall
subprocess.run([
"/usr/libexec/ApplicationFirewall/socketfilterfw",
"--unblockapp", app_path
], capture_output=True)
# Remove pf anchor if exists
anchor_name = f"network_guard_{rule.app_name}"
anchor_path = Path(f"/etc/pf.anchors/{anchor_name}")
if anchor_path.exists():
subprocess.run(
["sudo", "pfctl", "-a", anchor_name, "-F", "all"],
capture_output=True
)
anchor_path.unlink()
return True
def list_rules(self) -> list[AppRule]:
# Get Application Firewall block list
result = subprocess.run(
["/usr/libexec/ApplicationFirewall/socketfilterfw", "--listapps"],
capture_output=True, text=True, timeout=10
)
rules = []
for line in result.stdout.split("\n"):
# Parse application firewall output
if "ALLOW" in line or "BLOCK" in line:
parts = line.split()
if len(parts) >= 2:
app_path = parts[0]
policy = NetworkPolicy.DENY if "BLOCK" in line else NetworkPolicy.ALLOW
rules.append(AppRule(
app_name=Path(app_path).name,
executable_path=app_path,
policy=policy
))
return rules
def enable_logging(self) -> bool:
# Enable pf logging
result = subprocess.run(
["sudo", "pfctl", "-l", "pflog0"],
capture_output=True, timeout=10
)
return result.returncode == 0
def get_violations(self, since_seconds: int = 300) -> list[dict]:
# Read pflog0 interface or system log
try:
result = subprocess.run(
["tcpdump", "-ni", "pflog0", "-c", "100"],
capture_output=True, text=True, timeout=30
)
violations = []
for line in result.stdout.split("\n"):
if "block" in line:
violations.append({"raw": line, "timestamp": "recent"})
return violations
except:
return []
Windows: Windows Firewall with Advanced Security
Windows provides the richest programmatic API through COM, but we can achieve most goals through netsh:
# windows_backend.py
import subprocess
import re
from pathlib import Path
class WindowsBackend(FirewallBackend):
os_name = "Windows"
def __init__(self):
self._profile = "all" # domain, private, public, all
def is_available(self) -> bool:
try:
result = subprocess.run(
["netsh", "advfirewall", "show", "currentprofile"],
capture_output=True, timeout=10
)
return result.returncode == 0
except:
return False
def _run_netsh(self, args: list) -> tuple[bool, str]:
cmd = ["netsh", "advfirewall", "firewall"] + args
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=15
)
return result.returncode == 0, result.stdout + result.stderr
except subprocess.TimeoutExpired:
return False, "Command timed out"
def apply_rule(self, rule: AppRule) -> bool:
app_path = rule.executable_path
app_name = rule.app_name
rule_name = f"NetworkGuard_{app_name}"
# Remove existing rules for this app
self.remove_rule(rule)
if rule.policy == NetworkPolicy.DENY:
result = self._run_netsh([
"add", "rule",
f"name={rule_name}",
"dir=out",
"action=block",
f"program={app_path}",
"enable=yes",
f"profile={self._profile}"
])
return result[0]
elif rule.policy == NetworkPolicy.ALLOW:
result = self._run_netsh([
"add", "rule",
f"name={rule_name}",
"dir=out",
"action=allow",
f"program={app_path}",
"enable=yes",
f"profile={self._profile}"
])
return result[0]
elif rule.policy == NetworkPolicy.LOCAL_ONLY:
# Windows firewall doesn't directly support "local only"
# We implement as: block all + allow local subnets
# First: block all outbound for this app
self._run_netsh([
"add", "rule",
f"name={rule_name}_block",
"dir=out",
"action=block",
f"program={app_path}",
"enable=yes"
])
# Then: allow local subnets (higher priority due to ordering)
for subnet in ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]:
self._run_netsh([
"add", "rule",
f"name={rule_name}_allow_{subnet.replace('/', '_')}",
"dir=out",
"action=allow",
f"program={app_path}",
f"remoteip={subnet}",
"enable=yes"
])
return True
return False
def remove_rule(self, rule: AppRule) -> bool:
app_name = rule.app_name
# Remove all rules starting with NetworkGuard_{app_name}
success1, _ = self._run_netsh(["delete", "rule", f"name=NetworkGuard_{app_name}"])
success2, _ = self._run_netsh(["delete", "rule", f"name=NetworkGuard_{app_name}_block"])
# Also remove subnet-specific rules
for subnet in ["10.0.0.0_8", "172.16.0.0_12", "192.168.0.0_16"]:
self._run_netsh(["delete", "rule", f"name=NetworkGuard_{app_name}_allow_{subnet}"])
return True
def list_rules(self) -> list[AppRule]:
success, output = self._run_netsh(["show", "rule", "name=all"])
if not success:
return []
rules = []
current_rule = {}
for line in output.split("\n"):
if line.startswith("Rule Name:"):
if current_rule.get("name", "").startswith("NetworkGuard_"):
app_name = current_rule["name"].replace("NetworkGuard_", "")
policy = NetworkPolicy.DENY if current_rule.get("action") == "Block" else NetworkPolicy.ALLOW
rules.append(AppRule(
app_name=app_name,
executable_path=current_rule.get("program", ""),
policy=policy
))
current_rule = {"name": line.split(":", 1)[1].strip()}
elif ":" in line:
key, value = line.split(":", 1)
current_rule[key.strip().lower()] = value.strip()
return rules
def enable_logging(self) -> bool:
result = self._run_netsh([
"set", "allprofiles", "logging",
"allowedconnections=enable",
"droppedconnections=enable"
])
return result[0]
def get_violations(self, since_seconds: int = 300) -> list[dict]:
# Windows Firewall logs to %SystemRoot%\System32\LogFiles\Firewall\pfirewall.log
log_path = Path("C:/Windows/System32/LogFiles/Firewall/pfirewall.log")
if not log_path.exists():
return []
# Parse firewall log (CSV-like format)
violations = []
with open(log_path, "r") as f:
for line in f:
if "DROP" in line and "TCP" in line:
parts = line.strip().split(",")
if len(parts) >= 15:
violations.append({
"timestamp": parts[0],
"action": parts[3],
"protocol": parts[4],
"source_ip": parts[5],
"dest_ip": parts[6],
"source_port": parts[7],
"dest_port": parts[8],
"raw": line.strip()
})
return violations[-100:] # Return last 100
Testing Across Platforms
A unified test suite ensures consistent behavior regardless of OS:
# test_firewall.py
import pytest
from firewall_abc import AppRule, NetworkPolicy, FirewallManager
class TestCrossPlatformFirewall:
@pytest.fixture
def manager(self):
return FirewallManager()
def test_backend_detection(self, manager):
assert manager.backend_name in [
"Linux (iptables)", "Linux (nftables)",
"macOS", "Windows"
]
def test_apply_and_list(self, manager):
rule = AppRule(
app_name="test_app",
executable_path="/usr/bin/test_app",
policy=NetworkPolicy.DENY
)
assert manager.apply(rule)
rules = manager.get_rules()
assert any(r.app_name == "test_app" for r in rules)
# Cleanup
assert manager.remove("test_app")
def test_local_only_policy(self, manager):
rule = AppRule(
app_name="local_app",
executable_path="/usr/bin/local_app",
policy=NetworkPolicy.LOCAL_ONLY
)
assert manager.apply(rule)
# Verify rule exists
rules = manager.get_rules()
found = next((r for r in rules if r.app_name == "local_app"), None)
assert found is not None
assert found.policy == NetworkPolicy.LOCAL_ONLY
manager.remove("local_app")
def test_violation_detection(self, manager):
# This test requires a real blocked app to attempt connection
# In CI, we use a mock or skip
if not manager._backend.is_available():
pytest.skip("Backend not available")
manager._backend.enable_logging()
# After blocking an app and triggering connection attempt:
violations = manager._backend.get_violations(since_seconds=60)
assert isinstance(violations, list)
The Bottom Line
Cross-platform network control is achievable with a clean abstraction. The key insights: use iptables on Linux (widest deployment), Application Firewall + pf anchors on macOS, and netsh on Windows. Each has quirks—Linux requires CAP_NET_ADMIN, macOS needs root for pf, Windows needs Administrator—but the Python abstraction layer shields your application logic from these details.
For production use, add rule persistence (save/restore across reboots), audit logging, and tamper detection. But the core pattern—abstract interface, concrete backends—scales from prototype to production without architectural changes.