|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
引言
FastAPI是近年来Python生态系统中备受关注的一个现代、快速(高性能)的Web框架,用于构建API。它基于Starlette和Pydantic,具有出色的性能、易用性和直观的API设计。本教程将带你从零开始,逐步掌握FastAPI的核心概念和实际应用,通过项目案例和最佳实践,帮助你提升开发技能和就业竞争力。
1. FastAPI简介
1.1 什么是FastAPI
FastAPI是一个现代、快速(高性能)的Web框架,用于构建API,具有Python 3.6+类型提示。它的主要特点包括:
• 高性能:与NodeJS和Go相当的性能,是最快的Python框架之一
• 快速编码:将开发速度提高约200%至300%
• 更少的bug:减少约40%的人为(开发者)错误
• 直观:强大的编辑器支持,自动补全处处可见
• 简单:易于使用和学习,减少阅读文档的时间
• 简短:最小化代码重复,每个参数声明的多个功能
• 健壮:获取可用于生产的代码,具有自动交互式文档
• 基于标准:基于并完全兼容API的开放标准:OpenAPI(以前称为Swagger)和JSON Schema
1.2 FastAPI与其他框架的比较
与Flask和Django等传统Python Web框架相比,FastAPI具有以下优势:
• 性能:FastAPI的性能远超Flask和Django,接近Go和NodeJS的性能水平
• 异步支持:原生支持异步编程,适合高并发场景
• 自动文档:自动生成交互式API文档(Swagger UI)
• 类型提示:利用Python类型提示进行数据验证和序列化
• 现代Python特性:充分利用Python 3.6+的新特性
2. 环境搭建与基础使用
2.1 安装FastAPI
首先,我们需要安装FastAPI和ASGI服务器(如uvicorn):
- pip install fastapi
- pip install "uvicorn[standard]"
复制代码
2.2 创建第一个FastAPI应用
让我们创建一个简单的FastAPI应用:
- # main.py
- from fastapi import FastAPI
- app = FastAPI()
- @app.get("/")
- def read_root():
- return {"Hello": "World"}
- @app.get("/items/{item_id}")
- def read_item(item_id: int, q: str | None = None):
- return {"item_id": item_id, "q": q}
复制代码
运行应用:
- uvicorn main:app --reload
复制代码
访问http://127.0.0.1:8000你将看到{"Hello": "World"}。
访问http://127.0.0.1:8000/items/5?q=somequery,你将看到{"item_id": 5, "q": "somequery"}。
访问http://127.0.0.1:8000/docs,你将看到自动生成的交互式API文档(SwaggerUI)。
2.3 路由参数和查询参数
FastAPI支持路径参数和查询参数的声明:
- from fastapi import FastAPI
- app = FastAPI()
- @app.get("/users/{user_id}/items/{item_id}")
- def read_user_item(
- user_id: int,
- item_id: str,
- q: str | None = None,
- short: bool = False
- ):
- item = {"item_id": item_id, "owner_id": user_id}
- if q:
- item.update({"q": q})
- if not short:
- item.update(
- {"description": "This is an amazing item that has a long description"}
- )
- return item
复制代码
在这个例子中:
• user_id和item_id是路径参数
• q是可选的查询参数
• short是带有默认值的查询参数
3. 请求体和响应模型
3.1 使用Pydantic模型定义请求体
FastAPI使用Pydantic模型来定义请求体的结构:
- from fastapi import FastAPI
- from pydantic import BaseModel
- app = FastAPI()
- class Item(BaseModel):
- name: str
- description: str | None = None
- price: float
- tax: float | None = None
- @app.post("/items/")
- def create_item(item: Item):
- return item
复制代码
3.2 响应模型
你可以使用相同的Pydantic模型来定义响应:
- from fastapi import FastAPI
- from pydantic import BaseModel
- app = FastAPI()
- class Item(BaseModel):
- name: str
- description: str | None = None
- price: float
- tax: float | None = None
- tags: list[str] = []
- @app.post("/items/", response_model=Item)
- def create_item(item: Item):
- return item
复制代码
3.3 响应状态码
FastAPI允许你直接在路径操作中声明响应状态码:
- from fastapi import FastAPI, status
- app = FastAPI()
- @app.post("/items/", status_code=status.HTTP_201_CREATED)
- def create_item(name: str):
- return {"name": name}
复制代码
4. 数据验证与序列化
4.1 数据验证
FastAPI使用Pydantic进行数据验证:
- from fastapi import FastAPI
- from pydantic import BaseModel, Field
- app = FastAPI()
- class Item(BaseModel):
- name: str = Field(..., min_length=1, max_length=50)
- description: str | None = Field(
- None,
- title="The description of the item",
- max_length=300
- )
- price: float = Field(..., gt=0, description="The price must be greater than zero")
- tax: float | None = None
- @app.put("/items/{item_id}")
- def update_item(item_id: int, item: Item):
- return {"item_id": item_id, **item.dict()}
复制代码
4.2 数据序列化
FastAPI自动处理数据的序列化和反序列化:
- from datetime import datetime
- from typing import List
- from fastapi import FastAPI
- from pydantic import BaseModel
- app = FastAPI()
- class User(BaseModel):
- id: int
- name: str = "John Doe"
- signup_ts: datetime | None = None
- friends: List[int] = []
- @app.put("/users/{user_id}")
- def update_user(user_id: int, user: User):
- results = {"user_id": user_id, "user": user}
- return results
复制代码
5. 依赖注入系统
FastAPI有一个强大的依赖注入系统,使你能够轻松地集成组件:
- from fastapi import FastAPI, Depends
- from typing import Annotated
- app = FastAPI()
- async def common_parameters(q: str | None = None, skip: int = 0, limit: int = 100):
- return {"q": q, "skip": skip, "limit": limit}
- @app.get("/users/")
- def read_users(commons: Annotated[dict, Depends(common_parameters)]):
- return commons
- @app.get("/items/")
- def read_items(commons: Annotated[dict, Depends(common_parameters)]):
- return commons
复制代码
5.1 类作为依赖
你也可以使用类作为依赖:
- from fastapi import FastAPI, Depends
- from typing import Annotated
- app = FastAPI()
- class CommonQueryParams:
- def __init__(self, q: str | None = None, skip: int = 0, limit: int = 100):
- self.q = q
- self.skip = skip
- self.limit = limit
- @app.get("/users/")
- def read_users(commons: Annotated[CommonQueryParams, Depends()]):
- response = {}
- if commons.q:
- response.update({"q": commons.q})
- items = [{"item_id": "Foo"}, {"item_id": "Bar"}]
- response.update({"items": items[commons.skip : commons.skip + commons.limit]})
- return response
复制代码
6. 安全性与认证
6.1 OAuth2密码流
FastAPI内置了OAuth2密码流的支持:
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from pydantic import BaseModel
- app = FastAPI()
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- class User(BaseModel):
- username: str
- email: str | None = None
- full_name: str | None = None
- disabled: bool | None = None
- class UserInDB(User):
- hashed_password: str
- def fake_hash_password(password: str):
- return "fakehashed" + password
- users_db = {
- "johndoe": {
- "username": "johndoe",
- "full_name": "John Doe",
- "email": "johndoe@example.com",
- "hashed_password": fake_hash_password("secret"),
- "disabled": False,
- }
- }
- def get_user(db, username: str):
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
- def fake_decode_token(token):
- user = get_user(users_db, token)
- return user
- async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
- user = fake_decode_token(token)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid authentication credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- return user
- @app.post("/token")
- async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
- user_dict = users_db.get(form_data.username)
- if not user_dict:
- raise HTTPException(status_code=400, detail="Incorrect username or password")
- user = UserInDB(**user_dict)
- hashed_password = fake_hash_password(form_data.password)
- if not hashed_password == user.hashed_password:
- raise HTTPException(status_code=400, detail="Incorrect username or password")
- return {"access_token": user.username, "token_type": "bearer"}
- @app.get("/users/me")
- async def read_users_me(current_user: Annotated[User, Depends(get_current_user)]):
- return current_user
复制代码
6.2 JWT令牌认证
使用JWT进行认证:
- from datetime import datetime, timedelta
- from typing import Annotated
- from fastapi import FastAPI, Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from jose import JWTError, jwt
- from passlib.context import CryptContext
- from pydantic import BaseModel
- app = FastAPI()
- # to get a string like this run:
- # openssl rand -hex 32
- SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
- fake_users_db = {
- "johndoe": {
- "username": "johndoe",
- "full_name": "John Doe",
- "email": "johndoe@example.com",
- "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
- "disabled": False,
- }
- }
- class Token(BaseModel):
- access_token: str
- token_type: str
- class TokenData(BaseModel):
- username: str | None = None
- class User(BaseModel):
- username: str
- email: str | None = None
- full_name: str | None = None
- disabled: bool | None = None
- class UserInDB(User):
- hashed_password: str
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password):
- return pwd_context.hash(password)
- def get_user(db, username: str):
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
- def authenticate_user(fake_db, username: str, password: str):
- user = get_user(fake_db, username)
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
- def create_access_token(data: dict, expires_delta: timedelta | None = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = TokenData(username=username)
- except JWTError:
- raise credentials_exception
- user = get_user(fake_users_db, username=token_data.username)
- if user is None:
- raise credentials_exception
- return user
- async def get_current_active_user(
- current_user: Annotated[User, Depends(get_current_user)]
- ):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
- @app.post("/token", response_model=Token)
- async def login_for_access_token(
- form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
- ):
- user = authenticate_user(fake_users_db, form_data.username, form_data.password)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Incorrect username or password",
- headers={"WWW-Authenticate": "Bearer"},
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {"access_token": access_token, "token_type": "bearer"}
- @app.get("/users/me/", response_model=User)
- async def read_users_me(
- current_user: Annotated[User, Depends(get_current_active_user)]
- ):
- return current_user
- @app.get("/users/me/items/")
- async def read_own_items(
- current_user: Annotated[User, Depends(get_current_active_user)]
- ):
- return [{"item_id": "Foo", "owner": current_user.username}]
复制代码
7. 中间件和CORS
7.1 中间件
FastAPI支持中间件,可以在请求处理前后执行代码:
- from fastapi import FastAPI, Request
- import time
- app = FastAPI()
- @app.middleware("http")
- async def add_process_time_header(request: Request, call_next):
- start_time = time.time()
- response = await call_next(request)
- process_time = time.time() - start_time
- response.headers["X-Process-Time"] = str(process_time)
- return response
- @app.get("/")
- async def main():
- return {"message": "Hello World"}
复制代码
7.2 CORS(跨域资源共享)
处理跨域请求:
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- app = FastAPI()
- origins = [
- "http://localhost.tiangolo.com",
- "https://localhost.tiangolo.com",
- "http://localhost",
- "http://localhost:8080",
- ]
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.get("/")
- async def main():
- return {"message": "Hello World"}
复制代码
8. 数据库集成
8.1 SQLAlchemy集成
使用SQLAlchemy与数据库交互:
- from fastapi import FastAPI, Depends, HTTPException
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker, Session
- from pydantic import BaseModel
- SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
- engine = create_engine(
- SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
- )
- SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
- Base = declarative_base()
- app = FastAPI()
- # Dependency
- def get_db():
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
- # Pydantic models
- class ItemBase(BaseModel):
- title: str
- description: str | None = None
- class ItemCreate(ItemBase):
- pass
- class Item(ItemBase):
- id: int
- owner_id: int
- class Config:
- orm_mode = True
- # SQLAlchemy models
- class ItemModel(Base):
- __tablename__ = "items"
- id = Column(Integer, primary_key=True, index=True)
- title = Column(String, index=True)
- description = Column(String, index=True)
- owner_id = Column(Integer, ForeignKey("users.id"))
- # Create tables
- Base.metadata.create_all(bind=engine)
- @app.post("/items/", response_model=Item)
- def create_item(item: ItemCreate, db: Session = Depends(get_db)):
- db_item = ItemModel(**item.dict())
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- @app.get("/items/{item_id}", response_model=Item)
- def read_item(item_id: int, db: Session = Depends(get_db)):
- db_item = db.query(ItemModel).filter(ItemModel.id == item_id).first()
- if db_item is None:
- raise HTTPException(status_code=404, detail="Item not found")
- return db_item
复制代码
8.2 异步数据库操作
使用asyncpg和SQLAlchemy Core进行异步数据库操作:
- from fastapi import FastAPI
- from databases import Database
- import sqlalchemy
- app = FastAPI()
- DATABASE_URL = "postgresql://user:password@postgresserver/db"
- database = Database(DATABASE_URL)
- metadata = sqlalchemy.MetaData()
- notes = sqlalchemy.Table(
- "notes",
- metadata,
- sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
- sqlalchemy.Column("text", sqlalchemy.String),
- sqlalchemy.Column("completed", sqlalchemy.Boolean),
- )
- engine = sqlalchemy.create_engine(DATABASE_URL)
- metadata.create_all(engine)
- @app.on_event("startup")
- async def startup():
- await database.connect()
- @app.on_event("shutdown")
- async def shutdown():
- await database.disconnect()
- @app.get("/notes/")
- async def read_notes():
- query = notes.select()
- return await database.fetch_all(query)
- @app.post("/notes/")
- async def create_note(note: dict):
- query = notes.insert().values(text=note["text"], completed=note["completed"])
- last_record_id = await database.execute(query)
- return {**note, "id": last_record_id}
复制代码
9. 测试FastAPI应用
9.1 使用TestClient进行测试
FastAPI提供了TestClient用于测试:
- from fastapi import FastAPI
- from fastapi.testclient import TestClient
- app = FastAPI()
- @app.get("/")
- async def read_main():
- return {"msg": "Hello World"}
- client = TestClient(app)
- def test_read_main():
- response = client.get("/")
- assert response.status_code == 200
- assert response.json() == {"msg": "Hello World"}
复制代码
9.2 测试带依赖的应用
测试带有依赖的应用:
- from fastapi import FastAPI, Depends
- from fastapi.testclient import TestClient
- app = FastAPI()
- async def common_parameters(q: str | None = None, skip: int = 0, limit: int = 100):
- return {"q": q, "skip": skip, "limit": limit}
- @app.get("/users/")
- def read_users(commons: dict = Depends(common_parameters)):
- return commons
- client = TestClient(app)
- def test_read_users():
- response = client.get("/users/")
- assert response.status_code == 200
- assert response.json() == {"q": None, "skip": 0, "limit": 100}
- def test_read_users_with_params():
- response = client.get("/users/?q=foo&skip=5&limit=10")
- assert response.status_code == 200
- assert response.json() == {"q": "foo", "skip": 5, "limit": 10}
复制代码
10. 项目案例:构建一个完整的博客API
让我们通过一个完整的博客API项目来应用我们学到的知识。
10.1 项目结构
- blog_api/
- ├── app/
- │ ├── __init__.py
- │ ├── main.py
- │ ├── database.py
- │ ├── models.py
- │ ├── schemas.py
- │ ├── crud.py
- │ └── api/
- │ ├── __init__.py
- │ ├── deps.py
- │ └── endpoints/
- │ ├── __init__.py
- │ ├── users.py
- │ ├── posts.py
- │ └── comments.py
- ├── alembic.ini
- ├── requirements.txt
- └── tests/
- ├── __init__.py
- ├── conftest.py
- └── test_api.py
复制代码
10.2 数据库模型
- # app/models.py
- from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text, DateTime
- from sqlalchemy.orm import relationship
- from sqlalchemy.sql import func
- from .database import Base
- class User(Base):
- __tablename__ = "users"
- id = Column(Integer, primary_key=True, index=True)
- email = Column(String, unique=True, index=True)
- username = Column(String, unique=True, index=True)
- hashed_password = Column(String)
- is_active = Column(Boolean, default=True)
- created_at = Column(DateTime(timezone=True), server_default=func.now())
- posts = relationship("Post", back_populates="author")
- comments = relationship("Comment", back_populates="author")
- class Post(Base):
- __tablename__ = "posts"
- id = Column(Integer, primary_key=True, index=True)
- title = Column(String, index=True)
- content = Column(Text)
- author_id = Column(Integer, ForeignKey("users.id"))
- created_at = Column(DateTime(timezone=True), server_default=func.now())
- updated_at = Column(DateTime(timezone=True), onupdate=func.now())
- author = relationship("User", back_populates="posts")
- comments = relationship("Comment", back_populates="post")
- class Comment(Base):
- __tablename__ = "comments"
- id = Column(Integer, primary_key=True, index=True)
- content = Column(Text)
- post_id = Column(Integer, ForeignKey("posts.id"))
- author_id = Column(Integer, ForeignKey("users.id"))
- created_at = Column(DateTime(timezone=True), server_default=func.now())
- post = relationship("Post", back_populates="comments")
- author = relationship("User", back_populates="comments")
复制代码
10.3 Pydantic模型
- # app/schemas.py
- from pydantic import BaseModel, EmailStr
- from typing import List, Optional
- from datetime import datetime
- # User schemas
- class UserBase(BaseModel):
- email: EmailStr
- username: str
- class UserCreate(UserBase):
- password: str
- class UserUpdate(UserBase):
- password: Optional[str] = None
- class User(UserBase):
- id: int
- is_active: bool
- created_at: datetime
- class Config:
- orm_mode = True
- # Post schemas
- class PostBase(BaseModel):
- title: str
- content: str
- class PostCreate(PostBase):
- pass
- class PostUpdate(PostBase):
- pass
- class Post(PostBase):
- id: int
- author_id: int
- created_at: datetime
- updated_at: Optional[datetime] = None
- author: User
- class Config:
- orm_mode = True
- # Comment schemas
- class CommentBase(BaseModel):
- content: str
- class CommentCreate(CommentBase):
- post_id: int
- class CommentUpdate(CommentBase):
- pass
- class Comment(CommentBase):
- id: int
- post_id: int
- author_id: int
- created_at: datetime
- author: User
- class Config:
- orm_mode = True
复制代码
10.4 CRUD操作
- # app/crud.py
- from sqlalchemy.orm import Session
- from . import models, schemas
- from passlib.context import CryptContext
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- def get_password_hash(password: str):
- return pwd_context.hash(password)
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- # User CRUD
- def get_user(db: Session, user_id: int):
- return db.query(models.User).filter(models.User.id == user_id).first()
- def get_user_by_email(db: Session, email: str):
- return db.query(models.User).filter(models.User.email == email).first()
- def get_user_by_username(db: Session, username: str):
- return db.query(models.User).filter(models.User.username == username).first()
- def get_users(db: Session, skip: int = 0, limit: int = 100):
- return db.query(models.User).offset(skip).limit(limit).all()
- def create_user(db: Session, user: schemas.UserCreate):
- hashed_password = get_password_hash(user.password)
- db_user = models.User(
- email=user.email,
- username=user.username,
- hashed_password=hashed_password
- )
- db.add(db_user)
- db.commit()
- db.refresh(db_user)
- return db_user
- def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
- db_user = get_user(db, user_id)
- if not db_user:
- return None
-
- update_data = user.dict(exclude_unset=True)
- if "password" in update_data:
- update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
-
- for field, value in update_data.items():
- setattr(db_user, field, value)
-
- db.commit()
- db.refresh(db_user)
- return db_user
- def delete_user(db: Session, user_id: int):
- db_user = get_user(db, user_id)
- if not db_user:
- return None
-
- db.delete(db_user)
- db.commit()
- return db_user
- # Post CRUD
- def get_post(db: Session, post_id: int):
- return db.query(models.Post).filter(models.Post.id == post_id).first()
- def get_posts(db: Session, skip: int = 0, limit: int = 100):
- return db.query(models.Post).offset(skip).limit(limit).all()
- def get_posts_by_author(db: Session, author_id: int, skip: int = 0, limit: int = 100):
- return db.query(models.Post).filter(models.Post.author_id == author_id).offset(skip).limit(limit).all()
- def create_post(db: Session, post: schemas.PostCreate, author_id: int):
- db_post = models.Post(**post.dict(), author_id=author_id)
- db.add(db_post)
- db.commit()
- db.refresh(db_post)
- return db_post
- def update_post(db: Session, post_id: int, post: schemas.PostUpdate):
- db_post = get_post(db, post_id)
- if not db_post:
- return None
-
- update_data = post.dict(exclude_unset=True)
- for field, value in update_data.items():
- setattr(db_post, field, value)
-
- db.commit()
- db.refresh(db_post)
- return db_post
- def delete_post(db: Session, post_id: int):
- db_post = get_post(db, post_id)
- if not db_post:
- return None
-
- db.delete(db_post)
- db.commit()
- return db_post
- # Comment CRUD
- def get_comment(db: Session, comment_id: int):
- return db.query(models.Comment).filter(models.Comment.id == comment_id).first()
- def get_comments(db: Session, skip: int = 0, limit: int = 100):
- return db.query(models.Comment).offset(skip).limit(limit).all()
- def get_comments_by_post(db: Session, post_id: int, skip: int = 0, limit: int = 100):
- return db.query(models.Comment).filter(models.Comment.post_id == post_id).offset(skip).limit(limit).all()
- def get_comments_by_author(db: Session, author_id: int, skip: int = 0, limit: int = 100):
- return db.query(models.Comment).filter(models.Comment.author_id == author_id).offset(skip).limit(limit).all()
- def create_comment(db: Session, comment: schemas.CommentCreate, author_id: int):
- db_comment = models.Comment(**comment.dict(), author_id=author_id)
- db.add(db_comment)
- db.commit()
- db.refresh(db_comment)
- return db_comment
- def update_comment(db: Session, comment_id: int, comment: schemas.CommentUpdate):
- db_comment = get_comment(db, comment_id)
- if not db_comment:
- return None
-
- update_data = comment.dict(exclude_unset=True)
- for field, value in update_data.items():
- setattr(db_comment, field, value)
-
- db.commit()
- db.refresh(db_comment)
- return db_comment
- def delete_comment(db: Session, comment_id: int):
- db_comment = get_comment(db, comment_id)
- if not db_comment:
- return None
-
- db.delete(db_comment)
- db.commit()
- return db_comment
复制代码
10.5 API端点
- # app/api/endpoints/users.py
- from fastapi import APIRouter, Depends, HTTPException, status
- from sqlalchemy.orm import Session
- from typing import List
- from .. import crud, schemas, models
- from ..database import get_db
- from .deps import get_current_user
- router = APIRouter()
- @router.post("/", response_model=schemas.User)
- def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
- db_user = crud.get_user_by_email(db, email=user.email)
- if db_user:
- raise HTTPException(
- status_code=400,
- detail="Email already registered"
- )
- db_user = crud.get_user_by_username(db, username=user.username)
- if db_user:
- raise HTTPException(
- status_code=400,
- detail="Username already taken"
- )
- return crud.create_user(db=db, user=user)
- @router.get("/", response_model=List[schemas.User])
- def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
- users = crud.get_users(db, skip=skip, limit=limit)
- return users
- @router.get("/me", response_model=schemas.User)
- def read_users_me(current_user: models.User = Depends(get_current_user)):
- return current_user
- @router.get("/{user_id}", response_model=schemas.User)
- def read_user(user_id: int, db: Session = Depends(get_db)):
- db_user = crud.get_user(db, user_id=user_id)
- if db_user is None:
- raise HTTPException(status_code=404, detail="User not found")
- return db_user
- @router.put("/me", response_model=schemas.User)
- def update_user_me(
- user: schemas.UserUpdate,
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- updated_user = crud.update_user(db, user_id=current_user.id, user=user)
- if not updated_user:
- raise HTTPException(status_code=404, detail="User not found")
- return updated_user
- @router.delete("/me", response_model=schemas.User)
- def delete_user_me(
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- deleted_user = crud.delete_user(db, user_id=current_user.id)
- if not deleted_user:
- raise HTTPException(status_code=404, detail="User not found")
- return deleted_user
复制代码- # app/api/endpoints/posts.py
- from fastapi import APIRouter, Depends, HTTPException, status
- from sqlalchemy.orm import Session
- from typing import List
- from .. import crud, schemas, models
- from ..database import get_db
- from .deps import get_current_user
- router = APIRouter()
- @router.post("/", response_model=schemas.Post)
- def create_post(
- post: schemas.PostCreate,
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- return crud.create_post(db=db, post=post, author_id=current_user.id)
- @router.get("/", response_model=List[schemas.Post])
- def read_posts(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
- posts = crud.get_posts(db, skip=skip, limit=limit)
- return posts
- @router.get("/{post_id}", response_model=schemas.Post)
- def read_post(post_id: int, db: Session = Depends(get_db)):
- db_post = crud.get_post(db, post_id=post_id)
- if db_post is None:
- raise HTTPException(status_code=404, detail="Post not found")
- return db_post
- @router.put("/{post_id}", response_model=schemas.Post)
- def update_post(
- post_id: int,
- post: schemas.PostUpdate,
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- db_post = crud.get_post(db, post_id=post_id)
- if db_post is None:
- raise HTTPException(status_code=404, detail="Post not found")
- if db_post.author_id != current_user.id:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not authorized to update this post"
- )
- updated_post = crud.update_post(db, post_id=post_id, post=post)
- if not updated_post:
- raise HTTPException(status_code=404, detail="Post not found")
- return updated_post
- @router.delete("/{post_id}", response_model=schemas.Post)
- def delete_post(
- post_id: int,
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- db_post = crud.get_post(db, post_id=post_id)
- if db_post is None:
- raise HTTPException(status_code=404, detail="Post not found")
- if db_post.author_id != current_user.id:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not authorized to delete this post"
- )
- deleted_post = crud.delete_post(db, post_id=post_id)
- if not deleted_post:
- raise HTTPException(status_code=404, detail="Post not found")
- return deleted_post
复制代码- # app/api/endpoints/comments.py
- from fastapi import APIRouter, Depends, HTTPException, status
- from sqlalchemy.orm import Session
- from typing import List
- from .. import crud, schemas, models
- from ..database import get_db
- from .deps import get_current_user
- router = APIRouter()
- @router.post("/", response_model=schemas.Comment)
- def create_comment(
- comment: schemas.CommentCreate,
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- # Check if post exists
- db_post = crud.get_post(db, post_id=comment.post_id)
- if db_post is None:
- raise HTTPException(status_code=404, detail="Post not found")
-
- return crud.create_comment(db=db, comment=comment, author_id=current_user.id)
- @router.get("/", response_model=List[schemas.Comment])
- def read_comments(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
- comments = crud.get_comments(db, skip=skip, limit=limit)
- return comments
- @router.get("/post/{post_id}", response_model=List[schemas.Comment])
- def read_comments_by_post(
- post_id: int,
- skip: int = 0,
- limit: int = 100,
- db: Session = Depends(get_db)
- ):
- # Check if post exists
- db_post = crud.get_post(db, post_id=post_id)
- if db_post is None:
- raise HTTPException(status_code=404, detail="Post not found")
-
- comments = crud.get_comments_by_post(db, post_id=post_id, skip=skip, limit=limit)
- return comments
- @router.get("/{comment_id}", response_model=schemas.Comment)
- def read_comment(comment_id: int, db: Session = Depends(get_db)):
- db_comment = crud.get_comment(db, comment_id=comment_id)
- if db_comment is None:
- raise HTTPException(status_code=404, detail="Comment not found")
- return db_comment
- @router.put("/{comment_id}", response_model=schemas.Comment)
- def update_comment(
- comment_id: int,
- comment: schemas.CommentUpdate,
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- db_comment = crud.get_comment(db, comment_id=comment_id)
- if db_comment is None:
- raise HTTPException(status_code=404, detail="Comment not found")
- if db_comment.author_id != current_user.id:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not authorized to update this comment"
- )
- updated_comment = crud.update_comment(db, comment_id=comment_id, comment=comment)
- if not updated_comment:
- raise HTTPException(status_code=404, detail="Comment not found")
- return updated_comment
- @router.delete("/{comment_id}", response_model=schemas.Comment)
- def delete_comment(
- comment_id: int,
- db: Session = Depends(get_db),
- current_user: models.User = Depends(get_current_user)
- ):
- db_comment = crud.get_comment(db, comment_id=comment_id)
- if db_comment is None:
- raise HTTPException(status_code=404, detail="Comment not found")
- if db_comment.author_id != current_user.id:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="Not authorized to delete this comment"
- )
- deleted_comment = crud.delete_comment(db, comment_id=comment_id)
- if not deleted_comment:
- raise HTTPException(status_code=404, detail="Comment not found")
- return deleted_comment
复制代码
10.6 主应用
- # app/main.py
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- from . import models
- from .database import engine
- from .api.api_v1.api import api_router
- # Create tables
- models.Base.metadata.create_all(bind=engine)
- app = FastAPI(title="Blog API", openapi_url="/api/v1/openapi.json")
- # Set up CORS middleware
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # Include API router
- app.include_router(api_router, prefix="/api/v1")
- @app.get("/")
- async def root():
- return {"message": "Welcome to Blog API. See /api/v1/docs for API documentation."}
复制代码
10.7 依赖项
- # app/api/deps.py
- from fastapi import Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- from sqlalchemy.orm import Session
- from jose import JWTError, jwt
- from ..database import get_db
- from .. import crud, models, schemas
- from ..config import settings
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
- def get_current_user(
- db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
- ) -> models.User:
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(
- token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
- )
- user_id: int = payload.get("sub")
- if user_id is None:
- raise credentials_exception
- token_data = schemas.TokenPayload(sub=user_id)
- except JWTError:
- raise credentials_exception
- user = crud.get_user(db, user_id=token_data.sub)
- if user is None:
- raise credentials_exception
- return user
- def get_current_active_user(
- current_user: models.User = Depends(get_current_user),
- ) -> models.User:
- if not crud.is_active(current_user):
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
复制代码
11. 最佳实践
11.1 项目结构组织
• 使用分层架构,将应用分为不同的层(API、业务逻辑、数据访问)
• 使用模块化设计,将相关功能组织在一起
• 使用依赖注入来管理组件之间的依赖关系
11.2 错误处理
- from fastapi import FastAPI, Request, HTTPException
- from fastapi.responses import JSONResponse
- from fastapi.exceptions import RequestValidationError
- from starlette.exceptions import HTTPException as StarletteHTTPException
- app = FastAPI()
- @app.exception_handler(StarletteHTTPException)
- async def http_exception_handler(request: Request, exc: StarletteHTTPException):
- return JSONResponse(
- status_code=exc.status_code,
- content={"message": exc.detail},
- )
- @app.exception_handler(RequestValidationError)
- async def validation_exception_handler(request: Request, exc: RequestValidationError):
- return JSONResponse(
- status_code=422,
- content={"message": "Validation error", "details": exc.errors()},
- )
- @app.exception_handler(Exception)
- async def general_exception_handler(request: Request, exc: Exception):
- return JSONResponse(
- status_code=500,
- content={"message": "Internal server error"},
- )
复制代码
11.3 配置管理
- # app/config.py
- from pydantic import BaseSettings, AnyHttpUrl, EmailStr, validator
- from typing import List, Optional
- class Settings(BaseSettings):
- API_V1_STR: str = "/api/v1"
- PROJECT_NAME: str = "Blog API"
-
- # CORS
- BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = [
- "http://localhost:8000",
- "http://localhost:3000",
- ]
- @validator("BACKEND_CORS_ORIGINS", pre=True)
- def assemble_cors_origins(cls, v):
- if isinstance(v, str) and not v.startswith("["):
- return [i.strip() for i in v.split(",")]
- elif isinstance(v, (list, str)):
- return v
- raise ValueError(v)
- # JWT
- SECRET_KEY: str = "your-secret-key-here"
- ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
- ALGORITHM: str = "HS256"
-
- # Database
- DATABASE_URL: str = "sqlite:///./blog.db"
-
- # Email
- SMTP_TLS: bool = True
- SMTP_PORT: Optional[int] = None
- SMTP_HOST: Optional[str] = None
- SMTP_USER: Optional[str] = None
- SMTP_PASSWORD: Optional[str] = None
- EMAILS_FROM_EMAIL: Optional[EmailStr] = None
- EMAILS_FROM_NAME: Optional[str] = None
-
- @validator("EMAILS_FROM_NAME")
- def get_project_name(cls, v):
- if not v:
- return settings.PROJECT_NAME
- return v
- EMAIL_RESET_TOKEN_EXPIRE_HOURS: int = 48
- EMAIL_TEMPLATES_DIR: str = "/app/app/email-templates/build"
- EMAILS_ENABLED: bool = False
- @validator("EMAILS_ENABLED")
- def get_emails_enabled(cls, v, values):
- return bool(
- values.get("SMTP_HOST")
- and values.get("SMTP_PORT")
- and values.get("EMAILS_FROM_EMAIL")
- )
- # Test user
- FIRST_SUPERUSER: EmailStr = "admin@example.com"
- FIRST_SUPERUSER_PASSWORD: str = "changethis"
- class Config:
- case_sensitive = True
- settings = Settings()
复制代码
11.4 日志记录
- import logging
- from fastapi import FastAPI
- from fastapi.logger import logger
- app = FastAPI()
- # Configure logging
- gunicorn_logger = logging.getLogger('gunicorn.error')
- logger.handlers = gunicorn_logger.handlers
- if __name__ != "main__":
- logger.setLevel(gunicorn_logger.level)
- else:
- logger.setLevel(logging.DEBUG)
- @app.get("/")
- async def root():
- logger.info("Processing request to root endpoint")
- return {"message": "Hello World"}
复制代码
11.5 性能优化
• 使用异步数据库驱动(如asyncpg、aiomysql)
• 使用缓存(如Redis)
• 使用CDN加速静态资源
• 使用连接池管理数据库连接
• 使用异步任务队列(如Celery)处理耗时操作
12. 部署FastAPI应用
12.1 使用Docker部署
- # Dockerfile
- FROM python:3.9
- WORKDIR /app
- COPY requirements.txt .
- RUN pip install --no-cache-dir -r requirements.txt
- COPY . .
- CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
复制代码- # docker-compose.yml
- version: '3.8'
- services:
- web:
- build: .
- ports:
- - "8000:8000"
- depends_on:
- - db
- environment:
- - DATABASE_URL=postgresql://fastapi:fastapi@db:5432/blog_db
- db:
- image: postgres:13
- environment:
- - POSTGRES_USER=fastapi
- - POSTGRES_PASSWORD=fastapi
- - POSTGRES_DB=blog_db
- volumes:
- - postgres_data:/var/lib/postgresql/data/
- volumes:
- postgres_data:
复制代码
12.2 使用Gunicorn部署
- pip install gunicorn uvicorn
- gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker
复制代码
12.3 使用Nginx作为反向代理
- # nginx.conf
- server {
- listen 80;
- server_name your_domain.com;
- location / {
- proxy_pass http://127.0.0.1:8000;
- proxy_set_header Host $host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- }
- }
复制代码
13. 提升开发技能和就业竞争力
13.1 深入学习FastAPI
• 阅读FastAPI官方文档
• 参与FastAPI开源项目
• 关注FastAPI的更新和新特性
• 尝试实现FastAPI的扩展和插件
13.2 构建项目组合
• 使用FastAPI构建不同类型的项目(博客API、电商API、社交网络API等)
• 将项目部署到云平台(如AWS、Google Cloud、Azure)
• 为项目编写完整的文档和测试
• 将项目开源并分享到GitHub
13.3 学习相关技术
• 深入学习Python异步编程
• 学习数据库设计和优化
• 学习容器化技术(Docker、Kubernetes)
• 学习CI/CD流程和工具
• 学习云原生应用开发
13.4 参与社区和交流
• 加入FastAPI社区(Discord、Reddit、Stack Overflow)
• 参加技术会议和meetup
• 写博客分享你的学习经验
• 在GitHub上贡献代码
结论
FastAPI是一个强大、现代且高性能的Web框架,特别适合构建API。通过本教程,你学习了FastAPI的核心概念、特性和最佳实践,并通过一个完整的博客API项目案例,了解了如何使用FastAPI构建实际应用。
FastAPI的优秀性能、自动文档生成、类型提示和依赖注入系统等特性,使其成为Python开发者的理想选择。通过深入学习和实践FastAPI,你将能够构建高效、可维护的Web应用,并提升你的开发技能和就业竞争力。
继续探索FastAPI的更多可能性,构建出色的应用,并在社区中分享你的经验。祝你在FastAPI的学习之路上取得成功! |
|