Handler Protocol System#
Note
✨ NEW in v0.4.1: Extensible plugin architecture for custom audit handlers
Write custom audit handlers without modifying HFortix code. Integrate with any system using a simple protocol.
Overview#
The Handler Protocol System enables you to extend HFortix’s audit logging capabilities by writing custom handlers that integrate with any external system. No inheritance required - any class implementing log_operation(operation: dict) works as an audit handler.
Key Features:
✅ No Inheritance Required: Protocol-based (duck typing)
✅ Priority Ordering: Execute handlers in priority order
✅ Conditional Routing: Filter which operations go where
✅ Error Aggregation: Track handler reliability
✅ Dynamic Management: Add/remove handlers at runtime
✅ Production Ready: Kafka, Database, Webhook examples included
Why Protocol-Based?#
- Zero Coupling
Handlers don’t depend on HFortix internals
- Type Safety
@runtime_checkableProtocol provides IDE autocomplete- Testability
Mock handlers without inheritance hierarchies
- Flexibility
Use any class that implements
log_operation()
Quick Start#
Basic Custom Handler#
Any class with a log_operation(operation: dict) method can be an audit handler:
from typing import Any
from hfortix import FortiOS
class SlackNotifier:
"""Send notifications to Slack when firewall rules change"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def log_operation(self, operation: dict[str, Any]) -> None:
"""Called after every API operation"""
if 'firewall.policy' in operation.get('object_type', ''):
action = operation['action']
user = operation.get('user_context', {}).get('username', 'Unknown')
object_name = operation.get('object_name', 'N/A')
message = f"🔥 Firewall changed by {user}: {action} {object_name}"
self._send_to_slack(message)
def _send_to_slack(self, message: str):
import requests
requests.post(self.webhook_url, json={"text": message})
# Use it
handler = SlackNotifier("https://hooks.slack.com/services/YOUR/WEBHOOK")
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
# Now all firewall policy changes trigger Slack notifications
fgt.api.cmdb.firewall.policy.post(name="Block-Malware", ...)
# → Slack: "🔥 Firewall changed by admin: create Block-Malware"
Operation Dictionary Format#
Every log_operation() call receives a dictionary with:
{
"timestamp": "2025-01-08T10:30:45.123456",
"action": "create", # create, read, update, delete
"object_type": "cmdb.firewall.policy",
"object_name": "Block-Malware",
"success": True,
"duration": 0.342, # seconds
"request_id": "uuid-...",
"trace_id": "trace-uuid-...", # for distributed tracing
"user_context": {
"username": "admin",
"ticket_id": "TICKET-123",
"application": "automation-script"
},
"request": {
"method": "POST",
"path": "/api/v2/cmdb/firewall/policy",
"data": {...}
},
"response": {
"status": 200,
"data": {...}
},
"error": None # or error details if success=False
}
Built-in Handler Examples#
Kafka Handler#
Stream audit events to Kafka for distributed systems:
from examples.custom_handlers.kafka_handler import KafkaHandler
handler = KafkaHandler(
bootstrap_servers=["kafka1:9092", "kafka2:9092"],
topic="fortinet-audit",
compression_type="gzip" # or snappy, lz4
)
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
Features:
Async publishing with callbacks
Partition key routing by request_id
Compression support (gzip, snappy, lz4)
Automatic cleanup with context manager
Error handling and logging
Database Handler#
Store audit logs in SQL databases:
from examples.custom_handlers.database_handler import DatabaseHandler
# SQLite for development
handler = DatabaseHandler("sqlite:///audit.db")
# PostgreSQL for production
handler = DatabaseHandler("postgresql://user:pass@localhost/audit_db")
# MySQL for production
handler = DatabaseHandler("mysql://user:pass@localhost/audit_db")
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
# Query audit logs
recent = handler.query_operations(
start_time="2025-01-01",
end_time="2025-01-08",
action="delete"
)
Features:
Multi-database support (SQLite, PostgreSQL, MySQL)
Automatic table creation with indexes
JSON/JSONB column support
Query API for compliance reporting
Handles large JSON payloads
Webhook Handler#
Send notifications to Slack, Teams, or Discord:
from examples.custom_handlers.webhook_handler import (
SlackNotifier,
TeamsNotifier,
DiscordNotifier
)
# Slack notifications (only for policy changes)
slack = SlackNotifier(
"https://hooks.slack.com/services/YOUR/WEBHOOK",
filter_fn=lambda op: 'policy' in op.get('object_type', '')
)
# Teams alerts (only for failures)
teams = TeamsNotifier(
"https://outlook.office.com/webhook/...",
filter_fn=lambda op: not op['success']
)
# Discord notifications
discord = DiscordNotifier("https://discord.com/api/webhooks/...")
fgt = FortiOS("192.168.1.99", token="token", audit_handler=slack)
Features:
Rich message formatting with colors
Platform-specific formatters (Slack, Teams, Discord)
Automatic retry with backoff
Filter support for conditional notifications
Includes error details and stack traces
Enhanced CompositeHandler#
Route operations to different handlers based on priority and filters.
Priority-Based Routing#
from hfortix_core.audit import CompositeHandler, FileHandler, SyslogHandler
handler = CompositeHandler([
# Critical operations to dedicated file (highest priority)
(FileHandler('/var/log/critical.jsonl'), 100),
# Everything to SIEM (medium priority)
(SyslogHandler('siem.company.com:514'), 50),
# Everything to general log (lowest priority)
(FileHandler('/var/log/all.jsonl'), 1),
])
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
Execution Order:
Handlers are sorted by priority (highest first)
Each handler receives the operation in order
Errors in one handler don’t affect others (when
aggregate_errors=True)
Conditional Routing with Filters#
from hfortix_core.audit import CompositeHandler, FileHandler, SyslogHandler
# Define filters
def is_critical(op):
"""Critical: deletes or failures"""
return op['action'] == 'delete' or not op['success']
def is_policy_change(op):
"""Policy changes only"""
return 'policy' in op.get('object_type', '')
def is_slow(op):
"""Slow operations (>1 second)"""
return op['duration'] > 1.0
# Priority + filter routing
handler = CompositeHandler([
# Critical operations → dedicated file
(FileHandler('/var/log/critical.jsonl'), 100, is_critical),
# Policy changes → SIEM
(SyslogHandler('siem.company.com:514'), 50, is_policy_change),
# Slow operations → performance log
(FileHandler('/var/log/slow.jsonl'), 25, is_slow),
# Everything → general log
(FileHandler('/var/log/all.jsonl'), 1, None),
])
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
Filter Functions:
Receive the operation dictionary
Return
Trueto log,Falseto skipNonefilter = log everything
Error Aggregation#
Track handler reliability and troubleshoot issues:
handler = CompositeHandler([...], aggregate_errors=True)
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
# Perform operations...
fgt.api.cmdb.firewall.policy.post(...)
# Check error summary
summary = handler.error_summary
print(f"Total errors: {summary['total_errors']}")
print(f"Failed handlers: {summary['failed_handlers']}")
for error in summary['errors']:
print(f"Handler: {error['handler']}")
print(f"Error: {error['error']}")
print(f"Timestamp: {error['timestamp']}")
Error Summary Format:
{
"total_errors": 3,
"failed_handlers": ["KafkaHandler", "SyslogHandler"],
"errors": [
{
"handler": "KafkaHandler",
"error": "Connection refused",
"timestamp": "2025-01-08T10:30:45.123456",
"operation": {...}
},
...
]
}
Dynamic Handler Management#
Add or remove handlers at runtime:
handler = CompositeHandler([
FileHandler('/var/log/all.jsonl')
])
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
# Add Slack notifications for critical events
slack = SlackNotifier("https://...")
handler.add_handler(
slack,
priority=100,
filter_fn=lambda op: not op['success']
)
# Later: remove Slack handler
handler.remove_handler(slack)
Use Cases:
Enable/disable notifications based on time of day
Add temporary debugging handlers
Dynamically configure based on environment
A/B testing different handler configurations
Advanced Patterns#
Multi-Destination Routing#
Route different operation types to different systems:
from examples.custom_handlers import KafkaHandler, DatabaseHandler, SlackNotifier
# Define routing filters
def is_audit_event(op):
"""Security-relevant events"""
return op['action'] in ('create', 'update', 'delete')
def is_monitoring(op):
"""Read-only monitoring"""
return op['action'] == 'read'
# Route based on operation type
handler = CompositeHandler([
# Audit events → Kafka (for real-time processing)
(KafkaHandler(["kafka:9092"], "audit"), 100, is_audit_event),
# Audit events → Database (for compliance queries)
(DatabaseHandler("postgresql://..."), 90, is_audit_event),
# Failures → Slack (for immediate alerts)
(SlackNotifier("https://..."), 80, lambda op: not op['success']),
# Monitoring → separate log file
(FileHandler('/var/log/monitoring.jsonl'), 50, is_monitoring),
# Everything → archive
(FileHandler('/var/log/archive.jsonl'), 1, None),
])
Environment-Based Configuration#
Different handlers for dev/staging/production:
import os
from hfortix_core.audit import CompositeHandler, FileHandler
env = os.getenv("ENVIRONMENT", "dev")
if env == "production":
handler = CompositeHandler([
KafkaHandler(["kafka-prod:9092"], "audit"),
DatabaseHandler("postgresql://prod-db/audit"),
SlackNotifier("https://hooks.slack.com/production"),
])
elif env == "staging":
handler = CompositeHandler([
DatabaseHandler("postgresql://staging-db/audit"),
FileHandler('/var/log/staging-audit.jsonl'),
])
else: # development
handler = FileHandler('/var/log/dev-audit.jsonl')
fgt = FortiOS("192.168.1.99", token="token", audit_handler=handler)
Time-Based Filtering#
Different behavior during business hours vs off-hours:
from datetime import datetime
def is_business_hours(op):
"""9 AM - 5 PM weekdays"""
now = datetime.fromisoformat(op['timestamp'])
return (
now.weekday() < 5 and # Monday-Friday
9 <= now.hour < 17 # 9 AM - 5 PM
)
def is_after_hours(op):
"""Nights and weekends"""
return not is_business_hours(op)
handler = CompositeHandler([
# Business hours: quiet logging
(FileHandler('/var/log/business.jsonl'), 50, is_business_hours),
# After hours: loud alerts (shouldn't be changes!)
(SlackNotifier("https://..."), 100, is_after_hours),
(FileHandler('/var/log/after-hours.jsonl'), 90, is_after_hours),
])
Compliance-Focused Handler#
Track changes for SOC 2, HIPAA, PCI-DSS compliance:
class ComplianceHandler:
"""Enforce compliance requirements"""
def __init__(self, db_connection_string: str):
self.db = DatabaseHandler(db_connection_string)
def log_operation(self, operation: dict[str, Any]) -> None:
"""Validate and log with compliance metadata"""
# Ensure user context is present
if not operation.get('user_context'):
raise ValueError("Compliance requires user_context")
user_ctx = operation['user_context']
# SOC 2: Require ticket_id for changes
if operation['action'] in ('create', 'update', 'delete'):
if not user_ctx.get('ticket_id'):
raise ValueError("Change requires ticket_id for SOC 2")
# Add compliance metadata
operation['compliance'] = {
'soc2_compliant': True,
'approved_change': user_ctx.get('ticket_id') is not None,
'authorized_user': user_ctx.get('username'),
'audit_timestamp': operation['timestamp'],
}
# Store in tamper-proof database
self.db.log_operation(operation)
handler = ComplianceHandler("postgresql://audit-db/compliance")
fgt = FortiOS(
"192.168.1.99",
token="token",
audit_handler=handler,
user_context={"username": "admin", "ticket_id": "TICKET-123"}
)
Testing Custom Handlers#
Unit Testing#
import pytest
from typing import Any
class TestSlackNotifier:
def test_filters_non_policy_changes(self):
"""Should only notify for policy changes"""
messages = []
class MockSlack(SlackNotifier):
def _send_to_slack(self, message: str):
messages.append(message)
handler = MockSlack("https://fake")
# Non-policy change (should be ignored)
handler.log_operation({
'action': 'create',
'object_type': 'cmdb.system.interface',
'object_name': 'port1',
})
assert len(messages) == 0
# Policy change (should notify)
handler.log_operation({
'action': 'create',
'object_type': 'cmdb.firewall.policy',
'object_name': 'Block-Malware',
'user_context': {'username': 'admin'},
})
assert len(messages) == 1
assert 'Block-Malware' in messages[0]
Integration Testing#
def test_composite_handler_priority():
"""Handlers execute in priority order"""
execution_order = []
class OrderedHandler:
def __init__(self, name: str):
self.name = name
def log_operation(self, operation: dict[str, Any]) -> None:
execution_order.append(self.name)
handler = CompositeHandler([
(OrderedHandler("low"), 1),
(OrderedHandler("high"), 100),
(OrderedHandler("medium"), 50),
])
handler.log_operation({'action': 'test'})
assert execution_order == ["high", "medium", "low"]
Mock Handlers for Testing#
class MockHandler:
"""Collect operations without side effects"""
def __init__(self):
self.operations = []
def log_operation(self, operation: dict[str, Any]) -> None:
self.operations.append(operation.copy())
def get_operations(self, **filters):
"""Query collected operations"""
results = self.operations
if 'action' in filters:
results = [op for op in results if op['action'] == filters['action']]
if 'success' in filters:
results = [op for op in results if op['success'] == filters['success']]
return results
# Use in tests
mock = MockHandler()
fgt = FortiOS("192.168.1.99", token="token", audit_handler=mock)
fgt.api.cmdb.firewall.policy.post(...)
# Verify behavior
creates = mock.get_operations(action='create')
assert len(creates) == 1
assert creates[0]['object_type'] == 'cmdb.firewall.policy'
Best Practices#
Error Handling#
Always handle errors gracefully in handlers:
import logging
class RobustHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
def log_operation(self, operation: dict[str, Any]) -> None:
try:
self._do_logging(operation)
except Exception as e:
# Log the error but don't crash the application
self.logger.error(f"Handler failed: {e}", exc_info=True)
# Optionally re-raise if you want CompositeHandler to track it
# raise
Performance Considerations#
Async Operations: Use async publishing for I/O-bound handlers:
from concurrent.futures import ThreadPoolExecutor
class AsyncHandler:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=5)
def log_operation(self, operation: dict[str, Any]) -> None:
# Don't block the main thread
self.executor.submit(self._async_log, operation)
def _async_log(self, operation: dict[str, Any]) -> None:
# Slow I/O operation here
...
Batching: Batch operations for efficiency:
class BatchHandler:
def __init__(self, batch_size=100, flush_interval=60):
self.batch = []
self.batch_size = batch_size
self.flush_interval = flush_interval
self.last_flush = time.time()
def log_operation(self, operation: dict[str, Any]) -> None:
self.batch.append(operation)
if (len(self.batch) >= self.batch_size or
time.time() - self.last_flush > self.flush_interval):
self._flush_batch()
def _flush_batch(self):
if self.batch:
# Send batch to external system
self._send_batch(self.batch)
self.batch = []
self.last_flush = time.time()
Security Considerations#
Credentials: Never log sensitive data:
class SecureHandler:
def log_operation(self, operation: dict[str, Any]) -> None:
# Make a copy to avoid modifying the original
safe_op = operation.copy()
# Redact sensitive fields
if 'request' in safe_op and 'data' in safe_op['request']:
data = safe_op['request']['data']
for sensitive_field in ('password', 'token', 'secret', 'api_key'):
if sensitive_field in data:
data[sensitive_field] = '***REDACTED***'
self._log_safely(safe_op)
Data Retention: Implement retention policies:
class RetentionHandler:
def __init__(self, db: DatabaseHandler, retention_days=90):
self.db = db
self.retention_days = retention_days
def log_operation(self, operation: dict[str, Any]) -> None:
# Log current operation
self.db.log_operation(operation)
# Periodically clean old data
if random.random() < 0.01: # 1% of operations
self._cleanup_old_data()
def _cleanup_old_data(self):
cutoff = datetime.now() - timedelta(days=self.retention_days)
# Delete operations older than retention period
...
See Also#
Enterprise Audit Logging - Built-in audit logging features
Logging & Observability - Monitoring and telemetry
Debugging Guide - Debug logging and troubleshooting
Example handlers:
examples/custom_handlers/Demo script:
examples/composite_handler_demo.pyFull documentation: HANDLER_PROTOCOL_SYSTEM.md