"""
Internal HTTP Client for FortiOS API
This module contains the HTTPClient class which handles all HTTP communication
with FortiGate devices. It is an internal implementation detail and not part
of the public API.
Now powered by httpx for better performance, HTTP/2 support, and modern async
capabilities.
"""
from __future__ import annotations
import logging
import time
import uuid
from typing import TYPE_CHECKING, Any, Callable, Optional, TypeAlias, Union
if TYPE_CHECKING:
from collections.abc import Coroutine
from urllib.parse import quote, urlencode
import httpx
from hfortix_core.http.base import BaseHTTPClient
logger = logging.getLogger("hfortix.http")
# Type alias for API responses
HTTPResponse: TypeAlias = dict[str, Any]
__all__ = ["HTTPClient", "HTTPResponse", "encode_path_component"]
[docs]
def encode_path_component(component: str | int) -> str:
"""
Encode a single path component for use in URLs.
This encodes special characters including forward slashes, which are
commonly used in FortiOS object names (e.g., IP addresses with CIDR
notation).
Args:
component: Path component to encode (e.g., object name or ID)
Returns:
URL-encoded string safe for use in URL paths
Examples:
>>> encode_path_component("Test_NET_192.0.2.0/24")
'Test_NET_192.0.2.0%2F24'
>>> encode_path_component("test@example.com")
'test%40example.com'
>>> encode_path_component(123)
'123'
"""
return quote(str(component), safe="")
[docs]
class HTTPClient(BaseHTTPClient):
"""
Internal HTTP client for FortiOS API requests (Sync Implementation)
Implements the IHTTPClient protocol for synchronous HTTP operations.
Handles all HTTP communication with FortiGate devices including:
- Session management
- Authentication headers
- SSL verification
- Request/response handling
- Error handling
- Automatic retry with exponential backoff
- Context manager support (use with 'with' statement)
Query Parameter Encoding:
The requests library automatically handles query parameter encoding:
- Lists: Encoded as repeated parameters (e.g., ['a', 'b'] →
?key=a&key=b)
- Booleans: Converted to lowercase strings ('true'/'false')
- None values: Should be filtered out before passing to params
- Special characters: URL-encoded automatically
Path Encoding:
Paths are URL-encoded with / and % as safe characters to prevent
double-encoding of already-encoded components.
Protocol Implementation:
This class implements the IHTTPClient protocol, allowing it to be used
interchangeably with other HTTP client implementations (e.g.,
AsyncHTTPClient,
custom user-provided clients).
This class is internal and not exposed to users directly, but users can
provide
their own IHTTPClient implementations to FortiOS.__init__().
"""
[docs]
def __init__(
self,
url: str,
verify: bool = True,
token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
vdom: Optional[str] = None,
max_retries: int = 3,
connect_timeout: float = 10.0,
read_timeout: float = 300.0,
user_agent: Optional[str] = None,
circuit_breaker_threshold: int = 10,
circuit_breaker_timeout: float = 30.0,
circuit_breaker_auto_retry: bool = False,
circuit_breaker_max_retries: int = 3,
circuit_breaker_retry_delay: float = 5.0,
max_connections: int = 100,
max_keepalive_connections: int = 20,
session_idle_timeout: Union[int, float, None] = 300,
read_only: bool = False,
track_operations: bool = False,
adaptive_retry: bool = False,
retry_strategy: str = "exponential",
retry_jitter: bool = False,
audit_handler: Optional[Any] = None,
audit_callback: Optional[Any] = None,
user_context: Optional[dict[str, Any]] = None,
) -> None:
"""
Initialize HTTP client
Args:
url: Base URL for API (e.g., "https://192.0.2.10")
verify: Verify SSL certificates
token: API authentication token (if using token auth)
username: Username for authentication (if using username/password
auth)
password: Password for authentication (if using username/password
auth)
vdom: Default virtual domain
max_retries: Maximum number of retry attempts on transient failures
(default: 3)
connect_timeout: Timeout for establishing connection in seconds
(default: 10.0)
read_timeout: Timeout for reading response in seconds (default:
300.0)
user_agent: Custom User-Agent header for identifying application in
FortiGate logs.
If None, defaults to 'hfortix/{version}'. Useful for
multi-team environments
and troubleshooting in production.
circuit_breaker_threshold: Number of consecutive failures before
opening circuit (default: 10)
circuit_breaker_timeout: Seconds to wait before transitioning to
half-open (default: 30.0)
circuit_breaker_auto_retry: Enable automatic retry when circuit
breaker opens (default: False).
When enabled, waits circuit_breaker_retry_delay seconds
between retries instead of
immediately raising exception. Useful for long-running
automation scripts.
NOT recommended for tests or interactive use.
circuit_breaker_max_retries: Maximum retry attempts when
circuit_breaker_auto_retry=True (default: 3)
circuit_breaker_retry_delay: Delay in seconds between retry
attempts when auto-retry enabled (default: 5.0).
This is separate from circuit_breaker_timeout, which
controls when the circuit
transitions from open to half-open.
max_connections: Maximum number of connections in the pool
(default: 100)
max_keepalive_connections: Maximum number of keepalive connections
(default: 20)
session_idle_timeout: For username/password auth only. Idle timeout
in seconds before
proactively re-authenticating (default: 300 = 5
minutes). This should match
your FortiGate's 'config system global' ->
'remoteauthtimeout' setting.
Set to None to disable proactive re-authentication.
Note: API token authentication is stateless and doesn't
use sessions.
read_only: Enable read-only mode - simulate write operations
without executing (default: False)
track_operations: Enable operation tracking - maintain audit log of
all API calls (default: False)
adaptive_retry: Enable adaptive retry with backpressure detection
(default: False).
When enabled, monitors response times and adjusts
retry delays based on
FortiGate health signals (slow responses, 503
errors). Increases retry
delays when FortiGate is overloaded to prevent
cascading failures.
retry_strategy: Retry backoff strategy - 'exponential' (default)
or 'linear'. Exponential: 1s, 2s, 4s, 8s, 16s, 30s.
Linear: 1s, 2s, 3s, 4s, 5s. Use exponential for
transient failures, linear for rate limiting.
retry_jitter: Add random jitter (0-25% of delay) to retry delays
to prevent thundering herd problem when multiple
clients retry simultaneously (default: False).
audit_handler: Handler for audit logging (implements AuditHandler
protocol).
Use built-in handlers: SyslogHandler, FileHandler,
StreamHandler,
CompositeHandler. Essential for compliance (SOC 2,
HIPAA, PCI-DSS).
Example: SyslogHandler("siem.company.com:514")
audit_callback: Custom callback function for audit logging.
Alternative to audit_handler. Receives operation
dict as parameter.
Example: lambda op: send_to_kafka(op)
user_context: Optional dict with user/application context to
include in audit logs.
Example: {"username": "admin", "app": "automation",
"ticket": "CHG-12345"}
Raises:
ValueError: If parameters are invalid or both token and
username/password provided
"""
# Validate authentication parameters
if token and (username or password):
raise ValueError(
"Cannot specify both token and username/password authentication" # noqa: E501
)
if (username and not password) or (password and not username):
raise ValueError(
"Both username and password must be provided together"
)
# Call parent class constructor (handles validation and common
# initialization)
super().__init__(
url=url,
verify=verify,
vdom=vdom,
max_retries=max_retries,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
circuit_breaker_threshold=circuit_breaker_threshold,
circuit_breaker_timeout=circuit_breaker_timeout,
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
adaptive_retry=adaptive_retry,
retry_strategy=retry_strategy,
retry_jitter=retry_jitter,
)
# Store circuit breaker auto-retry settings
self._circuit_breaker_auto_retry = circuit_breaker_auto_retry
self._circuit_breaker_max_retries = circuit_breaker_max_retries
self._circuit_breaker_retry_delay = circuit_breaker_retry_delay
# Store connection pool settings for monitoring
self._max_connections = max_connections
self._max_keepalive_connections = max_keepalive_connections
# Set default User-Agent if not provided
if user_agent is None:
# Import here to avoid circular dependency
from hfortix_core import __version__
user_agent = f"hfortix/{__version__}"
# Initialize httpx client with proper timeout configuration
self._client = httpx.Client(
headers={"User-Agent": user_agent},
timeout=httpx.Timeout(
connect=connect_timeout,
read=read_timeout,
write=30.0, # Default write timeout
pool=10.0, # Default pool timeout
),
verify=verify,
http2=True, # Enable HTTP/2 support
limits=httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
),
)
# Store authentication credentials
self._token = token
self._username = username
self._password = password
# For username/password auth
self._session_token: Optional[str] = None
# Track when session was created
self._session_created_at: Optional[float] = None
# Track last request time
self._session_last_activity: Optional[float] = None
self._using_token_auth = token is not None
# Session timeout settings (in seconds) - only for username/password
# auth
# If session_idle_timeout is None or False, disable proactive
# re-authentication
if session_idle_timeout:
self._session_idle_timeout: float | None = float(
session_idle_timeout
)
# Re-authenticate at 80% of idle timeout to avoid session
# expiration
self._session_proactive_refresh: float | None = (
self._session_idle_timeout * 0.8
)
else:
# Disable proactive re-authentication
self._session_idle_timeout = None
self._session_proactive_refresh = None
# Read-only mode and operation tracking
self._read_only = read_only
self._track_operations = track_operations
self._operations: list[dict[str, Any]] = [] if track_operations else []
# Audit logging
self._audit_handler = audit_handler
self._audit_callback = audit_callback
self._user_context = user_context or {}
# Connection pool monitoring
self._active_requests = 0
self._total_requests = 0
self._pool_exhaustion_count = 0
self._pool_exhaustion_timestamps: list[float] = []
# Request inspection for debugging
self._last_request: Optional[dict[str, Any]] = None
self._last_response: Optional[dict[str, Any]] = None
self._last_response_time: Optional[float] = None
# Transaction support (FortiOS 6.4.0+)
self._active_transaction_id: Optional[int] = None
# Set token if provided
if token:
self._client.headers["Authorization"] = f"Bearer {token}"
# If using username/password, login automatically
if username and password:
self.login()
logger.debug(
"HTTP client initialized for %s (max_retries=%d, connect_timeout=%.1fs, read_timeout=%.1fs, " # noqa: E501
"http2=enabled, user_agent='%s', circuit_breaker_threshold=%d, max_connections=%d, " # noqa: E501
"read_only=%s, track_operations=%s)",
self._url,
max_retries,
connect_timeout,
read_timeout,
user_agent,
circuit_breaker_threshold,
max_connections,
read_only,
track_operations,
)
[docs]
def login(self) -> None:
"""
Authenticate using username/password and obtain session token
This method is called automatically if username/password are provided
during initialization. Can also be called manually to re-authenticate.
Raises:
ValueError: If username/password not configured
AuthenticationError: If login fails
"""
if not self._username or not self._password:
raise ValueError("Username and password required for login")
logger.debug("Authenticating with username/password for %s", self._url)
try:
# FortiOS login endpoint - note: parameter name is "secretkey" not
# "password"
login_url = f"{self._url}/logincheck"
# URL-encode the form data (FortiOS expects
# application/x-www-form-urlencoded)
login_data = urlencode(
[("username", self._username), ("secretkey", self._password)]
)
# Make login request with proper content type
# Note: FortiOS may redirect after login, so we follow redirects
response = self._client.post(
login_url,
content=login_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
follow_redirects=True, # Changed to True to handle FortiOS redirects # noqa: E501
)
# Debug: Log response details
logger.debug(f"Login response status: {response.status_code}")
logger.debug(f"Login response headers: {dict(response.headers)}")
logger.debug(f"Login response cookies: {dict(response.cookies)}")
logger.debug(f"Login response history: {response.history}")
logger.debug(
f"Login response text (first 500 chars): {response.text[:500]}"
)
# Check for successful login (FortiOS returns 200 with CSRF token
# in cookies)
response.raise_for_status()
# Extract CSRF token from cookies (may be in the client's cookie
# jar after redirects)
csrf_token = None
# Check response cookies first
for cookie_name, cookie_value in response.cookies.items():
logger.debug(
f"Response cookie: {cookie_name} = {cookie_value}"
)
if "ccsrftoken" in cookie_name.lower():
csrf_token = cookie_value
break
# If not found, check the client's cookie jar
if not csrf_token:
for cookie_name, cookie_value in self._client.cookies.items():
logger.debug(
f"Client cookie: {cookie_name} = {cookie_value}"
)
if "ccsrftoken" in cookie_name.lower():
csrf_token = cookie_value
break
if csrf_token:
self._session_token = csrf_token
self._client.headers["X-CSRFTOKEN"] = csrf_token
# Track session creation time
self._session_created_at = time.time()
self._session_last_activity = time.time()
logger.info("Successfully authenticated via username/password")
else:
# If still HTML in response, authentication likely failed
if (
"<!doctype html>" in response.text.lower()
or "<html" in response.text.lower()
):
raise ValueError(
"Login failed: FortiGate returned login page. "
"Please verify username and password are correct."
)
raise ValueError(
"Login succeeded but no CSRF token found in response"
)
except httpx.HTTPError as e:
logger.error("Login failed: %s", str(e))
raise ValueError(f"Login failed: {str(e)}") from e
def _should_refresh_session(self) -> bool:
"""
Check if the session should be proactively refreshed
Returns:
True if session needs refresh (approaching idle timeout), False
otherwise
"""
# Only applicable for username/password auth with idle timeout enabled
if (
self._using_token_auth
or not self._session_last_activity
or self._session_proactive_refresh is None
):
return False
# Check if we're approaching the idle timeout threshold
time_since_last_activity = time.time() - self._session_last_activity
return time_since_last_activity >= self._session_proactive_refresh
[docs]
def logout(self) -> None:
"""
Logout and invalidate session token
This method is called automatically when using context manager (with
statement).
Can also be called manually to explicitly logout.
Note:
Only applicable when using username/password authentication.
Token-based authentication doesn't require logout.
"""
if not self._session_token:
logger.debug(
"No active session to logout (using token auth or not logged in)" # noqa: E501
)
return
logger.debug("Logging out from %s", self._url)
try:
logout_url = f"{self._url}/logout"
response = self._client.post(logout_url)
if response.status_code == 200:
logger.info("Successfully logged out")
else:
logger.warning(
"Logout returned status code %d", response.status_code
)
except httpx.HTTPError as e:
logger.warning("Logout failed: %s", str(e))
finally:
# Clear session token, timestamps, and header regardless of logout
# result
self._session_token = None
self._session_created_at = None
self._session_last_activity = None
if "X-CSRFTOKEN" in self._client.headers:
del self._client.headers["X-CSRFTOKEN"]
[docs]
def get_connection_stats(self) -> dict[str, Any]:
"""
Get HTTP connection pool statistics
Returns:
dict: Connection statistics including:
- http2_enabled: Whether HTTP/2 is enabled
- max_connections: Maximum number of connections allowed
- max_keepalive_connections: Maximum keepalive connections
- active_requests: Current number of active requests
- total_requests: Total requests made since initialization
- pool_exhaustion_count: Times pool reached capacity
- circuit_breaker_state: Current circuit breaker state
- consecutive_failures: Number of consecutive failures
Example:
>>> stats = client.get_connection_stats()
>>> print(f"Circuit breaker: {stats['circuit_breaker_state']}")
>>> print(f"Active requests: {stats['active_requests']}")
"""
return {
"http2_enabled": True,
"max_connections": self._max_connections,
"max_keepalive_connections": self._max_keepalive_connections,
"active_requests": self._active_requests,
"total_requests": self._total_requests,
"pool_exhaustion_count": self._pool_exhaustion_count,
"circuit_breaker_state": self._circuit_breaker["state"],
"consecutive_failures": self._circuit_breaker[
"consecutive_failures"
],
"last_failure_time": self._circuit_breaker["last_failure_time"],
}
[docs]
def set_transaction(self, transaction_id: Optional[int]) -> None:
"""
Set active transaction ID for automatic header injection.
When a transaction ID is set, all subsequent requests will automatically
include the 'X-TRANSACTION-ID' header required by FortiOS batch transactions.
Args:
transaction_id: Transaction ID to use (None to clear)
Examples:
>>> # Start transaction
>>> client.set_transaction(19)
>>> # All requests now include X-TRANSACTION-ID: 19
>>>
>>> # Clear transaction
>>> client.set_transaction(None)
"""
self._active_transaction_id = transaction_id
logger.debug(
"Transaction ID %s",
f"set to {transaction_id}" if transaction_id else "cleared"
)
[docs]
def inspect_last_request(self) -> dict[str, Any]:
"""
Get details of last API request for debugging
Returns:
dict: Request information including:
- method: HTTP method used
- endpoint: API endpoint path
- params: Query parameters
- response_time_ms: Response time in milliseconds
- status_code: HTTP status code
- error: Error message if no requests made
Example:
>>> client.get("/api/v2/cmdb/firewall/address")
>>> info = client.inspect_last_request()
>>> print(f"Last request took {info['response_time_ms']:.2f}ms")
"""
if not self._last_request:
return {"error": "No requests have been made yet"}
result: dict[str, Any] = {
"method": self._last_request.get("method"),
"endpoint": self._last_request.get("endpoint"),
"params": self._last_request.get("params"),
"response_time_ms": (
round(self._last_response_time * 1000, 2)
if self._last_response_time
else None
),
}
if self._last_response:
result["status_code"] = self._last_response.get("status_code")
return result
def _check_circuit_breaker(self, endpoint: str) -> None:
"""
Override base class circuit breaker check with optional auto-retry
Args:
endpoint: API endpoint being checked
Raises:
CircuitBreakerOpenError: If circuit breaker is open and auto-retry
is disabled or max retries exceeded
"""
if not self._circuit_breaker_auto_retry:
# Use default fail-fast behavior
super()._check_circuit_breaker(endpoint)
return
# Auto-retry enabled - wait and retry when circuit breaker opens
retry_count = 0
while retry_count < self._circuit_breaker_max_retries:
if self._circuit_breaker["state"] == "open":
retry_count += 1
logger.info(
"Circuit breaker OPEN - auto-retry %d/%d after "
"%.1fs delay",
retry_count,
self._circuit_breaker_max_retries,
self._circuit_breaker_retry_delay,
)
time.sleep(self._circuit_breaker_retry_delay)
# Check if enough time has elapsed for circuit to transition
elapsed = time.time() - (
self._circuit_breaker["last_failure_time"] or 0
)
if elapsed >= self._circuit_breaker["timeout"]:
# Timeout elapsed, transition to half_open
self._circuit_breaker["state"] = "half_open"
logger.info(
"Circuit breaker transitioning to HALF_OPEN " "state"
)
# If timeout not elapsed, circuit stays open but we
# retry anyway (the request will fail-fast again if
# service still down)
return
else:
# Circuit breaker is closed or half_open, proceed
return
# Max retries exceeded, raise error
from hfortix_core.exceptions import CircuitBreakerOpenError
raise CircuitBreakerOpenError(
f"Circuit breaker is OPEN for {endpoint}. "
f"Max retries ({self._circuit_breaker_max_retries}) exceeded. "
"Service appears to be down."
)
def _handle_response_errors(
self,
response: httpx.Response,
endpoint: Optional[str] = None,
method: Optional[str] = None,
params: Optional[dict[str, Any]] = None,
silent: bool = False,
) -> None:
"""
Handle HTTP response errors consistently using FortiOS error handling
Args:
response: httpx.Response object
endpoint: API endpoint path for better error context
method: HTTP method (GET, POST, PUT, DELETE)
params: Request parameters (will be sanitized in error messages)
silent: If True, skip logging (for expected errors like 404 in exists())
Raises:
DuplicateEntryError: If entry already exists (-5, -15, -100)
EntryInUseError: If entry is in use (-23, -94, -95, -96)
PermissionDeniedError: If permission denied (-14, -37)
InvalidValueError: If invalid value provided (-1, -50, -651)
ResourceNotFoundError: If resource not found (-3, HTTP 404)
BadRequestError: If bad request (HTTP 400)
AuthenticationError: If authentication failed (HTTP 401)
AuthorizationError: If authorization failed (HTTP 403)
MethodNotAllowedError: If method not allowed (HTTP 405)
RateLimitError: If rate limit exceeded (HTTP 429)
ServerError: If server error (HTTP 500)
APIError: For other API errors
"""
if not response.is_success:
try:
from hfortix_core.exceptions import (
get_error_description,
raise_for_status,
)
# Try to parse JSON response (most FortiOS errors are JSON)
json_response = response.json()
# Add error description if error code present
error_code = json_response.get("error")
if error_code and "error_description" not in json_response:
json_response["error_description"] = get_error_description(
error_code
)
# Log the error with details (unless silent mode)
if not silent:
status = json_response.get("status")
http_status = json_response.get(
"http_status", response.status_code
)
error_desc = json_response.get(
"error_description", "Unknown error"
)
logger.error(
"Request failed: HTTP %d, status=%s, error=%s, description='%s'", # noqa: E501
http_status,
status,
error_code,
error_desc,
)
# Use FortiOS-specific error handling with enhanced context
raise_for_status(
json_response,
endpoint=endpoint,
method=method,
params=params,
)
except ValueError:
# Response is not JSON (could be binary or HTML error page)
# This can happen with binary endpoints or proxy/firewall
# errors
if not silent:
logger.error(
"Request failed: HTTP %d (non-JSON response, %d bytes)",
response.status_code,
len(response.content),
)
response.raise_for_status()
def _log_audit(
self,
method: str,
endpoint: str,
api_type: str,
path: str,
data: Optional[dict[str, Any]],
params: Optional[dict[str, Any]],
status_code: int,
success: bool,
duration_ms: int,
request_id: str,
error: Optional[str] = None,
) -> None:
"""
Log API operation to audit handlers
Args:
method: HTTP method
endpoint: Full API endpoint
api_type: API type (cmdb, monitor, etc.)
path: Relative path
data: Request data (will be sanitized)
params: Request params (will be sanitized)
status_code: HTTP status code
success: Whether operation succeeded
duration_ms: Duration in milliseconds
request_id: Request ID
error: Error message if failed
"""
# Skip if no audit handler or callback configured
if not self._audit_handler and not self._audit_callback:
return
try:
from datetime import datetime, timezone
# Determine action from method and path
action = self._infer_action(method, path)
# Extract object type and name from path
object_type, object_name = self._extract_object_info(path, data)
# Build operation dict
operation: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"request_id": request_id,
"method": method.upper(),
"endpoint": endpoint,
"api_type": api_type,
"path": path,
"vdom": params.get("vdom") if params else None,
"action": action,
"object_type": object_type,
"object_name": object_name,
"data": self._sanitize_data(data) if data else None,
"params": self._sanitize_data(params) if params else None,
"status_code": status_code,
"success": success,
"duration_ms": duration_ms,
"host": self._url.replace("https://", "").replace(
"http://", ""
),
"read_only_mode": self._read_only
and method in ("POST", "PUT", "DELETE"),
}
# Add error if present
if error:
operation["error"] = error
# Add user context if provided
if self._user_context:
operation["user_context"] = self._user_context
# Call audit handler if configured
if self._audit_handler:
try:
self._audit_handler.log_operation(operation)
except Exception as e:
logger.error(
f"Audit handler failed: {e}",
extra={
"error": str(e),
"request_id": request_id,
},
exc_info=True,
)
# Call audit callback if configured
if self._audit_callback:
try:
self._audit_callback(operation)
except Exception as e:
logger.error(
f"Audit callback failed: {e}",
extra={
"error": str(e),
"request_id": request_id,
},
exc_info=True,
)
except Exception as e:
# Don't let audit failures break API operations
logger.error(
f"Audit logging failed: {e}",
extra={"error": str(e), "request_id": request_id},
exc_info=True,
)
@staticmethod
def _infer_action(method: str, path: str) -> str:
"""Infer high-level action from method and path"""
method = method.upper()
if method == "GET":
# Heuristic: if path ends with a specific name, it's a read,
# otherwise it's a list
parts = path.strip("/").split("/")
if len(parts) > 0 and parts[-1] and not parts[-1].startswith("?"):
# Has a trailing identifier
return "read"
return "list"
elif method == "POST":
return "create"
elif method == "PUT":
return "update"
elif method == "DELETE":
return "delete"
else:
return "unknown"
@staticmethod
def _extract_object_info(
path: str, data: Optional[dict[str, Any]]
) -> tuple[str, Optional[str]]:
"""
Extract object type and name from path and data
Returns:
Tuple of (object_type, object_name)
"""
# Clean path
path = path.strip("/")
# Object type is the full path with dots instead of slashes
# e.g., "firewall/address" -> "firewall.address"
object_type = path.replace("/", ".")
# Try to extract object name from:
# 1. Last path component (if it looks like a name)
# 2. 'name' field in data
# 3. 'mkey' field in data
object_name = None
parts = path.split("/")
if len(parts) > 0:
last_part = parts[-1]
# If last part doesn't look like an endpoint, use it as name
if last_part and not last_part.startswith("?"):
object_name = last_part
# Override with data if available
if data:
if "name" in data:
object_name = str(data["name"])
elif "mkey" in data:
object_name = str(data["mkey"])
return object_type, object_name
[docs]
def request(
self,
method: str,
api_type: str,
path: str,
data: Optional[dict[str, Any]] = None,
params: Optional[dict[str, Any]] = None,
vdom: Optional[Union[str, bool]] = None,
raw_json: bool = False,
request_id: Optional[str] = None,
silent: bool = False,
) -> dict[str, Any]:
"""
Generic request method for all API calls
Args:
method: HTTP method (GET, POST, PUT, DELETE)
api_type: API type (cmdb, monitor, log, service)
path: API endpoint path (e.g., 'firewall/address', 'system/status')
data: Request body data (for POST/PUT)
params: Query parameters dict
vdom: Virtual domain (None=use default, or specify vdom name)
raw_json: If False (default), return only 'results' field. If True,
return full response
request_id: Optional correlation ID for tracking requests across
logs
silent: If True, suppress error logging (for exists() checks)
Returns:
dict: If raw_json=False, returns response['results'] (or full
response if no 'results' key)
If raw_json=True, returns complete API response with status,
http_status, etc.
"""
# Generate request ID if not provided
if request_id is None:
request_id = str(uuid.uuid4())[:8] # Short UUID for readability
# Normalize path: remove any leading slash so callers may pass
# either 'firewall/acl' or '/firewall/acl' without causing a
# double-slash
# in the constructed URL. Keep internal separators intact.
path = self._normalize_path(path)
# URL encode the path, treating / as safe (path separator)
# Individual path components may already be encoded by endpoint files
# using
# encode_path_component(), so quote() with safe='/' won't double-encode
# already-encoded %XX sequences (e.g., %2F stays as %2F)
encoded_path = (
quote(str(path), safe="/%") if isinstance(path, str) else path
)
url = f"{self._url}/api/v2/{api_type}/{encoded_path}"
params = params or {}
# Handle vdom parameter:
# - vdom=False: Don't send vdom param (for global-only endpoints)
# - vdom=True: Use "global" scope
# - vdom=None: Use client default or "root" as fallback
# - vdom="string": Use specified vdom
if vdom is False:
# Global-only endpoint - don't add vdom parameter
pass
elif vdom is True:
params["vdom"] = "global"
elif vdom is not None:
params["vdom"] = vdom
elif self._vdom is not None and "vdom" not in params:
params["vdom"] = self._vdom
elif "vdom" not in params:
# Default to "root" if no vdom specified anywhere
params["vdom"] = "root"
# Build full API path for logging and circuit breaker
full_path = f"/api/v2/{api_type}/{path}"
endpoint_key = f"{api_type}/{path}"
# Check circuit breaker before making request
try:
self._check_circuit_breaker(endpoint_key)
except RuntimeError:
# Structured log for circuit breaker open
logger.error(
"Circuit breaker blocked request",
extra=self._log_context(
request_id=request_id,
method=method,
endpoint=full_path,
circuit_state=self._circuit_breaker["state"],
consecutive_failures=self._circuit_breaker[
"consecutive_failures"
],
),
)
raise
# Get endpoint-specific timeout if configured
endpoint_timeout = self._get_endpoint_timeout(endpoint_key)
if endpoint_timeout:
# Temporarily set custom timeout for this request
original_timeout = self._client.timeout
self._client.timeout = endpoint_timeout
# Structured log for request start
logger.debug(
"Request started",
extra=self._log_context(
request_id=request_id,
method=method.upper(),
endpoint=full_path,
has_data=bool(data),
has_params=bool(params),
),
)
if params:
logger.debug(
"Request parameters",
extra=self._log_context(
request_id=request_id,
params=self._sanitize_data(params),
),
)
if data:
logger.debug(
"Request data",
extra=self._log_context(
request_id=request_id,
data=self._sanitize_data(data),
),
)
# Track timing
start_time = time.time()
# Track total requests
self._retry_stats["total_requests"] += 1
# ========================================================================
# Read-Only Mode Check
# ========================================================================
# If in read-only mode, block write operations
if self._read_only and method in ("POST", "PUT", "DELETE"):
logger.error(
"READ-ONLY MODE: %s request blocked",
method,
extra={
"request_id": request_id,
"method": method.upper(),
"endpoint": full_path,
"data": self._sanitize_data(data) if data else None,
},
)
# Track blocked operation
if self._track_operations:
from datetime import datetime, timezone
self._operations.append(
{
"timestamp": datetime.now(timezone.utc).isoformat(),
"method": method.upper(),
"api_type": api_type,
"path": f"/{path}",
"data": data,
"status_code": 403, # Forbidden
"vdom": params.get("vdom") if params else None,
"blocked_by_read_only": True,
}
)
# Raise error - operation blocked
from hfortix_core.exceptions import ReadOnlyModeError
raise ReadOnlyModeError(
f"{method} operation blocked by read-only mode: {full_path}"
)
# Proactively check if session needs refresh (username/password auth
# only)
if self._should_refresh_session():
logger.info(
"Session approaching idle timeout, proactively re-authenticating", # noqa: E501
extra={
"request_id": request_id,
"time_since_last_activity": round(
time.time() - (self._session_last_activity or 0), 1
),
},
)
try:
self.login()
logger.info("Proactive re-authentication successful")
except Exception as e:
logger.warning(
"Proactive re-authentication failed, will retry on 401: %s", # noqa: E501
str(e),
)
# Retry loop with exponential backoff
last_error = None
session_retry_attempted = (
False # Track if we've tried re-authenticating
)
for attempt in range(self._max_retries + 1):
try:
# Update last activity time (for idle timeout tracking)
if (
not self._using_token_auth
and self._session_last_activity is not None
):
self._session_last_activity = time.time()
# Track active requests and total requests
self._active_requests += 1
self._total_requests += 1
# Store request details for debugging
self._last_request = {
"method": method.upper(),
"endpoint": full_path,
"url": url,
"params": params,
"data": data,
"timestamp": time.time(),
}
try:
# Build headers for this request
request_headers = {}
# Add transaction ID header if active
if self._active_transaction_id is not None:
request_headers['X-TRANSACTION-ID'] = str(self._active_transaction_id)
# Make request with httpx client
res = self._client.request(
method=method,
url=url,
json=data if data else None,
params=params if params else None,
headers=request_headers if request_headers else None,
)
# Store response details for debugging
self._last_response = {
"status_code": res.status_code,
"headers": dict(res.headers),
}
except httpx.PoolTimeout:
# Track pool exhaustion
self._pool_exhaustion_count += 1
self._pool_exhaustion_timestamps.append(time.time())
logger.warning(
"Connection pool exhausted",
extra={
"request_id": request_id,
"endpoint": full_path,
"active_requests": self._active_requests,
"max_connections": self._max_connections,
},
)
raise
finally:
# Always decrement active requests
self._active_requests -= 1
# Calculate duration
duration = time.time() - start_time
self._last_response_time = duration
# Record response time for adaptive backpressure (if enabled)
self._record_response_time(endpoint_key, duration)
# Handle errors (will raise exception if error response)
self._handle_response_errors(
res,
endpoint=full_path,
method=method.upper(),
params=params,
silent=silent,
)
# Record success in circuit breaker
self._record_circuit_breaker_success()
# Record successful request
self._retry_stats["successful_requests"] += 1
# Track operation if enabled
if self._track_operations:
from datetime import datetime, timezone
self._operations.append(
{
"timestamp": datetime.now(
timezone.utc
).isoformat(),
"method": method.upper(),
"api_type": api_type,
"path": f"/{path}",
"data": (
data if method in ("POST", "PUT") else None
),
"status_code": res.status_code,
"vdom": params.get("vdom") if params else None,
# Indicates read-only mode was NOT active
# (operation executed)
"read_only": False,
}
)
# Structured log for successful response
logger.info(
"Request completed successfully",
extra=self._log_context(
request_id=request_id,
method=method.upper(),
endpoint=full_path,
status_code=res.status_code,
duration_seconds=round(duration, 3),
attempts=attempt + 1,
),
)
# Audit logging for successful operations
self._log_audit(
method=method,
endpoint=full_path,
api_type=api_type,
path=path,
data=data,
params=params,
status_code=res.status_code,
success=True,
duration_ms=int(duration * 1000),
request_id=request_id,
)
# Warn about slow requests
if duration > 2.0:
logger.warning(
"Slow request detected",
extra=self._log_context(
request_id=request_id,
method=method.upper(),
endpoint=full_path,
duration_seconds=round(duration, 3),
),
)
# Check content type to determine if response is JSON
content_type = res.headers.get("content-type", "").lower()
is_json_response = "application/json" in content_type
# Parse JSON response or return raw content for non-JSON responses
if is_json_response:
try:
json_response = res.json()
# Inject http_status into response if not present
# FortiOS API doesn't always include http_status in JSON body
if "http_status" not in json_response:
json_response["http_status"] = res.status_code
except Exception as json_err:
# If JSON parsing fails, log warning and return text
logger.warning(
f"Failed to parse JSON response: {json_err}",
extra=self._log_context(
request_id=request_id,
endpoint=full_path,
content_type=content_type,
),
)
# Return text content as fallback
return {
"content": res.text,
"http_status": res.status_code,
}
else:
# Non-JSON response (e.g., file download, binary data)
# Return content with metadata
json_response = {
"content": res.content,
"content_type": content_type,
"http_status": res.status_code,
"status": "success",
}
# Restore original timeout if we changed it
if endpoint_timeout:
self._client.timeout = original_timeout
# Normalize keys: FortiOS returns hyphenated keys (tcp-portrange)
# but Python/TypedDict requires underscores (tcp_portrange)
from hfortix_core.utils import normalize_keys
json_response = normalize_keys(json_response)
# Return full response if raw_json=True, otherwise extract
# results
if raw_json:
return json_response
else:
# Return 'results' field if present, otherwise full
# response
return json_response.get("results", json_response)
except Exception as e:
last_error = e
# Special handling for 401 Unauthorized with username/password
# auth
# Session may have expired - try to re-authenticate once
is_401_error = False
if isinstance(e, httpx.HTTPStatusError):
is_401_error = e.response.status_code == 401
if (
not self._using_token_auth
and not session_retry_attempted
and is_401_error
and self._username
and self._password
):
logger.warning(
"Session expired (401), attempting to re-authenticate",
extra={
"request_id": request_id,
"method": method.upper(),
"endpoint": full_path,
},
)
session_retry_attempted = True
try:
# Try to login again
self.login()
logger.info(
"Re-authentication successful, retrying request"
)
# Continue to retry the request with new session
continue
except Exception as login_error:
logger.error(
"Re-authentication failed: %s",
str(login_error),
extra={"request_id": request_id},
)
# Fall through to normal retry logic
# Record failure in circuit breaker
self._record_circuit_breaker_failure(endpoint_key)
# Check if we should retry
if self._should_retry(e, attempt, endpoint_key):
# Calculate delay with adaptive backpressure
response_obj = (
getattr(e, "response", None)
if isinstance(e, httpx.HTTPStatusError)
else None
)
delay = self._get_retry_delay(
attempt, response_obj, endpoint_key
)
# Structured log for retry
logger.info(
"Retrying request after delay",
extra=self._log_context(
request_id=request_id,
method=method.upper(),
endpoint=full_path,
error_type=type(e).__name__,
attempt=attempt + 1,
max_attempts=self._max_retries + 1,
delay_seconds=delay,
adaptive_retry=self._adaptive_retry,
),
)
# Wait before retry
time.sleep(delay)
continue
else:
# Don't retry, restore timeout and raise the error
if endpoint_timeout:
self._client.timeout = original_timeout
raise
# If we've exhausted all retries, restore timeout and raise the last
# error
if endpoint_timeout:
self._client.timeout = original_timeout
if last_error:
# Record failed request
self._retry_stats["failed_requests"] += 1
logger.error(
"Request failed after all retries",
extra=self._log_context(
request_id=request_id,
method=method.upper(),
endpoint=full_path,
total_attempts=self._max_retries + 1,
error_type=type(last_error).__name__,
),
)
# Audit log the failure
duration = time.time() - start_time
error_message = str(last_error)
status_code = 0
# Try to extract status code from error
if hasattr(last_error, "response"):
response_obj = getattr(last_error, "response", None)
if response_obj and hasattr(response_obj, "status_code"):
status_code = response_obj.status_code
self._log_audit(
method=method,
endpoint=full_path,
api_type=api_type,
path=path,
data=data,
params=params,
status_code=status_code,
success=False,
duration_ms=int(duration * 1000),
request_id=request_id,
error=error_message,
)
raise last_error
# This should never be reached, but satisfies type checker
raise RuntimeError("Request loop completed without success or error")
[docs]
def get(
self,
api_type: str,
path: str,
params: Optional[dict[str, Any]] = None,
vdom: Optional[Union[str, bool]] = None,
raw_json: bool = False,
silent: bool = False,
) -> Union[dict[str, Any], Coroutine[Any, Any, dict[str, Any]]]:
"""GET request
Args:
api_type: API type (cmdb, monitor, etc.)
path: Endpoint path
params: Query parameters
vdom: Virtual domain
raw_json: Return raw JSON response
silent: If True, suppress error logging (for exists() checks)
"""
return self.request(
"GET", api_type, path, params=params, vdom=vdom, raw_json=raw_json, silent=silent
)
[docs]
def get_binary(
self,
api_type: str,
path: str,
params: Optional[dict[str, Any]] = None,
vdom: Optional[Union[str, bool]] = None,
) -> bytes:
"""
GET request returning binary data (for file downloads)
Args:
api_type: API type
path: Endpoint path
params: Query parameters
vdom: Virtual domain
Returns:
Raw binary response data
"""
path = path.lstrip("/") if isinstance(path, str) else path
url = f"{self._url}/api/v2/{api_type}/{path}"
params = params or {}
# Add vdom if applicable
if vdom is not None:
params["vdom"] = vdom
elif self._vdom is not None and "vdom" not in params:
params["vdom"] = self._vdom
# Make request
res = self._client.get(url, params=params if params else None)
# Build full endpoint path for error context
full_path = f"/api/v2/{api_type}/{path}"
# Handle errors
self._handle_response_errors(
res,
endpoint=full_path,
method="GET",
params=params,
)
return res.content
[docs]
def post(
self,
api_type: str,
path: str,
data: dict[str, Any],
params: Optional[dict[str, Any]] = None,
vdom: Optional[Union[str, bool]] = None,
scope: Optional[str] = None,
raw_json: bool = False,
) -> Union[dict[str, Any], Coroutine[Any, Any, dict[str, Any]]]:
"""POST request - Create new object
Args:
scope: Optional scope parameter for global objects
('global' or 'vdom'). Required when creating
objects in global scope.
"""
# Add scope to params if provided
if scope:
if params is None:
params = {}
params["scope"] = scope
return self.request(
"POST",
api_type,
path,
data=data,
params=params,
vdom=vdom,
raw_json=raw_json,
)
[docs]
def put(
self,
api_type: str,
path: str,
data: dict[str, Any],
params: Optional[dict[str, Any]] = None,
vdom: Optional[Union[str, bool]] = None,
scope: Optional[str] = None,
raw_json: bool = False,
) -> Union[dict[str, Any], Coroutine[Any, Any, dict[str, Any]]]:
"""PUT request - Update existing object
Args:
scope: Optional scope parameter for global objects
('global' or 'vdom'). Required when updating
objects in global scope.
"""
# Add scope to params if provided
if scope:
if params is None:
params = {}
params["scope"] = scope
return self.request(
"PUT",
api_type,
path,
data=data,
params=params,
vdom=vdom,
raw_json=raw_json,
)
[docs]
def delete(
self,
api_type: str,
path: str,
params: Optional[dict[str, Any]] = None,
vdom: Optional[Union[str, bool]] = None,
scope: Optional[str] = None,
raw_json: bool = False,
) -> Union[dict[str, Any], Coroutine[Any, Any, dict[str, Any]]]:
"""DELETE request - Delete object
Args:
scope: Optional scope parameter for global objects
('global' or 'vdom'). Required when deleting
objects in global scope.
"""
# Add scope to params if provided
if scope:
if params is None:
params = {}
params["scope"] = scope
return self.request(
"DELETE",
api_type,
path,
params=params,
vdom=vdom,
raw_json=raw_json,
)
# ========================================================================
# Validation Helper Methods
# ========================================================================
[docs]
@staticmethod
def validate_mkey(mkey: Any, parameter_name: str = "mkey") -> str:
"""
Validate and convert mkey to string
Args:
mkey: The management key value to validate
parameter_name: Name of the parameter (for error messages)
Returns:
String representation of mkey
Raises:
ValueError: If mkey is None, empty, or invalid
Example:
>>> mkey = HTTPClient.validate_mkey(user_id, 'user_id')
"""
if mkey is None:
raise ValueError(
f"{parameter_name} is required and cannot be None"
)
mkey_str = str(mkey).strip()
if not mkey_str:
raise ValueError(f"{parameter_name} cannot be empty")
return mkey_str
[docs]
@staticmethod
def validate_required_params(
params: dict[str, Any], required: list[str]
) -> None:
"""
Validate that required parameters are present in params dict
Args:
params: Dictionary of parameters to validate
required: List of required parameter names
Raises:
ValueError: If any required parameters are missing
Example:
>>> HTTPClient.validate_required_params(data, ['name', 'type'])
"""
if not params:
if required:
raise ValueError(
f"Missing required parameters: {', '.join(required)}"
)
return
missing = [
param
for param in required
if param not in params or params[param] is None
]
if missing:
raise ValueError(
f"Missing required parameters: {', '.join(missing)}"
)
[docs]
@staticmethod
def validate_range(
value: Union[int, float],
min_val: Union[int, float],
max_val: Union[int, float],
parameter_name: str = "value",
) -> None:
"""
Validate that a numeric value is within a specified range
Args:
value: The value to validate
min_val: Minimum allowed value (inclusive)
max_val: Maximum allowed value (inclusive)
parameter_name: Name of the parameter (for error messages)
Raises:
ValueError: If value is outside the specified range
Example:
>>> HTTPClient.validate_range(port, 1, 65535, 'port')
"""
if not isinstance(value, (int, float)):
raise ValueError(f"{parameter_name} must be a number")
if not (min_val <= value <= max_val):
raise ValueError(
f"{parameter_name} must be between {min_val} and {max_val}, got {value}" # noqa: E501
)
[docs]
@staticmethod
def validate_choice(
value: Any, choices: list[Any], parameter_name: str = "value"
) -> None:
"""
Validate that a value is one of the allowed choices
Args:
value: The value to validate
choices: List of allowed values
parameter_name: Name of the parameter (for error messages)
Raises:
ValueError: If value is not in the allowed choices
Example:
>>> HTTPClient.validate_choice(protocol, ['tcp', 'udp'],
'protocol')
"""
if value not in choices:
raise ValueError(
f"{parameter_name} must be one of {choices}, got '{value}'"
)
[docs]
@staticmethod
def build_params(**kwargs: Any) -> dict[str, Any]:
"""
Build parameters dict, filtering out None values
Args:
**kwargs: Keyword arguments to build params from
Returns:
Dictionary with None values removed
Example:
>>> params = HTTPClient.build_params(format=['name'],
datasource=True, other=None)
>>> # Returns: {'format': ['name'], 'datasource': True}
"""
return {k: v for k, v in kwargs.items() if v is not None}
[docs]
def __enter__(self) -> "HTTPClient":
"""Enter context manager - returns self for use in 'with' statements"""
return self
[docs]
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[Any],
) -> None:
"""Exit context manager - ensures session is closed"""
self.close()
[docs]
def close(self) -> None:
"""
Close the HTTP session and release resources
If using username/password authentication, this will also logout
to properly clean up the session.
"""
# Logout if using username/password auth
if self._session_token:
self.logout()
if self._client:
self._client.close()
logger.debug("HTTP client session closed")
[docs]
def get_operations(self) -> list[dict[str, Any]]:
"""
Get audit log of all tracked API operations
Returns all tracked operations (GET/POST/PUT/DELETE) in chronological
order.
Only available when track_operations=True was passed to constructor.
Returns:
List of operation dictionaries with keys:
- timestamp: ISO 8601 timestamp
- method: HTTP method (GET/POST/PUT/DELETE)
- api_type: API type (cmdb/monitor/log/service)
- path: API endpoint path
- data: Request payload (for POST/PUT), None otherwise
- status_code: HTTP response status code
- vdom: Virtual domain (if specified)
- read_only_simulated: True if operation was simulated in
read-only mode
Example:
>>> client = HTTPClient(url="https://192.0.2.10", token="...",
track_operations=True)
>>> client.post("cmdb", "/firewall/address", {"name": "test"})
>>> ops = client.get_operations()
>>> print(ops[0])
{
'timestamp': '2024-12-20T10:30:15Z',
'method': 'POST',
'api_type': 'cmdb',
'path': '/firewall/address',
'data': {'name': 'test'},
'status_code': 200,
'vdom': 'root',
'read_only_simulated': False
}
"""
return self._operations.copy()
[docs]
def get_write_operations(self) -> list[dict[str, Any]]:
"""
Get audit log of write operations only (POST/PUT/DELETE)
Filters tracked operations to return only write operations, excluding
GET requests.
Returns:
List of write operation dictionaries (same format as
get_operations())
Example:
>>> client = HTTPClient(url="https://192.0.2.10", token="...",
track_operations=True)
>>> client.get("cmdb", "/firewall/address/test") # GET - excluded
>>> client.post("cmdb", "/firewall/address", {"name": "test2"}) #
POST - included
>>> client.delete("cmdb", "/firewall/address/test") # DELETE -
included
>>> write_ops = client.get_write_operations()
>>> len(write_ops) # Returns 2 (POST and DELETE only)
2
"""
return [
op
for op in self._operations
if op["method"] in ("POST", "PUT", "DELETE")
]
[docs]
@staticmethod
def make_exists_method(
get_method: Callable[..., Any],
) -> Callable[..., bool]:
"""
Create an exists() helper that works with both sync and async modes.
This utility wraps a get() method and returns a function that:
- Returns True if the object exists
- Returns False if ResourceNotFoundError is raised
- Returns False if response has error status (e.g., {'status': 'error'})
- Works transparently with both sync and async clients
Args:
get_method: The get() method to wrap (bound method from endpoint
instance)
Returns:
A function that returns bool (sync) or Coroutine[bool] (async)
Example:
class Address:
def __init__(self, client):
self._client = client
def get(self, name, **kwargs):
return self._client.get("cmdb",
f"/firewall/address/{name}", **kwargs)
# Create exists method using the helper
exists = HTTPClient.make_exists_method(get)
"""
import inspect
def _is_error_response(result: Any) -> bool:
"""Check if the result indicates an error (non-existent resource)."""
if isinstance(result, dict):
# Check for error status in response
if result.get("status") == "error":
return True
# Check for http_status indicating not found
if result.get("http_status") == 404:
return True
return False
def exists_wrapper(*args: Any, **kwargs: Any) -> Union[bool, Any]:
"""Check if an object exists."""
from hfortix_core.exceptions import ResourceNotFoundError
# Call the get method
result = get_method(*args, **kwargs)
# Check if we got a coroutine (async mode)
if inspect.iscoroutine(result):
# Return async version
async def _exists_async():
try:
# Type ignore justified: Runtime check
# (inspect.iscoroutine) confirms
# result is awaitable, but mypy sees Union[dict,
# Coroutine] from protocol
# and cannot narrow the type. This is safe and
# necessary for dual-mode design.
awaited_result = await result
# Check for error response
if _is_error_response(awaited_result):
return False
return True
except ResourceNotFoundError:
return False
return _exists_async()
else:
# Sync mode - check for error response
if _is_error_response(result):
return False
# If it raised ResourceNotFoundError, we wouldn't be here
return True
return exists_wrapper