|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
引言
FastAPI 是一个现代、高性能的 Web 框架,用于构建基于 Python 3.6+ 的 API,它具有出色的性能、易于使用和快速开发的特点。然而,随着网络攻击手段的不断演变,确保 FastAPI 应用的安全性变得至关重要。本文将深入探讨 FastAPI 中的网络安全关键策略,帮助你保护应用免受常见网络攻击,提升系统防御能力,并确保数据安全传输。
FastAPI 中的常见安全威胁
在深入安全策略之前,我们需要了解 FastAPI 应用可能面临的一些常见安全威胁:
1. 身份验证问题:弱密码、不安全的令牌管理和身份验证绕过
2. 授权漏洞:访问控制不当,允许用户访问未授权的资源
3. 数据泄露:敏感数据未经加密传输或存储
4. 注入攻击:SQL 注入、NoSQL 注入、命令注入等
5. 跨站脚本攻击 (XSS):恶意脚本注入到网页中
6. 跨站请求伪造 (CSRF):欺骗用户在已认证的网站上执行非预期操作
7. 跨域资源共享 (CORS) 配置错误:不当的 CORS 策略可能导致安全风险
8. 依赖项漏洞:使用存在已知漏洞的第三方库
FastAPI 安全基础设置
FastAPI 提供了一些基础的安全功能,可以通过简单的配置来增强应用的安全性。首先,让我们看一个基本的 FastAPI 应用,并逐步添加安全功能。
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
- from fastapi.middleware.trustedhost import TrustedHostMiddleware
- app = FastAPI()
- # 强制使用 HTTPS
- app.add_middleware(HTTPSRedirectMiddleware)
- # 限制可访问的主机
- app.add_middleware(
- TrustedHostMiddleware,
- allowed_hosts=["example.com", "*.example.com"]
- )
- # 配置 CORS
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["https://example.com"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.get("/")
- def read_root():
- return {"message": "Hello World"}
复制代码
在上面的例子中,我们添加了三个中间件来增强安全性:
1. HTTPSRedirectMiddleware:强制所有请求使用 HTTPS
2. TrustedHostMiddleware:限制可访问的主机,防止 DNS 重新绑定攻击
3. CORSMiddleware:配置跨域资源共享策略
认证与授权策略
认证是验证用户身份的过程,而授权是确定已认证用户是否有权访问特定资源的过程。FastAPI 提供了灵活的安全工具来处理认证和授权。
OAuth2 密码流
OAuth2 是一个广泛使用的授权框架。以下是如何在 FastAPI 中实现 OAuth2 密码流的示例:
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from pydantic import BaseModel
- from passlib.context import CryptContext
- from datetime import datetime, timedelta
- from typing import Optional
- import jwt
- app = FastAPI()
- # 密码加密上下文
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- # OAuth2 密码流
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- # 密钥和算法
- SECRET_KEY = "your-secret-key"
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
- # 用户模型
- class User(BaseModel):
- username: str
- email: Optional[str] = None
- full_name: Optional[str] = None
- disabled: Optional[bool] = None
- class UserInDB(User):
- hashed_password: str
- # 模拟数据库
- fake_users_db = {
- "johndoe": {
- "username": "johndoe",
- "full_name": "John Doe",
- "email": "johndoe@example.com",
- "hashed_password": pwd_context.hash("secret"),
- "disabled": False,
- }
- }
- # 验证密码
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- # 获取用户
- def get_user(db, username: str):
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
- # 认证用户
- def authenticate_user(fake_db, username: str, password: str):
- user = get_user(fake_db, username)
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
- # 创建访问令牌
- def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- # 获取当前用户
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- except jwt.PyJWTError:
- raise credentials_exception
- user = get_user(fake_users_db, username=username)
- if user is None:
- raise credentials_exception
- return user
- # 获取当前活跃用户
- async def get_current_active_user(current_user: UserInDB = Depends(get_current_user)):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
- # 令牌端点
- @app.post("/token")
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- user = authenticate_user(fake_users_db, form_data.username, form_data.password)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {"access_token": access_token, "token_type": "bearer"}
- # 受保护的路由
- @app.get("/users/me")
- async def read_users_me(current_user: User = Depends(get_current_active_user)):
- return current_user
复制代码
这个示例实现了完整的 OAuth2 密码流,包括:
1. 密码哈希和验证
2. JWT 令牌的创建和验证
3. 用户认证和授权
4. 受保护的路由
API 密钥认证
对于简单的 API 认证,可以使用 API 密钥:
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import APIKeyHeader
- app = FastAPI()
- api_key_header = APIKeyHeader(name="X-API-KEY")
- async def get_api_key(api_key: str = Depends(api_key_header)):
- if api_key != "your-secret-api-key":
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid API Key",
- )
- return api_key
- @app.get("/protected-route")
- async def protected_route(api_key: str = Depends(get_api_key)):
- return {"message": "This is a protected route"}
复制代码
数据加密与安全传输
HTTPS 配置
确保数据在传输过程中加密是至关重要的。在生产环境中,应始终使用 HTTPS。以下是如何在 FastAPI 中配置 HTTPS:
- import uvicorn
- from fastapi import FastAPI
- app = FastAPI()
- @app.get("/")
- def read_root():
- return {"message": "Hello World"}
- if __name__ == "__main__":
- uvicorn.run(
- "main:app",
- host="0.0.0.0",
- port=8000,
- ssl_keyfile="path/to/key.pem",
- ssl_certfile="path/to/cert.pem",
- reload=True
- )
复制代码
敏感数据加密
对于敏感数据,应该在存储前进行加密:
- from cryptography.fernet import Fernet
- # 生成密钥
- key = Fernet.generate_key()
- cipher_suite = Fernet(key)
- # 加密数据
- def encrypt_data(data: str) -> bytes:
- return cipher_suite.encrypt(data.encode())
- # 解密数据
- def decrypt_data(encrypted_data: bytes) -> str:
- return cipher_suite.decrypt(encrypted_data).decode()
- # 使用示例
- sensitive_data = "This is a secret message"
- encrypted_data = encrypt_data(sensitive_data)
- decrypted_data = decrypt_data(encrypted_data)
- print(f"Original: {sensitive_data}")
- print(f"Encrypted: {encrypted_data}")
- print(f"Decrypted: {decrypted_data}")
复制代码
输入验证与 SQL 注入防护
FastAPI 使用 Pydantic 模型进行数据验证,这有助于防止许多类型的注入攻击。以下是如何安全地处理用户输入:
- from fastapi import FastAPI, HTTPException, Depends
- from pydantic import BaseModel, constr
- from typing import Optional
- import databases
- import sqlalchemy
- from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
- # 数据库配置
- DATABASE_URL = "sqlite:///./test.db"
- database = databases.Database(DATABASE_URL)
- metadata = MetaData()
- users = Table(
- "users",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("name", String(50)),
- Column("email", String(50)),
- )
- engine = create_engine(DATABASE_URL)
- metadata.create_all(engine)
- app = FastAPI()
- # 用户模型
- class UserBase(BaseModel):
- name: constr(min_length=1, max_length=50)
- email: constr(min_length=1, max_length=50)
- class UserCreate(UserBase):
- pass
- class User(UserBase):
- id: int
-
- class Config:
- orm_mode = True
- # 创建用户
- @app.post("/users/", response_model=User)
- async def create_user(user: UserCreate):
- query = users.insert().values(name=user.name, email=user.email)
- last_record_id = await database.execute(query)
- return {**user.dict(), "id": last_record_id}
- # 获取用户
- @app.get("/users/{user_id}", response_model=User)
- async def read_user(user_id: int):
- query = users.select().where(users.c.id == user_id)
- user = await database.fetch_one(query)
- if user is None:
- raise HTTPException(status_code=404, detail="User not found")
- return user
- # 安全搜索用户
- @app.get("/users/", response_model=list[User])
- async def search_users(name: Optional[str] = None, email: Optional[str] = None):
- query = users.select()
- if name:
- query = query.where(users.c.name == name)
- if email:
- query = query.where(users.c.email == email)
- results = await database.fetch_all(query)
- return results
复制代码
在这个例子中,我们:
1. 使用 Pydantic 模型验证输入数据
2. 使用参数化查询防止 SQL 注入
3. 对输入数据进行长度限制
CORS 与 CSRF 防护
CORS 配置
跨域资源共享 (CORS) 是一种机制,允许 Web 应用服务器进行跨域访问控制。以下是如何在 FastAPI 中安全地配置 CORS:
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- app = FastAPI()
- # 安全的 CORS 配置
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["https://example.com", "https://www.example.com"], # 只允许特定域名
- allow_credentials=True,
- allow_methods=["GET", "POST", "PUT", "DELETE"], # 只允许特定方法
- allow_headers=["Authorization", "Content-Type"], # 只允许特定头部
- max_age=600, # 预检请求结果的缓存时间
- )
复制代码
CSRF 防护
跨站请求伪造 (CSRF) 是一种攻击,迫使最终用户在当前已认证的 Web 应用程序上执行非预期的操作。以下是如何在 FastAPI 中实现 CSRF 防护:
- from fastapi import FastAPI, Depends, HTTPException, Request, Response
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from secrets import token_hex
- import uuid
- app = FastAPI()
- security = HTTPBearer()
- # 存储 CSRF 令牌的简单字典
- csrf_tokens = {}
- # 生成 CSRF 令牌
- def generate_csrf_token():
- return token_hex(32)
- # 验证 CSRF 令牌
- def verify_csrf_token(request: Request, csrf_token: str):
- # 从请求头获取 CSRF 令牌
- header_token = request.headers.get("X-CSRF-Token")
-
- # 比较令牌
- if not header_token or header_token != csrf_token:
- raise HTTPException(status_code=403, detail="CSRF token validation failed")
- # 获取 CSRF 令牌
- @app.get("/csrf-token")
- async def get_csrf_token(request: Request):
- csrf_token = generate_csrf_token()
- session_id = str(uuid.uuid4())
- csrf_tokens[session_id] = csrf_token
-
- # 设置包含 CSRF 令牌的 cookie
- response = Response(content={"csrf_token": csrf_token})
- response.set_cookie(
- key="csrf_token",
- value=csrf_token,
- httponly=False, # 允许 JavaScript 访问
- secure=True, # 只通过 HTTPS 发送
- samesite="strict" # 严格的同源策略
- )
- return response
- # 受保护的路由
- @app.post("/protected-route")
- async def protected_route(
- request: Request,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- # 从 cookie 获取 CSRF 令牌
- csrf_token = request.cookies.get("csrf_token")
- if not csrf_token:
- raise HTTPException(status_code=403, detail="CSRF token not found")
-
- # 验证 CSRF 令牌
- verify_csrf_token(request, csrf_token)
-
- return {"message": "CSRF validation passed"}
复制代码
安全 Headers 设置
安全 HTTP 头可以帮助防止某些类型的攻击,如点击劫持、MIME 类型混淆等。以下是如何在 FastAPI 中设置安全头:
- from fastapi import FastAPI, Response
- from fastapi.middleware.gzip import GZipMiddleware
- app = FastAPI()
- # 添加 GZip 压缩中间件
- app.add_middleware(GZipMiddleware, minimum_size=1000)
- @app.middleware("http")
- async def add_security_headers(request, call_next):
- response = await call_next(request)
-
- # 添加安全头
- response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload"
- response.headers["X-Content-Type-Options"] = "nosniff"
- response.headers["X-Frame-Options"] = "DENY"
- response.headers["X-XSS-Protection"] = "1; mode=block"
- response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
- response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; font-src 'self'; connect-src 'self'; frame-ancestors 'none';"
- response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=(), payment=()"
-
- return response
- @app.get("/")
- def read_root():
- return {"message": "Hello World"}
复制代码
这些安全头的含义:
1. Strict-Transport-Security:强制浏览器使用 HTTPS
2. X-Content-Type-Options:防止 MIME 类型混淆攻击
3. X-Frame-Options:防止点击劫持攻击
4. X-XSS-Protection:启用浏览器内置的 XSS 过滤器
5. Referrer-Policy:控制 referrer 信息
6. Content-Security-Policy:防止代码注入攻击
7. Permissions-Policy:控制浏览器功能和 API 的访问权限
依赖注入与中间件的安全应用
FastAPI 的依赖注入系统可以用于实现安全功能,如认证、授权和输入验证。以下是一些示例:
安全依赖项
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- from typing import List
- app = FastAPI()
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- # 模拟用户数据库
- fake_users_db = {
- "alice": {"username": "alice", "role": "admin"},
- "bob": {"username": "bob", "role": "user"},
- }
- # 获取当前用户
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- # 这里应该验证 token 并获取用户信息
- # 简化示例,直接使用 token 作为用户名
- if token not in fake_users_db:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid authentication credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return fake_users_db[token]
- # 检查用户是否为管理员
- async def get_current_admin(current_user: dict = Depends(get_current_user)):
- if current_user["role"] != "admin":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not enough permissions"
- )
- return current_user
- # 普通用户路由
- @app.get("/user-profile")
- async def user_profile(current_user: dict = Depends(get_current_user)):
- return {"username": current_user["username"], "role": current_user["role"]}
- # 管理员路由
- @app.get("/admin-panel")
- async def admin_panel(current_admin: dict = Depends(get_current_admin)):
- return {"message": f"Welcome, {current_admin['username']}! You have admin access."}
复制代码
安全中间件
- from fastapi import FastAPI, Request, HTTPException
- from fastapi.middleware import Middleware
- from fastapi.middleware.base import BaseHTTPMiddleware
- import time
- app = FastAPI()
- # 速率限制中间件
- class RateLimitMiddleware(BaseHTTPMiddleware):
- def __init__(self, app, rate_limit: int = 100, time_window: int = 60):
- super().__init__(app)
- self.rate_limit = rate_limit
- self.time_window = time_window
- self.requests = {}
- async def dispatch(self, request: Request, call_next):
- client_ip = request.client.host
- current_time = time.time()
-
- # 清理过期的请求记录
- if client_ip in self.requests:
- self.requests[client_ip] = [
- req_time for req_time in self.requests[client_ip]
- if current_time - req_time < self.time_window
- ]
- else:
- self.requests[client_ip] = []
-
- # 检查是否超过速率限制
- if len(self.requests[client_ip]) >= self.rate_limit:
- raise HTTPException(status_code=429, detail="Too many requests")
-
- # 记录当前请求
- self.requests[client_ip].append(current_time)
-
- response = await call_next(request)
- return response
- # 添加速率限制中间件
- app.add_middleware(RateLimitMiddleware, rate_limit=10, time_window=60)
- @app.get("/")
- async def read_root():
- return {"message": "Hello World"}
复制代码
日志记录与监控
安全日志记录和监控对于检测和响应安全事件至关重要。以下是如何在 FastAPI 中实现安全日志记录:
- from fastapi import FastAPI, Request, HTTPException
- from fastapi.responses import JSONResponse
- import logging
- from logging.handlers import RotatingFileHandler
- import time
- import traceback
- app = FastAPI()
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- RotatingFileHandler('app.log', maxBytes=1024*1024, backupCount=5),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- # 请求日志中间件
- @app.middleware("http")
- async def log_requests(request: Request, call_next):
- start_time = time.time()
-
- # 记录请求信息
- logger.info(f"Request: {request.method} {request.url}")
-
- try:
- response = await call_next(request)
-
- # 记录响应信息
- process_time = time.time() - start_time
- logger.info(f"Response: {response.status_code} - Processed in {process_time:.4f}s")
-
- return response
- except Exception as e:
- # 记录异常信息
- logger.error(f"Exception: {str(e)}")
- logger.error(traceback.format_exc())
-
- # 返回错误响应
- return JSONResponse(
- status_code=500,
- content={"message": "Internal Server Error"}
- )
- # 安全事件日志
- @app.exception_handler(HTTPException)
- async def http_exception_handler(request: Request, exc: HTTPException):
- # 记录安全相关事件
- if exc.status_code in [401, 403, 404]:
- logger.warning(f"Security event: {exc.status_code} - {exc.detail} - IP: {request.client.host}")
-
- return JSONResponse(
- status_code=exc.status_code,
- content={"message": exc.detail}
- )
- @app.get("/")
- async def read_root():
- logger.info("Root endpoint accessed")
- return {"message": "Hello World"}
- @app.get("/secure")
- async def secure_endpoint():
- logger.info("Secure endpoint accessed")
- return {"message": "This is a secure endpoint"}
复制代码
安全测试方法
安全测试是确保 FastAPI 应用安全的重要环节。以下是一些常见的安全测试方法和工具:
单元测试
- import pytest
- from fastapi.testclient import TestClient
- from main import app
- client = TestClient(app)
- def test_root_endpoint():
- response = client.get("/")
- assert response.status_code == 200
- assert response.json() == {"message": "Hello World"}
- def test_secure_endpoint_without_auth():
- response = client.get("/secure")
- assert response.status_code == 401
- def test_secure_endpoint_with_auth():
- # 获取令牌
- response = client.post(
- "/token",
- data={"username": "johndoe", "password": "secret"}
- )
- assert response.status_code == 200
- token = response.json()["access_token"]
-
- # 使用令牌访问安全端点
- response = client.get(
- "/secure",
- headers={"Authorization": f"Bearer {token}"}
- )
- assert response.status_code == 200
- assert response.json() == {"message": "This is a secure endpoint"}
复制代码
集成测试
- import pytest
- from fastapi.testclient import TestClient
- from main import app
- client = TestClient(app)
- def test_sql_injection_protection():
- # 尝试 SQL 注入攻击
- malicious_input = "'; DROP TABLE users; --"
- response = client.post(
- "/users/",
- json={"name": malicious_input, "email": "test@example.com"}
- )
- # 应用应该正确处理输入,而不是执行 SQL 注入
- assert response.status_code == 422 # 验证错误
- def test_xss_protection():
- # 尝试 XSS 攻击
- malicious_input = "<script>alert('XSS')</script>"
- response = client.post(
- "/users/",
- json={"name": malicious_input, "email": "test@example.com"}
- )
- # 应用应该正确处理输入,而不是执行脚本
- assert response.status_code == 422 # 验证错误
复制代码
使用安全测试工具
1. OWASP ZAP:开源的 Web 应用安全扫描器
2. Burp Suite:用于测试 Web 应用安全的集成平台
3. SQLMap:自动化的 SQL 注入检测工具
4. Bandit:Python 代码安全漏洞扫描器
以下是使用 Bandit 扫描 FastAPI 应用的示例:
- # 安装 Bandit
- pip install bandit
- # 扫描 FastAPI 应用
- bandit -r ./
复制代码
最佳实践与安全清单
以下是一些 FastAPI 安全最佳实践和安全清单:
安全清单
1. 认证与授权[ ] 使用强身份验证机制(如 OAuth2)[ ] 实现适当的访问控制[ ] 定期轮换密钥和令牌[ ] 使用安全的密码存储(如 bcrypt)
2. [ ] 使用强身份验证机制(如 OAuth2)
3. [ ] 实现适当的访问控制
4. [ ] 定期轮换密钥和令牌
5. [ ] 使用安全的密码存储(如 bcrypt)
6. 数据保护[ ] 使用 HTTPS 加密所有通信[ ] 加密敏感数据存储[ ] 实现适当的日志记录和监控[ ] 定期备份数据
7. [ ] 使用 HTTPS 加密所有通信
8. [ ] 加密敏感数据存储
9. [ ] 实现适当的日志记录和监控
10. [ ] 定期备份数据
11. 输入验证[ ] 验证所有用户输入[ ] 使用参数化查询防止 SQL 注入[ ] 限制文件上传类型和大小[ ] 实现适当的错误处理
12. [ ] 验证所有用户输入
13. [ ] 使用参数化查询防止 SQL 注入
14. [ ] 限制文件上传类型和大小
15. [ ] 实现适当的错误处理
16. 安全配置[ ] 配置适当的安全头[ ] 实现速率限制[ ] 禁用不必要的服务和功能[ ] 保持依赖项更新
17. [ ] 配置适当的安全头
18. [ ] 实现速率限制
19. [ ] 禁用不必要的服务和功能
20. [ ] 保持依赖项更新
21. 测试与监控[ ] 定期进行安全测试[ ] 实施安全监控和警报[ ] 定期审查日志[ ] 制定安全事件响应计划
22. [ ] 定期进行安全测试
23. [ ] 实施安全监控和警报
24. [ ] 定期审查日志
25. [ ] 制定安全事件响应计划
认证与授权
• [ ] 使用强身份验证机制(如 OAuth2)
• [ ] 实现适当的访问控制
• [ ] 定期轮换密钥和令牌
• [ ] 使用安全的密码存储(如 bcrypt)
数据保护
• [ ] 使用 HTTPS 加密所有通信
• [ ] 加密敏感数据存储
• [ ] 实现适当的日志记录和监控
• [ ] 定期备份数据
输入验证
• [ ] 验证所有用户输入
• [ ] 使用参数化查询防止 SQL 注入
• [ ] 限制文件上传类型和大小
• [ ] 实现适当的错误处理
安全配置
• [ ] 配置适当的安全头
• [ ] 实现速率限制
• [ ] 禁用不必要的服务和功能
• [ ] 保持依赖项更新
测试与监控
• [ ] 定期进行安全测试
• [ ] 实施安全监控和警报
• [ ] 定期审查日志
• [ ] 制定安全事件响应计划
安全最佳实践示例
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
- from fastapi.middleware.trustedhost import TrustedHostMiddleware
- from fastapi.middleware.gzip import GZipMiddleware
- from pydantic import BaseModel, constr, EmailStr
- from passlib.context import CryptContext
- from datetime import datetime, timedelta
- from typing import Optional
- import jwt
- import secrets
- import logging
- from logging.handlers import RotatingFileHandler
- app = FastAPI()
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- RotatingFileHandler('app.log', maxBytes=1024*1024, backupCount=5),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- # 添加安全中间件
- app.add_middleware(HTTPSRedirectMiddleware)
- app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com"])
- app.add_middleware(GZipMiddleware, minimum_size=1000)
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["https://example.com"],
- allow_credentials=True,
- allow_methods=["GET", "POST", "PUT", "DELETE"],
- allow_headers=["Authorization", "Content-Type"],
- max_age=600,
- )
- # 安全配置
- SECRET_KEY = secrets.token_urlsafe(32)
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
- # 密码加密上下文
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- # OAuth2 密码流
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- # 用户模型
- class UserBase(BaseModel):
- username: constr(min_length=3, max_length=20)
- email: EmailStr
- class UserCreate(UserBase):
- password: constr(min_length=8)
- class User(UserBase):
- id: int
- is_active: bool = True
-
- class Config:
- orm_mode = True
- # 模拟数据库
- fake_users_db = {}
- # 工具函数
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password):
- return pwd_context.hash(password)
- def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- # 认证和授权
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- except jwt.PyJWTError:
- raise credentials_exception
-
- user = fake_users_db.get(username)
- if user is None:
- raise credentials_exception
-
- return user
- async def get_current_active_user(current_user: User = Depends(get_current_user)):
- if not current_user.get("is_active", True):
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
- # 端点
- @app.post("/token")
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- user = fake_users_db.get(form_data.username)
- if not user or not verify_password(form_data.password, user["hashed_password"]):
- logger.warning(f"Failed login attempt for username: {form_data.username}")
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user["username"]}, expires_delta=access_token_expires
- )
-
- logger.info(f"User {form_data.username} logged in successfully")
- return {"access_token": access_token, "token_type": "bearer"}
- @app.post("/users/", response_model=User)
- async def create_user(user: UserCreate):
- if user.username in fake_users_db:
- raise HTTPException(
- status_code=400,
- detail="Username already registered"
- )
-
- hashed_password = get_password_hash(user.password)
- user_id = len(fake_users_db) + 1
-
- fake_users_db[user.username] = {
- "id": user_id,
- "username": user.username,
- "email": user.email,
- "hashed_password": hashed_password,
- "is_active": True,
- }
-
- logger.info(f"New user created: {user.username}")
- return fake_users_db[user.username]
- @app.get("/users/me", response_model=User)
- async def read_users_me(current_user: User = Depends(get_current_active_user)):
- return current_user
- # 安全中间件
- @app.middleware("http")
- async def add_security_headers(request, call_next):
- response = await call_next(request)
-
- # 添加安全头
- response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload"
- response.headers["X-Content-Type-Options"] = "nosniff"
- response.headers["X-Frame-Options"] = "DENY"
- response.headers["X-XSS-Protection"] = "1; mode=block"
- response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
- response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; font-src 'self'; connect-src 'self'; frame-ancestors 'none';"
- response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=(), payment=()"
-
- return response
复制代码
结论
FastAPI 提供了丰富的工具和功能来构建安全的 Web 应用程序。通过实施适当的安全策略,如强身份验证和授权机制、数据加密、输入验证、安全配置和持续监控,你可以显著提高 FastAPI 应用的安全性,保护其免受常见网络攻击,并确保数据安全传输。
记住,安全是一个持续的过程,而不是一次性的任务。定期审查和更新你的安全策略,保持对最新安全威胁的了解,并采用最佳实践,是确保你的 FastAPI 应用长期安全的关键。
通过本文介绍的技术和策略,你可以构建一个既强大又安全的 FastAPI 应用,为用户提供安全可靠的服务。 |
|