|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
1. 引言
FastAPI是一个现代、高性能的Web框架,用于构建API,基于Python 3.6+的类型提示。它具有自动交互式文档、数据验证、依赖注入等强大功能。Pillow是Python图像处理库(PIL)的一个分支,提供了广泛的图像处理功能。结合这两个库,我们可以构建高性能的图像处理API,用于各种应用,如社交媒体、电子商务、内容管理系统等。
本指南将带你从零开始,逐步学习如何使用FastAPI与Pillow构建高性能图像处理API,并解决实际开发中可能遇到的各种挑战。
2. 环境设置
在开始之前,我们需要安装Python和必要的库。
2.1 安装Python
首先,确保你的系统上安装了Python 3.6或更高版本。你可以从Python官网下载并安装Python。
2.2 创建虚拟环境
创建一个虚拟环境来隔离项目依赖:
- # 创建项目目录
- mkdir fastapi-image-processing
- cd fastapi-image-processing
- # 创建虚拟环境
- python -m venv venv
- # 激活虚拟环境
- # Windows
- venv\Scripts\activate
- # macOS/Linux
- source venv/bin/activate
复制代码
2.3 安装必要的库
安装FastAPI、Pillow和其他必要的库:
- pip install fastapi "uvicorn[standard]" pillow python-multipart aiofiles
复制代码
这些库的作用如下:
• fastapi: Web框架
• uvicorn: ASGI服务器,用于运行FastAPI应用
• pillow: 图像处理库
• python-multipart: 处理表单数据,包括文件上传
• aiofiles: 异步文件操作
3. FastAPI基础
3.1 创建基本的FastAPI应用
创建一个名为main.py的文件,并添加以下代码:
- from fastapi import FastAPI
- app = FastAPI(title="图像处理API", description="使用FastAPI和Pillow构建的高性能图像处理API")
- @app.get("/")
- async def root():
- return {"message": "欢迎使用图像处理API"}
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
复制代码
3.2 运行FastAPI应用
在终端中运行以下命令启动应用:
或者使用uvicorn直接运行:
- uvicorn main:app --reload
复制代码
--reload选项使服务器在代码更改时自动重启。
3.3 访问API文档
打开浏览器,访问http://localhost:8000/docs,你将看到自动生成的交互式API文档。
3.4 添加路由
让我们添加一些基本的路由:
- from fastapi import FastAPI
- from pydantic import BaseModel
- app = FastAPI(title="图像处理API", description="使用FastAPI和Pillow构建的高性能图像处理API")
- class HealthCheck(BaseModel):
- status: str
- message: str
- @app.get("/", response_model=HealthCheck)
- async def root():
- return {"status": "ok", "message": "欢迎使用图像处理API"}
- @app.get("/health", response_model=HealthCheck)
- async def health_check():
- return {"status": "ok", "message": "API运行正常"}
复制代码
4. Pillow基础
4.1 基本图像操作
创建一个名为image_utils.py的文件,并添加以下代码:
- from PIL import Image, ImageFilter, ImageEnhance
- import io
- from typing import Tuple, Optional
- def open_image(image_bytes: bytes) -> Image.Image:
- """从字节流打开图像"""
- return Image.open(io.BytesIO(image_bytes))
- def save_image(image: Image.Image, format: str = "JPEG", quality: int = 85) -> bytes:
- """将图像保存为字节流"""
- img_byte_arr = io.BytesIO()
- image.save(img_byte_arr, format=format, quality=quality)
- img_byte_arr.seek(0)
- return img_byte_arr.getvalue()
- def resize_image(image: Image.Image, size: Tuple[int, int], resample: int = Image.LANCZOS) -> Image.Image:
- """调整图像大小"""
- return image.resize(size, resample)
- def crop_image(image: Image.Image, box: Tuple[int, int, int, int]) -> Image.Image:
- """裁剪图像"""
- return image.crop(box)
- def rotate_image(image: Image.Image, angle: float, expand: bool = True) -> Image.Image:
- """旋转图像"""
- return image.rotate(angle, expand=expand)
- def flip_image(image: Image.Image, mode: str = "horizontal") -> Image.Image:
- """翻转图像"""
- if mode == "horizontal":
- return image.transpose(Image.FLIP_LEFT_RIGHT)
- elif mode == "vertical":
- return image.transpose(Image.FLIP_TOP_BOTTOM)
- else:
- raise ValueError("模式必须是'horizontal'或'vertical'")
- def apply_filter(image: Image.Image, filter_type: str) -> Image.Image:
- """应用滤镜"""
- filters = {
- "blur": ImageFilter.BLUR,
- "sharpen": ImageFilter.SHARPEN,
- "edge_enhance": ImageFilter.EDGE_ENHANCE,
- "emboss": ImageFilter.EMBOSS,
- "smooth": ImageFilter.SMOOTH,
- }
- if filter_type not in filters:
- raise ValueError(f"不支持的滤镜类型: {filter_type}")
- return image.filter(filters[filter_type])
- def adjust_brightness(image: Image.Image, factor: float) -> Image.Image:
- """调整亮度"""
- enhancer = ImageEnhance.Brightness(image)
- return enhancer.enhance(factor)
- def adjust_contrast(image: Image.Image, factor: float) -> Image.Image:
- """调整对比度"""
- enhancer = ImageEnhance.Contrast(image)
- return enhancer.enhance(factor)
- def adjust_saturation(image: Image.Image, factor: float) -> Image.Image:
- """调整饱和度"""
- enhancer = ImageEnhance.Color(image)
- return enhancer.enhance(factor)
- def convert_format(image: Image.Image, format: str) -> Image.Image:
- """转换图像格式"""
- if format.upper() == "JPEG" and image.mode in ("RGBA", "LA", "P"):
- # JPEG不支持透明度,转换为RGB
- background = Image.new("RGB", image.size, (255, 255, 255))
- background.paste(image, mask=image.split()[-1] if image.mode == "RGBA" else None)
- image = background
- elif format.upper() == "PNG" and image.mode not in ("RGBA", "LA", "P"):
- # PNG支持透明度,转换为RGBA
- image = image.convert("RGBA")
- return image
复制代码
4.2 测试图像处理函数
创建一个名为test_image_utils.py的文件,并添加以下代码来测试我们的图像处理函数:
- from PIL import Image
- import requests
- from image_utils import (
- open_image, save_image, resize_image, crop_image, rotate_image,
- flip_image, apply_filter, adjust_brightness, adjust_contrast,
- adjust_saturation, convert_format
- )
- # 下载测试图像
- url = "https://picsum.photos/800/600.jpg"
- response = requests.get(url)
- image_bytes = response.content
- # 打开图像
- image = open_image(image_bytes)
- print(f"原始图像尺寸: {image.size}, 模式: {image.mode}")
- # 调整大小
- resized_image = resize_image(image, (400, 300))
- print(f"调整大小后的图像尺寸: {resized_image.size}")
- # 裁剪图像
- cropped_image = crop_image(image, (100, 100, 700, 500))
- print(f"裁剪后的图像尺寸: {cropped_image.size}")
- # 旋转图像
- rotated_image = rotate_image(image, 45)
- print(f"旋转后的图像尺寸: {rotated_image.size}")
- # 翻转图像
- flipped_image = flip_image(image, "horizontal")
- print(f"翻转后的图像尺寸: {flipped_image.size}")
- # 应用滤镜
- filtered_image = apply_filter(image, "blur")
- print(f"应用滤镜后的图像尺寸: {filtered_image.size}")
- # 调整亮度
- brightened_image = adjust_brightness(image, 1.5)
- print(f"调整亮度后的图像尺寸: {brightened_image.size}")
- # 调整对比度
- contrasted_image = adjust_contrast(image, 1.5)
- print(f"调整对比度后的图像尺寸: {contrasted_image.size}")
- # 调整饱和度
- saturated_image = adjust_saturation(image, 1.5)
- print(f"调整饱和度后的图像尺寸: {saturated_image.size}")
- # 转换格式
- png_image = convert_format(image, "PNG")
- print(f"转换格式后的图像模式: {png_image.mode}")
- # 保存处理后的图像
- save_image(resized_image, "resized.jpg")
- save_image(cropped_image, "cropped.jpg")
- save_image(rotated_image, "rotated.jpg")
- save_image(flipped_image, "flipped.jpg")
- save_image(filtered_image, "filtered.jpg")
- save_image(brightened_image, "brightened.jpg")
- save_image(contrasted_image, "contrasted.jpg")
- save_image(saturated_image, "saturated.jpg")
- save_image(png_image, "converted.png")
- print("测试完成!")
复制代码
运行测试文件:
- python test_image_utils.py
复制代码
5. 构建图像处理API
现在,我们将FastAPI和Pillow结合起来,构建一个图像处理API。
5.1 创建图像处理路由
在main.py中添加以下代码:
- from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Query
- from fastapi.responses import StreamingResponse
- from pydantic import BaseModel
- from typing import Optional, List
- import io
- from image_utils import (
- open_image, save_image, resize_image, crop_image, rotate_image,
- flip_image, apply_filter, adjust_brightness, adjust_contrast,
- adjust_saturation, convert_format
- )
- app = FastAPI(title="图像处理API", description="使用FastAPI和Pillow构建的高性能图像处理API")
- class HealthCheck(BaseModel):
- status: str
- message: str
- @app.get("/", response_model=HealthCheck)
- async def root():
- return {"status": "ok", "message": "欢迎使用图像处理API"}
- @app.get("/health", response_model=HealthCheck)
- async def health_check():
- return {"status": "ok", "message": "API运行正常"}
- @app.post("/process/")
- async def process_image(
- file: UploadFile = File(...),
- operation: str = Form(...),
- width: Optional[int] = Form(None),
- height: Optional[int] = Form(None),
- angle: Optional[float] = Form(None),
- flip_mode: Optional[str] = Form(None),
- filter_type: Optional[str] = Form(None),
- brightness: Optional[float] = Form(None),
- contrast: Optional[float] = Form(None),
- saturation: Optional[float] = Form(None),
- output_format: Optional[str] = Form("JPEG"),
- quality: Optional[int] = Form(85),
- left: Optional[int] = Form(None),
- top: Optional[int] = Form(None),
- right: Optional[int] = Form(None),
- bottom: Optional[int] = Form(None)
- ):
- """处理图像"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- try:
- # 读取图像
- image_bytes = await file.read()
- image = open_image(image_bytes)
-
- # 执行操作
- if operation == "resize":
- if width is None or height is None:
- raise HTTPException(status_code=400, detail="调整大小需要宽度和高度参数")
- image = resize_image(image, (width, height))
- elif operation == "crop":
- if left is None or top is None or right is None or bottom is None:
- raise HTTPException(status_code=400, detail="裁剪需要left、top、right和bottom参数")
- image = crop_image(image, (left, top, right, bottom))
- elif operation == "rotate":
- if angle is None:
- raise HTTPException(status_code=400, detail="旋转需要角度参数")
- image = rotate_image(image, angle)
- elif operation == "flip":
- if flip_mode is None:
- raise HTTPException(status_code=400, detail="翻转需要flip_mode参数")
- image = flip_image(image, flip_mode)
- elif operation == "filter":
- if filter_type is None:
- raise HTTPException(status_code=400, detail="滤镜需要filter_type参数")
- image = apply_filter(image, filter_type)
- elif operation == "brightness":
- if brightness is None:
- raise HTTPException(status_code=400, detail="亮度调整需要brightness参数")
- image = adjust_brightness(image, brightness)
- elif operation == "contrast":
- if contrast is None:
- raise HTTPException(status_code=400, detail="对比度调整需要contrast参数")
- image = adjust_contrast(image, contrast)
- elif operation == "saturation":
- if saturation is None:
- raise HTTPException(status_code=400, detail="饱和度调整需要saturation参数")
- image = adjust_saturation(image, saturation)
- elif operation == "convert":
- if output_format is None:
- raise HTTPException(status_code=400, detail="格式转换需要output_format参数")
- image = convert_format(image, output_format)
- else:
- raise HTTPException(status_code=400, detail=f"不支持的操作: {operation}")
-
- # 转换格式
- if output_format:
- image = convert_format(image, output_format)
-
- # 保存图像
- processed_image_bytes = save_image(image, format=output_format, quality=quality)
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"处理图像时出错: {str(e)}")
复制代码
5.2 批量处理端点
添加一个批量处理端点,可以一次对多个图像应用相同的操作:
- @app.post("/batch-process/")
- async def batch_process_images(
- files: List[UploadFile] = File(...),
- operation: str = Form(...),
- width: Optional[int] = Form(None),
- height: Optional[int] = Form(None),
- angle: Optional[float] = Form(None),
- flip_mode: Optional[str] = Form(None),
- filter_type: Optional[str] = Form(None),
- brightness: Optional[float] = Form(None),
- contrast: Optional[float] = Form(None),
- saturation: Optional[float] = Form(None),
- output_format: Optional[str] = Form("JPEG"),
- quality: Optional[int] = Form(85),
- left: Optional[int] = Form(None),
- top: Optional[int] = Form(None),
- right: Optional[int] = Form(None),
- bottom: Optional[int] = Form(None)
- ):
- """批量处理图像"""
- results = []
-
- for file in files:
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- results.append({"filename": file.filename, "error": "文件必须是图像"})
- continue
-
- try:
- # 读取图像
- image_bytes = await file.read()
- image = open_image(image_bytes)
-
- # 执行操作
- if operation == "resize":
- if width is None or height is None:
- results.append({"filename": file.filename, "error": "调整大小需要宽度和高度参数"})
- continue
- image = resize_image(image, (width, height))
- elif operation == "crop":
- if left is None or top is None or right is None or bottom is None:
- results.append({"filename": file.filename, "error": "裁剪需要left、top、right和bottom参数"})
- continue
- image = crop_image(image, (left, top, right, bottom))
- elif operation == "rotate":
- if angle is None:
- results.append({"filename": file.filename, "error": "旋转需要角度参数"})
- continue
- image = rotate_image(image, angle)
- elif operation == "flip":
- if flip_mode is None:
- results.append({"filename": file.filename, "error": "翻转需要flip_mode参数"})
- continue
- image = flip_image(image, flip_mode)
- elif operation == "filter":
- if filter_type is None:
- results.append({"filename": file.filename, "error": "滤镜需要filter_type参数"})
- continue
- image = apply_filter(image, filter_type)
- elif operation == "brightness":
- if brightness is None:
- results.append({"filename": file.filename, "error": "亮度调整需要brightness参数"})
- continue
- image = adjust_brightness(image, brightness)
- elif operation == "contrast":
- if contrast is None:
- results.append({"filename": file.filename, "error": "对比度调整需要contrast参数"})
- continue
- image = adjust_contrast(image, contrast)
- elif operation == "saturation":
- if saturation is None:
- results.append({"filename": file.filename, "error": "饱和度调整需要saturation参数"})
- continue
- image = adjust_saturation(image, saturation)
- elif operation == "convert":
- if output_format is None:
- results.append({"filename": file.filename, "error": "格式转换需要output_format参数"})
- continue
- image = convert_format(image, output_format)
- else:
- results.append({"filename": file.filename, "error": f"不支持的操作: {operation}"})
- continue
-
- # 转换格式
- if output_format:
- image = convert_format(image, output_format)
-
- # 保存图像
- processed_image_bytes = save_image(image, format=output_format, quality=quality)
-
- # 添加到结果列表
- results.append({
- "filename": file.filename,
- "status": "success",
- "size": image.size,
- "format": image.format,
- "mode": image.mode
- })
-
- except Exception as e:
- results.append({"filename": file.filename, "error": str(e)})
-
- return {"results": results}
复制代码
5.3 图像信息端点
添加一个端点,用于获取图像的基本信息:
- @app.post("/info/")
- async def get_image_info(file: UploadFile = File(...)):
- """获取图像信息"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- try:
- # 读取图像
- image_bytes = await file.read()
- image = open_image(image_bytes)
-
- # 获取图像信息
- info = {
- "filename": file.filename,
- "format": image.format,
- "mode": image.mode,
- "size": image.size,
- "width": image.width,
- "height": image.height,
- "content_type": file.content_type,
- }
-
- # 如果有EXIF数据,添加到信息中
- if hasattr(image, '_getexif') and image._getexif() is not None:
- info["exif"] = image._getexif()
-
- return info
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"获取图像信息时出错: {str(e)}")
复制代码
6. 高级功能
6.1 添加水印
在image_utils.py中添加添加水印的功能:
- def add_watermark(
- image: Image.Image,
- watermark_text: str,
- position: str = "bottom-right",
- opacity: float = 0.5,
- font_size: int = 36,
- color: Tuple[int, int, int] = (255, 255, 255)
- ) -> Image.Image:
- """添加水印"""
- from PIL import ImageDraw, ImageFont
-
- # 创建一个透明层
- watermark = Image.new("RGBA", image.size, (0, 0, 0, 0))
- draw = ImageDraw.Draw(watermark)
-
- # 尝试加载字体,如果失败则使用默认字体
- try:
- font = ImageFont.truetype("arial.ttf", font_size)
- except:
- font = ImageFont.load_default()
-
- # 获取文本大小
- text_width, text_height = draw.textsize(watermark_text, font=font)
-
- # 计算位置
- margin = 20
- if position == "top-left":
- position = (margin, margin)
- elif position == "top-right":
- position = (image.width - text_width - margin, margin)
- elif position == "bottom-left":
- position = (margin, image.height - text_height - margin)
- elif position == "bottom-right":
- position = (image.width - text_width - margin, image.height - text_height - margin)
- elif position == "center":
- position = ((image.width - text_width) // 2, (image.height - text_height) // 2)
- else:
- raise ValueError("位置必须是'top-left'、'top-right'、'bottom-left'、'bottom-right'或'center'")
-
- # 绘制文本
- draw.text(position, watermark_text, font=font, fill=(*color, int(255 * opacity)))
-
- # 合并图像
- if image.mode != "RGBA":
- image = image.convert("RGBA")
-
- return Image.alpha_composite(image, watermark)
复制代码
6.2 添加水印端点
在main.py中添加添加水印的端点:
- @app.post("/watermark/")
- async def add_watermark_to_image(
- file: UploadFile = File(...),
- text: str = Form(...),
- position: str = Form("bottom-right"),
- opacity: float = Form(0.5),
- font_size: int = Form(36),
- color_r: int = Form(255),
- color_g: int = Form(255),
- color_b: int = Form(255),
- output_format: Optional[str] = Form("JPEG"),
- quality: Optional[int] = Form(85)
- ):
- """添加水印"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- try:
- # 读取图像
- image_bytes = await file.read()
- image = open_image(image_bytes)
-
- # 添加水印
- watermarked_image = add_watermark(
- image,
- text,
- position=position,
- opacity=opacity,
- font_size=font_size,
- color=(color_r, color_g, color_b)
- )
-
- # 转换格式
- if output_format:
- watermarked_image = convert_format(watermarked_image, output_format)
-
- # 保存图像
- processed_image_bytes = save_image(watermarked_image, format=output_format, quality=quality)
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=watermarked.{output_format.lower()}"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"添加水印时出错: {str(e)}")
复制代码
6.3 图像合成
在image_utils.py中添加图像合成的功能:
- def composite_images(
- background: Image.Image,
- foreground: Image.Image,
- position: Tuple[int, int] = (0, 0),
- opacity: float = 1.0
- ) -> Image.Image:
- """合成图像"""
- # 确保前景图像有透明通道
- if foreground.mode != "RGBA":
- foreground = foreground.convert("RGBA")
-
- # 调整透明度
- if opacity < 1.0:
- foreground = foreground.copy()
- alpha = foreground.split()[3]
- alpha = Image.eval(alpha, lambda a: int(a * opacity))
- foreground.putalpha(alpha)
-
- # 确保背景图像足够大
- bg_width, bg_height = background.size
- fg_width, fg_height = foreground.size
-
- if position[0] + fg_width > bg_width or position[1] + fg_height > bg_height:
- raise ValueError("前景图像超出背景图像边界")
-
- # 确保背景图像有透明通道
- if background.mode != "RGBA":
- background = background.convert("RGBA")
-
- # 创建一个临时图像
- temp = Image.new("RGBA", background.size, (0, 0, 0, 0))
- temp.paste(foreground, position)
-
- # 合成图像
- return Image.alpha_composite(background, temp)
复制代码
6.4 图像合成端点
在main.py中添加图像合成的端点:
- @app.post("/composite/")
- async def composite_images_endpoint(
- background: UploadFile = File(...),
- foreground: UploadFile = File(...),
- position_x: int = Form(0),
- position_y: int = Form(0),
- opacity: float = Form(1.0),
- output_format: Optional[str] = Form("PNG"),
- quality: Optional[int] = Form(85)
- ):
- """合成图像"""
- # 检查文件类型
- if not background.content_type.startswith("image/") or not foreground.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail "文件必须是图像")
-
- try:
- # 读取图像
- background_bytes = await background.read()
- background_image = open_image(background_bytes)
-
- foreground_bytes = await foreground.read()
- foreground_image = open_image(foreground_bytes)
-
- # 合成图像
- composite_image = composite_images(
- background_image,
- foreground_image,
- position=(position_x, position_y),
- opacity=opacity
- )
-
- # 转换格式
- if output_format:
- composite_image = convert_format(composite_image, output_format)
-
- # 保存图像
- processed_image_bytes = save_image(composite_image, format=output_format, quality=quality)
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=composite.{output_format.lower()}"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"合成图像时出错: {str(e)}")
复制代码
7. 性能优化
7.1 异步处理
FastAPI支持异步请求处理,我们可以利用这一点来提高性能。让我们修改我们的图像处理函数,使其支持异步操作。
创建一个名为async_image_utils.py的文件:
- from PIL import Image, ImageFilter, ImageEnhance
- import io
- from typing import Tuple, Optional
- import asyncio
- import concurrent.futures
- # 创建一个线程池执行器
- executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
- async def open_image_async(image_bytes: bytes) -> Image.Image:
- """从字节流异步打开图像"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: Image.open(io.BytesIO(image_bytes)))
- async def save_image_async(image: Image.Image, format: str = "JPEG", quality: int = 85) -> bytes:
- """异步将图像保存为字节流"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: _save_image_sync(image, format, quality))
- def _save_image_sync(image: Image.Image, format: str, quality: int) -> bytes:
- """同步保存图像"""
- img_byte_arr = io.BytesIO()
- image.save(img_byte_arr, format=format, quality=quality)
- img_byte_arr.seek(0)
- return img_byte_arr.getvalue()
- async def resize_image_async(image: Image.Image, size: Tuple[int, int], resample: int = Image.LANCZOS) -> Image.Image:
- """异步调整图像大小"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: image.resize(size, resample))
- async def crop_image_async(image: Image.Image, box: Tuple[int, int, int, int]) -> Image.Image:
- """异步裁剪图像"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: image.crop(box))
- async def rotate_image_async(image: Image.Image, angle: float, expand: bool = True) -> Image.Image:
- """异步旋转图像"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: image.rotate(angle, expand=expand))
- async def flip_image_async(image: Image.Image, mode: str = "horizontal") -> Image.Image:
- """异步翻转图像"""
- loop = asyncio.get_event_loop()
- if mode == "horizontal":
- return await loop.run_in_executor(executor, lambda: image.transpose(Image.FLIP_LEFT_RIGHT))
- elif mode == "vertical":
- return await loop.run_in_executor(executor, lambda: image.transpose(Image.FLIP_TOP_BOTTOM))
- else:
- raise ValueError("模式必须是'horizontal'或'vertical'")
- async def apply_filter_async(image: Image.Image, filter_type: str) -> Image.Image:
- """异步应用滤镜"""
- filters = {
- "blur": ImageFilter.BLUR,
- "sharpen": ImageFilter.SHARPEN,
- "edge_enhance": ImageFilter.EDGE_ENHANCE,
- "emboss": ImageFilter.EMBOSS,
- "smooth": ImageFilter.SMOOTH,
- }
- if filter_type not in filters:
- raise ValueError(f"不支持的滤镜类型: {filter_type}")
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: image.filter(filters[filter_type]))
- async def adjust_brightness_async(image: Image.Image, factor: float) -> Image.Image:
- """异步调整亮度"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: ImageEnhance.Brightness(image).enhance(factor))
- async def adjust_contrast_async(image: Image.Image, factor: float) -> Image.Image:
- """异步调整对比度"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: ImageEnhance.Contrast(image).enhance(factor))
- async def adjust_saturation_async(image: Image.Image, factor: float) -> Image.Image:
- """异步调整饱和度"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: ImageEnhance.Color(image).enhance(factor))
- async def convert_format_async(image: Image.Image, format: str) -> Image.Image:
- """异步转换图像格式"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: _convert_format_sync(image, format))
- def _convert_format_sync(image: Image.Image, format: str) -> Image.Image:
- """同步转换图像格式"""
- if format.upper() == "JPEG" and image.mode in ("RGBA", "LA", "P"):
- # JPEG不支持透明度,转换为RGB
- background = Image.new("RGB", image.size, (255, 255, 255))
- background.paste(image, mask=image.split()[-1] if image.mode == "RGBA" else None)
- image = background
- elif format.upper() == "PNG" and image.mode not in ("RGBA", "LA", "P"):
- # PNG支持透明度,转换为RGBA
- image = image.convert("RGBA")
- return image
- async def add_watermark_async(
- image: Image.Image,
- watermark_text: str,
- position: str = "bottom-right",
- opacity: float = 0.5,
- font_size: int = 36,
- color: Tuple[int, int, int] = (255, 255, 255)
- ) -> Image.Image:
- """异步添加水印"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: _add_watermark_sync(image, watermark_text, position, opacity, font_size, color))
- def _add_watermark_sync(
- image: Image.Image,
- watermark_text: str,
- position: str,
- opacity: float,
- font_size: int,
- color: Tuple[int, int, int]
- ) -> Image.Image:
- """同步添加水印"""
- from PIL import ImageDraw, ImageFont
-
- # 创建一个透明层
- watermark = Image.new("RGBA", image.size, (0, 0, 0, 0))
- draw = ImageDraw.Draw(watermark)
-
- # 尝试加载字体,如果失败则使用默认字体
- try:
- font = ImageFont.truetype("arial.ttf", font_size)
- except:
- font = ImageFont.load_default()
-
- # 获取文本大小
- text_width, text_height = draw.textsize(watermark_text, font=font)
-
- # 计算位置
- margin = 20
- if position == "top-left":
- pos = (margin, margin)
- elif position == "top-right":
- pos = (image.width - text_width - margin, margin)
- elif position == "bottom-left":
- pos = (margin, image.height - text_height - margin)
- elif position == "bottom-right":
- pos = (image.width - text_width - margin, image.height - text_height - margin)
- elif position == "center":
- pos = ((image.width - text_width) // 2, (image.height - text_height) // 2)
- else:
- raise ValueError("位置必须是'top-left'、'top-right'、'bottom-left'、'bottom-right'或'center'")
-
- # 绘制文本
- draw.text(pos, watermark_text, font=font, fill=(*color, int(255 * opacity)))
-
- # 合并图像
- if image.mode != "RGBA":
- image = image.convert("RGBA")
-
- return Image.alpha_composite(image, watermark)
- async def composite_images_async(
- background: Image.Image,
- foreground: Image.Image,
- position: Tuple[int, int] = (0, 0),
- opacity: float = 1.0
- ) -> Image.Image:
- """异步合成图像"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, lambda: _composite_images_sync(background, foreground, position, opacity))
- def _composite_images_sync(
- background: Image.Image,
- foreground: Image.Image,
- position: Tuple[int, int],
- opacity: float
- ) -> Image.Image:
- """同步合成图像"""
- # 确保前景图像有透明通道
- if foreground.mode != "RGBA":
- foreground = foreground.convert("RGBA")
-
- # 调整透明度
- if opacity < 1.0:
- foreground = foreground.copy()
- alpha = foreground.split()[3]
- alpha = Image.eval(alpha, lambda a: int(a * opacity))
- foreground.putalpha(alpha)
-
- # 确保背景图像足够大
- bg_width, bg_height = background.size
- fg_width, fg_height = foreground.size
-
- if position[0] + fg_width > bg_width or position[1] + fg_height > bg_height:
- raise ValueError("前景图像超出背景图像边界")
-
- # 确保背景图像有透明通道
- if background.mode != "RGBA":
- background = background.convert("RGBA")
-
- # 创建一个临时图像
- temp = Image.new("RGBA", background.size, (0, 0, 0, 0))
- temp.paste(foreground, position)
-
- # 合成图像
- return Image.alpha_composite(background, temp)
复制代码
7.2 创建异步处理端点
在main.py中添加使用异步图像处理函数的端点:
- from fastapi import FastAPI, File, UploadFile, HTTPException, Form, BackgroundTasks
- from fastapi.responses import StreamingResponse
- from pydantic import BaseModel
- from typing import Optional, List, Dict
- import io
- import uuid
- import os
- from async_image_utils import (
- open_image_async, save_image_async, resize_image_async, crop_image_async, rotate_image_async,
- flip_image_async, apply_filter_async, adjust_brightness_async, adjust_contrast_async,
- adjust_saturation_async, convert_format_async, add_watermark_async, composite_images_async
- )
- app = FastAPI(title="图像处理API", description="使用FastAPI和Pillow构建的高性能图像处理API")
- # 创建临时目录
- TEMP_DIR = "temp_images"
- os.makedirs(TEMP_DIR, exist_ok=True)
- # 存储处理任务的状态
- processing_tasks: Dict[str, Dict] = {}
- class HealthCheck(BaseModel):
- status: str
- message: str
- class ProcessingStatus(BaseModel):
- task_id: str
- status: str
- message: str
- result_url: Optional[str] = None
- @app.get("/", response_model=HealthCheck)
- async def root():
- return {"status": "ok", "message": "欢迎使用图像处理API"}
- @app.get("/health", response_model=HealthCheck)
- async def health_check():
- return {"status": "ok", "message": "API运行正常"}
- @app.post("/process-async/")
- async def process_image_async(
- background_tasks: BackgroundTasks,
- file: UploadFile = File(...),
- operation: str = Form(...),
- width: Optional[int] = Form(None),
- height: Optional[int] = Form(None),
- angle: Optional[float] = Form(None),
- flip_mode: Optional[str] = Form(None),
- filter_type: Optional[str] = Form(None),
- brightness: Optional[float] = Form(None),
- contrast: Optional[float] = Form(None),
- saturation: Optional[float] = Form(None),
- output_format: Optional[str] = Form("JPEG"),
- quality: Optional[int] = Form(85),
- left: Optional[int] = Form(None),
- top: Optional[int] = Form(None),
- right: Optional[int] = Form(None),
- bottom: Optional[int] = Form(None)
- ):
- """异步处理图像"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- # 创建任务ID
- task_id = str(uuid.uuid4())
-
- # 初始化任务状态
- processing_tasks[task_id] = {
- "status": "processing",
- "message": "图像处理中",
- "result_url": None
- }
-
- # 添加后台任务
- background_tasks.add_task(
- process_image_task,
- task_id,
- file,
- operation,
- width,
- height,
- angle,
- flip_mode,
- filter_type,
- brightness,
- contrast,
- saturation,
- output_format,
- quality,
- left,
- top,
- right,
- bottom
- )
-
- return {"task_id": task_id, "status": "processing", "message": "图像处理已开始"}
- async def process_image_task(
- task_id: str,
- file: UploadFile,
- operation: str,
- width: Optional[int],
- height: Optional[int],
- angle: Optional[float],
- flip_mode: Optional[str],
- filter_type: Optional[str],
- brightness: Optional[float],
- contrast: Optional[float],
- saturation: Optional[float],
- output_format: Optional[str],
- quality: Optional[int],
- left: Optional[int],
- top: Optional[int],
- right: Optional[int],
- bottom: Optional[int]
- ):
- """处理图像任务"""
- try:
- # 读取图像
- image_bytes = await file.read()
- image = await open_image_async(image_bytes)
-
- # 执行操作
- if operation == "resize":
- if width is None or height is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "调整大小需要宽度和高度参数",
- "result_url": None
- }
- return
- image = await resize_image_async(image, (width, height))
- elif operation == "crop":
- if left is None or top is None or right is None or bottom is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "裁剪需要left、top、right和bottom参数",
- "result_url": None
- }
- return
- image = await crop_image_async(image, (left, top, right, bottom))
- elif operation == "rotate":
- if angle is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "旋转需要角度参数",
- "result_url": None
- }
- return
- image = await rotate_image_async(image, angle)
- elif operation == "flip":
- if flip_mode is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "翻转需要flip_mode参数",
- "result_url": None
- }
- return
- image = await flip_image_async(image, flip_mode)
- elif operation == "filter":
- if filter_type is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "滤镜需要filter_type参数",
- "result_url": None
- }
- return
- image = await apply_filter_async(image, filter_type)
- elif operation == "brightness":
- if brightness is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "亮度调整需要brightness参数",
- "result_url": None
- }
- return
- image = await adjust_brightness_async(image, brightness)
- elif operation == "contrast":
- if contrast is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "对比度调整需要contrast参数",
- "result_url": None
- }
- return
- image = await adjust_contrast_async(image, contrast)
- elif operation == "saturation":
- if saturation is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "饱和度调整需要saturation参数",
- "result_url": None
- }
- return
- image = await adjust_saturation_async(image, saturation)
- elif operation == "convert":
- if output_format is None:
- processing_tasks[task_id] = {
- "status": "error",
- "message": "格式转换需要output_format参数",
- "result_url": None
- }
- return
- image = await convert_format_async(image, output_format)
- else:
- processing_tasks[task_id] = {
- "status": "error",
- "message": f"不支持的操作: {operation}",
- "result_url": None
- }
- return
-
- # 转换格式
- if output_format:
- image = await convert_format_async(image, output_format)
-
- # 保存图像
- processed_image_bytes = await save_image_async(image, format=output_format, quality=quality)
-
- # 保存到临时文件
- result_filename = f"{task_id}.{output_format.lower()}"
- result_path = os.path.join(TEMP_DIR, result_filename)
-
- with open(result_path, "wb") as f:
- f.write(processed_image_bytes)
-
- # 更新任务状态
- processing_tasks[task_id] = {
- "status": "completed",
- "message": "图像处理完成",
- "result_url": f"/results/{result_filename}"
- }
-
- except Exception as e:
- processing_tasks[task_id] = {
- "status": "error",
- "message": f"处理图像时出错: {str(e)}",
- "result_url": None
- }
- @app.get("/status/{task_id}", response_model=ProcessingStatus)
- async def get_processing_status(task_id: str):
- """获取处理状态"""
- if task_id not in processing_tasks:
- raise HTTPException(status_code=404, detail="任务不存在")
-
- task = processing_tasks[task_id]
- return {
- "task_id": task_id,
- "status": task["status"],
- "message": task["message"],
- "result_url": task.get("result_url")
- }
- @app.get("/results/{filename}")
- async def get_result_image(filename: str):
- """获取处理后的图像"""
- file_path = os.path.join(TEMP_DIR, filename)
-
- if not os.path.exists(file_path):
- raise HTTPException(status_code=404, detail="文件不存在")
-
- return FileResponse(file_path)
复制代码
7.3 缓存处理结果
添加缓存机制,避免重复处理相同的图像:
- from fastapi import FastAPI, Request, Response
- from fastapi.responses import StreamingResponse
- import hashlib
- import time
- # 创建缓存目录
- CACHE_DIR = "cache"
- os.makedirs(CACHE_DIR, exist_ok=True)
- # 缓存处理结果
- def get_cache_key(image_bytes: bytes, operation: str, params: dict) -> str:
- """生成缓存键"""
- # 创建包含图像数据和参数的字符串
- data_str = f"{image_bytes}{operation}{str(sorted(params.items()))}"
- # 计算MD5哈希作为缓存键
- return hashlib.md5(data_str.encode()).hexdigest()
- def get_cached_result(cache_key: str) -> Optional[bytes]:
- """获取缓存结果"""
- cache_file = os.path.join(CACHE_DIR, f"{cache_key}")
- if os.path.exists(cache_file):
- with open(cache_file, "rb") as f:
- return f.read()
- return None
- def cache_result(cache_key: str, result: bytes) -> None:
- """缓存结果"""
- cache_file = os.path.join(CACHE_DIR, f"{cache_key}")
- with open(cache_file, "wb") as f:
- f.write(result)
- @app.post("/process-cached/")
- async def process_image_with_cache(
- file: UploadFile = File(...),
- operation: str = Form(...),
- width: Optional[int] = Form(None),
- height: Optional[int] = Form(None),
- angle: Optional[float] = Form(None),
- flip_mode: Optional[str] = Form(None),
- filter_type: Optional[str] = Form(None),
- brightness: Optional[float] = Form(None),
- contrast: Optional[float] = Form(None),
- saturation: Optional[float] = Form(None),
- output_format: Optional[str] = Form("JPEG"),
- quality: Optional[int] = Form(85),
- left: Optional[int] = Form(None),
- top: Optional[int] = Form(None),
- right: Optional[int] = Form(None),
- bottom: Optional[int] = Form(None)
- ):
- """带缓存的图像处理"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- try:
- # 读取图像
- image_bytes = await file.read()
-
- # 准备参数
- params = {
- "width": width,
- "height": height,
- "angle": angle,
- "flip_mode": flip_mode,
- "filter_type": filter_type,
- "brightness": brightness,
- "contrast": contrast,
- "saturation": saturation,
- "output_format": output_format,
- "quality": quality,
- "left": left,
- "top": top,
- "right": right,
- "bottom": bottom
- }
-
- # 生成缓存键
- cache_key = get_cache_key(image_bytes, operation, params)
-
- # 检查缓存
- cached_result = get_cached_result(cache_key)
- if cached_result:
- return StreamingResponse(
- io.BytesIO(cached_result),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}", "X-Cache": "HIT"}
- )
-
- # 打开图像
- image = await open_image_async(image_bytes)
-
- # 执行操作
- if operation == "resize":
- if width is None or height is None:
- raise HTTPException(status_code=400, detail="调整大小需要宽度和高度参数")
- image = await resize_image_async(image, (width, height))
- elif operation == "crop":
- if left is None or top is None or right is None or bottom is None:
- raise HTTPException(status_code=400, detail="裁剪需要left、top、right和bottom参数")
- image = await crop_image_async(image, (left, top, right, bottom))
- elif operation == "rotate":
- if angle is None:
- raise HTTPException(status_code=400, detail="旋转需要角度参数")
- image = await rotate_image_async(image, angle)
- elif operation == "flip":
- if flip_mode is None:
- raise HTTPException(status_code=400, detail="翻转需要flip_mode参数")
- image = await flip_image_async(image, flip_mode)
- elif operation == "filter":
- if filter_type is None:
- raise HTTPException(status_code=400, detail="滤镜需要filter_type参数")
- image = await apply_filter_async(image, filter_type)
- elif operation == "brightness":
- if brightness is None:
- raise HTTPException(status_code=400, detail="亮度调整需要brightness参数")
- image = await adjust_brightness_async(image, brightness)
- elif operation == "contrast":
- if contrast is None:
- raise HTTPException(status_code=400, detail="对比度调整需要contrast参数")
- image = await adjust_contrast_async(image, contrast)
- elif operation == "saturation":
- if saturation is None:
- raise HTTPException(status_code=400, detail="饱和度调整需要saturation参数")
- image = await adjust_saturation_async(image, saturation)
- elif operation == "convert":
- if output_format is None:
- raise HTTPException(status_code=400, detail="格式转换需要output_format参数")
- image = await convert_format_async(image, output_format)
- else:
- raise HTTPException(status_code=400, detail=f"不支持的操作: {operation}")
-
- # 转换格式
- if output_format:
- image = await convert_format_async(image, output_format)
-
- # 保存图像
- processed_image_bytes = await save_image_async(image, format=output_format, quality=quality)
-
- # 缓存结果
- cache_result(cache_key, processed_image_bytes)
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}", "X-Cache": "MISS"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"处理图像时出错: {str(e)}")
复制代码
8. 部署
8.1 使用Docker部署
创建一个Dockerfile:
- # 使用Python 3.9作为基础镜像
- FROM python:3.9-slim
- # 设置工作目录
- WORKDIR /app
- # 安装系统依赖
- RUN apt-get update && apt-get install -y \
- gcc \
- && rm -rf /var/lib/apt/lists/*
- # 复制requirements文件
- COPY requirements.txt .
- # 安装Python依赖
- RUN pip install --no-cache-dir -r requirements.txt
- # 复制应用代码
- COPY . .
- # 创建临时目录
- RUN mkdir -p temp_images cache
- # 暴露端口
- EXPOSE 8000
- # 运行应用
- CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
复制代码
创建一个requirements.txt文件:
- fastapi==0.68.0
- uvicorn[standard]==0.15.0
- pillow==8.3.1
- python-multipart==0.0.5
- aiofiles==0.7.0
- python-jose[cryptography]==3.3.0
- passlib[bcrypt]==1.7.4
复制代码
构建和运行Docker容器:
- # 构建镜像
- docker build -t fastapi-image-processing .
- # 运行容器
- docker run -d -p 8000:8000 -v $(pwd)/temp_images:/app/temp_images -v $(pwd)/cache:/app/cache --name image-api fastapi-image-processing
复制代码
8.2 使用Docker Compose部署
创建一个docker-compose.yml文件:
- version: '3.8'
- services:
- api:
- build: .
- ports:
- - "8000:8000"
- volumes:
- - ./temp_images:/app/temp_images
- - ./cache:/app/cache
- environment:
- - ENVIRONMENT=production
- restart: unless-stopped
- healthcheck:
- test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
- interval: 30s
- timeout: 10s
- retries: 3
- start_period: 40s
- nginx:
- image: nginx:alpine
- ports:
- - "80:80"
- volumes:
- - ./nginx.conf:/etc/nginx/nginx.conf:ro
- depends_on:
- - api
- restart: unless-stopped
复制代码
创建一个nginx.conf文件:
- events {
- worker_connections 1024;
- }
- http {
- upstream api {
- server api:8000;
- }
- server {
- listen 80;
- server_name localhost;
- client_max_body_size 100M;
- location / {
- proxy_pass http://api;
- proxy_set_header Host $host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto $scheme;
- proxy_read_timeout 300s;
- proxy_connect_timeout 75s;
- }
- location /docs {
- proxy_pass http://api;
- proxy_set_header Host $host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto $scheme;
- }
- location /redoc {
- proxy_pass http://api;
- proxy_set_header Host $host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto $scheme;
- }
- location /openapi.json {
- proxy_pass http://api;
- proxy_set_header Host $host;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto $scheme;
- }
- }
- }
复制代码
使用Docker Compose启动服务:
8.3 使用Kubernetes部署
创建一个deployment.yaml文件:
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: image-processing-api
- spec:
- replicas: 3
- selector:
- matchLabels:
- app: image-processing-api
- template:
- metadata:
- labels:
- app: image-processing-api
- spec:
- containers:
- - name: api
- image: fastapi-image-processing:latest
- ports:
- - containerPort: 8000
- env:
- - name: ENVIRONMENT
- value: "production"
- volumeMounts:
- - name: temp-images
- mountPath: /app/temp_images
- - name: cache
- mountPath: /app/cache
- resources:
- requests:
- memory: "256Mi"
- cpu: "250m"
- limits:
- memory: "512Mi"
- cpu: "500m"
- livenessProbe:
- httpGet:
- path: /health
- port: 8000
- initialDelaySeconds: 30
- periodSeconds: 10
- readinessProbe:
- httpGet:
- path: /health
- port: 8000
- initialDelaySeconds: 5
- periodSeconds: 5
- volumes:
- - name: temp-images
- emptyDir: {}
- - name: cache
- emptyDir: {}
- ---
- apiVersion: v1
- kind: Service
- metadata:
- name: image-processing-api-service
- spec:
- selector:
- app: image-processing-api
- ports:
- - protocol: TCP
- port: 80
- targetPort: 8000
- type: LoadBalancer
- ---
- apiVersion: networking.k8s.io/v1
- kind: Ingress
- metadata:
- name: image-processing-api-ingress
- annotations:
- nginx.ingress.kubernetes.io/rewrite-target: /
- nginx.ingress.kubernetes.io/proxy-body-size: "100m"
- spec:
- rules:
- - http:
- paths:
- - path: /
- pathType: Prefix
- backend:
- service:
- name: image-processing-api-service
- port:
- number: 80
复制代码
部署到Kubernetes集群:
- kubectl apply -f deployment.yaml
复制代码
9. 常见问题和解决方案
9.1 内存管理问题
问题:处理大图像时,可能会遇到内存不足的问题。
解决方案:
1. 限制上传文件大小:
- from fastapi import Request, HTTPException
- from fastapi.middleware import Middleware
- from fastapi.middleware.base import BaseHTTPMiddleware
- class MaxFileSizeMiddleware(BaseHTTPMiddleware):
- def __init__(self, app, max_file_size: int = 10 * 1024 * 1024):
- super().__init__(app)
- self.max_file_size = max_file_size
- async def dispatch(self, request: Request, call_next):
- content_type = request.headers.get("content-type", "")
- if "multipart/form-data" in content_type:
- content_length = request.headers.get("content-length")
- if content_length and int(content_length) > self.max_file_size:
- raise HTTPException(status_code=413, detail="文件太大")
- response = await call_next(request)
- return response
- # 添加中间件
- app = FastAPI(
- title="图像处理API",
- description="使用FastAPI和Pillow构建的高性能图像处理API",
- middleware=[Middleware(MaxFileSizeMiddleware, max_file_size=10 * 1024 * 1024)] # 10MB
- )
复制代码
1. 使用流式处理:
- @app.post("/process-stream/")
- async def process_image_stream(
- file: UploadFile = File(...),
- operation: str = Form(...),
- # 其他参数...
- ):
- """流式处理图像"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- try:
- # 使用流式读取
- contents = await file.read()
-
- # 处理图像...
-
- # 返回流式响应
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"处理图像时出错: {str(e)}")
复制代码
1. 使用临时文件处理大图像:
- import tempfile
- import os
- @app.post("/process-large/")
- async def process_large_image(
- file: UploadFile = File(...),
- operation: str = Form(...),
- # 其他参数...
- ):
- """处理大图像"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- try:
- # 创建临时文件
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- # 流式写入临时文件
- for chunk in file.file:
- temp_file.write(chunk)
- temp_file_path = temp_file.name
-
- try:
- # 使用临时文件处理图像
- image = Image.open(temp_file_path)
-
- # 处理图像...
-
- # 保存到另一个临时文件
- with tempfile.NamedTemporaryFile(suffix=f".{output_format.lower()}", delete=False) as output_temp_file:
- image.save(output_temp_file.name, format=output_format, quality=quality)
- output_temp_file_path = output_temp_file.name
-
- # 返回文件响应
- return FileResponse(
- output_temp_file_path,
- media_type=f"image/{output_format.lower()}",
- filename=f"processed.{output_format.lower()}"
- )
-
- finally:
- # 清理临时文件
- if os.path.exists(temp_file_path):
- os.unlink(temp_file_path)
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"处理图像时出错: {str(e)}")
复制代码
9.2 并发处理问题
问题:当多个用户同时上传大图像时,可能会导致服务器负载过高。
解决方案:
1. 使用队列系统处理任务:
- from fastapi import BackgroundTasks
- import redis
- import json
- import uuid
- # Redis配置
- REDIS_HOST = "localhost"
- REDIS_PORT = 6379
- REDIS_DB = 0
- REDIS_QUEUE = "image_processing_tasks"
- # 创建Redis连接
- redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
- @app.post("/process-queued/")
- async def process_image_queued(
- file: UploadFile = File(...),
- operation: str = Form(...),
- # 其他参数...
- ):
- """队列处理图像"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- # 创建任务ID
- task_id = str(uuid.uuid4())
-
- # 读取文件
- image_bytes = await file.read()
-
- # 准备任务数据
- task_data = {
- "task_id": task_id,
- "image_bytes": image_bytes.hex(), # 转换为十六进制字符串
- "operation": operation,
- "width": width,
- "height": height,
- # 其他参数...
- }
-
- # 将任务添加到队列
- redis_client.rpush(REDIS_QUEUE, json.dumps(task_data))
-
- # 返回任务ID
- return {"task_id": task_id, "status": "queued", "message": "图像处理已加入队列"}
- # 创建一个单独的工作进程来处理队列中的任务
- def process_queue():
- """处理队列中的任务"""
- while True:
- # 从队列中获取任务
- task_json = redis_client.blpop(REDIS_QUEUE, timeout=1)
- if not task_json:
- continue
-
- try:
- # 解析任务数据
- task_data = json.loads(task_json[1])
- task_id = task_data["task_id"]
-
- # 更新任务状态
- redis_client.hset(f"task:{task_id}", "status", "processing")
-
- # 获取图像数据
- image_bytes = bytes.fromhex(task_data["image_bytes"])
-
- # 处理图像...
-
- # 更新任务状态
- redis_client.hset(f"task:{task_id}", "status", "completed")
- redis_client.hset(f"task:{task_id}", "result_url", f"/results/{task_id}.{output_format.lower()}")
-
- except Exception as e:
- # 更新任务状态为错误
- redis_client.hset(f"task:{task_id}", "status", "error")
- redis_client.hset(f"task:{task_id}", "message", str(e))
- # 启动工作进程
- import threading
- worker_thread = threading.Thread(target=process_queue, daemon=True)
- worker_thread.start()
复制代码
1. 使用Celery处理异步任务:
首先,安装Celery:
- pip install celery[redis]
复制代码
创建一个celery_config.py文件:
- from celery import Celery
- import os
- # Celery配置
- CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0")
- CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
- # 创建Celery应用
- celery_app = Celery(
- "image_processing",
- broker=CELERY_BROKER_URL,
- backend=CELERY_RESULT_BACKEND,
- include=["tasks"]
- )
- # Celery配置
- celery_app.conf.update(
- task_serializer="json",
- result_serializer="json",
- accept_content=["json"],
- timezone="UTC",
- enable_utc=True,
- task_track_started=True,
- task_time_limit=30 * 60, # 30分钟超时
- worker_prefetch_multiplier=1,
- worker_max_tasks_per_child=1000,
- )
复制代码
创建一个tasks.py文件:
- from celery_config import celery_app
- from PIL import Image, ImageFilter, ImageEnhance
- import io
- import os
- from typing import Tuple, Optional
- @celery_app.task(bind=True)
- def process_image_task(self, image_bytes: bytes, operation: str, params: dict):
- """处理图像任务"""
- try:
- # 更新任务状态
- self.update_state(state="PROGRESS", meta={"step": "opening_image"})
-
- # 打开图像
- image = Image.open(io.BytesIO(image_bytes))
-
- # 执行操作
- if operation == "resize":
- self.update_state(state="PROGRESS", meta={"step": "resizing_image"})
- width = params.get("width")
- height = params.get("height")
- if width is None or height is None:
- raise ValueError("调整大小需要宽度和高度参数")
- image = image.resize((width, height), Image.LANCZOS)
- elif operation == "crop":
- self.update_state(state="PROGRESS", meta={"step": "cropping_image"})
- left = params.get("left")
- top = params.get("top")
- right = params.get("right")
- bottom = params.get("bottom")
- if left is None or top is None or right is None or bottom is None:
- raise ValueError("裁剪需要left、top、right和bottom参数")
- image = image.crop((left, top, right, bottom))
- # 其他操作...
-
- # 转换格式
- output_format = params.get("output_format", "JPEG")
- if output_format:
- self.update_state(state="PROGRESS", meta={"step": "converting_format"})
- if output_format.upper() == "JPEG" and image.mode in ("RGBA", "LA", "P"):
- # JPEG不支持透明度,转换为RGB
- background = Image.new("RGB", image.size, (255, 255, 255))
- background.paste(image, mask=image.split()[-1] if image.mode == "RGBA" else None)
- image = background
- elif output_format.upper() == "PNG" and image.mode not in ("RGBA", "LA", "P"):
- # PNG支持透明度,转换为RGBA
- image = image.convert("RGBA")
-
- # 保存图像
- self.update_state(state="PROGRESS", meta={"step": "saving_image"})
- quality = params.get("quality", 85)
- img_byte_arr = io.BytesIO()
- image.save(img_byte_arr, format=output_format, quality=quality)
- img_byte_arr.seek(0)
- processed_image_bytes = img_byte_arr.getvalue()
-
- # 返回结果
- return {
- "status": "completed",
- "result": processed_image_bytes.hex(),
- "format": output_format,
- "size": image.size
- }
-
- except Exception as e:
- # 更新任务状态为错误
- self.update_state(
- state="FAILURE",
- meta={
- "error": str(e),
- "status": "error"
- }
- )
- raise
复制代码
在main.py中添加使用Celery的端点:
- from fastapi import FastAPI, File, UploadFile, HTTPException, Form
- from fastapi.responses import StreamingResponse
- from tasks import process_image_task
- import io
- @app.post("/process-celery/")
- async def process_image_with_celery(
- file: UploadFile = File(...),
- operation: str = Form(...),
- # 其他参数...
- ):
- """使用Celery处理图像"""
- # 检查文件类型
- if not file.content_type.startswith("image/"):
- raise HTTPException(status_code=400, detail="文件必须是图像")
-
- try:
- # 读取图像
- image_bytes = await file.read()
-
- # 准备参数
- params = {
- "width": width,
- "height": height,
- "angle": angle,
- "flip_mode": flip_mode,
- "filter_type": filter_type,
- "brightness": brightness,
- "contrast": contrast,
- "saturation": saturation,
- "output_format": output_format,
- "quality": quality,
- "left": left,
- "top": top,
- "right": right,
- "bottom": bottom
- }
-
- # 提交任务
- task = process_image_task.delay(image_bytes, operation, params)
-
- # 返回任务ID
- return {"task_id": task.id, "status": "queued", "message": "图像处理已开始"}
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"提交任务时出错: {str(e)}")
- @app.get("/celery-status/{task_id}")
- async def get_celery_task_status(task_id: str):
- """获取Celery任务状态"""
- task = process_image_task.AsyncResult(task_id)
-
- if task.state == "PENDING":
- response = {
- "task_id": task_id,
- "status": "pending",
- "message": "任务等待中"
- }
- elif task.state == "PROGRESS":
- response = {
- "task_id": task_id,
- "status": "progress",
- "message": "任务处理中",
- "step": task.info.get("step", "")
- }
- elif task.state == "SUCCESS":
- response = {
- "task_id": task_id,
- "status": "completed",
- "message": "任务完成",
- "result_url": f"/celery-results/{task_id}"
- }
- elif task.state == "FAILURE":
- response = {
- "task_id": task_id,
- "status": "error",
- "message": "任务失败",
- "error": str(task.info)
- }
- else:
- response = {
- "task_id": task_id,
- "status": "unknown",
- "message": "未知状态"
- }
-
- return response
- @app.get("/celery-results/{task_id}")
- async def get_celery_task_result(task_id: str):
- """获取Celery任务结果"""
- task = process_image_task.AsyncResult(task_id)
-
- if task.state != "SUCCESS":
- raise HTTPException(status_code=404, detail="任务未完成或不存在")
-
- # 获取结果
- result = task.result
- processed_image_bytes = bytes.fromhex(result["result"])
- output_format = result["format"]
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}"}
- )
复制代码
9.3 安全问题
问题:API可能会被恶意使用,如上传恶意文件、DDoS攻击等。
解决方案:
1. 文件类型验证:
- from fastapi import UploadFile, HTTPException
- import imghdr
- def validate_image_file(file: UploadFile) -> bool:
- """验证图像文件"""
- # 检查MIME类型
- if not file.content_type.startswith("image/"):
- return False
-
- # 检查文件扩展名
- allowed_extensions = [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"]
- file_extension = os.path.splitext(file.filename)[1].lower()
- if file_extension not in allowed_extensions:
- return False
-
- # 检查文件内容
- file_content = file.file.read()
- file.file.seek(0) # 重置文件指针
-
- # 使用imghdr检查文件类型
- image_type = imghdr.what(None, h=file_content)
- if image_type not in ["jpeg", "jpg", "png", "gif", "bmp", "webp"]:
- return False
-
- return True
- @app.post("/process-secure/")
- async def process_image_secure(
- file: UploadFile = File(...),
- operation: str = Form(...),
- # 其他参数...
- ):
- """安全处理图像"""
- # 验证文件
- if not validate_image_file(file):
- raise HTTPException(status_code=400, detail="无效的图像文件")
-
- try:
- # 处理图像...
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"处理图像时出错: {str(e)}")
复制代码
1. 速率限制:
- pip install fastapi-limiter
复制代码- from fastapi import FastAPI, Request
- from fastapi_limiter import FastAPILimiter
- from fastapi_limiter.depends import RateLimiter
- import redis
- app = FastAPI()
- @app.on_event("startup")
- async def startup():
- redis = await aioredis.create_redis_pool("redis://localhost")
- FastAPILimiter.init(redis)
- @app.post("/process-rate-limited/", dependencies=[Depends(RateLimiter(times=10, seconds=60))])
- async def process_image_rate_limited(
- file: UploadFile = File(...),
- operation: str = Form(...),
- # 其他参数...
- ):
- """速率限制的图像处理"""
- # 验证文件
- if not validate_image_file(file):
- raise HTTPException(status_code=400, detail="无效的图像文件")
-
- try:
- # 处理图像...
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"处理图像时出错: {str(e)}")
复制代码
1. 身份验证和授权:
- from fastapi import Depends, HTTPException, status
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from jose import JWTError, jwt
- from passlib.context import CryptContext
- from datetime import datetime, timedelta
- from pydantic import BaseModel
- # 安全配置
- SECRET_KEY = "your-secret-key-here"
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_MINUTES = 30
- # 密码上下文
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- # Bearer token
- security = HTTPBearer()
- # 用户模型
- class User(BaseModel):
- username: str
- email: str = None
- full_name: str = None
- disabled: bool = None
- class UserInDB(User):
- hashed_password: str
- # 模拟数据库
- fake_users_db = {
- "johndoe": {
- "username": "johndoe",
- "full_name": "John Doe",
- "email": "johndoe@example.com",
- "hashed_password": pwd_context.hash("secret"),
- "disabled": False,
- }
- }
- # Token模型
- class Token(BaseModel):
- access_token: str
- token_type: str
- class TokenData(BaseModel):
- username: str = None
- def verify_password(plain_password, hashed_password):
- """验证密码"""
- return pwd_context.verify(plain_password, hashed_password)
- def get_password_hash(password):
- """获取密码哈希"""
- return pwd_context.hash(password)
- def get_user(db, username: str):
- """获取用户"""
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
- def authenticate_user(fake_db, username: str, password: str):
- """验证用户"""
- user = get_user(fake_db, username)
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
- def create_access_token(data: dict, expires_delta: timedelta = None):
- """创建访问令牌"""
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """获取当前用户"""
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="无法验证凭据",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = TokenData(username=username)
- except JWTError:
- raise credentials_exception
- user = get_user(fake_users_db, username=token_data.username)
- if user is None:
- raise credentials_exception
- return user
- async def get_current_active_user(current_user: UserInDB = Depends(get_current_user)):
- """获取当前活跃用户"""
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="禁用的用户")
- return current_user
- @app.post("/token", response_model=Token)
- async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
- """登录获取访问令牌"""
- user = authenticate_user(fake_users_db, form_data.username, form_data.password)
- if not user:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="用户名或密码不正确",
- headers={"WWW-Authenticate": "Bearer"},
- )
- access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- access_token = create_access_token(
- data={"sub": user.username}, expires_delta=access_token_expires
- )
- return {"access_token": access_token, "token_type": "bearer"}
- @app.post("/process-auth/")
- async def process_image_auth(
- current_user: UserInDB = Depends(get_current_active_user),
- file: UploadFile = File(...),
- operation: str = Form(...),
- # 其他参数...
- ):
- """需要身份验证的图像处理"""
- # 验证文件
- if not validate_image_file(file):
- raise HTTPException(status_code=400, detail="无效的图像文件")
-
- try:
- # 处理图像...
-
- # 返回处理后的图像
- return StreamingResponse(
- io.BytesIO(processed_image_bytes),
- media_type=f"image/{output_format.lower()}",
- headers={"Content-Disposition": f"attachment; filename=processed.{output_format.lower()}"}
- )
-
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"处理图像时出错: {str(e)}")
复制代码
10. 结论
在本指南中,我们学习了如何使用FastAPI和Pillow构建高性能的图像处理API。我们从基础开始,逐步添加了更多功能,并解决了实际开发中可能遇到的各种挑战。
10.1 主要收获
1. FastAPI基础:我们学习了如何创建基本的FastAPI应用,添加路由,处理请求和响应。
2. Pillow图像处理:我们掌握了使用Pillow进行各种图像处理操作,如调整大小、裁剪、旋转、应用滤镜等。
3. API设计:我们设计了RESTful API,支持多种图像处理操作,并提供了批量处理功能。
4. 性能优化:我们实现了异步处理、缓存机制和队列系统,以提高API的性能和可扩展性。
5. 部署:我们学习了如何使用Docker、Docker Compose和Kubernetes部署API。
6. 问题解决:我们解决了实际开发中可能遇到的各种问题,如内存管理、并发处理和安全问题。
FastAPI基础:我们学习了如何创建基本的FastAPI应用,添加路由,处理请求和响应。
Pillow图像处理:我们掌握了使用Pillow进行各种图像处理操作,如调整大小、裁剪、旋转、应用滤镜等。
API设计:我们设计了RESTful API,支持多种图像处理操作,并提供了批量处理功能。
性能优化:我们实现了异步处理、缓存机制和队列系统,以提高API的性能和可扩展性。
部署:我们学习了如何使用Docker、Docker Compose和Kubernetes部署API。
问题解决:我们解决了实际开发中可能遇到的各种问题,如内存管理、并发处理和安全问题。
10.2 进一步学习
如果你想进一步学习,可以探索以下主题:
1. 更多图像处理功能:学习更高级的图像处理技术,如人脸识别、对象检测、图像分割等。
2. 机器学习集成:将机器学习模型集成到API中,以实现更智能的图像处理。
3. 微服务架构:将API拆分为多个微服务,以提高可维护性和可扩展性。
4. 监控和日志:添加监控和日志系统,以跟踪API的性能和使用情况。
5. CI/CD:实现持续集成和持续部署,以自动化测试和部署过程。
更多图像处理功能:学习更高级的图像处理技术,如人脸识别、对象检测、图像分割等。
机器学习集成:将机器学习模型集成到API中,以实现更智能的图像处理。
微服务架构:将API拆分为多个微服务,以提高可维护性和可扩展性。
监控和日志:添加监控和日志系统,以跟踪API的性能和使用情况。
CI/CD:实现持续集成和持续部署,以自动化测试和部署过程。
10.3 资源
以下是一些有用的资源,可以帮助你进一步学习:
1. FastAPI官方文档
2. Pillow官方文档
3. Docker官方文档
4. Kubernetes官方文档
5. Celery官方文档
希望本指南能帮助你构建高性能的图像处理API,并在实际开发中解决各种挑战。祝你编码愉快! |
|