Managing App Connectivity with Python Across Windows, macOS, and Linux

Every operating system provides mechanisms to control network access at the application level, but the APIs, command-line tools, and permission models differ radically. This guide presents a unified Python abstraction layer that works across Windows, macOS, and Linux—enabling you to build cross-platform network policy tools without rewriting core logic for each OS.

The OS Landscape: Three Different Philosophies

OS Firewall Tool Granularity Privilege Model
Linux iptables / nftables Process UID, cgroup, path root / CAP_NET_ADMIN
macOS pfctl / SocketFilter Application bundle ID root / entitlements
Windows WFAS / netsh Executable path Administrator

The Unified Abstraction

Instead of littering your code with if platform.system() == ... blocks, define a clean interface and implement backends:

# firewall_abc.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional

class NetworkPolicy(Enum):
    ALLOW = auto()
    DENY = auto()
    LOCAL_ONLY = auto()  # RFC 1918 + loopback

@dataclass
class AppRule:
    app_name: str
    executable_path: str
    policy: NetworkPolicy
    description: Optional[str] = None

class FirewallBackend(ABC):
    """Abstract base for OS-specific firewall implementations."""
    
    @property
    @abstractmethod
    def os_name(self) -> str:
        pass
    
    @abstractmethod
    def is_available(self) -> bool:
        """Check if the backend can operate on this system."""
        pass
    
    @abstractmethod
    def apply_rule(self, rule: AppRule) -> bool:
        """Apply a network policy rule. Returns success/failure."""
        pass
    
    @abstractmethod
    def remove_rule(self, rule: AppRule) -> bool:
        """Remove a previously applied rule."""
        pass
    
    @abstractmethod
    def list_rules(self) -> list[AppRule]:
        """List all currently applied rules."""
        pass
    
    @abstractmethod
    def enable_logging(self) -> bool:
        """Enable per-rule traffic logging."""
        pass
    
    @abstractmethod
    def get_violations(self, since_seconds: int = 300) -> list[dict]:
        """Get policy violations in the given timeframe."""
        pass


class FirewallManager:
    """Cross-platform firewall orchestrator."""
    
    def __init__(self):
        self._backend = self._detect_backend()
        self._rules: dict[str, AppRule] = {}
    
    def _detect_backend(self) -> FirewallBackend:
        import platform
        
        os_name = platform.system()
        
        if os_name == "Linux":
            from .linux_backend import LinuxBackend
            backend = LinuxBackend()
            if backend.is_available():
                return backend
            
            # Fallback: try nftables
            from .nftables_backend import NftablesBackend
            backend = NftablesBackend()
            if backend.is_available():
                return backend
        
        elif os_name == "Darwin":
            from .macos_backend import MacOSBackend
            backend = MacOSBackend()
            if backend.is_available():
                return backend
        
        elif os_name == "Windows":
            from .windows_backend import WindowsBackend
            backend = WindowsBackend()
            if backend.is_available():
                return backend
        
        raise RuntimeError(f"No firewall backend available for {os_name}")
    
    @property
    def backend_name(self) -> str:
        return self._backend.os_name
    
    def apply(self, rule: AppRule) -> bool:
        if self._backend.apply_rule(rule):
            self._rules[rule.app_name] = rule
            return True
        return False
    
    def remove(self, app_name: str) -> bool:
        if app_name in self._rules:
            rule = self._rules[app_name]
            if self._backend.remove_rule(rule):
                del self._rules[app_name]
                return True
        return False
    
    def get_rules(self) -> list[AppRule]:
        return list(self._rules.values())
    
    def sync(self) -> list[AppRule]:
        """Reconcile local rule state with actual firewall state.
        Returns list of discrepancies found."""
        active = self._backend.list_rules()
        active_names = {r.app_name for r in active}
        local_names = set(self._rules.keys())
        
        discrepancies = []
        
        # Rules we think exist but don't
        for name in local_names - active_names:
            discrepancies.append(self._rules[name])
        
        # Rules that exist but we don't know about
        for rule in active:
            if rule.app_name not in local_names:
                discrepancies.append(rule)
        
        return discrepancies

Linux: iptables Implementation

The Linux backend uses the owner module to match by process name or UID. This avoids PID tracking complexity:

# linux_backend.py
import subprocess
import re
from pathlib import Path

class LinuxBackend(FirewallBackend):
    os_name = "Linux (iptables)"
    
    def __init__(self, chain: str = "NETWORK_GUARD"):
        self.chain = chain
        self._table = "filter"
        self._ensure_chain()
    
    def is_available(self) -> bool:
        try:
            result = subprocess.run(
                ["iptables", "-L", "-n"],
                capture_output=True, timeout=5
            )
            return result.returncode == 0
        except (FileNotFoundError, subprocess.TimeoutExpired):
            return False
    
    def _run(self, args: list, use_sudo: bool = True) -> tuple[bool, str]:
        cmd = ["sudo", "iptables"] if use_sudo else ["iptables"]
        cmd.extend(args)
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
            return result.returncode == 0, result.stderr
        except subprocess.TimeoutExpired:
            return False, "Command timed out"
    
    def _ensure_chain(self):
        """Create custom chain and link it to OUTPUT."""
        exists, _ = self._run(["-L", self.chain, "-n"])
        if not exists:
            self._run(["-N", self.chain])
            self._run(["-I", "OUTPUT", "1", "-j", self.chain])
    
    def _clear_app_rules(self, app_name: str):
        """Remove all existing rules for this app from our chain."""
        # List rules with numbers
        success, output = self._run(["-L", self.chain, "--line-numbers", "-n"])
        if not success:
            return
        
        # Find and delete matching rules (process in reverse to maintain numbering)
        to_delete = []
        for line in output.split("\n"):
            match = re.match(r"^\s*(\d+)\s+", line)
            if match and app_name in line:
                to_delete.append(int(match.group(1)))
        
        for num in reversed(to_delete):
            self._run(["-D", self.chain, str(num)])
    
    def apply_rule(self, rule: AppRule) -> bool:
        self._clear_app_rules(Path(rule.executable_path).name)
        
        app_name = Path(rule.executable_path).name
        
        if rule.policy == NetworkPolicy.ALLOW:
            success, _ = self._run([
                "-A", self.chain,
                "-m", "owner", "--cmd-owner", app_name,
                "-j", "ACCEPT"
            ])
            return success
        
        elif rule.policy == NetworkPolicy.DENY:
            success, _ = self._run([
                "-A", self.chain,
                "-m", "owner", "--cmd-owner", app_name,
                "-j", "DROP"
            ])
            return success
        
        elif rule.policy == NetworkPolicy.LOCAL_ONLY:
            # Allow loopback
            self._run([
                "-A", self.chain,
                "-m", "owner", "--cmd-owner", app_name,
                "-d", "127.0.0.0/8",
                "-j", "ACCEPT"
            ])
            # Allow RFC 1918
            for subnet in ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]:
                self._run([
                    "-A", self.chain,
                    "-m", "owner", "--cmd-owner", app_name,
                    "-d", subnet,
                    "-j", "ACCEPT"
                ])
            # Block everything else
            success, _ = self._run([
                "-A", self.chain,
                "-m", "owner", "--cmd-owner", app_name,
                "-j", "DROP"
            ])
            return success
        
        return False
    
    def remove_rule(self, rule: AppRule) -> bool:
        self._clear_app_rules(Path(rule.executable_path).name)
        return True
    
    def list_rules(self) -> list[AppRule]:
        success, output = self._run(["-L", self.chain, "-n", "-v"])
        if not success:
            return []
        
        rules = []
        for line in output.split("\n"):
            # Parse iptables output format
            if "owner CMD match" in line:
                parts = line.split()
                # Extract app name and target
                app_match = re.search(r"owner CMD match (\S+)", line)
                target_match = re.search(r"(\w+)\s*$", line)
                
                if app_match and target_match:
                    app_name = app_match.group(1)
                    target = target_match.group(1)
                    
                    policy = NetworkPolicy.DENY if target == "DROP" else NetworkPolicy.ALLOW
                    
                    rules.append(AppRule(
                        app_name=app_name,
                        executable_path=f"/usr/bin/{app_name}",
                        policy=policy
                    ))
        
        return rules
    
    def enable_logging(self) -> bool:
        # Add LOG target before DROP rules
        success, _ = self._run([
            "-A", self.chain,
            "-m", "limit", "--limit", "10/min",
            "-j", "LOG", "--log-prefix", "NETWORK_GUARD: "
        ])
        return success
    
    def get_violations(self, since_seconds: int = 300) -> list[dict]:
        # Parse syslog/kernel log for NETWORK_GUARD entries
        try:
            result = subprocess.run(
                ["journalctl", "-k", "--since", f"{since_seconds}s ago", "-q"],
                capture_output=True, text=True, timeout=10
            )
            
            violations = []
            for line in result.stdout.split("\n"):
                if "NETWORK_GUARD:" in line:
                    # Extract details from log line
                    match = re.search(r"SRC=(\S+).*DST=(\S+).*DPT=(\d+)", line)
                    if match:
                        violations.append({
                            "timestamp": line[:15],  # Simplified
                            "source": match.group(1),
                            "destination": match.group(2),
                            "port": match.group(3),
                            "raw": line
                        })
            
            return violations
        except Exception:
            return []

macOS: Application Firewall

macOS provides two paths: the Application Firewall (binary allow/block) or the packet filter (pfctl) for fine-grained control. For application-level granularity, we use a hybrid approach:

# macos_backend.py
import subprocess
import plistlib
from pathlib import Path

class MacOSBackend(FirewallBackend):
    os_name = "macOS"
    
    def __init__(self):
        self._socket_filter_available = self._check_socket_filter()
    
    def _check_socket_filter(self) -> bool:
        """Check if NEFilterSocketProvider is available (macOS 10.15+)."""
        try:
            result = subprocess.run(
                ["system_profiler", "SPApplicationsDataType"],
                capture_output=True, timeout=5
            )
            return result.returncode == 0
        except:
            return False
    
    def is_available(self) -> bool:
        # Check for pfctl or socket filter capability
        try:
            subprocess.run(["pfctl", "-s", "all"], capture_output=True, timeout=5)
            return True
        except (FileNotFoundError, subprocess.TimeoutExpired):
            return False
    
    def apply_rule(self, rule: AppRule) -> bool:
        """Use Application Firewall for binary-level control."""
        app_path = rule.executable_path
        app_name = Path(app_path).name
        
        if rule.policy == NetworkPolicy.DENY:
            # Add to Application Firewall block list
            result = subprocess.run([
                "/usr/libexec/ApplicationFirewall/socketfilterfw",
                "--block", app_path
            ], capture_output=True, text=True, timeout=10)
            return result.returncode == 0
        
        elif rule.policy == NetworkPolicy.ALLOW:
            result = subprocess.run([
                "/usr/libexec/ApplicationFirewall/socketfilterfw",
                "--unblockapp", app_path
            ], capture_output=True, text=True, timeout=10)
            return result.returncode == 0
        
        elif rule.policy == NetworkPolicy.LOCAL_ONLY:
            # For LOCAL_ONLY, we need pfctl with anchors
            return self._apply_local_only_pf(rule)
        
        return False
    
    def _apply_local_only_pf(self, rule: AppRule) -> bool:
        """Use pfctl anchor for local-only policy."""
        anchor_name = f"network_guard_{rule.app_name}"
        
        # Build pf rules
        pf_rules = f"""
        # Allow loopback
        pass out quick on lo0 from any to 127.0.0.0/8
        pass out quick on lo0 from any to ::1/128
        
        # Allow RFC 1918
        pass out quick from any to 10.0.0.0/8
        pass out quick from any to 172.16.0.0/12
        pass out quick from any to 192.168.0.0/16
        
        # Block everything else for this app's UID
        block drop out from any to any \
            user = {self._get_uid(rule.executable_path)}
        """
        
        # Write to anchor file
        anchor_path = Path(f"/etc/pf.anchors/{anchor_name}")
        anchor_path.write_text(pf_rules)
        
        # Load anchor
        result = subprocess.run(
            ["sudo", "pfctl", "-a", anchor_name, "-f", str(anchor_path)],
            capture_output=True, text=True, timeout=10
        )
        
        # Enable pf if not already
        subprocess.run(["sudo", "pfctl", "-e"], capture_output=True)
        
        return result.returncode == 0
    
    def _get_uid(self, path: str) -> int:
        """Get UID that runs this application."""
        import os
        stat = os.stat(path)
        return stat.st_uid
    
    def remove_rule(self, rule: AppRule) -> bool:
        app_path = rule.executable_path
        
        # Remove from Application Firewall
        subprocess.run([
            "/usr/libexec/ApplicationFirewall/socketfilterfw",
            "--unblockapp", app_path
        ], capture_output=True)
        
        # Remove pf anchor if exists
        anchor_name = f"network_guard_{rule.app_name}"
        anchor_path = Path(f"/etc/pf.anchors/{anchor_name}")
        if anchor_path.exists():
            subprocess.run(
                ["sudo", "pfctl", "-a", anchor_name, "-F", "all"],
                capture_output=True
            )
            anchor_path.unlink()
        
        return True
    
    def list_rules(self) -> list[AppRule]:
        # Get Application Firewall block list
        result = subprocess.run(
            ["/usr/libexec/ApplicationFirewall/socketfilterfw", "--listapps"],
            capture_output=True, text=True, timeout=10
        )
        
        rules = []
        for line in result.stdout.split("\n"):
            # Parse application firewall output
            if "ALLOW" in line or "BLOCK" in line:
                parts = line.split()
                if len(parts) >= 2:
                    app_path = parts[0]
                    policy = NetworkPolicy.DENY if "BLOCK" in line else NetworkPolicy.ALLOW
                    rules.append(AppRule(
                        app_name=Path(app_path).name,
                        executable_path=app_path,
                        policy=policy
                    ))
        
        return rules
    
    def enable_logging(self) -> bool:
        # Enable pf logging
        result = subprocess.run(
            ["sudo", "pfctl", "-l", "pflog0"],
            capture_output=True, timeout=10
        )
        return result.returncode == 0
    
    def get_violations(self, since_seconds: int = 300) -> list[dict]:
        # Read pflog0 interface or system log
        try:
            result = subprocess.run(
                ["tcpdump", "-ni", "pflog0", "-c", "100"],
                capture_output=True, text=True, timeout=30
            )
            
            violations = []
            for line in result.stdout.split("\n"):
                if "block" in line:
                    violations.append({"raw": line, "timestamp": "recent"})
            
            return violations
        except:
            return []

Windows: Windows Firewall with Advanced Security

Windows provides the richest programmatic API through COM, but we can achieve most goals through netsh:

# windows_backend.py
import subprocess
import re
from pathlib import Path

class WindowsBackend(FirewallBackend):
    os_name = "Windows"
    
    def __init__(self):
        self._profile = "all"  # domain, private, public, all
    
    def is_available(self) -> bool:
        try:
            result = subprocess.run(
                ["netsh", "advfirewall", "show", "currentprofile"],
                capture_output=True, timeout=10
            )
            return result.returncode == 0
        except:
            return False
    
    def _run_netsh(self, args: list) -> tuple[bool, str]:
        cmd = ["netsh", "advfirewall", "firewall"] + args
        try:
            result = subprocess.run(
                cmd, capture_output=True, text=True, timeout=15
            )
            return result.returncode == 0, result.stdout + result.stderr
        except subprocess.TimeoutExpired:
            return False, "Command timed out"
    
    def apply_rule(self, rule: AppRule) -> bool:
        app_path = rule.executable_path
        app_name = rule.app_name
        rule_name = f"NetworkGuard_{app_name}"
        
        # Remove existing rules for this app
        self.remove_rule(rule)
        
        if rule.policy == NetworkPolicy.DENY:
            result = self._run_netsh([
                "add", "rule",
                f"name={rule_name}",
                "dir=out",
                "action=block",
                f"program={app_path}",
                "enable=yes",
                f"profile={self._profile}"
            ])
            return result[0]
        
        elif rule.policy == NetworkPolicy.ALLOW:
            result = self._run_netsh([
                "add", "rule",
                f"name={rule_name}",
                "dir=out",
                "action=allow",
                f"program={app_path}",
                "enable=yes",
                f"profile={self._profile}"
            ])
            return result[0]
        
        elif rule.policy == NetworkPolicy.LOCAL_ONLY:
            # Windows firewall doesn't directly support "local only"
            # We implement as: block all + allow local subnets
            
            # First: block all outbound for this app
            self._run_netsh([
                "add", "rule",
                f"name={rule_name}_block",
                "dir=out",
                "action=block",
                f"program={app_path}",
                "enable=yes"
            ])
            
            # Then: allow local subnets (higher priority due to ordering)
            for subnet in ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]:
                self._run_netsh([
                    "add", "rule",
                    f"name={rule_name}_allow_{subnet.replace('/', '_')}",
                    "dir=out",
                    "action=allow",
                    f"program={app_path}",
                    f"remoteip={subnet}",
                    "enable=yes"
                ])
            
            return True
        
        return False
    
    def remove_rule(self, rule: AppRule) -> bool:
        app_name = rule.app_name
        
        # Remove all rules starting with NetworkGuard_{app_name}
        success1, _ = self._run_netsh(["delete", "rule", f"name=NetworkGuard_{app_name}"])
        success2, _ = self._run_netsh(["delete", "rule", f"name=NetworkGuard_{app_name}_block"])
        
        # Also remove subnet-specific rules
        for subnet in ["10.0.0.0_8", "172.16.0.0_12", "192.168.0.0_16"]:
            self._run_netsh(["delete", "rule", f"name=NetworkGuard_{app_name}_allow_{subnet}"])
        
        return True
    
    def list_rules(self) -> list[AppRule]:
        success, output = self._run_netsh(["show", "rule", "name=all"])
        if not success:
            return []
        
        rules = []
        current_rule = {}
        
        for line in output.split("\n"):
            if line.startswith("Rule Name:"):
                if current_rule.get("name", "").startswith("NetworkGuard_"):
                    app_name = current_rule["name"].replace("NetworkGuard_", "")
                    policy = NetworkPolicy.DENY if current_rule.get("action") == "Block" else NetworkPolicy.ALLOW
                    
                    rules.append(AppRule(
                        app_name=app_name,
                        executable_path=current_rule.get("program", ""),
                        policy=policy
                    ))
                
                current_rule = {"name": line.split(":", 1)[1].strip()}
            elif ":" in line:
                key, value = line.split(":", 1)
                current_rule[key.strip().lower()] = value.strip()
        
        return rules
    
    def enable_logging(self) -> bool:
        result = self._run_netsh([
            "set", "allprofiles", "logging",
            "allowedconnections=enable",
            "droppedconnections=enable"
        ])
        return result[0]
    
    def get_violations(self, since_seconds: int = 300) -> list[dict]:
        # Windows Firewall logs to %SystemRoot%\System32\LogFiles\Firewall\pfirewall.log
        log_path = Path("C:/Windows/System32/LogFiles/Firewall/pfirewall.log")
        
        if not log_path.exists():
            return []
        
        # Parse firewall log (CSV-like format)
        violations = []
        with open(log_path, "r") as f:
            for line in f:
                if "DROP" in line and "TCP" in line:
                    parts = line.strip().split(",")
                    if len(parts) >= 15:
                        violations.append({
                            "timestamp": parts[0],
                            "action": parts[3],
                            "protocol": parts[4],
                            "source_ip": parts[5],
                            "dest_ip": parts[6],
                            "source_port": parts[7],
                            "dest_port": parts[8],
                            "raw": line.strip()
                        })
        
        return violations[-100:]  # Return last 100

Testing Across Platforms

A unified test suite ensures consistent behavior regardless of OS:

# test_firewall.py
import pytest
from firewall_abc import AppRule, NetworkPolicy, FirewallManager

class TestCrossPlatformFirewall:
    @pytest.fixture
    def manager(self):
        return FirewallManager()
    
    def test_backend_detection(self, manager):
        assert manager.backend_name in [
            "Linux (iptables)", "Linux (nftables)",
            "macOS", "Windows"
        ]
    
    def test_apply_and_list(self, manager):
        rule = AppRule(
            app_name="test_app",
            executable_path="/usr/bin/test_app",
            policy=NetworkPolicy.DENY
        )
        
        assert manager.apply(rule)
        
        rules = manager.get_rules()
        assert any(r.app_name == "test_app" for r in rules)
        
        # Cleanup
        assert manager.remove("test_app")
    
    def test_local_only_policy(self, manager):
        rule = AppRule(
            app_name="local_app",
            executable_path="/usr/bin/local_app",
            policy=NetworkPolicy.LOCAL_ONLY
        )
        
        assert manager.apply(rule)
        
        # Verify rule exists
        rules = manager.get_rules()
        found = next((r for r in rules if r.app_name == "local_app"), None)
        assert found is not None
        assert found.policy == NetworkPolicy.LOCAL_ONLY
        
        manager.remove("local_app")
    
    def test_violation_detection(self, manager):
        # This test requires a real blocked app to attempt connection
        # In CI, we use a mock or skip
        if not manager._backend.is_available():
            pytest.skip("Backend not available")
        
        manager._backend.enable_logging()
        
        # After blocking an app and triggering connection attempt:
        violations = manager._backend.get_violations(since_seconds=60)
        assert isinstance(violations, list)

The Bottom Line

Cross-platform network control is achievable with a clean abstraction. The key insights: use iptables on Linux (widest deployment), Application Firewall + pf anchors on macOS, and netsh on Windows. Each has quirks—Linux requires CAP_NET_ADMIN, macOS needs root for pf, Windows needs Administrator—but the Python abstraction layer shields your application logic from these details.

For production use, add rule persistence (save/restore across reboots), audit logging, and tamper detection. But the core pattern—abstract interface, concrete backends—scales from prototype to production without architectural changes.