Middlewares¶
fastsio provides a flexible middleware system that allows intercepting and modifying Socket.IO events at various stages of processing. Middlewares can be used for:
Authentication and authorization
Logging and monitoring
Rate limiting
Data validation and transformation
Error handling
Performance monitoring
Basic Concepts¶
Middlewares in fastsio follow a chain-of-responsibility pattern where each middleware can process the request before and after the event handler execution. There are two ways to implement middlewares:
Method-based: Override before_event and after_event methods
Call-based: Override the __call__ method for complete control
Middleware Types¶
BaseMiddleware: Base class for all middlewares with async support SyncMiddleware: Base class for synchronous middlewares MiddlewareChain: Manages execution order of multiple middlewares
Creating Middlewares¶
Method-based Middleware¶
from fastsio import BaseMiddleware
class LoggingMiddleware(BaseMiddleware):
def __init__(self):
super().__init__(events=["message", "join_room"])
async def before_event(self, event: str, sid: str, data: Any, **kwargs):
"""Called before event handler execution."""
print(f"Processing {event} from {sid}")
return data
async def after_event(self, event: str, sid: str, response: Any, **kwargs):
"""Called after event handler execution."""
print(f"Completed {event} from {sid}")
return response
Call-based Middleware¶
from fastsio import BaseMiddleware
class CustomMiddleware(BaseMiddleware):
async def __call__(self, event: str, sid: str, data: Any, handler, **kwargs):
"""Complete control over middleware execution."""
# Pre-processing
modified_data = self.preprocess(data)
# Execute handler
if asyncio.iscoroutinefunction(handler):
response = await handler(modified_data, **kwargs)
else:
response = handler(modified_data, **kwargs)
# Post-processing
modified_response = self.postprocess(response)
return modified_response
Synchronous Middleware¶
from fastsio import SyncMiddleware
class SyncLoggingMiddleware(SyncMiddleware):
def before_event(self, event: str, sid: str, data: Any, **kwargs):
print(f"Sync: Processing {event} from {sid}")
return data
def after_event(self, event: str, sid: str, response: Any, **kwargs):
print(f"Sync: Completed {event} from {sid}")
return response
Middleware Configuration¶
Event Filtering¶
# Apply to specific events
middleware = LoggingMiddleware(events=["message", "join_room"])
# Apply to single event
middleware = LoggingMiddleware(events="message")
# Apply to all events (default)
middleware = LoggingMiddleware()
Namespace Filtering¶
# Apply to specific namespace
middleware = AuthMiddleware(namespace="/admin")
# Apply to all namespaces (default)
middleware = AuthMiddleware()
Global Middlewares¶
# Apply to all events regardless of namespace
middleware = LoggingMiddleware(global_middleware=True)
Registering Middlewares¶
Adding to Server¶
from fastsio import AsyncServer
sio = AsyncServer()
# Add middleware with default settings
sio.add_middleware(LoggingMiddleware())
# Override middleware settings
sio.add_middleware(
AuthMiddleware(),
events=["join_room", "send_message"],
namespace="/chat"
)
# Global middleware
sio.add_middleware(LoggingMiddleware(), global_middleware=True)
Managing Middlewares¶
# Get list of registered middlewares
middlewares = sio.get_middlewares()
# Remove specific middleware
sio.remove_middleware(middleware_instance)
Built-in Middlewares¶
Authentication Middleware¶
from fastsio import auth_middleware
def check_auth(sid: str, environ: dict) -> bool:
"""Check if user is authenticated."""
return environ.get("HTTP_AUTHORIZATION") is not None
auth_middleware = auth_middleware(check_auth)
sio.add_middleware(auth_middleware)
Logging Middleware¶
from fastsio import logging_middleware
import logging
logger = logging.getLogger("app")
log_middleware = logging_middleware(logger)
sio.add_middleware(log_middleware)
Rate Limiting Middleware¶
from fastsio import rate_limit_middleware
# 10 requests per minute
rate_limit = rate_limit_middleware(max_requests=10, window_seconds=60)
sio.add_middleware(rate_limit)
Advanced Usage¶
Custom Exception Handling¶
class ErrorHandlingMiddleware(BaseMiddleware):
async def handle_exception(self, exc: Exception, event: str, sid: str, data: Any, **kwargs):
"""Custom exception handling."""
if isinstance(exc, PermissionError):
return {"error": "Access denied", "code": 403}
elif isinstance(exc, ValueError):
return {"error": "Invalid data", "code": 400}
else:
# Re-raise unexpected exceptions
raise exc
Data Transformation¶
class DataTransformMiddleware(BaseMiddleware):
async def before_event(self, event: str, sid: str, data: Any, **kwargs):
"""Transform incoming data."""
if isinstance(data, dict):
data["timestamp"] = time.time()
data["source"] = sid
return data
async def after_event(self, event: str, sid: str, response: Any, **kwargs):
"""Transform outgoing response."""
if isinstance(response, dict):
response["processed_at"] = time.time()
return response
Middleware Chaining¶
# Middlewares execute in the order they are added
sio.add_middleware(LoggingMiddleware()) # First
sio.add_middleware(AuthMiddleware()) # Second
sio.add_middleware(RateLimitMiddleware()) # Third
# Execution flow:
# 1. LoggingMiddleware.before_event
# 2. AuthMiddleware.before_event
# 3. RateLimitMiddleware.before_event
# 4. Event handler
# 5. RateLimitMiddleware.after_event
# 6. AuthMiddleware.after_event
# 7. LoggingMiddleware.after_event
Performance Considerations¶
Middlewares execute for every event, so keep them lightweight
Use event and namespace filtering to limit middleware execution
Consider caching expensive operations
Use SyncMiddleware for CPU-bound operations
Best Practices¶
Keep middlewares focused: Each middleware should have a single responsibility
Use appropriate base class: Use SyncMiddleware for sync operations
Filter appropriately: Use events and namespace filters to limit execution
Handle exceptions gracefully: Override handle_exception for custom error handling
Test thoroughly: Middlewares can affect all events, so test with various scenarios
Example: Complete Chat Application¶
from fastsio import AsyncServer, BaseMiddleware, SocketID, Data
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create server
sio = AsyncServer(async_mode="asgi")
# Authentication middleware
class AuthMiddleware(BaseMiddleware):
def __init__(self):
super().__init__(events=["join_room", "send_message"])
async def before_event(self, event: str, sid: str, data: Any, environ: dict = None, **kwargs):
if not environ or not environ.get("HTTP_AUTHORIZATION"):
raise PermissionError("Authentication required")
return data
# Logging middleware
class ChatLoggingMiddleware(BaseMiddleware):
async def before_event(self, event: str, sid: str, data: Any, **kwargs):
logger.info(f"Chat event: {event} from {sid}")
return data
async def after_event(self, event: str, sid: str, response: Any, **kwargs):
logger.info(f"Chat event completed: {event} from {sid}")
return response
# Rate limiting middleware
class ChatRateLimitMiddleware(BaseMiddleware):
def __init__(self):
super().__init__()
self.message_counts = {}
async def before_event(self, event: str, sid: str, data: Any, **kwargs):
if event == "send_message":
current_count = self.message_counts.get(sid, 0)
if current_count >= 10: # 10 messages per minute
raise RuntimeError("Rate limit exceeded")
self.message_counts[sid] = current_count + 1
return data
# Add middlewares
sio.add_middleware(AuthMiddleware())
sio.add_middleware(ChatLoggingMiddleware())
sio.add_middleware(ChatRateLimitMiddleware())
# Event handlers
@sio.event
async def join_room(sid: SocketID, data: Data):
room = data.get("room", "general")
await sio.enter_room(sid, room)
return {"status": "joined", "room": room}
@sio.event
async def send_message(sid: SocketID, data: Data):
room = data.get("room", "general")
message = data.get("message", "")
await sio.emit("new_message", {"from": sid, "message": message}, room=room)
return {"status": "sent"}
This example demonstrates a complete chat application with authentication, logging, and rate limiting middlewares.