简体中文 繁體中文 English Deutsch 한국 사람 بالعربية TÜRKÇE português คนไทย Français Japanese

站内搜索

搜索

活动公告

通知:为庆祝网站一周年,将在5.1日与5.2日开放注册,具体信息请见后续详细公告
04-22 00:04
通知:本站资源由网友上传分享,如有违规等问题请到版务模块进行投诉,资源失效请在帖子内回复要求补档,会尽快处理!
10-23 09:31

探索 FastAPI 网络安全性关键策略保护你的应用免受常见网络攻击提升系统防御能力确保数据安全传输

SunJu_FaceMall

3万

主题

1116

科技点

3万

积分

白金月票

碾压王

积分
32766

立华奏

发表于 2025-8-23 00:10:46 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
引言

FastAPI 是一个现代、高性能的 Web 框架,用于构建基于 Python 3.6+ 的 API,它具有出色的性能、易于使用和快速开发的特点。然而,随着网络攻击手段的不断演变,确保 FastAPI 应用的安全性变得至关重要。本文将深入探讨 FastAPI 中的网络安全关键策略,帮助你保护应用免受常见网络攻击,提升系统防御能力,并确保数据安全传输。

FastAPI 中的常见安全威胁

在深入安全策略之前,我们需要了解 FastAPI 应用可能面临的一些常见安全威胁:

1. 身份验证问题:弱密码、不安全的令牌管理和身份验证绕过
2. 授权漏洞:访问控制不当,允许用户访问未授权的资源
3. 数据泄露:敏感数据未经加密传输或存储
4. 注入攻击:SQL 注入、NoSQL 注入、命令注入等
5. 跨站脚本攻击 (XSS):恶意脚本注入到网页中
6. 跨站请求伪造 (CSRF):欺骗用户在已认证的网站上执行非预期操作
7. 跨域资源共享 (CORS) 配置错误:不当的 CORS 策略可能导致安全风险
8. 依赖项漏洞:使用存在已知漏洞的第三方库

FastAPI 安全基础设置

FastAPI 提供了一些基础的安全功能,可以通过简单的配置来增强应用的安全性。首先,让我们看一个基本的 FastAPI 应用,并逐步添加安全功能。
  1. from fastapi import FastAPI
  2. from fastapi.middleware.cors import CORSMiddleware
  3. from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
  4. from fastapi.middleware.trustedhost import TrustedHostMiddleware
  5. app = FastAPI()
  6. # 强制使用 HTTPS
  7. app.add_middleware(HTTPSRedirectMiddleware)
  8. # 限制可访问的主机
  9. app.add_middleware(
  10.     TrustedHostMiddleware,
  11.     allowed_hosts=["example.com", "*.example.com"]
  12. )
  13. # 配置 CORS
  14. app.add_middleware(
  15.     CORSMiddleware,
  16.     allow_origins=["https://example.com"],
  17.     allow_credentials=True,
  18.     allow_methods=["*"],
  19.     allow_headers=["*"],
  20. )
  21. @app.get("/")
  22. def read_root():
  23.     return {"message": "Hello World"}
复制代码

在上面的例子中,我们添加了三个中间件来增强安全性:

1. HTTPSRedirectMiddleware:强制所有请求使用 HTTPS
2. TrustedHostMiddleware:限制可访问的主机,防止 DNS 重新绑定攻击
3. CORSMiddleware:配置跨域资源共享策略

认证与授权策略

认证是验证用户身份的过程,而授权是确定已认证用户是否有权访问特定资源的过程。FastAPI 提供了灵活的安全工具来处理认证和授权。

OAuth2 密码流

OAuth2 是一个广泛使用的授权框架。以下是如何在 FastAPI 中实现 OAuth2 密码流的示例:
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  3. from pydantic import BaseModel
  4. from passlib.context import CryptContext
  5. from datetime import datetime, timedelta
  6. from typing import Optional
  7. import jwt
  8. app = FastAPI()
  9. # 密码加密上下文
  10. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  11. # OAuth2 密码流
  12. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  13. # 密钥和算法
  14. SECRET_KEY = "your-secret-key"
  15. ALGORITHM = "HS256"
  16. ACCESS_TOKEN_EXPIRE_MINUTES = 30
  17. # 用户模型
  18. class User(BaseModel):
  19.     username: str
  20.     email: Optional[str] = None
  21.     full_name: Optional[str] = None
  22.     disabled: Optional[bool] = None
  23. class UserInDB(User):
  24.     hashed_password: str
  25. # 模拟数据库
  26. fake_users_db = {
  27.     "johndoe": {
  28.         "username": "johndoe",
  29.         "full_name": "John Doe",
  30.         "email": "johndoe@example.com",
  31.         "hashed_password": pwd_context.hash("secret"),
  32.         "disabled": False,
  33.     }
  34. }
  35. # 验证密码
  36. def verify_password(plain_password, hashed_password):
  37.     return pwd_context.verify(plain_password, hashed_password)
  38. # 获取用户
  39. def get_user(db, username: str):
  40.     if username in db:
  41.         user_dict = db[username]
  42.         return UserInDB(**user_dict)
  43. # 认证用户
  44. def authenticate_user(fake_db, username: str, password: str):
  45.     user = get_user(fake_db, username)
  46.     if not user:
  47.         return False
  48.     if not verify_password(password, user.hashed_password):
  49.         return False
  50.     return user
  51. # 创建访问令牌
  52. def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
  53.     to_encode = data.copy()
  54.     if expires_delta:
  55.         expire = datetime.utcnow() + expires_delta
  56.     else:
  57.         expire = datetime.utcnow() + timedelta(minutes=15)
  58.     to_encode.update({"exp": expire})
  59.     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  60.     return encoded_jwt
  61. # 获取当前用户
  62. async def get_current_user(token: str = Depends(oauth2_scheme)):
  63.     credentials_exception = HTTPException(
  64.         status_code=status.HTTP_401_UNAUTHORIZED,
  65.         detail="Could not validate credentials",
  66.         headers={"WWW-Authenticate": "Bearer"},
  67.     )
  68.     try:
  69.         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  70.         username: str = payload.get("sub")
  71.         if username is None:
  72.             raise credentials_exception
  73.     except jwt.PyJWTError:
  74.         raise credentials_exception
  75.     user = get_user(fake_users_db, username=username)
  76.     if user is None:
  77.         raise credentials_exception
  78.     return user
  79. # 获取当前活跃用户
  80. async def get_current_active_user(current_user: UserInDB = Depends(get_current_user)):
  81.     if current_user.disabled:
  82.         raise HTTPException(status_code=400, detail="Inactive user")
  83.     return current_user
  84. # 令牌端点
  85. @app.post("/token")
  86. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
  87.     user = authenticate_user(fake_users_db, form_data.username, form_data.password)
  88.     if not user:
  89.         raise HTTPException(
  90.             status_code=status.HTTP_401_UNAUTHORIZED,
  91.             detail="Incorrect username or password",
  92.             headers={"WWW-Authenticate": "Bearer"},
  93.         )
  94.     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  95.     access_token = create_access_token(
  96.         data={"sub": user.username}, expires_delta=access_token_expires
  97.     )
  98.     return {"access_token": access_token, "token_type": "bearer"}
  99. # 受保护的路由
  100. @app.get("/users/me")
  101. async def read_users_me(current_user: User = Depends(get_current_active_user)):
  102.     return current_user
复制代码

这个示例实现了完整的 OAuth2 密码流,包括:

1. 密码哈希和验证
2. JWT 令牌的创建和验证
3. 用户认证和授权
4. 受保护的路由

API 密钥认证

对于简单的 API 认证,可以使用 API 密钥:
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import APIKeyHeader
  3. app = FastAPI()
  4. api_key_header = APIKeyHeader(name="X-API-KEY")
  5. async def get_api_key(api_key: str = Depends(api_key_header)):
  6.     if api_key != "your-secret-api-key":
  7.         raise HTTPException(
  8.             status_code=status.HTTP_401_UNAUTHORIZED,
  9.             detail="Invalid API Key",
  10.         )
  11.     return api_key
  12. @app.get("/protected-route")
  13. async def protected_route(api_key: str = Depends(get_api_key)):
  14.     return {"message": "This is a protected route"}
复制代码

数据加密与安全传输

HTTPS 配置

确保数据在传输过程中加密是至关重要的。在生产环境中,应始终使用 HTTPS。以下是如何在 FastAPI 中配置 HTTPS:
  1. import uvicorn
  2. from fastapi import FastAPI
  3. app = FastAPI()
  4. @app.get("/")
  5. def read_root():
  6.     return {"message": "Hello World"}
  7. if __name__ == "__main__":
  8.     uvicorn.run(
  9.         "main:app",
  10.         host="0.0.0.0",
  11.         port=8000,
  12.         ssl_keyfile="path/to/key.pem",
  13.         ssl_certfile="path/to/cert.pem",
  14.         reload=True
  15.     )
复制代码

敏感数据加密

对于敏感数据,应该在存储前进行加密:
  1. from cryptography.fernet import Fernet
  2. # 生成密钥
  3. key = Fernet.generate_key()
  4. cipher_suite = Fernet(key)
  5. # 加密数据
  6. def encrypt_data(data: str) -> bytes:
  7.     return cipher_suite.encrypt(data.encode())
  8. # 解密数据
  9. def decrypt_data(encrypted_data: bytes) -> str:
  10.     return cipher_suite.decrypt(encrypted_data).decode()
  11. # 使用示例
  12. sensitive_data = "This is a secret message"
  13. encrypted_data = encrypt_data(sensitive_data)
  14. decrypted_data = decrypt_data(encrypted_data)
  15. print(f"Original: {sensitive_data}")
  16. print(f"Encrypted: {encrypted_data}")
  17. print(f"Decrypted: {decrypted_data}")
复制代码

输入验证与 SQL 注入防护

FastAPI 使用 Pydantic 模型进行数据验证,这有助于防止许多类型的注入攻击。以下是如何安全地处理用户输入:
  1. from fastapi import FastAPI, HTTPException, Depends
  2. from pydantic import BaseModel, constr
  3. from typing import Optional
  4. import databases
  5. import sqlalchemy
  6. from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
  7. # 数据库配置
  8. DATABASE_URL = "sqlite:///./test.db"
  9. database = databases.Database(DATABASE_URL)
  10. metadata = MetaData()
  11. users = Table(
  12.     "users",
  13.     metadata,
  14.     Column("id", Integer, primary_key=True),
  15.     Column("name", String(50)),
  16.     Column("email", String(50)),
  17. )
  18. engine = create_engine(DATABASE_URL)
  19. metadata.create_all(engine)
  20. app = FastAPI()
  21. # 用户模型
  22. class UserBase(BaseModel):
  23.     name: constr(min_length=1, max_length=50)
  24.     email: constr(min_length=1, max_length=50)
  25. class UserCreate(UserBase):
  26.     pass
  27. class User(UserBase):
  28.     id: int
  29.    
  30.     class Config:
  31.         orm_mode = True
  32. # 创建用户
  33. @app.post("/users/", response_model=User)
  34. async def create_user(user: UserCreate):
  35.     query = users.insert().values(name=user.name, email=user.email)
  36.     last_record_id = await database.execute(query)
  37.     return {**user.dict(), "id": last_record_id}
  38. # 获取用户
  39. @app.get("/users/{user_id}", response_model=User)
  40. async def read_user(user_id: int):
  41.     query = users.select().where(users.c.id == user_id)
  42.     user = await database.fetch_one(query)
  43.     if user is None:
  44.         raise HTTPException(status_code=404, detail="User not found")
  45.     return user
  46. # 安全搜索用户
  47. @app.get("/users/", response_model=list[User])
  48. async def search_users(name: Optional[str] = None, email: Optional[str] = None):
  49.     query = users.select()
  50.     if name:
  51.         query = query.where(users.c.name == name)
  52.     if email:
  53.         query = query.where(users.c.email == email)
  54.     results = await database.fetch_all(query)
  55.     return results
复制代码

在这个例子中,我们:

1. 使用 Pydantic 模型验证输入数据
2. 使用参数化查询防止 SQL 注入
3. 对输入数据进行长度限制

CORS 与 CSRF 防护

CORS 配置

跨域资源共享 (CORS) 是一种机制,允许 Web 应用服务器进行跨域访问控制。以下是如何在 FastAPI 中安全地配置 CORS:
  1. from fastapi import FastAPI
  2. from fastapi.middleware.cors import CORSMiddleware
  3. app = FastAPI()
  4. # 安全的 CORS 配置
  5. app.add_middleware(
  6.     CORSMiddleware,
  7.     allow_origins=["https://example.com", "https://www.example.com"],  # 只允许特定域名
  8.     allow_credentials=True,
  9.     allow_methods=["GET", "POST", "PUT", "DELETE"],  # 只允许特定方法
  10.     allow_headers=["Authorization", "Content-Type"],  # 只允许特定头部
  11.     max_age=600,  # 预检请求结果的缓存时间
  12. )
复制代码

CSRF 防护

跨站请求伪造 (CSRF) 是一种攻击,迫使最终用户在当前已认证的 Web 应用程序上执行非预期的操作。以下是如何在 FastAPI 中实现 CSRF 防护:
  1. from fastapi import FastAPI, Depends, HTTPException, Request, Response
  2. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  3. from secrets import token_hex
  4. import uuid
  5. app = FastAPI()
  6. security = HTTPBearer()
  7. # 存储 CSRF 令牌的简单字典
  8. csrf_tokens = {}
  9. # 生成 CSRF 令牌
  10. def generate_csrf_token():
  11.     return token_hex(32)
  12. # 验证 CSRF 令牌
  13. def verify_csrf_token(request: Request, csrf_token: str):
  14.     # 从请求头获取 CSRF 令牌
  15.     header_token = request.headers.get("X-CSRF-Token")
  16.    
  17.     # 比较令牌
  18.     if not header_token or header_token != csrf_token:
  19.         raise HTTPException(status_code=403, detail="CSRF token validation failed")
  20. # 获取 CSRF 令牌
  21. @app.get("/csrf-token")
  22. async def get_csrf_token(request: Request):
  23.     csrf_token = generate_csrf_token()
  24.     session_id = str(uuid.uuid4())
  25.     csrf_tokens[session_id] = csrf_token
  26.    
  27.     # 设置包含 CSRF 令牌的 cookie
  28.     response = Response(content={"csrf_token": csrf_token})
  29.     response.set_cookie(
  30.         key="csrf_token",
  31.         value=csrf_token,
  32.         httponly=False,  # 允许 JavaScript 访问
  33.         secure=True,     # 只通过 HTTPS 发送
  34.         samesite="strict"  # 严格的同源策略
  35.     )
  36.     return response
  37. # 受保护的路由
  38. @app.post("/protected-route")
  39. async def protected_route(
  40.     request: Request,
  41.     credentials: HTTPAuthorizationCredentials = Depends(security)
  42. ):
  43.     # 从 cookie 获取 CSRF 令牌
  44.     csrf_token = request.cookies.get("csrf_token")
  45.     if not csrf_token:
  46.         raise HTTPException(status_code=403, detail="CSRF token not found")
  47.    
  48.     # 验证 CSRF 令牌
  49.     verify_csrf_token(request, csrf_token)
  50.    
  51.     return {"message": "CSRF validation passed"}
复制代码

安全 Headers 设置

安全 HTTP 头可以帮助防止某些类型的攻击,如点击劫持、MIME 类型混淆等。以下是如何在 FastAPI 中设置安全头:
  1. from fastapi import FastAPI, Response
  2. from fastapi.middleware.gzip import GZipMiddleware
  3. app = FastAPI()
  4. # 添加 GZip 压缩中间件
  5. app.add_middleware(GZipMiddleware, minimum_size=1000)
  6. @app.middleware("http")
  7. async def add_security_headers(request, call_next):
  8.     response = await call_next(request)
  9.    
  10.     # 添加安全头
  11.     response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload"
  12.     response.headers["X-Content-Type-Options"] = "nosniff"
  13.     response.headers["X-Frame-Options"] = "DENY"
  14.     response.headers["X-XSS-Protection"] = "1; mode=block"
  15.     response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
  16.     response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; font-src 'self'; connect-src 'self'; frame-ancestors 'none';"
  17.     response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=(), payment=()"
  18.    
  19.     return response
  20. @app.get("/")
  21. def read_root():
  22.     return {"message": "Hello World"}
复制代码

这些安全头的含义:

1. Strict-Transport-Security:强制浏览器使用 HTTPS
2. X-Content-Type-Options:防止 MIME 类型混淆攻击
3. X-Frame-Options:防止点击劫持攻击
4. X-XSS-Protection:启用浏览器内置的 XSS 过滤器
5. Referrer-Policy:控制 referrer 信息
6. Content-Security-Policy:防止代码注入攻击
7. Permissions-Policy:控制浏览器功能和 API 的访问权限

依赖注入与中间件的安全应用

FastAPI 的依赖注入系统可以用于实现安全功能,如认证、授权和输入验证。以下是一些示例:

安全依赖项
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer
  3. from typing import List
  4. app = FastAPI()
  5. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  6. # 模拟用户数据库
  7. fake_users_db = {
  8.     "alice": {"username": "alice", "role": "admin"},
  9.     "bob": {"username": "bob", "role": "user"},
  10. }
  11. # 获取当前用户
  12. async def get_current_user(token: str = Depends(oauth2_scheme)):
  13.     # 这里应该验证 token 并获取用户信息
  14.     # 简化示例,直接使用 token 作为用户名
  15.     if token not in fake_users_db:
  16.         raise HTTPException(
  17.             status_code=status.HTTP_401_UNAUTHORIZED,
  18.             detail="Invalid authentication credentials",
  19.             headers={"WWW-Authenticate": "Bearer"},
  20.         )
  21.     return fake_users_db[token]
  22. # 检查用户是否为管理员
  23. async def get_current_admin(current_user: dict = Depends(get_current_user)):
  24.     if current_user["role"] != "admin":
  25.         raise HTTPException(
  26.             status_code=status.HTTP_403_FORBIDDEN,
  27.             detail="Not enough permissions"
  28.         )
  29.     return current_user
  30. # 普通用户路由
  31. @app.get("/user-profile")
  32. async def user_profile(current_user: dict = Depends(get_current_user)):
  33.     return {"username": current_user["username"], "role": current_user["role"]}
  34. # 管理员路由
  35. @app.get("/admin-panel")
  36. async def admin_panel(current_admin: dict = Depends(get_current_admin)):
  37.     return {"message": f"Welcome, {current_admin['username']}! You have admin access."}
复制代码

安全中间件
  1. from fastapi import FastAPI, Request, HTTPException
  2. from fastapi.middleware import Middleware
  3. from fastapi.middleware.base import BaseHTTPMiddleware
  4. import time
  5. app = FastAPI()
  6. # 速率限制中间件
  7. class RateLimitMiddleware(BaseHTTPMiddleware):
  8.     def __init__(self, app, rate_limit: int = 100, time_window: int = 60):
  9.         super().__init__(app)
  10.         self.rate_limit = rate_limit
  11.         self.time_window = time_window
  12.         self.requests = {}
  13.     async def dispatch(self, request: Request, call_next):
  14.         client_ip = request.client.host
  15.         current_time = time.time()
  16.         
  17.         # 清理过期的请求记录
  18.         if client_ip in self.requests:
  19.             self.requests[client_ip] = [
  20.                 req_time for req_time in self.requests[client_ip]
  21.                 if current_time - req_time < self.time_window
  22.             ]
  23.         else:
  24.             self.requests[client_ip] = []
  25.         
  26.         # 检查是否超过速率限制
  27.         if len(self.requests[client_ip]) >= self.rate_limit:
  28.             raise HTTPException(status_code=429, detail="Too many requests")
  29.         
  30.         # 记录当前请求
  31.         self.requests[client_ip].append(current_time)
  32.         
  33.         response = await call_next(request)
  34.         return response
  35. # 添加速率限制中间件
  36. app.add_middleware(RateLimitMiddleware, rate_limit=10, time_window=60)
  37. @app.get("/")
  38. async def read_root():
  39.     return {"message": "Hello World"}
复制代码

日志记录与监控

安全日志记录和监控对于检测和响应安全事件至关重要。以下是如何在 FastAPI 中实现安全日志记录:
  1. from fastapi import FastAPI, Request, HTTPException
  2. from fastapi.responses import JSONResponse
  3. import logging
  4. from logging.handlers import RotatingFileHandler
  5. import time
  6. import traceback
  7. app = FastAPI()
  8. # 配置日志
  9. logging.basicConfig(
  10.     level=logging.INFO,
  11.     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  12.     handlers=[
  13.         RotatingFileHandler('app.log', maxBytes=1024*1024, backupCount=5),
  14.         logging.StreamHandler()
  15.     ]
  16. )
  17. logger = logging.getLogger(__name__)
  18. # 请求日志中间件
  19. @app.middleware("http")
  20. async def log_requests(request: Request, call_next):
  21.     start_time = time.time()
  22.    
  23.     # 记录请求信息
  24.     logger.info(f"Request: {request.method} {request.url}")
  25.    
  26.     try:
  27.         response = await call_next(request)
  28.         
  29.         # 记录响应信息
  30.         process_time = time.time() - start_time
  31.         logger.info(f"Response: {response.status_code} - Processed in {process_time:.4f}s")
  32.         
  33.         return response
  34.     except Exception as e:
  35.         # 记录异常信息
  36.         logger.error(f"Exception: {str(e)}")
  37.         logger.error(traceback.format_exc())
  38.         
  39.         # 返回错误响应
  40.         return JSONResponse(
  41.             status_code=500,
  42.             content={"message": "Internal Server Error"}
  43.         )
  44. # 安全事件日志
  45. @app.exception_handler(HTTPException)
  46. async def http_exception_handler(request: Request, exc: HTTPException):
  47.     # 记录安全相关事件
  48.     if exc.status_code in [401, 403, 404]:
  49.         logger.warning(f"Security event: {exc.status_code} - {exc.detail} - IP: {request.client.host}")
  50.    
  51.     return JSONResponse(
  52.         status_code=exc.status_code,
  53.         content={"message": exc.detail}
  54.     )
  55. @app.get("/")
  56. async def read_root():
  57.     logger.info("Root endpoint accessed")
  58.     return {"message": "Hello World"}
  59. @app.get("/secure")
  60. async def secure_endpoint():
  61.     logger.info("Secure endpoint accessed")
  62.     return {"message": "This is a secure endpoint"}
复制代码

安全测试方法

安全测试是确保 FastAPI 应用安全的重要环节。以下是一些常见的安全测试方法和工具:

单元测试
  1. import pytest
  2. from fastapi.testclient import TestClient
  3. from main import app
  4. client = TestClient(app)
  5. def test_root_endpoint():
  6.     response = client.get("/")
  7.     assert response.status_code == 200
  8.     assert response.json() == {"message": "Hello World"}
  9. def test_secure_endpoint_without_auth():
  10.     response = client.get("/secure")
  11.     assert response.status_code == 401
  12. def test_secure_endpoint_with_auth():
  13.     # 获取令牌
  14.     response = client.post(
  15.         "/token",
  16.         data={"username": "johndoe", "password": "secret"}
  17.     )
  18.     assert response.status_code == 200
  19.     token = response.json()["access_token"]
  20.    
  21.     # 使用令牌访问安全端点
  22.     response = client.get(
  23.         "/secure",
  24.         headers={"Authorization": f"Bearer {token}"}
  25.     )
  26.     assert response.status_code == 200
  27.     assert response.json() == {"message": "This is a secure endpoint"}
复制代码

集成测试
  1. import pytest
  2. from fastapi.testclient import TestClient
  3. from main import app
  4. client = TestClient(app)
  5. def test_sql_injection_protection():
  6.     # 尝试 SQL 注入攻击
  7.     malicious_input = "'; DROP TABLE users; --"
  8.     response = client.post(
  9.         "/users/",
  10.         json={"name": malicious_input, "email": "test@example.com"}
  11.     )
  12.     # 应用应该正确处理输入,而不是执行 SQL 注入
  13.     assert response.status_code == 422  # 验证错误
  14. def test_xss_protection():
  15.     # 尝试 XSS 攻击
  16.     malicious_input = "<script>alert('XSS')</script>"
  17.     response = client.post(
  18.         "/users/",
  19.         json={"name": malicious_input, "email": "test@example.com"}
  20.     )
  21.     # 应用应该正确处理输入,而不是执行脚本
  22.     assert response.status_code == 422  # 验证错误
复制代码

使用安全测试工具

1. OWASP ZAP:开源的 Web 应用安全扫描器
2. Burp Suite:用于测试 Web 应用安全的集成平台
3. SQLMap:自动化的 SQL 注入检测工具
4. Bandit:Python 代码安全漏洞扫描器

以下是使用 Bandit 扫描 FastAPI 应用的示例:
  1. # 安装 Bandit
  2. pip install bandit
  3. # 扫描 FastAPI 应用
  4. bandit -r ./
复制代码

最佳实践与安全清单

以下是一些 FastAPI 安全最佳实践和安全清单:

安全清单

1. 认证与授权[ ] 使用强身份验证机制(如 OAuth2)[ ] 实现适当的访问控制[ ] 定期轮换密钥和令牌[ ] 使用安全的密码存储(如 bcrypt)
2. [ ] 使用强身份验证机制(如 OAuth2)
3. [ ] 实现适当的访问控制
4. [ ] 定期轮换密钥和令牌
5. [ ] 使用安全的密码存储(如 bcrypt)
6. 数据保护[ ] 使用 HTTPS 加密所有通信[ ] 加密敏感数据存储[ ] 实现适当的日志记录和监控[ ] 定期备份数据
7. [ ] 使用 HTTPS 加密所有通信
8. [ ] 加密敏感数据存储
9. [ ] 实现适当的日志记录和监控
10. [ ] 定期备份数据
11. 输入验证[ ] 验证所有用户输入[ ] 使用参数化查询防止 SQL 注入[ ] 限制文件上传类型和大小[ ] 实现适当的错误处理
12. [ ] 验证所有用户输入
13. [ ] 使用参数化查询防止 SQL 注入
14. [ ] 限制文件上传类型和大小
15. [ ] 实现适当的错误处理
16. 安全配置[ ] 配置适当的安全头[ ] 实现速率限制[ ] 禁用不必要的服务和功能[ ] 保持依赖项更新
17. [ ] 配置适当的安全头
18. [ ] 实现速率限制
19. [ ] 禁用不必要的服务和功能
20. [ ] 保持依赖项更新
21. 测试与监控[ ] 定期进行安全测试[ ] 实施安全监控和警报[ ] 定期审查日志[ ] 制定安全事件响应计划
22. [ ] 定期进行安全测试
23. [ ] 实施安全监控和警报
24. [ ] 定期审查日志
25. [ ] 制定安全事件响应计划

认证与授权

• [ ] 使用强身份验证机制(如 OAuth2)
• [ ] 实现适当的访问控制
• [ ] 定期轮换密钥和令牌
• [ ] 使用安全的密码存储(如 bcrypt)

数据保护

• [ ] 使用 HTTPS 加密所有通信
• [ ] 加密敏感数据存储
• [ ] 实现适当的日志记录和监控
• [ ] 定期备份数据

输入验证

• [ ] 验证所有用户输入
• [ ] 使用参数化查询防止 SQL 注入
• [ ] 限制文件上传类型和大小
• [ ] 实现适当的错误处理

安全配置

• [ ] 配置适当的安全头
• [ ] 实现速率限制
• [ ] 禁用不必要的服务和功能
• [ ] 保持依赖项更新

测试与监控

• [ ] 定期进行安全测试
• [ ] 实施安全监控和警报
• [ ] 定期审查日志
• [ ] 制定安全事件响应计划

安全最佳实践示例
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  3. from fastapi.middleware.cors import CORSMiddleware
  4. from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
  5. from fastapi.middleware.trustedhost import TrustedHostMiddleware
  6. from fastapi.middleware.gzip import GZipMiddleware
  7. from pydantic import BaseModel, constr, EmailStr
  8. from passlib.context import CryptContext
  9. from datetime import datetime, timedelta
  10. from typing import Optional
  11. import jwt
  12. import secrets
  13. import logging
  14. from logging.handlers import RotatingFileHandler
  15. app = FastAPI()
  16. # 配置日志
  17. logging.basicConfig(
  18.     level=logging.INFO,
  19.     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  20.     handlers=[
  21.         RotatingFileHandler('app.log', maxBytes=1024*1024, backupCount=5),
  22.         logging.StreamHandler()
  23.     ]
  24. )
  25. logger = logging.getLogger(__name__)
  26. # 添加安全中间件
  27. app.add_middleware(HTTPSRedirectMiddleware)
  28. app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com"])
  29. app.add_middleware(GZipMiddleware, minimum_size=1000)
  30. app.add_middleware(
  31.     CORSMiddleware,
  32.     allow_origins=["https://example.com"],
  33.     allow_credentials=True,
  34.     allow_methods=["GET", "POST", "PUT", "DELETE"],
  35.     allow_headers=["Authorization", "Content-Type"],
  36.     max_age=600,
  37. )
  38. # 安全配置
  39. SECRET_KEY = secrets.token_urlsafe(32)
  40. ALGORITHM = "HS256"
  41. ACCESS_TOKEN_EXPIRE_MINUTES = 30
  42. # 密码加密上下文
  43. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  44. # OAuth2 密码流
  45. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  46. # 用户模型
  47. class UserBase(BaseModel):
  48.     username: constr(min_length=3, max_length=20)
  49.     email: EmailStr
  50. class UserCreate(UserBase):
  51.     password: constr(min_length=8)
  52. class User(UserBase):
  53.     id: int
  54.     is_active: bool = True
  55.    
  56.     class Config:
  57.         orm_mode = True
  58. # 模拟数据库
  59. fake_users_db = {}
  60. # 工具函数
  61. def verify_password(plain_password, hashed_password):
  62.     return pwd_context.verify(plain_password, hashed_password)
  63. def get_password_hash(password):
  64.     return pwd_context.hash(password)
  65. def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
  66.     to_encode = data.copy()
  67.     if expires_delta:
  68.         expire = datetime.utcnow() + expires_delta
  69.     else:
  70.         expire = datetime.utcnow() + timedelta(minutes=15)
  71.     to_encode.update({"exp": expire})
  72.     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  73.     return encoded_jwt
  74. # 认证和授权
  75. async def get_current_user(token: str = Depends(oauth2_scheme)):
  76.     credentials_exception = HTTPException(
  77.         status_code=status.HTTP_401_UNAUTHORIZED,
  78.         detail="Could not validate credentials",
  79.         headers={"WWW-Authenticate": "Bearer"},
  80.     )
  81.     try:
  82.         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  83.         username: str = payload.get("sub")
  84.         if username is None:
  85.             raise credentials_exception
  86.     except jwt.PyJWTError:
  87.         raise credentials_exception
  88.    
  89.     user = fake_users_db.get(username)
  90.     if user is None:
  91.         raise credentials_exception
  92.    
  93.     return user
  94. async def get_current_active_user(current_user: User = Depends(get_current_user)):
  95.     if not current_user.get("is_active", True):
  96.         raise HTTPException(status_code=400, detail="Inactive user")
  97.     return current_user
  98. # 端点
  99. @app.post("/token")
  100. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
  101.     user = fake_users_db.get(form_data.username)
  102.     if not user or not verify_password(form_data.password, user["hashed_password"]):
  103.         logger.warning(f"Failed login attempt for username: {form_data.username}")
  104.         raise HTTPException(
  105.             status_code=status.HTTP_401_UNAUTHORIZED,
  106.             detail="Incorrect username or password",
  107.             headers={"WWW-Authenticate": "Bearer"},
  108.         )
  109.    
  110.     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  111.     access_token = create_access_token(
  112.         data={"sub": user["username"]}, expires_delta=access_token_expires
  113.     )
  114.    
  115.     logger.info(f"User {form_data.username} logged in successfully")
  116.     return {"access_token": access_token, "token_type": "bearer"}
  117. @app.post("/users/", response_model=User)
  118. async def create_user(user: UserCreate):
  119.     if user.username in fake_users_db:
  120.         raise HTTPException(
  121.             status_code=400,
  122.             detail="Username already registered"
  123.         )
  124.    
  125.     hashed_password = get_password_hash(user.password)
  126.     user_id = len(fake_users_db) + 1
  127.    
  128.     fake_users_db[user.username] = {
  129.         "id": user_id,
  130.         "username": user.username,
  131.         "email": user.email,
  132.         "hashed_password": hashed_password,
  133.         "is_active": True,
  134.     }
  135.    
  136.     logger.info(f"New user created: {user.username}")
  137.     return fake_users_db[user.username]
  138. @app.get("/users/me", response_model=User)
  139. async def read_users_me(current_user: User = Depends(get_current_active_user)):
  140.     return current_user
  141. # 安全中间件
  142. @app.middleware("http")
  143. async def add_security_headers(request, call_next):
  144.     response = await call_next(request)
  145.    
  146.     # 添加安全头
  147.     response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload"
  148.     response.headers["X-Content-Type-Options"] = "nosniff"
  149.     response.headers["X-Frame-Options"] = "DENY"
  150.     response.headers["X-XSS-Protection"] = "1; mode=block"
  151.     response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
  152.     response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; font-src 'self'; connect-src 'self'; frame-ancestors 'none';"
  153.     response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=(), payment=()"
  154.    
  155.     return response
复制代码

结论

FastAPI 提供了丰富的工具和功能来构建安全的 Web 应用程序。通过实施适当的安全策略,如强身份验证和授权机制、数据加密、输入验证、安全配置和持续监控,你可以显著提高 FastAPI 应用的安全性,保护其免受常见网络攻击,并确保数据安全传输。

记住,安全是一个持续的过程,而不是一次性的任务。定期审查和更新你的安全策略,保持对最新安全威胁的了解,并采用最佳实践,是确保你的 FastAPI 应用长期安全的关键。

通过本文介绍的技术和策略,你可以构建一个既强大又安全的 FastAPI 应用,为用户提供安全可靠的服务。
「七転び八起き(ななころびやおき)」
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

手机版|联系我们|小黑屋|TG频道|RSS |网站地图

Powered by Pixtech

© 2025-2026 Pixtech Team.

>