简体中文 繁體中文 English Deutsch 한국 사람 بالعربية TÜRKÇE português คนไทย Français Japanese

站内搜索

搜索

活动公告

通知:为庆祝网站一周年,将在5.1日与5.2日开放注册,具体信息请见后续详细公告
04-22 00:04
通知:本站资源由网友上传分享,如有违规等问题请到版务模块进行投诉,资源失效请在帖子内回复要求补档,会尽快处理!
10-23 09:31

FastAPI Python开发实战教程快速构建现代Web应用的完整指南包含项目案例和最佳实践助你提升开发技能和就业竞争力

SunJu_FaceMall

3万

主题

1116

科技点

3万

积分

白金月票

碾压王

积分
32766

立华奏

发表于 2025-8-23 00:50:47 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
引言

FastAPI是近年来Python生态系统中备受关注的一个现代、快速(高性能)的Web框架,用于构建API。它基于Starlette和Pydantic,具有出色的性能、易用性和直观的API设计。本教程将带你从零开始,逐步掌握FastAPI的核心概念和实际应用,通过项目案例和最佳实践,帮助你提升开发技能和就业竞争力。

1. FastAPI简介

1.1 什么是FastAPI

FastAPI是一个现代、快速(高性能)的Web框架,用于构建API,具有Python 3.6+类型提示。它的主要特点包括:

• 高性能:与NodeJS和Go相当的性能,是最快的Python框架之一
• 快速编码:将开发速度提高约200%至300%
• 更少的bug:减少约40%的人为(开发者)错误
• 直观:强大的编辑器支持,自动补全处处可见
• 简单:易于使用和学习,减少阅读文档的时间
• 简短:最小化代码重复,每个参数声明的多个功能
• 健壮:获取可用于生产的代码,具有自动交互式文档
• 基于标准:基于并完全兼容API的开放标准:OpenAPI(以前称为Swagger)和JSON Schema

1.2 FastAPI与其他框架的比较

与Flask和Django等传统Python Web框架相比,FastAPI具有以下优势:

• 性能:FastAPI的性能远超Flask和Django,接近Go和NodeJS的性能水平
• 异步支持:原生支持异步编程,适合高并发场景
• 自动文档:自动生成交互式API文档(Swagger UI)
• 类型提示:利用Python类型提示进行数据验证和序列化
• 现代Python特性:充分利用Python 3.6+的新特性

2. 环境搭建与基础使用

2.1 安装FastAPI

首先,我们需要安装FastAPI和ASGI服务器(如uvicorn):
  1. pip install fastapi
  2. pip install "uvicorn[standard]"
复制代码

2.2 创建第一个FastAPI应用

让我们创建一个简单的FastAPI应用:
  1. # main.py
  2. from fastapi import FastAPI
  3. app = FastAPI()
  4. @app.get("/")
  5. def read_root():
  6.     return {"Hello": "World"}
  7. @app.get("/items/{item_id}")
  8. def read_item(item_id: int, q: str | None = None):
  9.     return {"item_id": item_id, "q": q}
复制代码

运行应用:
  1. uvicorn main:app --reload
复制代码

访问http://127.0.0.1:8000你将看到{"Hello": "World"}。

访问http://127.0.0.1:8000/items/5?q=somequery,你将看到{"item_id": 5, "q": "somequery"}。

访问http://127.0.0.1:8000/docs,你将看到自动生成的交互式API文档(SwaggerUI)。

2.3 路由参数和查询参数

FastAPI支持路径参数和查询参数的声明:
  1. from fastapi import FastAPI
  2. app = FastAPI()
  3. @app.get("/users/{user_id}/items/{item_id}")
  4. def read_user_item(
  5.     user_id: int,
  6.     item_id: str,
  7.     q: str | None = None,
  8.     short: bool = False
  9. ):
  10.     item = {"item_id": item_id, "owner_id": user_id}
  11.     if q:
  12.         item.update({"q": q})
  13.     if not short:
  14.         item.update(
  15.             {"description": "This is an amazing item that has a long description"}
  16.         )
  17.     return item
复制代码

在这个例子中:

• user_id和item_id是路径参数
• q是可选的查询参数
• short是带有默认值的查询参数

3. 请求体和响应模型

3.1 使用Pydantic模型定义请求体

FastAPI使用Pydantic模型来定义请求体的结构:
  1. from fastapi import FastAPI
  2. from pydantic import BaseModel
  3. app = FastAPI()
  4. class Item(BaseModel):
  5.     name: str
  6.     description: str | None = None
  7.     price: float
  8.     tax: float | None = None
  9. @app.post("/items/")
  10. def create_item(item: Item):
  11.     return item
复制代码

3.2 响应模型

你可以使用相同的Pydantic模型来定义响应:
  1. from fastapi import FastAPI
  2. from pydantic import BaseModel
  3. app = FastAPI()
  4. class Item(BaseModel):
  5.     name: str
  6.     description: str | None = None
  7.     price: float
  8.     tax: float | None = None
  9.     tags: list[str] = []
  10. @app.post("/items/", response_model=Item)
  11. def create_item(item: Item):
  12.     return item
复制代码

3.3 响应状态码

FastAPI允许你直接在路径操作中声明响应状态码:
  1. from fastapi import FastAPI, status
  2. app = FastAPI()
  3. @app.post("/items/", status_code=status.HTTP_201_CREATED)
  4. def create_item(name: str):
  5.     return {"name": name}
复制代码

4. 数据验证与序列化

4.1 数据验证

FastAPI使用Pydantic进行数据验证:
  1. from fastapi import FastAPI
  2. from pydantic import BaseModel, Field
  3. app = FastAPI()
  4. class Item(BaseModel):
  5.     name: str = Field(..., min_length=1, max_length=50)
  6.     description: str | None = Field(
  7.         None,
  8.         title="The description of the item",
  9.         max_length=300
  10.     )
  11.     price: float = Field(..., gt=0, description="The price must be greater than zero")
  12.     tax: float | None = None
  13. @app.put("/items/{item_id}")
  14. def update_item(item_id: int, item: Item):
  15.     return {"item_id": item_id, **item.dict()}
复制代码

4.2 数据序列化

FastAPI自动处理数据的序列化和反序列化:
  1. from datetime import datetime
  2. from typing import List
  3. from fastapi import FastAPI
  4. from pydantic import BaseModel
  5. app = FastAPI()
  6. class User(BaseModel):
  7.     id: int
  8.     name: str = "John Doe"
  9.     signup_ts: datetime | None = None
  10.     friends: List[int] = []
  11. @app.put("/users/{user_id}")
  12. def update_user(user_id: int, user: User):
  13.     results = {"user_id": user_id, "user": user}
  14.     return results
复制代码

5. 依赖注入系统

FastAPI有一个强大的依赖注入系统,使你能够轻松地集成组件:
  1. from fastapi import FastAPI, Depends
  2. from typing import Annotated
  3. app = FastAPI()
  4. async def common_parameters(q: str | None = None, skip: int = 0, limit: int = 100):
  5.     return {"q": q, "skip": skip, "limit": limit}
  6. @app.get("/users/")
  7. def read_users(commons: Annotated[dict, Depends(common_parameters)]):
  8.     return commons
  9. @app.get("/items/")
  10. def read_items(commons: Annotated[dict, Depends(common_parameters)]):
  11.     return commons
复制代码

5.1 类作为依赖

你也可以使用类作为依赖:
  1. from fastapi import FastAPI, Depends
  2. from typing import Annotated
  3. app = FastAPI()
  4. class CommonQueryParams:
  5.     def __init__(self, q: str | None = None, skip: int = 0, limit: int = 100):
  6.         self.q = q
  7.         self.skip = skip
  8.         self.limit = limit
  9. @app.get("/users/")
  10. def read_users(commons: Annotated[CommonQueryParams, Depends()]):
  11.     response = {}
  12.     if commons.q:
  13.         response.update({"q": commons.q})
  14.     items = [{"item_id": "Foo"}, {"item_id": "Bar"}]
  15.     response.update({"items": items[commons.skip : commons.skip + commons.limit]})
  16.     return response
复制代码

6. 安全性与认证

6.1 OAuth2密码流

FastAPI内置了OAuth2密码流的支持:
  1. from fastapi import FastAPI, Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  3. from pydantic import BaseModel
  4. app = FastAPI()
  5. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  6. class User(BaseModel):
  7.     username: str
  8.     email: str | None = None
  9.     full_name: str | None = None
  10.     disabled: bool | None = None
  11. class UserInDB(User):
  12.     hashed_password: str
  13. def fake_hash_password(password: str):
  14.     return "fakehashed" + password
  15. users_db = {
  16.     "johndoe": {
  17.         "username": "johndoe",
  18.         "full_name": "John Doe",
  19.         "email": "johndoe@example.com",
  20.         "hashed_password": fake_hash_password("secret"),
  21.         "disabled": False,
  22.     }
  23. }
  24. def get_user(db, username: str):
  25.     if username in db:
  26.         user_dict = db[username]
  27.         return UserInDB(**user_dict)
  28. def fake_decode_token(token):
  29.     user = get_user(users_db, token)
  30.     return user
  31. async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
  32.     user = fake_decode_token(token)
  33.     if not user:
  34.         raise HTTPException(
  35.             status_code=status.HTTP_401_UNAUTHORIZED,
  36.             detail="Invalid authentication credentials",
  37.             headers={"WWW-Authenticate": "Bearer"},
  38.         )
  39.     return user
  40. @app.post("/token")
  41. async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
  42.     user_dict = users_db.get(form_data.username)
  43.     if not user_dict:
  44.         raise HTTPException(status_code=400, detail="Incorrect username or password")
  45.     user = UserInDB(**user_dict)
  46.     hashed_password = fake_hash_password(form_data.password)
  47.     if not hashed_password == user.hashed_password:
  48.         raise HTTPException(status_code=400, detail="Incorrect username or password")
  49.     return {"access_token": user.username, "token_type": "bearer"}
  50. @app.get("/users/me")
  51. async def read_users_me(current_user: Annotated[User, Depends(get_current_user)]):
  52.     return current_user
复制代码

6.2 JWT令牌认证

使用JWT进行认证:
  1. from datetime import datetime, timedelta
  2. from typing import Annotated
  3. from fastapi import FastAPI, Depends, HTTPException, status
  4. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  5. from jose import JWTError, jwt
  6. from passlib.context import CryptContext
  7. from pydantic import BaseModel
  8. app = FastAPI()
  9. # to get a string like this run:
  10. # openssl rand -hex 32
  11. SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
  12. ALGORITHM = "HS256"
  13. ACCESS_TOKEN_EXPIRE_MINUTES = 30
  14. fake_users_db = {
  15.     "johndoe": {
  16.         "username": "johndoe",
  17.         "full_name": "John Doe",
  18.         "email": "johndoe@example.com",
  19.         "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
  20.         "disabled": False,
  21.     }
  22. }
  23. class Token(BaseModel):
  24.     access_token: str
  25.     token_type: str
  26. class TokenData(BaseModel):
  27.     username: str | None = None
  28. class User(BaseModel):
  29.     username: str
  30.     email: str | None = None
  31.     full_name: str | None = None
  32.     disabled: bool | None = None
  33. class UserInDB(User):
  34.     hashed_password: str
  35. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  36. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  37. def verify_password(plain_password, hashed_password):
  38.     return pwd_context.verify(plain_password, hashed_password)
  39. def get_password_hash(password):
  40.     return pwd_context.hash(password)
  41. def get_user(db, username: str):
  42.     if username in db:
  43.         user_dict = db[username]
  44.         return UserInDB(**user_dict)
  45. def authenticate_user(fake_db, username: str, password: str):
  46.     user = get_user(fake_db, username)
  47.     if not user:
  48.         return False
  49.     if not verify_password(password, user.hashed_password):
  50.         return False
  51.     return user
  52. def create_access_token(data: dict, expires_delta: timedelta | None = None):
  53.     to_encode = data.copy()
  54.     if expires_delta:
  55.         expire = datetime.utcnow() + expires_delta
  56.     else:
  57.         expire = datetime.utcnow() + timedelta(minutes=15)
  58.     to_encode.update({"exp": expire})
  59.     encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  60.     return encoded_jwt
  61. async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
  62.     credentials_exception = HTTPException(
  63.         status_code=status.HTTP_401_UNAUTHORIZED,
  64.         detail="Could not validate credentials",
  65.         headers={"WWW-Authenticate": "Bearer"},
  66.     )
  67.     try:
  68.         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  69.         username: str = payload.get("sub")
  70.         if username is None:
  71.             raise credentials_exception
  72.         token_data = TokenData(username=username)
  73.     except JWTError:
  74.         raise credentials_exception
  75.     user = get_user(fake_users_db, username=token_data.username)
  76.     if user is None:
  77.         raise credentials_exception
  78.     return user
  79. async def get_current_active_user(
  80.     current_user: Annotated[User, Depends(get_current_user)]
  81. ):
  82.     if current_user.disabled:
  83.         raise HTTPException(status_code=400, detail="Inactive user")
  84.     return current_user
  85. @app.post("/token", response_model=Token)
  86. async def login_for_access_token(
  87.     form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
  88. ):
  89.     user = authenticate_user(fake_users_db, form_data.username, form_data.password)
  90.     if not user:
  91.         raise HTTPException(
  92.             status_code=status.HTTP_401_UNAUTHORIZED,
  93.             detail="Incorrect username or password",
  94.             headers={"WWW-Authenticate": "Bearer"},
  95.         )
  96.     access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  97.     access_token = create_access_token(
  98.         data={"sub": user.username}, expires_delta=access_token_expires
  99.     )
  100.     return {"access_token": access_token, "token_type": "bearer"}
  101. @app.get("/users/me/", response_model=User)
  102. async def read_users_me(
  103.     current_user: Annotated[User, Depends(get_current_active_user)]
  104. ):
  105.     return current_user
  106. @app.get("/users/me/items/")
  107. async def read_own_items(
  108.     current_user: Annotated[User, Depends(get_current_active_user)]
  109. ):
  110.     return [{"item_id": "Foo", "owner": current_user.username}]
复制代码

7. 中间件和CORS

7.1 中间件

FastAPI支持中间件,可以在请求处理前后执行代码:
  1. from fastapi import FastAPI, Request
  2. import time
  3. app = FastAPI()
  4. @app.middleware("http")
  5. async def add_process_time_header(request: Request, call_next):
  6.     start_time = time.time()
  7.     response = await call_next(request)
  8.     process_time = time.time() - start_time
  9.     response.headers["X-Process-Time"] = str(process_time)
  10.     return response
  11. @app.get("/")
  12. async def main():
  13.     return {"message": "Hello World"}
复制代码

7.2 CORS(跨域资源共享)

处理跨域请求:
  1. from fastapi import FastAPI
  2. from fastapi.middleware.cors import CORSMiddleware
  3. app = FastAPI()
  4. origins = [
  5.     "http://localhost.tiangolo.com",
  6.     "https://localhost.tiangolo.com",
  7.     "http://localhost",
  8.     "http://localhost:8080",
  9. ]
  10. app.add_middleware(
  11.     CORSMiddleware,
  12.     allow_origins=origins,
  13.     allow_credentials=True,
  14.     allow_methods=["*"],
  15.     allow_headers=["*"],
  16. )
  17. @app.get("/")
  18. async def main():
  19.     return {"message": "Hello World"}
复制代码

8. 数据库集成

8.1 SQLAlchemy集成

使用SQLAlchemy与数据库交互:
  1. from fastapi import FastAPI, Depends, HTTPException
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy.orm import sessionmaker, Session
  5. from pydantic import BaseModel
  6. SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
  7. engine = create_engine(
  8.     SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
  9. )
  10. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  11. Base = declarative_base()
  12. app = FastAPI()
  13. # Dependency
  14. def get_db():
  15.     db = SessionLocal()
  16.     try:
  17.         yield db
  18.     finally:
  19.         db.close()
  20. # Pydantic models
  21. class ItemBase(BaseModel):
  22.     title: str
  23.     description: str | None = None
  24. class ItemCreate(ItemBase):
  25.     pass
  26. class Item(ItemBase):
  27.     id: int
  28.     owner_id: int
  29.     class Config:
  30.         orm_mode = True
  31. # SQLAlchemy models
  32. class ItemModel(Base):
  33.     __tablename__ = "items"
  34.     id = Column(Integer, primary_key=True, index=True)
  35.     title = Column(String, index=True)
  36.     description = Column(String, index=True)
  37.     owner_id = Column(Integer, ForeignKey("users.id"))
  38. # Create tables
  39. Base.metadata.create_all(bind=engine)
  40. @app.post("/items/", response_model=Item)
  41. def create_item(item: ItemCreate, db: Session = Depends(get_db)):
  42.     db_item = ItemModel(**item.dict())
  43.     db.add(db_item)
  44.     db.commit()
  45.     db.refresh(db_item)
  46.     return db_item
  47. @app.get("/items/{item_id}", response_model=Item)
  48. def read_item(item_id: int, db: Session = Depends(get_db)):
  49.     db_item = db.query(ItemModel).filter(ItemModel.id == item_id).first()
  50.     if db_item is None:
  51.         raise HTTPException(status_code=404, detail="Item not found")
  52.     return db_item
复制代码

8.2 异步数据库操作

使用asyncpg和SQLAlchemy Core进行异步数据库操作:
  1. from fastapi import FastAPI
  2. from databases import Database
  3. import sqlalchemy
  4. app = FastAPI()
  5. DATABASE_URL = "postgresql://user:password@postgresserver/db"
  6. database = Database(DATABASE_URL)
  7. metadata = sqlalchemy.MetaData()
  8. notes = sqlalchemy.Table(
  9.     "notes",
  10.     metadata,
  11.     sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
  12.     sqlalchemy.Column("text", sqlalchemy.String),
  13.     sqlalchemy.Column("completed", sqlalchemy.Boolean),
  14. )
  15. engine = sqlalchemy.create_engine(DATABASE_URL)
  16. metadata.create_all(engine)
  17. @app.on_event("startup")
  18. async def startup():
  19.     await database.connect()
  20. @app.on_event("shutdown")
  21. async def shutdown():
  22.     await database.disconnect()
  23. @app.get("/notes/")
  24. async def read_notes():
  25.     query = notes.select()
  26.     return await database.fetch_all(query)
  27. @app.post("/notes/")
  28. async def create_note(note: dict):
  29.     query = notes.insert().values(text=note["text"], completed=note["completed"])
  30.     last_record_id = await database.execute(query)
  31.     return {**note, "id": last_record_id}
复制代码

9. 测试FastAPI应用

9.1 使用TestClient进行测试

FastAPI提供了TestClient用于测试:
  1. from fastapi import FastAPI
  2. from fastapi.testclient import TestClient
  3. app = FastAPI()
  4. @app.get("/")
  5. async def read_main():
  6.     return {"msg": "Hello World"}
  7. client = TestClient(app)
  8. def test_read_main():
  9.     response = client.get("/")
  10.     assert response.status_code == 200
  11.     assert response.json() == {"msg": "Hello World"}
复制代码

9.2 测试带依赖的应用

测试带有依赖的应用:
  1. from fastapi import FastAPI, Depends
  2. from fastapi.testclient import TestClient
  3. app = FastAPI()
  4. async def common_parameters(q: str | None = None, skip: int = 0, limit: int = 100):
  5.     return {"q": q, "skip": skip, "limit": limit}
  6. @app.get("/users/")
  7. def read_users(commons: dict = Depends(common_parameters)):
  8.     return commons
  9. client = TestClient(app)
  10. def test_read_users():
  11.     response = client.get("/users/")
  12.     assert response.status_code == 200
  13.     assert response.json() == {"q": None, "skip": 0, "limit": 100}
  14. def test_read_users_with_params():
  15.     response = client.get("/users/?q=foo&skip=5&limit=10")
  16.     assert response.status_code == 200
  17.     assert response.json() == {"q": "foo", "skip": 5, "limit": 10}
复制代码

10. 项目案例:构建一个完整的博客API

让我们通过一个完整的博客API项目来应用我们学到的知识。

10.1 项目结构
  1. blog_api/
  2. ├── app/
  3. │   ├── __init__.py
  4. │   ├── main.py
  5. │   ├── database.py
  6. │   ├── models.py
  7. │   ├── schemas.py
  8. │   ├── crud.py
  9. │   └── api/
  10. │       ├── __init__.py
  11. │       ├── deps.py
  12. │       └── endpoints/
  13. │           ├── __init__.py
  14. │           ├── users.py
  15. │           ├── posts.py
  16. │           └── comments.py
  17. ├── alembic.ini
  18. ├── requirements.txt
  19. └── tests/
  20.     ├── __init__.py
  21.     ├── conftest.py
  22.     └── test_api.py
复制代码

10.2 数据库模型
  1. # app/models.py
  2. from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text, DateTime
  3. from sqlalchemy.orm import relationship
  4. from sqlalchemy.sql import func
  5. from .database import Base
  6. class User(Base):
  7.     __tablename__ = "users"
  8.     id = Column(Integer, primary_key=True, index=True)
  9.     email = Column(String, unique=True, index=True)
  10.     username = Column(String, unique=True, index=True)
  11.     hashed_password = Column(String)
  12.     is_active = Column(Boolean, default=True)
  13.     created_at = Column(DateTime(timezone=True), server_default=func.now())
  14.     posts = relationship("Post", back_populates="author")
  15.     comments = relationship("Comment", back_populates="author")
  16. class Post(Base):
  17.     __tablename__ = "posts"
  18.     id = Column(Integer, primary_key=True, index=True)
  19.     title = Column(String, index=True)
  20.     content = Column(Text)
  21.     author_id = Column(Integer, ForeignKey("users.id"))
  22.     created_at = Column(DateTime(timezone=True), server_default=func.now())
  23.     updated_at = Column(DateTime(timezone=True), onupdate=func.now())
  24.     author = relationship("User", back_populates="posts")
  25.     comments = relationship("Comment", back_populates="post")
  26. class Comment(Base):
  27.     __tablename__ = "comments"
  28.     id = Column(Integer, primary_key=True, index=True)
  29.     content = Column(Text)
  30.     post_id = Column(Integer, ForeignKey("posts.id"))
  31.     author_id = Column(Integer, ForeignKey("users.id"))
  32.     created_at = Column(DateTime(timezone=True), server_default=func.now())
  33.     post = relationship("Post", back_populates="comments")
  34.     author = relationship("User", back_populates="comments")
复制代码

10.3 Pydantic模型
  1. # app/schemas.py
  2. from pydantic import BaseModel, EmailStr
  3. from typing import List, Optional
  4. from datetime import datetime
  5. # User schemas
  6. class UserBase(BaseModel):
  7.     email: EmailStr
  8.     username: str
  9. class UserCreate(UserBase):
  10.     password: str
  11. class UserUpdate(UserBase):
  12.     password: Optional[str] = None
  13. class User(UserBase):
  14.     id: int
  15.     is_active: bool
  16.     created_at: datetime
  17.     class Config:
  18.         orm_mode = True
  19. # Post schemas
  20. class PostBase(BaseModel):
  21.     title: str
  22.     content: str
  23. class PostCreate(PostBase):
  24.     pass
  25. class PostUpdate(PostBase):
  26.     pass
  27. class Post(PostBase):
  28.     id: int
  29.     author_id: int
  30.     created_at: datetime
  31.     updated_at: Optional[datetime] = None
  32.     author: User
  33.     class Config:
  34.         orm_mode = True
  35. # Comment schemas
  36. class CommentBase(BaseModel):
  37.     content: str
  38. class CommentCreate(CommentBase):
  39.     post_id: int
  40. class CommentUpdate(CommentBase):
  41.     pass
  42. class Comment(CommentBase):
  43.     id: int
  44.     post_id: int
  45.     author_id: int
  46.     created_at: datetime
  47.     author: User
  48.     class Config:
  49.         orm_mode = True
复制代码

10.4 CRUD操作
  1. # app/crud.py
  2. from sqlalchemy.orm import Session
  3. from . import models, schemas
  4. from passlib.context import CryptContext
  5. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  6. def get_password_hash(password: str):
  7.     return pwd_context.hash(password)
  8. def verify_password(plain_password, hashed_password):
  9.     return pwd_context.verify(plain_password, hashed_password)
  10. # User CRUD
  11. def get_user(db: Session, user_id: int):
  12.     return db.query(models.User).filter(models.User.id == user_id).first()
  13. def get_user_by_email(db: Session, email: str):
  14.     return db.query(models.User).filter(models.User.email == email).first()
  15. def get_user_by_username(db: Session, username: str):
  16.     return db.query(models.User).filter(models.User.username == username).first()
  17. def get_users(db: Session, skip: int = 0, limit: int = 100):
  18.     return db.query(models.User).offset(skip).limit(limit).all()
  19. def create_user(db: Session, user: schemas.UserCreate):
  20.     hashed_password = get_password_hash(user.password)
  21.     db_user = models.User(
  22.         email=user.email,
  23.         username=user.username,
  24.         hashed_password=hashed_password
  25.     )
  26.     db.add(db_user)
  27.     db.commit()
  28.     db.refresh(db_user)
  29.     return db_user
  30. def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
  31.     db_user = get_user(db, user_id)
  32.     if not db_user:
  33.         return None
  34.    
  35.     update_data = user.dict(exclude_unset=True)
  36.     if "password" in update_data:
  37.         update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
  38.    
  39.     for field, value in update_data.items():
  40.         setattr(db_user, field, value)
  41.    
  42.     db.commit()
  43.     db.refresh(db_user)
  44.     return db_user
  45. def delete_user(db: Session, user_id: int):
  46.     db_user = get_user(db, user_id)
  47.     if not db_user:
  48.         return None
  49.    
  50.     db.delete(db_user)
  51.     db.commit()
  52.     return db_user
  53. # Post CRUD
  54. def get_post(db: Session, post_id: int):
  55.     return db.query(models.Post).filter(models.Post.id == post_id).first()
  56. def get_posts(db: Session, skip: int = 0, limit: int = 100):
  57.     return db.query(models.Post).offset(skip).limit(limit).all()
  58. def get_posts_by_author(db: Session, author_id: int, skip: int = 0, limit: int = 100):
  59.     return db.query(models.Post).filter(models.Post.author_id == author_id).offset(skip).limit(limit).all()
  60. def create_post(db: Session, post: schemas.PostCreate, author_id: int):
  61.     db_post = models.Post(**post.dict(), author_id=author_id)
  62.     db.add(db_post)
  63.     db.commit()
  64.     db.refresh(db_post)
  65.     return db_post
  66. def update_post(db: Session, post_id: int, post: schemas.PostUpdate):
  67.     db_post = get_post(db, post_id)
  68.     if not db_post:
  69.         return None
  70.    
  71.     update_data = post.dict(exclude_unset=True)
  72.     for field, value in update_data.items():
  73.         setattr(db_post, field, value)
  74.    
  75.     db.commit()
  76.     db.refresh(db_post)
  77.     return db_post
  78. def delete_post(db: Session, post_id: int):
  79.     db_post = get_post(db, post_id)
  80.     if not db_post:
  81.         return None
  82.    
  83.     db.delete(db_post)
  84.     db.commit()
  85.     return db_post
  86. # Comment CRUD
  87. def get_comment(db: Session, comment_id: int):
  88.     return db.query(models.Comment).filter(models.Comment.id == comment_id).first()
  89. def get_comments(db: Session, skip: int = 0, limit: int = 100):
  90.     return db.query(models.Comment).offset(skip).limit(limit).all()
  91. def get_comments_by_post(db: Session, post_id: int, skip: int = 0, limit: int = 100):
  92.     return db.query(models.Comment).filter(models.Comment.post_id == post_id).offset(skip).limit(limit).all()
  93. def get_comments_by_author(db: Session, author_id: int, skip: int = 0, limit: int = 100):
  94.     return db.query(models.Comment).filter(models.Comment.author_id == author_id).offset(skip).limit(limit).all()
  95. def create_comment(db: Session, comment: schemas.CommentCreate, author_id: int):
  96.     db_comment = models.Comment(**comment.dict(), author_id=author_id)
  97.     db.add(db_comment)
  98.     db.commit()
  99.     db.refresh(db_comment)
  100.     return db_comment
  101. def update_comment(db: Session, comment_id: int, comment: schemas.CommentUpdate):
  102.     db_comment = get_comment(db, comment_id)
  103.     if not db_comment:
  104.         return None
  105.    
  106.     update_data = comment.dict(exclude_unset=True)
  107.     for field, value in update_data.items():
  108.         setattr(db_comment, field, value)
  109.    
  110.     db.commit()
  111.     db.refresh(db_comment)
  112.     return db_comment
  113. def delete_comment(db: Session, comment_id: int):
  114.     db_comment = get_comment(db, comment_id)
  115.     if not db_comment:
  116.         return None
  117.    
  118.     db.delete(db_comment)
  119.     db.commit()
  120.     return db_comment
复制代码

10.5 API端点
  1. # app/api/endpoints/users.py
  2. from fastapi import APIRouter, Depends, HTTPException, status
  3. from sqlalchemy.orm import Session
  4. from typing import List
  5. from .. import crud, schemas, models
  6. from ..database import get_db
  7. from .deps import get_current_user
  8. router = APIRouter()
  9. @router.post("/", response_model=schemas.User)
  10. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  11.     db_user = crud.get_user_by_email(db, email=user.email)
  12.     if db_user:
  13.         raise HTTPException(
  14.             status_code=400,
  15.             detail="Email already registered"
  16.         )
  17.     db_user = crud.get_user_by_username(db, username=user.username)
  18.     if db_user:
  19.         raise HTTPException(
  20.             status_code=400,
  21.             detail="Username already taken"
  22.         )
  23.     return crud.create_user(db=db, user=user)
  24. @router.get("/", response_model=List[schemas.User])
  25. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  26.     users = crud.get_users(db, skip=skip, limit=limit)
  27.     return users
  28. @router.get("/me", response_model=schemas.User)
  29. def read_users_me(current_user: models.User = Depends(get_current_user)):
  30.     return current_user
  31. @router.get("/{user_id}", response_model=schemas.User)
  32. def read_user(user_id: int, db: Session = Depends(get_db)):
  33.     db_user = crud.get_user(db, user_id=user_id)
  34.     if db_user is None:
  35.         raise HTTPException(status_code=404, detail="User not found")
  36.     return db_user
  37. @router.put("/me", response_model=schemas.User)
  38. def update_user_me(
  39.     user: schemas.UserUpdate,
  40.     db: Session = Depends(get_db),
  41.     current_user: models.User = Depends(get_current_user)
  42. ):
  43.     updated_user = crud.update_user(db, user_id=current_user.id, user=user)
  44.     if not updated_user:
  45.         raise HTTPException(status_code=404, detail="User not found")
  46.     return updated_user
  47. @router.delete("/me", response_model=schemas.User)
  48. def delete_user_me(
  49.     db: Session = Depends(get_db),
  50.     current_user: models.User = Depends(get_current_user)
  51. ):
  52.     deleted_user = crud.delete_user(db, user_id=current_user.id)
  53.     if not deleted_user:
  54.         raise HTTPException(status_code=404, detail="User not found")
  55.     return deleted_user
复制代码
  1. # app/api/endpoints/posts.py
  2. from fastapi import APIRouter, Depends, HTTPException, status
  3. from sqlalchemy.orm import Session
  4. from typing import List
  5. from .. import crud, schemas, models
  6. from ..database import get_db
  7. from .deps import get_current_user
  8. router = APIRouter()
  9. @router.post("/", response_model=schemas.Post)
  10. def create_post(
  11.     post: schemas.PostCreate,
  12.     db: Session = Depends(get_db),
  13.     current_user: models.User = Depends(get_current_user)
  14. ):
  15.     return crud.create_post(db=db, post=post, author_id=current_user.id)
  16. @router.get("/", response_model=List[schemas.Post])
  17. def read_posts(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  18.     posts = crud.get_posts(db, skip=skip, limit=limit)
  19.     return posts
  20. @router.get("/{post_id}", response_model=schemas.Post)
  21. def read_post(post_id: int, db: Session = Depends(get_db)):
  22.     db_post = crud.get_post(db, post_id=post_id)
  23.     if db_post is None:
  24.         raise HTTPException(status_code=404, detail="Post not found")
  25.     return db_post
  26. @router.put("/{post_id}", response_model=schemas.Post)
  27. def update_post(
  28.     post_id: int,
  29.     post: schemas.PostUpdate,
  30.     db: Session = Depends(get_db),
  31.     current_user: models.User = Depends(get_current_user)
  32. ):
  33.     db_post = crud.get_post(db, post_id=post_id)
  34.     if db_post is None:
  35.         raise HTTPException(status_code=404, detail="Post not found")
  36.     if db_post.author_id != current_user.id:
  37.         raise HTTPException(
  38.             status_code=status.HTTP_403_FORBIDDEN,
  39.             detail="Not authorized to update this post"
  40.         )
  41.     updated_post = crud.update_post(db, post_id=post_id, post=post)
  42.     if not updated_post:
  43.         raise HTTPException(status_code=404, detail="Post not found")
  44.     return updated_post
  45. @router.delete("/{post_id}", response_model=schemas.Post)
  46. def delete_post(
  47.     post_id: int,
  48.     db: Session = Depends(get_db),
  49.     current_user: models.User = Depends(get_current_user)
  50. ):
  51.     db_post = crud.get_post(db, post_id=post_id)
  52.     if db_post is None:
  53.         raise HTTPException(status_code=404, detail="Post not found")
  54.     if db_post.author_id != current_user.id:
  55.         raise HTTPException(
  56.             status_code=status.HTTP_403_FORBIDDEN,
  57.             detail="Not authorized to delete this post"
  58.         )
  59.     deleted_post = crud.delete_post(db, post_id=post_id)
  60.     if not deleted_post:
  61.         raise HTTPException(status_code=404, detail="Post not found")
  62.     return deleted_post
复制代码
  1. # app/api/endpoints/comments.py
  2. from fastapi import APIRouter, Depends, HTTPException, status
  3. from sqlalchemy.orm import Session
  4. from typing import List
  5. from .. import crud, schemas, models
  6. from ..database import get_db
  7. from .deps import get_current_user
  8. router = APIRouter()
  9. @router.post("/", response_model=schemas.Comment)
  10. def create_comment(
  11.     comment: schemas.CommentCreate,
  12.     db: Session = Depends(get_db),
  13.     current_user: models.User = Depends(get_current_user)
  14. ):
  15.     # Check if post exists
  16.     db_post = crud.get_post(db, post_id=comment.post_id)
  17.     if db_post is None:
  18.         raise HTTPException(status_code=404, detail="Post not found")
  19.    
  20.     return crud.create_comment(db=db, comment=comment, author_id=current_user.id)
  21. @router.get("/", response_model=List[schemas.Comment])
  22. def read_comments(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  23.     comments = crud.get_comments(db, skip=skip, limit=limit)
  24.     return comments
  25. @router.get("/post/{post_id}", response_model=List[schemas.Comment])
  26. def read_comments_by_post(
  27.     post_id: int,
  28.     skip: int = 0,
  29.     limit: int = 100,
  30.     db: Session = Depends(get_db)
  31. ):
  32.     # Check if post exists
  33.     db_post = crud.get_post(db, post_id=post_id)
  34.     if db_post is None:
  35.         raise HTTPException(status_code=404, detail="Post not found")
  36.    
  37.     comments = crud.get_comments_by_post(db, post_id=post_id, skip=skip, limit=limit)
  38.     return comments
  39. @router.get("/{comment_id}", response_model=schemas.Comment)
  40. def read_comment(comment_id: int, db: Session = Depends(get_db)):
  41.     db_comment = crud.get_comment(db, comment_id=comment_id)
  42.     if db_comment is None:
  43.         raise HTTPException(status_code=404, detail="Comment not found")
  44.     return db_comment
  45. @router.put("/{comment_id}", response_model=schemas.Comment)
  46. def update_comment(
  47.     comment_id: int,
  48.     comment: schemas.CommentUpdate,
  49.     db: Session = Depends(get_db),
  50.     current_user: models.User = Depends(get_current_user)
  51. ):
  52.     db_comment = crud.get_comment(db, comment_id=comment_id)
  53.     if db_comment is None:
  54.         raise HTTPException(status_code=404, detail="Comment not found")
  55.     if db_comment.author_id != current_user.id:
  56.         raise HTTPException(
  57.             status_code=status.HTTP_403_FORBIDDEN,
  58.             detail="Not authorized to update this comment"
  59.         )
  60.     updated_comment = crud.update_comment(db, comment_id=comment_id, comment=comment)
  61.     if not updated_comment:
  62.         raise HTTPException(status_code=404, detail="Comment not found")
  63.     return updated_comment
  64. @router.delete("/{comment_id}", response_model=schemas.Comment)
  65. def delete_comment(
  66.     comment_id: int,
  67.     db: Session = Depends(get_db),
  68.     current_user: models.User = Depends(get_current_user)
  69. ):
  70.     db_comment = crud.get_comment(db, comment_id=comment_id)
  71.     if db_comment is None:
  72.         raise HTTPException(status_code=404, detail="Comment not found")
  73.     if db_comment.author_id != current_user.id:
  74.         raise HTTPException(
  75.             status_code=status.HTTP_403_FORBIDDEN,
  76.             detail="Not authorized to delete this comment"
  77.         )
  78.     deleted_comment = crud.delete_comment(db, comment_id=comment_id)
  79.     if not deleted_comment:
  80.         raise HTTPException(status_code=404, detail="Comment not found")
  81.     return deleted_comment
复制代码

10.6 主应用
  1. # app/main.py
  2. from fastapi import FastAPI
  3. from fastapi.middleware.cors import CORSMiddleware
  4. from . import models
  5. from .database import engine
  6. from .api.api_v1.api import api_router
  7. # Create tables
  8. models.Base.metadata.create_all(bind=engine)
  9. app = FastAPI(title="Blog API", openapi_url="/api/v1/openapi.json")
  10. # Set up CORS middleware
  11. app.add_middleware(
  12.     CORSMiddleware,
  13.     allow_origins=["*"],
  14.     allow_credentials=True,
  15.     allow_methods=["*"],
  16.     allow_headers=["*"],
  17. )
  18. # Include API router
  19. app.include_router(api_router, prefix="/api/v1")
  20. @app.get("/")
  21. async def root():
  22.     return {"message": "Welcome to Blog API. See /api/v1/docs for API documentation."}
复制代码

10.7 依赖项
  1. # app/api/deps.py
  2. from fastapi import Depends, HTTPException, status
  3. from fastapi.security import OAuth2PasswordBearer
  4. from sqlalchemy.orm import Session
  5. from jose import JWTError, jwt
  6. from ..database import get_db
  7. from .. import crud, models, schemas
  8. from ..config import settings
  9. oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
  10. def get_current_user(
  11.     db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)
  12. ) -> models.User:
  13.     credentials_exception = HTTPException(
  14.         status_code=status.HTTP_401_UNAUTHORIZED,
  15.         detail="Could not validate credentials",
  16.         headers={"WWW-Authenticate": "Bearer"},
  17.     )
  18.     try:
  19.         payload = jwt.decode(
  20.             token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
  21.         )
  22.         user_id: int = payload.get("sub")
  23.         if user_id is None:
  24.             raise credentials_exception
  25.         token_data = schemas.TokenPayload(sub=user_id)
  26.     except JWTError:
  27.         raise credentials_exception
  28.     user = crud.get_user(db, user_id=token_data.sub)
  29.     if user is None:
  30.         raise credentials_exception
  31.     return user
  32. def get_current_active_user(
  33.     current_user: models.User = Depends(get_current_user),
  34. ) -> models.User:
  35.     if not crud.is_active(current_user):
  36.         raise HTTPException(status_code=400, detail="Inactive user")
  37.     return current_user
复制代码

11. 最佳实践

11.1 项目结构组织

• 使用分层架构,将应用分为不同的层(API、业务逻辑、数据访问)
• 使用模块化设计,将相关功能组织在一起
• 使用依赖注入来管理组件之间的依赖关系

11.2 错误处理
  1. from fastapi import FastAPI, Request, HTTPException
  2. from fastapi.responses import JSONResponse
  3. from fastapi.exceptions import RequestValidationError
  4. from starlette.exceptions import HTTPException as StarletteHTTPException
  5. app = FastAPI()
  6. @app.exception_handler(StarletteHTTPException)
  7. async def http_exception_handler(request: Request, exc: StarletteHTTPException):
  8.     return JSONResponse(
  9.         status_code=exc.status_code,
  10.         content={"message": exc.detail},
  11.     )
  12. @app.exception_handler(RequestValidationError)
  13. async def validation_exception_handler(request: Request, exc: RequestValidationError):
  14.     return JSONResponse(
  15.         status_code=422,
  16.         content={"message": "Validation error", "details": exc.errors()},
  17.     )
  18. @app.exception_handler(Exception)
  19. async def general_exception_handler(request: Request, exc: Exception):
  20.     return JSONResponse(
  21.         status_code=500,
  22.         content={"message": "Internal server error"},
  23.     )
复制代码

11.3 配置管理
  1. # app/config.py
  2. from pydantic import BaseSettings, AnyHttpUrl, EmailStr, validator
  3. from typing import List, Optional
  4. class Settings(BaseSettings):
  5.     API_V1_STR: str = "/api/v1"
  6.     PROJECT_NAME: str = "Blog API"
  7.    
  8.     # CORS
  9.     BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = [
  10.         "http://localhost:8000",
  11.         "http://localhost:3000",
  12.     ]
  13.     @validator("BACKEND_CORS_ORIGINS", pre=True)
  14.     def assemble_cors_origins(cls, v):
  15.         if isinstance(v, str) and not v.startswith("["):
  16.             return [i.strip() for i in v.split(",")]
  17.         elif isinstance(v, (list, str)):
  18.             return v
  19.         raise ValueError(v)
  20.     # JWT
  21.     SECRET_KEY: str = "your-secret-key-here"
  22.     ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8  # 8 days
  23.     ALGORITHM: str = "HS256"
  24.    
  25.     # Database
  26.     DATABASE_URL: str = "sqlite:///./blog.db"
  27.    
  28.     # Email
  29.     SMTP_TLS: bool = True
  30.     SMTP_PORT: Optional[int] = None
  31.     SMTP_HOST: Optional[str] = None
  32.     SMTP_USER: Optional[str] = None
  33.     SMTP_PASSWORD: Optional[str] = None
  34.     EMAILS_FROM_EMAIL: Optional[EmailStr] = None
  35.     EMAILS_FROM_NAME: Optional[str] = None
  36.    
  37.     @validator("EMAILS_FROM_NAME")
  38.     def get_project_name(cls, v):
  39.         if not v:
  40.             return settings.PROJECT_NAME
  41.         return v
  42.     EMAIL_RESET_TOKEN_EXPIRE_HOURS: int = 48
  43.     EMAIL_TEMPLATES_DIR: str = "/app/app/email-templates/build"
  44.     EMAILS_ENABLED: bool = False
  45.     @validator("EMAILS_ENABLED")
  46.     def get_emails_enabled(cls, v, values):
  47.         return bool(
  48.             values.get("SMTP_HOST")
  49.             and values.get("SMTP_PORT")
  50.             and values.get("EMAILS_FROM_EMAIL")
  51.         )
  52.     # Test user
  53.     FIRST_SUPERUSER: EmailStr = "admin@example.com"
  54.     FIRST_SUPERUSER_PASSWORD: str = "changethis"
  55.     class Config:
  56.         case_sensitive = True
  57. settings = Settings()
复制代码

11.4 日志记录
  1. import logging
  2. from fastapi import FastAPI
  3. from fastapi.logger import logger
  4. app = FastAPI()
  5. # Configure logging
  6. gunicorn_logger = logging.getLogger('gunicorn.error')
  7. logger.handlers = gunicorn_logger.handlers
  8. if __name__ != "main__":
  9.     logger.setLevel(gunicorn_logger.level)
  10. else:
  11.     logger.setLevel(logging.DEBUG)
  12. @app.get("/")
  13. async def root():
  14.     logger.info("Processing request to root endpoint")
  15.     return {"message": "Hello World"}
复制代码

11.5 性能优化

• 使用异步数据库驱动(如asyncpg、aiomysql)
• 使用缓存(如Redis)
• 使用CDN加速静态资源
• 使用连接池管理数据库连接
• 使用异步任务队列(如Celery)处理耗时操作

12. 部署FastAPI应用

12.1 使用Docker部署
  1. # Dockerfile
  2. FROM python:3.9
  3. WORKDIR /app
  4. COPY requirements.txt .
  5. RUN pip install --no-cache-dir -r requirements.txt
  6. COPY . .
  7. CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
复制代码
  1. # docker-compose.yml
  2. version: '3.8'
  3. services:
  4.   web:
  5.     build: .
  6.     ports:
  7.       - "8000:8000"
  8.     depends_on:
  9.       - db
  10.     environment:
  11.       - DATABASE_URL=postgresql://fastapi:fastapi@db:5432/blog_db
  12.   db:
  13.     image: postgres:13
  14.     environment:
  15.       - POSTGRES_USER=fastapi
  16.       - POSTGRES_PASSWORD=fastapi
  17.       - POSTGRES_DB=blog_db
  18.     volumes:
  19.       - postgres_data:/var/lib/postgresql/data/
  20. volumes:
  21.   postgres_data:
复制代码

12.2 使用Gunicorn部署
  1. pip install gunicorn uvicorn
  2. gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker
复制代码

12.3 使用Nginx作为反向代理
  1. # nginx.conf
  2. server {
  3.     listen 80;
  4.     server_name your_domain.com;
  5.     location / {
  6.         proxy_pass http://127.0.0.1:8000;
  7.         proxy_set_header Host $host;
  8.         proxy_set_header X-Real-IP $remote_addr;
  9.         proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  10.     }
  11. }
复制代码

13. 提升开发技能和就业竞争力

13.1 深入学习FastAPI

• 阅读FastAPI官方文档
• 参与FastAPI开源项目
• 关注FastAPI的更新和新特性
• 尝试实现FastAPI的扩展和插件

13.2 构建项目组合

• 使用FastAPI构建不同类型的项目(博客API、电商API、社交网络API等)
• 将项目部署到云平台(如AWS、Google Cloud、Azure)
• 为项目编写完整的文档和测试
• 将项目开源并分享到GitHub

13.3 学习相关技术

• 深入学习Python异步编程
• 学习数据库设计和优化
• 学习容器化技术(Docker、Kubernetes)
• 学习CI/CD流程和工具
• 学习云原生应用开发

13.4 参与社区和交流

• 加入FastAPI社区(Discord、Reddit、Stack Overflow)
• 参加技术会议和meetup
• 写博客分享你的学习经验
• 在GitHub上贡献代码

结论

FastAPI是一个强大、现代且高性能的Web框架,特别适合构建API。通过本教程,你学习了FastAPI的核心概念、特性和最佳实践,并通过一个完整的博客API项目案例,了解了如何使用FastAPI构建实际应用。

FastAPI的优秀性能、自动文档生成、类型提示和依赖注入系统等特性,使其成为Python开发者的理想选择。通过深入学习和实践FastAPI,你将能够构建高效、可维护的Web应用,并提升你的开发技能和就业竞争力。

继续探索FastAPI的更多可能性,构建出色的应用,并在社区中分享你的经验。祝你在FastAPI的学习之路上取得成功!
「七転び八起き(ななころびやおき)」
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

手机版|联系我们|小黑屋|TG频道|RSS |网站地图

Powered by Pixtech

© 2025-2026 Pixtech Team.

>