🚀 8. API 开发与微服务
在现代Web开发中,API(应用程序编程接口)已成为系统间通信的核心。本章将深入探讨如何使用Flask构建高质量的API,并逐步向微服务架构演进。
📋 8.1 RESTful API 设计规范
什么是RESTful API?
REST(Representational State Transfer)是一种架构风格,它定义了一组约束条件和原则。RESTful API遵循这些原则,使得API更加直观、可预测和易于维护。
REST核心原则
HTTP方法与资源操作映射
| HTTP方法 | 操作 | 示例URL | 描述 |
|---|---|---|---|
| GET | 读取 | /api/users | 获取用户列表 |
| GET | 读取 | /api/users/123 | 获取特定用户 |
| POST | 创建 | /api/users | 创建新用户 |
| PUT | 更新 | /api/users/123 | 完整更新用户 |
| PATCH | 部分更新 | /api/users/123 | 部分更新用户 |
| DELETE | 删除 | /api/users/123 | 删除用户 |
Flask RESTful API 基础示例
python
# app.py
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///api.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
# 获取所有用户
@app.route('/api/users', methods=['GET'])
def get_users():
users = User.query.all()
return jsonify([
{'id': user.id, 'name': user.name, 'email': user.email}
for user in users
])
# 创建用户
@app.route('/api/users', methods=['POST'])
def create_user():
data = request.get_json()
user = User(name=data['name'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify({'id': user.id, 'name': user.name, 'email': user.email}), 201
# 获取特定用户
@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = User.query.get_or_404(user_id)
return jsonify({'id': user.id, 'name': user.name, 'email': user.email})1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
API响应格式标准化
python
#utils/response.py
from flask import jsonify
def success_response(data=None, message="Success", status_code=200):
"""标准化成功响应"""
response = {
'success': True,
'message': message,
'data': data
}
return jsonify(response), status_code
def error_response(message="Error", status_code=400, errors=None):
"""标准化错误响应"""
response = {
'success': False,
'message': message,
'errors': errors
}
return jsonify(response), status_code1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
🔄 8.2 API 版本控制策略
为什么需要版本控制?
API版本控制确保向后兼容性,允许客户端在自己的节奏下迁移到新版本,同时让开发者能够持续改进API。
版本控制策略对比
Flask版本控制实现
python
# app.py
from flask import Flask, Blueprint
app = Flask(__name__)
# 方式1: URL路径版本控制
api_v1 = Blueprint('api_v1', __name__, url_prefix='/api/v1')
api_v2 = Blueprint('api_v2', __name__, url_prefix='/api/v2')
@api_v1.route('/users')
def get_users_v1():
return {'version': '1.0', 'users': []}
@api_v2.route('/users')
def get_users_v2():
return {'version': '2.0', 'users': [], 'pagination': {}}
app.register_blueprint(api_v1)
app.register_blueprint(api_v2)
# 方式2: 请求头版本控制
@app.route('/api/users')
def get_users():
version = request.headers.get('API-Version', '1.0')
if version == '2.0':
return get_users_v2_logic()
return get_users_v1_logic()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
🛠️ 8.3 Flask-RESTful 与 Flask-RESTX
Flask-RESTful:简洁的REST API框架
python
# api/resources.py
from flask import Flask
from flask_restful import Api, Resource, reqparse
app = Flask(__name__)
api = Api(app)
class UserListAPI(Resource):
def __init__(self):
self.reqparse = reqparse.RequestParser()
self.reqparse.add_argument('name', type=str, required=True, location='json')
self.reqparse.add_argument('email', type=str, required=True, location='json')
super(UserListAPI, self).__init__()
def get(self):
"""获取用户列表"""
return {'users': []}
def post(self):
"""创建新用户"""
args = self.reqparse.parse_args()
# 创建用户逻辑
return {'message': 'User created', 'user': args}, 201
api.add_resource(UserListAPI, '/api/users')1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Flask-RESTX:带文档生成的增强版
python
# api/namespaces.py
from flask import Flask
from flask_restx import Api, Resource, fields, Namespace
app = Flask(__name__)
api = Api(app, doc='/docs/', title='用户管理API', description='用户管理系统API文档')
# 创建命名空间
user_ns = Namespace('users', description='用户相关操作')
# 定义数据模型
user_model = user_ns.model('User', {
'id': fields.Integer(required=True, description='用户ID'),
'name': fields.String(required=True, description='用户姓名'),
'email': fields.String(required=True, description='用户邮箱')
})
user_input_model = user_ns.model('UserInput', {
'name': fields.String(required=True, description='用户姓名'),
'email': fields.String(required=True, description='用户邮箱')
})
@user_ns.route('/')
class UserList(Resource):
@user_ns.marshal_list_with(user_model)
@user_ns.doc('获取用户列表')
def get(self):
"""获取所有用户"""
return []
@user_ns.expect(user_input_model)
@user_ns.marshal_with(user_model, code=201)
@user_ns.doc('创建用户')
def post(self):
"""创建新用户"""
return {}, 201
api.add_namespace(user_ns, path='/api/users')1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
🔗 8.4 GraphQL API 开发
GraphQL vs REST对比
Flask + Graphene 实现
python
# graphql_api.py
from flask import Flask
from flask_graphql import GraphQLView
import graphene
from graphene_sqlalchemy import SQLAlchemyObjectType
app = Flask(__name__)
# GraphQL类型定义
class UserType(SQLAlchemyObjectType):
class Meta:
model = User
interfaces = (graphene.relay.Node,)
# 查询类
class Query(graphene.ObjectType):
users = graphene.List(UserType)
user = graphene.Field(UserType, id=graphene.Int())
def resolve_users(self, info):
return User.query.all()
def resolve_user(self, info, id):
return User.query.get(id)
# 变更类
class CreateUser(graphene.Mutation):
class Arguments:
name = graphene.String(required=True)
email = graphene.String(required=True)
user = graphene.Field(UserType)
def mutate(self, info, name, email):
user = User(name=name, email=email)
db.session.add(user)
db.session.commit()
return CreateUser(user=user)
class Mutation(graphene.ObjectType):
create_user = CreateUser.Field()
# 创建Schema
schema = graphene.Schema(query=Query, mutation=Mutation)
# 添加GraphQL端点
app.add_url_rule('/graphql', view_func=GraphQLView.as_view(
'graphql',
schema=schema,
graphiql=True # 启用GraphiQL界面
))1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
📚 8.5 API 文档自动生成(Swagger/OpenAPI)
OpenAPI规范的重要性
OpenAPI(原Swagger)规范提供了描述REST API的标准方式,使得API文档化、测试和客户端代码生成变得自动化。
Flask-RESTX自动文档生成
python
# api/swagger_config.py
from flask import Flask
from flask_restx import Api, Resource, fields
app = Flask(__name__)
# 配置Swagger文档
api = Api(
app,
version='1.0',
title='博客API',
description='一个简单的博客管理API',
doc='/docs/',
authorizations={
'Bearer': {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization',
'description': 'JWT Token: Bearer <token>'
}
},
security='Bearer'
)
# 定义错误响应模型
error_model = api.model('Error', {
'message': fields.String(description='错误信息'),
'code': fields.Integer(description='错误代码')
})
# 定义分页模型
pagination_model = api.model('Pagination', {
'page': fields.Integer(description='当前页码'),
'per_page': fields.Integer(description='每页数量'),
'total': fields.Integer(description='总数量'),
'pages': fields.Integer(description='总页数')
})
@api.route('/users')
class UserListResource(Resource):
@api.doc('获取用户列表')
@api.param('page', '页码', type='integer', default=1)
@api.param('per_page', '每页数量', type='integer', default=10)
@api.response(200, '成功', user_list_model)
@api.response(400, '请求错误', error_model)
def get(self):
"""获取用户列表,支持分页"""
pass1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
🛡️ 8.6 API 限流与熔断机制
限流策略
Flask-Limiter实现
python
# api/rate_limiting.py
from flask import Flask, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import redis
app = Flask(__name__)
# 配置Redis作为存储后端
limiter = Limiter(
app,
key_func=get_remote_address,
storage_uri="redis://localhost:6379",
default_limits=["1000 per hour"]
)
@app.route('/api/users')
@limiter.limit("10 per minute") # 每分钟最多10次请求
def get_users():
return jsonify({'users': []})
@app.route('/api/upload')
@limiter.limit("5 per hour") # 上传接口更严格的限制
def upload_file():
return jsonify({'message': 'File uploaded'})
# 自定义限流键函数
def get_user_id():
# 基于用户ID进行限流
return request.headers.get('User-ID', get_remote_address())
@app.route('/api/premium')
@limiter.limit("100 per hour", key_func=get_user_id)
def premium_api():
return jsonify({'data': 'premium content'})
# 限流异常处理
@app.errorhandler(429)
def ratelimit_handler(e):
return jsonify({
'error': 'Rate limit exceeded',
'message': str(e.description),
'retry_after': e.retry_after
}), 4291
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
熔断器模式实现
python
# utils/circuit_breaker.py
import time
from enum import Enum
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 熔断状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# 装饰器使用
def circuit_breaker(failure_threshold=5, timeout=60):
cb = CircuitBreaker(failure_threshold, timeout)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return cb.call(func, *args, **kwargs)
return wrapper
return decorator
# 使用示例
@circuit_breaker(failure_threshold=3, timeout=30)
def external_api_call():
# 调用外部API
pass1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
🏗️ 8.7 微服务架构设计
单体架构 vs 微服务架构
Flask微服务基础架构
python
# services/user_service.py
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
import os
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///users.db')
db = SQLAlchemy(app)
migrate = Migrate(app, db)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
@app.route('/health')
def health_check():
"""健康检查端点"""
return jsonify({'status': 'healthy', 'service': 'user-service'})
@app.route('/users', methods=['GET'])
def get_users():
users = User.query.all()
return jsonify([
{'id': u.id, 'username': u.username, 'email': u.email}
for u in users
])
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
user = User(username=data['username'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify({'id': user.id, 'username': user.username}), 201
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
服务间通信
python
# utils/service_client.py
import requests
import json
from typing import Optional, Dict, Any
class ServiceClient:
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.session = requests.Session()
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""GET请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
response = self.session.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
return response.json()
def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""POST请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
response = self.session.post(
url,
json=data,
timeout=self.timeout,
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
return response.json()
# 使用示例
user_service = ServiceClient('http://user-service:5001')
order_service = ServiceClient('http://order-service:5002')
@app.route('/user-orders/<int:user_id>')
def get_user_orders(user_id):
try:
# 调用用户服务获取用户信息
user = user_service.get(f'/users/{user_id}')
# 调用订单服务获取用户订单
orders = order_service.get('/orders', params={'user_id': user_id})
return jsonify({
'user': user,
'orders': orders
})
except requests.RequestException as e:
return jsonify({'error': 'Service unavailable'}), 5031
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Docker容器化部署
dockerfile
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 5000
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:5000/health || exit 1
# 启动应用
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "app:app"]1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
yaml
# docker-compose.yml
version: '3.8'
services:
user-service:
build: ./user-service
ports:
- "5001:5000"
environment:
- DATABASE_URL=postgresql://user:pass@user-db:5432/users
depends_on:
- user-db
networks:
- microservices
order-service:
build: ./order-service
ports:
- "5002:5000"
environment:
- DATABASE_URL=postgresql://user:pass@order-db:5432/orders
depends_on:
- order-db
networks:
- microservices
api-gateway:
build: ./api-gateway
ports:
- "8080:5000"
environment:
- USER_SERVICE_URL=http://user-service:5000
- ORDER_SERVICE_URL=http://order-service:5000
depends_on:
- user-service
- order-service
networks:
- microservices
user-db:
image: postgres:13
environment:
- POSTGRES_DB=users
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
networks:
- microservices
order-db:
image: postgres:13
environment:
- POSTGRES_DB=orders
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
networks:
- microservices
networks:
microservices:
driver: bridge1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
📝 总结
本章深入探讨了Flask API开发的各个方面,从基础的RESTful设计到复杂的微服务架构。关键要点包括:
- RESTful设计:遵循REST原则,使用标准HTTP方法和状态码
- 版本控制:为API演进提供向后兼容性保障
- 工具选择:Flask-RESTful适合简单API,Flask-RESTX提供文档生成
- GraphQL:为复杂数据查询提供更灵活的解决方案
- 文档化:自动生成API文档提高开发效率
- 保护机制:限流和熔断确保系统稳定性
- 微服务:合理拆分服务,注意服务间通信和数据一致性
在下一章中,我们将探讨应用架构与模块化设计,学习如何构建可维护和可扩展的Flask应用。
