Flask

一、介绍和安装

Flask是一个基于WSGI协议的轻量级web框架,它使用起来非常简单且快捷,并且有能力扩展到开发大型项目。它基于 Werkzeug and Jinja 开发,已经成为目前流行的Python web应用程序框架之一。Flask为开发者提供了一些建议,但是并不会强制依赖某些布局,由开发人员自己来选择他们想要使用的工具和库。在社区提供了许多Flask的扩展包,这使添加新功能变得容易。

参考文档:官方文档中文文档

参考文章:https://www.cnblogs.com/wupeiqi/articles/7552008.html

源码:GitHub

安装:

1
pip install flask

二、基本使用

1 flask初识

先创建一个最简单的Flask应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 导入Flask类
from flask import Flask
# 创建一个该类的对象。第一个参数是应用模块或者包的名称。
# __name__ 是一个适用于大多数情况的快捷方式。有了这个参数, Flask 才能知道在哪里可以找到模板和静态文件等东西。
app = Flask(__name__)

# 然后我们使用route()装饰器来告诉 Flask 触发函数的URL,这里是根路径
@app.route("/")
def hello_world():
# 函数返回值就是浏览器中显示的信息。
# 默认的内容类型是HTML,因此字符串中的HTML代码会被浏览器渲染。
return "<p>Hello, World!</p>"
# 这里写在文件里启动,(还可以命令行启动)
app.run()

这样就启动了一个非常简单的服务器,默认flask会监听本地的5000端口,打开浏览器输入http://127.0.0.1:5000/ 即可看到hello world。接下来对比django,看看flask中如何使用。

2 三种类型的响应

在django中,有render,HttpResponse,redirect三种类型的响应,在flask中同样也有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask import Flask,render_template,redirect
app = Flask(__name__)
# 1.返回字符串就,对应django的HTTPResponse
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"

# 2.返回模版文件render_template,对应django的render
@app.route("/")
def hello_world():
# 这里的index.html模版文件默认路径是根路径下的templates目录下(是Flask类的init方法中默认值,可以另行指定)
return render_template("index.html")

# 3.redirect,对应于django的redirect
@app.route("/")
def hello_world():
return redirect("https://www.baidu.com")

3 模版语法

flask中同样有模版语法,django的模版语法是自己写的,flask使用的是jinja模块,它更好用一些。在HTML中双大括号中的变量可以被动态替换,在点.调用的同时,jinja还可以直接调用方法,并且可以括号取值、get取值。使用和django类似,

3.1 举例

demo.py

1
2
3
4
5
6
7
8
9
10
11
from flask import Flask,render_template,request,redirect
app = Flask(__name__)
USERS = {
1:{'name':'令狐冲','age':20,'gender':'男','text':"独孤九剑"},
2:{'name':'小龙女','age':18,'gender':'女','text':"玉女心经"},
3:{'name':'段誉','age':23,'gender':'男','text':"凌波微步"},
}

@app.route('/index',methods=['GET'])
def index():
return render_template('index.html',user_dict=USERS) # 可以直接在这里加自定义的键值

index.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<body>
<h1>列表</h1>
<table>
<!--循环结构,与django不同的是,可以直接加括号调用方法-->
{% for k,v in user_dict.items() %}
<tr>
<!--对于字典内的数据,可以`.`取值,也可以中括号`[]`,还可以使用`get`方法-->
<td>{{k}}</td>
<td>{{v.name}}</td>
<td>{{v['age']}}</td>
<td>{{v.get('gender')}}</td>
<!--if判断-->
{% if k==2 %}
<td>秘笈!{{ secret }}</td>
{% endif %}
</tr>
{% endfor %}
</table>
</body>

3.2 总结:

变量

1
{{ 变量名 }}

循环

1
2
3
4
5
{% for i in var %}

...

{% endfor %}

条件

1
2
3
4
5
{% if ... %}

..

{% endif %}

4 请求方法、request对象、反向解析

4.1 举例

demo.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask,render_template,request
app = Flask(__name__)
@app.route('/login',methods=['GET','POST'],endpoint='log') # endpoint路由的别名,用作反向解析
def login():
# request.method判断请求类型
if request.method == "GET":
params = request.query_string # get请求携带的数据从query_string取出,默认是字节byte类型
print(params)
return render_template("login.html")
else:
print(request.form) # form表单提交的post请求数据从form取出
user = request.form.get('user') # 可以使用get取值
pwd = request.form.get('pwd')
if user == 'abc' and pwd == '123':
return "post"

login.html

1
2
3
4
5
6
7
8
<body>
<h1>用户登录</h1>
<form method="post">
<input type="text" name="user">
<input type="text" name="pwd">
<input type="submit" value="登录">
</form>
</body>

4.2 总结

使用methods参数,传递一个列表,指定接收什么类型的请求,比如GET,POST

endpoint用作反向解析的关键字(别名)。使用url_for作后端反向解析,根据endpoint设置的值获取url。

request对象导入即可使用,能够自动区别每次的请求对象,通过request.method判断请求类型,

request.query_string获取get请求携带参数,request.form.get获取form表单提交的post请求数据。

5 路由的另一种形式

flask通过app.route装饰器来实现路由分发,这是它的源码

1
2
3
4
5
6
7
def route(self, rule: str, **options: t.Any) -> t.Callable:
def decorator(f: t.Callable) -> t.Callable:
endpoint = options.pop("endpoint", None)
self.add_url_rule(rule, endpoint, f, **options)
return f

return decorator

举例:@app.route('/login',methods=['GET','POST'],endpoint='log'),首先调用route函数返回装饰器,然后将视图函数作为参数传到decorator中,然后调用self.add_url_rule方法,此时的self是Flask类的对象(app对象),即Flask.add_url_rule方法。所以,可以不使用装饰器,而改为类似django的路由分发形式:

1
2
3
4
5
6
7
8
9
10
from flask import Flask,render_template,redirect

app = Flask(__name__)

def hello_world():
return "hello!"
# 手动调用add_url_rule方法,把url地址、视图函数、反向解析别名传入
app.add_url_rule("/",view_func=hello_world,endpoint="root")

app.run()

访问 http://127.0.0.1:5000/同样生效。这和django中的`url('/',xxxx.view,name="root")`的形式很相似。

6 配置文件与配置方法

flask中的配置文件是一个flask.config.Config,它继承字典,可以像字典一样操作。默认配置为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
'DEBUG': get_debug_flag(default=False), # 是否开启Debug模式
'TESTING': False, # 是否开启测试模式
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None, # 密钥,使用session的时候需要配置
'PERMANENT_SESSION_LIFETIME': timedelta(days=31), # session过期时间
'USE_X_SENDFILE': False,
'LOGGER_NAME': None,
'LOGGER_HANDLER_POLICY': 'always',
'SERVER_NAME': None,
'APPLICATION_ROOT': None,
'SESSION_COOKIE_NAME': 'session', # session的名字
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),
'TRAP_BAD_REQUEST_ERRORS': False,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': True,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
}

6.1 配置方法1

直接写在项目里,如果是小项目,可以使用;对于大型项目,不建议采用这种方式。

1
2
3
app.config['DEBUG'] = True
# 或者使用update(config是字典的子类)
app.config.update(...)

6.2 配置方法2

通过py文件配置

1
2
# 第一个参数是py文件路径,可以是绝对路径,也可以是相对路径
app.config.from_pyfile("settings.py") # 这里是相对路径

settings.py

1
DEBUG = True

6.3 配置方法3

通过对象来配置

1
2
3
4
# 第一个参数可以是
# - 字符串:在这种情况下,字符串对应的对象会被导入
# - 对象:直接传入一个对象
app.config.from_object('settings.TestingConfig')

settings.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Config(object):
DEBUG = False
TESTING = False
DATABASE_URI = 'sqlite://:memory:'

class ProductionConfig(Config):
# 生产环境
DATABASE_URI = 'mysql://user@localhost/foo'

class DevelopmentConfig(Config):
# 开发环境
DEBUG = True

class TestingConfig(Config):
# 测试环境
TESTING = True

这种方式的好处在于,事先定义好每个环境的配置文件,然后只需要在app.config.from_object()配置就能切换。

6.4 其他配置方法

通过文件、json、环境变量配置,详见这里

7 路由层转换器和参数

7.1 转换器

flask有内置的转换器,和django2.x+版本的转换器效果相同

1
2
3
4
5
6
7
8
9
DEFAULT_CONVERTERS = {
'default': UnicodeConverter, # 如果省略,默认是string
'string': UnicodeConverter,
'any': AnyConverter,
'path': PathConverter,
'int': IntegerConverter,
'float': FloatConverter,
'uuid': UUIDConverter,
}

使用方法:

1
2
@app.route('/post/<int:post_id>') # 访问127.0.0.1/post/1
@app.route('/post/<post_id>') # 省略转换器,默认使用string

7.2 参数

app.route中的参数,和app.add_url_rule中参数一样

1
2
3
4
5
6
7
8
rule   # 字符串,匹配的url,支持转换器
view_func=None # 视图函数
endpoint # 视图函数别名,用于反向生成URL(使用url_for),如果不写默认为当前视图函数的名称,注意,不要与其它视图函数重名
methods # 允许哪些请求方式,默认为GET;
defaults # 使用defaults={'k1':'v1'}传值,视图函数加参数k1可以通过k1获取到v1
strict_slashes # 对URL最后的/符号是否严格要求,如果为False则不严格。
redirect_to # 重定向到指定地址,可以是字符串url地址,或者是可调用的函数(函数返回值是url地址)
subdomain # 允许哪些子域访问

flask本质调用了werkzeug的Rule的init方法,所以更多详情详见这里

8 flask的CBV

原理和django的CBV基本相同

1
2
3
4
5
6
7
8
9
10
11
12
from flask import Flask
from flask import views # 导入基类
app=Flask(__name__)

class LoginView(views.View):
# 需要重写dispatch_request方法
methods = ['GET']

def dispatch_request(self):
return 'xxx'
# 这里和django不同的是必须要传一个视图函数的别名,也就是endpoint
app.add_url_rule('/login',view_func=LoginView.as_view('login'))

也可以直接继承MethodView,就不用手动重写dispatch_request方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def outer(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result
return inner

class LoginView(views.MethodView):
methods = ['GET'] # 指定允许的请求方式
decorators = [outer, ] # 可以增加多个装饰器,内部是for循环执行的,和装饰器从上到下一样

def get(self):
print('xxxxx')
return "get"
def post(self):
return 'post'

app.add_url_rule('/login',view_func=LoginView.as_view('login'))

9 pycharm设置jinja模版自动补全

第一步,右键模版文件夹--->Mark Directory as--->Template Folder,此时可能会提示你还未选模板语言,是否要选择一个?,点击确定,

image-20211122140547506

如果没有提示也没关系,打开设置,找到下图位置,选择jinja2即可

image-20211122140842859

10 request请求对象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
request.method   # 请求方法
request.args # get请求携带的数据
request.form # post请求提交的数据
request.values # post和get提交的所有数据
request.cookies # 浏览器携带的cookie
request.headers # 请求头
request.path # 不带域名,请求地址,不带参数
request.full_path # 不带域名,请求地址,带参数
request.url # 带域名带参数的请求地址
request.base_url # 带域名不带参数请求地址
request.url_root # 域名
request.host_url # 域名
request.host # 127.0.0.1:5000
request.files # 文件

11 response响应对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
return "字符串"  # 直接返回字符串
return render_template('html模板路径',**{}) # 渲染模版,可以传自定义键值
return redirect('/index.html') # 重定向
return jsonify({'k1':'v1'}) # json格式响应,相当于django的Jsonresponse
# make_response用于响应头里添加自定义信息,比如session、cookies,使用这个方法包装一个response对象,比如下面的例子:
# response是flask.wrappers.Response对象
# def index():
# response = make_response(render_template('index.html', foo=42))
# response.headers['X-Parachutes'] = 'parachutes are cool'
# return response


response.delete_cookie('key')
response.set_cookie('key', 'value')
response.headers['X-Something'] = 'A value'
return response

12 session

1
from flask import Flask,sessions

在使用session之前需要在配置文件中设置SECRET_KEY

然后在视图函数中直接使用

1
2
3
session['key']=value  # 设置session
session.pop('key') # 删除session
session['key'] # 获取session

13 消息闪现

闪现系统的基本工作方式是:在且只在下一个请求中访问上一个请求结束时记录的消息。

1
2
flash("xxxx") # 用于闪现一个消息
get_flashed_message() # 用于获取消息

在a页面设置flash,然后在b页面获取,并且只能获取一次。一般结合布局模板来使用闪现系统。此外还支持分类,更多内容详见这里

14 请求扩展

相当于django的中间件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from flask import Flask, Request

app = Flask(__name__, template_folder='templates')
app.debug = True

# 仅第一次请求来的时候会执行,后面再来的请求不会执行
@app.before_first_request
def before_first_request1():
print('before_first_request1')


@app.before_first_request
def before_first_request2():
print('before_first_request2')

# 类似于django的process_request,请求来之前做一些操作(比如登录认证)
@app.before_request
def before_request1():
Request.nnn = 123
print('before_request1')


@app.before_request
def before_request2():
print('before_request2')

# 请求走之前执行,可以在这里做一些操作(比如设置session等)
@app.after_request
def after_request1(response):
print('before_request1', response)
return response


@app.after_request
def after_request2(response):
print('before_request2', response)
return response

# 只要出现404状态码,就交由该视图处理,可以render一个页面
@app.errorhandler(404)
def page_not_found(error):
return 'This page does not exist', 404

# 全局的模版,{{sb(1,2)}}
@app.template_global()
def sb(a1, a2):
return a1 + a2
# 全局过滤器,{{ 1|db(2,3)}}
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3

@app.route('/')
def hello_world():
return "helloworld"


if __name__ == '__main__':
app.run()

15 蓝图

为了在一个或多个应用中,使应用模块化并且支持常用方案,Flask 引入了蓝图概念,主要是为应用提供目录划分,类似于django的多个app注册。

多个应用注册时,可以不使用蓝图,但是这样会导致每个app都使用自己独立的配置,且只能在 WSGI 层中管理应用。而如果使用蓝图,那么应用会在Flask层中进行管理,共享配置,通过注册按需改变应用对象。蓝图的缺点是一旦应用被创建后,只有销毁整个应用对象才能注销蓝图。

要使用蓝图,首先要注册蓝图

1
2
3
4
5
from flask import Flask
from yourapplication.simple_page import simple_page
# 正常创建app,在app里注册蓝图
app = Flask(__name__)
app.register_blueprint(simple_page)

然后在应用下使用蓝图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from flask import Blueprint, render_template, abort
from jinja2 import TemplateNotFound

# 使用时,用Blueprint代替Flask创建应用
simple_page = Blueprint('simple_page', __name__,
template_folder='templates',static_folder='statics',static_url_path='/static')

@simple_page.route('/', defaults={'page': 'index'})
@simple_page.route('/<page>')
def show(page):
try:
return render_template(f'pages/{page}.html')
except TemplateNotFound:
abort(404)

一般来说把项目划分成几个部分,比如模版、视图、静态文件、中间件等等,使用蓝图注册视图层下的应用,统一管理。

蓝图的应用示例,参考这里

大型项目的目录结构:点击下载

简单项目的目录结构:点击下载

16 信号

Flask 自 0.6 版本开始在内部支持信号。信号功能由优秀的 blinker 库提供支持, 如果没有安装该库就无法使用信号功能,但不影响其他功能。

什么是信号?当flask的运行过程中或者是在flask扩展中发生动作时,会设置一些信号,默认是不会执行的。当你需要使用信号的时候,订阅某个信号,为它绑定你的函数,它就会执行,并且所有的信号处理器是乱序执行的。

flask内置了如下信号

1
2
3
4
5
6
7
8
9
10
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
appcontext_tearing_down = _signals.signal('appcontext-tearing-down') # 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flash在其中添加数据时,自动触发

信号的使用:

首先需要安装blinker

1
pip install blinker

然后只需要定义函数,订阅信号即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import Flask,signals # 导入信号类

app = Flask(__name__)
app.config['DEBUG'] = True

# 创建函数
def func(*args, **kwargs):
print('request_started信号触发了', args, kwargs) # args[0]是当前的app对象

# 订阅信号request_started,connect里传入定义的函数
signals.request_started.connect(func)

@app.route("/",endpoint="root")
def hello_world():
print("视图函数执行")
return "hello!"

if __name__ == '__main__':
app.run()

支持自定义信号(需要自己触发信号,其余操作和内置信号使用相同):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from flask import Flask
from flask.signals import _signals

app = Flask(__name__)
app.config['DEBUG'] = True

# 自定义信号
sig = _signals.signal('xxxx')

def func( *args, **kwargs):
print(args)
print(kwargs)

# 自定义信号中注册函数
sig.connect(func)

@app.route("/",endpoint="root")
def hello_world():
# 在想要触发的位置send触发信号
sig.send("abc",k="v")
print("视图函数执行")
return "hello!"

if __name__ == '__main__':
app.run()

三、Flask第三方插件

1 flask-session

flask有内置的session,但是我们如果想要保存在数据库或者文件中,需要使用插件。安装:

1
pip install flask-session

使用方法很简单,在配置中替换一下即可

1
2
3
4
5
6
7
8
9
10
11
12
13
from flask import Flask,session
from flask_session import RedisSessionInterface
import redis
conn = redis.Redis("127.0.0.1",6379)
app = Flask(__name__)
app.session_interface=RedisSessionInterface(conn,'lqz',permanent=False)
@app.route("/",endpoint="root")
def hello_world():
session["k1"]="v1"
return "hello!"

if __name__ == '__main__':
app.run()

过期时间配置PERMANENT_SESSION_LIFETIME属性就可以,默认是31天。如果想要关闭浏览器使得session失效,配置一下permanent参数:

1
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False) # 设置为False

2 DBUtils

源码:GitHub

参考文章:https://www.cnblogs.com/liuqingzheng/articles/9006055.html

用于创建数据库连接池,而不是直接用pymysql。安装

1
pip install DBUtils

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import pymysql

from dbutils.pooled_db import PooledDB
import time
from threading import Thread
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='flask',
charset='utf8'
)


def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()

# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\r\n')

cursor = conn.cursor()
cursor.execute('select * from boy')
result = cursor.fetchall()
time.sleep(2)
print(result)
conn.close()

if __name__ == '__main__':
for i in range(10):
t=Thread(target=func)
t.start()

用类封装的dbutils:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import pymysql
from settings import Config
class SQLHelper(object):

@staticmethod
def open(cursor):
POOL = Config.PYMYSQL_POOL
conn = POOL.connection()
cursor = conn.cursor(cursor=cursor)
return conn,cursor

@staticmethod
def close(conn,cursor):
conn.commit()
cursor.close()
conn.close()

@classmethod
def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
conn,cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchone()
cls.close(conn,cursor)
return obj

@classmethod
def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
conn, cursor = cls.open(cursor)
cursor.execute(sql, args)
obj = cursor.fetchall()
cls.close(conn, cursor)
return obj
@classmethod
def execute(cls,sql, args,cursor =pymysql.cursors.DictCursor):
conn, cursor = cls.open(cursor)
cursor.execute(sql, args)
cls.close(conn, cursor)

3 wtforms

类似于django的forms组件。

官方文档

源码:GiuHub

安装:

1
pip install wtforms

使用方法:

下载代码wtforms使用

注意,如果需要使用EmailField的校验功能,需要安装另一个第三方模块pip install email_validator

4 SQLAlchemy

源码:github

安装:

1
pip install sqlalchemy

注意:sqlalchemy本身不能对数据库直接操作,底层需要借助于pymysql等库支持。

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from sqlalchemy import create_engine

# 1.生成一个engine对象
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8", # 连接远程数据库
# //用户名:密码@地址:端口/数据库名?参数
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 2.创建连接
conn = engine.raw_connection()
# 3.获取游标对象
cursor = conn.cursor()

# 4.具体操作
cursor.execute('select * from user_user')
data = cursor.fetchall()
print(data)

除了生成engine对象之外,其他操作和pymysql相同。二者的区别是,sqlalchemy自动创建了连接池,不需要我们来维护。

orm创建表

接下来,要在flask中使用。比如和django一样,我们在flask项目下也创建models.py(文件名随意)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 导入字段、字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# orm的表操作其实就是
# - 创建字段
# - 创建表
# - 修改表:sqlalchemy本身不支持
# - 删除表

# 创建一个类,作为所有模型类的基类
Model = declarative_base() # 名字可以随便取

class User(Model):
# 你的orm类继承这个基类
__tablename__ = 'users' # 数据库表名称,如果不写,默认以类名小写作为表名
# 不同于django,所有的字段都使用Column创建,具体的数据类型在Column内写
# 比如Column(Integer),Column(String(32))
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # index索引,nullable不可为空
email = Column(String(32), unique=True) # unique 唯一
# datetime.datetime.now不能加括号,加了括号就执行,以后时间就固定了
ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
extra = Column(Text, nullable=True) # Text长文本字段

#类似于djagno的 Meta
__table_args__ = (
UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一,起名为uix_id_name
Index('ix_id_name', 'name', 'email'), # 索引
)


# 创建表:定义一个函数
def create_table():
# 创建engine对象
engine = create_engine(
"mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 通过engine对象创建表,只要是继承了Model的类都会创建表
Model.metadata.create_all(engine)


# 删除表,创建一个函数
def drop_table():
# 创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 通过engine对象删除所有表,只要继承了Model的类都会删除
Model.metadata.drop_all(engine)

if __name__ == '__main__':
create_table() # 执行函数创建表
drop_table() # 执行函数删除表

以上是orm表的基本操作。

orm表关系创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 一对多关系

class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')


class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
# 一对多关系
hobby_id = Column(Integer, ForeignKey("hobby.id")) # 默认可以为空

# relationship跟数据库无关,不会新增字段,只用于快速链表操作
# 第一个参数 Hobby 是类名,backref用于反向查询
hobby = relationship('Hobby', backref='pers')


# 多对多关系
# 这个关联表需要自己建立
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True) # autoincrement自增,默认是True
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)

class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)

# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
# secondary:通过哪个表建关联,跟django中的through一模一样
girls = relationship('Girl', secondary='boy2girl', backref='boys')

数据的简单插入

接下来是数据的curd部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User


# 1 创建engine
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
# 2 制造 session 会话
Session = sessionmaker(bind=engine) # 得到一个类
# 3 得到一个session对象
session=Session() # 得到一个session对象
# 4 创建一个对象
obj1 = User(name="xxx")
# 5 把session对象通过add放入
session.add(obj1)
# 6 提交
session.commit()

这个创建过程存在线程安全的问题,scoped_session提供了另一种方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User
from sqlalchemy.orm import scoped_session

# 1 创建engine
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
# 2 制造 session 会话
Session = sessionmaker(bind=engine) # 得到一个类

# 3 使用scoped_session(线程安全的session,内部使用的是local)
session = scoped_session(Session)

# 4 创建一个对象
obj1 = User(name="xxx")
# 5 把对象通过add放入
session.add(obj1)
# session.aaa()
# 6 提交
session.commit()
session.close()

数据的curd

数据插入的更多用法(curd、分组查询、排序、筛选、连表查询等等):下载源码查看。

5 Flask-SQLAlchemy

github

配合flask-migrate插件做数据库迁移,搭配flask-script插件制作成命令行命令

1
2
3
pip install Flask-SQLAlchemy
pip install Flask-Migrate
pip install flask-script

使用方法简单概述(配合源码:下载示例源码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 1 from flask_sqlalchemy import SQLAlchemy
# 2 db = SQLAlchemy()
# 3 db.init_app(app)
# 4 初始化之后,使用db.session
# 就相当于原来SQLAlchemy的session,用它来操作数据库


# flask-migrate的使用(表创建,字段修改)
# 1 from flask_migrate import Migrate,MigrateCommand
# 2 Migrate(app,db)
# 3 manager.add_command('db', MigrateCommand)

# 然后就可以在命令行输入命令
# python3 manage.py db init 初始化:只执行一次,创建migrations文件夹
# python3 manage.py db migrate 等同于 makemigartions
# python3 manage.py db upgrade 等同于 migrate

四、源码分析

准备工作

另外,在粘贴源码时,删除了无关部分,只挑出与执行流程相关的主干部分。

启动流程分析

从helloworld读起,这是flask官方文档上提供的代码:

1
2
3
4
5
6
7
8
9
from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
if __name__ == '__main__':
app.run()

执行这段代码,执行app.run(),项目就启动了,也就是说,启动的代码一定在run方法里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def run(self, host=None, port=None, debug=None,
load_dotenv=True, **options):
if not host:
if sn_host:
host = sn_host
else:
host = "127.0.0.1"

if port or port == 0:
port = int(port)
elif sn_port:
port = int(sn_port)
else:
port = 5000

from werkzeug.serving import run_simple

try:
run_simple(host, port, self, **options) # 这里的self就是app对象
finally:
self._got_first_request = False

run主要做了这些事:设置主机和端口号,然后处理一些其他参数(此处未截取),最后调用werkzeug.serving里的run_simple方法。这个方法封装了一些参数,底层使用socket,无限循环等待请求到达。当HTTP请求到来的时候,它将其解析为WSGI格式,然后werkzeug.serving:WSGIRequestHandler调用了app去执行相应的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
try:
execute(self.server.app)
# 这里的self.server.app就是我们的app
def execute(app: "WSGIApplication") -> None:
application_iter = app(environ, start_response)
try:
for data in application_iter:
write(data)
if not headers_sent:
write(b"")
finally:
if hasattr(application_iter, "close"):
application_iter.close() # type: ignore

可以看到这里调用了 application_iter = app(environ, start_response),相当于app(),也就是说会调用Flask.__call__方法。

1
2
3
4
5
6
def __call__(self, environ: dict, start_response: t.Callable) -> t.Any:
"""The WSGI server calls the Flask application object as the
WSGI application. This calls :meth:`wsgi_app`, which can be
wrapped to apply middleware.
"""
return self.wsgi_app(environ, start_response)

调用了wsgi_app,直接点过来看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def wsgi_app(self, environ: dict, start_response: t.Callable) -> t.Any:
# 1.调用了request_context创建request对象
ctx = self.request_context(environ)
error: t.Optional[BaseException] = None
try:
try:
ctx.push() # 2.将ctx压栈
# 3.调用full_dispatch_request方法
response = self.full_dispatch_request()
except Exception as e:
# 调用错误处理
error = e
response = self.handle_exception(e)
except:
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
# 4.最后将ctx出栈
ctx.auto_pop(error)

1 request_context

这里的内容比较多,分开来研究,首先是request_context,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def request_context(self, environ: dict) -> RequestContext:
# 这里的self是app对象;将app对象当做第一个参数,实例化一个RequestContext对象
return RequestContext(self, environ)

class RequestContext:
# RequestContext类,用于包装一个request对象
def __init__(self,app,environ,request=None,session=None):
self.app = app
if request is None:
request = app.request_class(environ) # 这里的request如果为None,默认会创建一个
self.request = request
self.url_adapter = None
try:
self.url_adapter = app.create_url_adapter(self.request)
except HTTPException as e:
self.request.routing_exception = e
self.flashes = None
self.session = session # 注意这里的session最开始为None
self._implicit_app_ctx_stack: t.List[t.Optional["AppContext"]] = []
self.preserved = False
self._preserved_exc = None
self._after_request_functions: t.List[AfterRequestCallable] = []

2 ctx.push

然后是ctx.push(),也就是RequestContext类的push方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def push(self):
# 由于是ctx.push,所以此时的self是我们刚刚RequestContext类的对象

app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
# 创建app_context对象,里面存放了app对象和g
app_ctx = self.app.app_context()
# app_context入栈
app_ctx.push()
self._implicit_app_ctx_stack.append(app_ctx)
else:
self._implicit_app_ctx_stack.append(None)

# 这里_request_ctx_stack是flask项目的全局变量,它是LocalStack的对象
_request_ctx_stack.push(self)

# 只有第一次push时,session为None
if self.session is None:
# self.app就是我们创建的app
session_interface = self.app.session_interface
# 将app对象、request对象放入session
self.session = session_interface.open_session(self.app, self.request)

if self.session is None:
# 如果session仍然为空,则创建一个空session
self.session = session_interface.make_null_session(self.app)
if self.url_adapter is not None:
self.match_request()

稍微研究一下 _request_ctx_stack.push(self)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LocalStack:
def __init__(self) -> None:
# _local是Local类的对象
self._local = Local()
def push(self, obj: t.Any) -> t.List[t.Any]:
# 这里的obj就是前面传过来的self,也就是RequestContext类的对象
rv = getattr(self._local, "stack", []).copy() # 反射取值,第一次调用的时候,由于没有"stack",返回一个列表
# rv=[]
rv.append(obj) # 将obj添加进列表中
# rv=[obj]
# _local是这种格式:{当次请求的线程id:{},请求的线程id:{},...}
# 多个请求到达时,创建多个线程local的特点是:
# 每个线程操作数据时,根据线程id从local里操作数据(同时支持gevent),所以保证了每个请求之间相互独立地操作自己的数据
self._local.stack = rv # 把rv放进stack
# _local:{当次请求的线程id:{"stack":[obj]},请求的线程id:{"stack":[obj]},...}
return rv

3 full_dispatch_request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def full_dispatch_request(self) -> Response:
# 检查app是不是已经处理了一个请求,如果是第一次处理请求,并且定义了@app.before_first_request装饰器,那么就会先执行一遍那些请求扩展函数
self.try_trigger_before_first_request_functions()
try:
request_started.send(self) # 发送信号
# 同理,执行@app.before_request注册的函数
rv = self.preprocess_request()
if rv is None:
# 如果在before_request函数中返回None,则进行路由分发;如果不是None,则直接跳到最后
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
# 最后执行@app.after_request,并且处理session
return self.finalize_request(rv)

除了请求扩展的这些函数之外,最关键的就是self.dispatch_request,关于路由分发和异常处理,暂时先不深入,先把整个请求流程走完。

4 ctx.auto_pop

不管前面的处理是否发生异常,最终执行ctx.auto_pop(error),把这次请求的ctx对象出栈

1
2
3
4
5
6
7
8
9
def auto_pop(self, exc: t.Optional[BaseException]) -> None:
if self.request.environ.get("flask._preserve_context") or (
exc is not None and self.app.preserve_context_on_exception
): # 异常相关,大概意思是如果发生了错误,并且在配置文件中配置了保存错误信息,那么就把错误保存
self.preserved = True
self._preserved_exc = exc # type: ignore
else:
# 最终,出栈,如果没发生错误,这里的exc=None
self.pop(exc)

调用ctx.pop出栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None:  # type: ignore
# 从另一个全局变量栈中弹出app_ctx
app_ctx = self._implicit_app_ctx_stack.pop()
clear_request = False

try:
# 如果此时_implicit_app_ctx_stack栈中已经没有元素了,就执行下面代码
if not self._implicit_app_ctx_stack:
self.preserved = False
self._preserved_exc = None
if exc is _sentinel:
exc = sys.exc_info()[1]
self.app.do_teardown_request(exc)

request_close = getattr(self.request, "close", None)
if request_close is not None:
request_close()
clear_request = True
finally:
# 和入栈同理,最终将ctx出栈
rv = _request_ctx_stack.pop()

整个请求流程结束。

路由分发

上一节梳理了大致流程,这里详细看一看full_dispatch_request里面的self.dispatch_request,也就是路由分发的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def dispatch_request(self) -> ResponseReturnValue:
# 从_request_ctx_stack栈中获取到当次请求的request
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
# 判断是否异常,如果异常就抛出错误
self.raise_routing_exception(req)
# 如果request对象没有异常,就获取url_rule路径,比如访问127.0.0.1:5000,那么url_rule就匹配为/
rule = req.url_rule

if (
getattr(rule, "provide_automatic_options", False)
and req.method == "OPTIONS"
):# 如果我们提供了provide_automatic_options,并且请求的方法是OPTIONS那么就会执行这里
return self.make_default_options_response()

# 从self.view_functions字典中寻找endpoint对应的视图函数,把参数传过去,然后调用它
return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)

这里又出现了_request_ctx_stack,前面提到过,它就是一个全局变量,每次请求到达的时候就会把当次的请求保存到栈里,使得我们可以在整个请求处理过程中使用它。上一节也简单介绍了这个栈的特点,这里暂时先不深入研究,我们在后面单独讲。总之,dispatch_request就做了一件事,那就是获取当前请求的request对象,然后把它交给某个合适的函数(我们写的视图函数)处理。那么,这里的路由url_rule是什么时候匹配的?

在上一节request_context里,创建ctx,也就是ctx = self.request_context(environ)这行,最终会调用RequestContext.__init__初始化方法。在这个初始化方法里,会执行这一句app.create_url_adapter(self.request),我们来看看这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def create_url_adapter(self, request: t.Optional[Request]) -> t.Optional[MapAdapter]:
# 由于调用的时候传入了当前的request,所以不为空
if request is not None:
# 把app的url_map绑定到WSGI environ变量上
return self.url_map.bind_to_environ(
request.environ,
server_name=self.config["SERVER_NAME"],
subdomain=subdomain,
)

# 如果request为空,但是SERVER_NAME不为空那么也会绑定
if self.config["SERVER_NAME"] is not None:
return self.url_map.bind(
self.config["SERVER_NAME"],
script_name=self.config["APPLICATION_ROOT"],
url_scheme=self.config["PREFERRED_URL_SCHEME"],
)
# 如果连名字也没有,返回None
return None

无论是bind_to_environ还是bind,它们的效果相同,最终都会返回一个werkzeug.routing.MapAdapter对象,这个对象主要用于url的匹配,这涉及到werkzeug的源码,这里就不深入研究了,想了解的话建议查阅文档。总之记住,此时ctx.url_adapter不为空,这个很重要。

还是在上一节ctx.push里,我们阅读了RequestContext类的push源码,其实留下了一些代码没有讲,我把它留到这里是因为,除了将ctx压栈之外,同时还进行了路由匹配,这就是最后两句所做的:

1
2
3
4
def push(self): # self是当前请求的ctx对象,实际也就是RequestContext类的对象
....
if self.url_adapter is not None:
self.match_request()

还记得吗,此时ctx.url_adapter不为空,所以会调用self.match_request()

1
2
3
4
5
6
7
8
def match_request(self) -> None:
try:
# 进行路由匹配
result = self.url_adapter.match(return_rule=True)
# 获得匹配结果,将路由url_rule和参数view_args保存到request对象里
self.request.url_rule, self.request.view_args = result
except HTTPException as e:
self.request.routing_exception = e

这里调用的是ctx.url_adaptermatch方法,底层是由werkzeug实现的。匹配到的路径保存在了ctx.request对象里,交给dispatch_request,最后匹配到我们写的视图函数。

上下文之Local

在前面我们多次提到flask的全局变量,比如ctx.push介绍的源码里_request_ctx_stack或者是_local。它们是什么?这就是flask中的上下文机制。从一个简单的例子开始:

1
2
3
4
5
6
7
8
9
from flask import Flask,request

app = Flask(__name__)

@app.route("/",endpoint="root")
def hello_world():
if request.method =="GET":
print(request)
return "hello!"

是否注意到,当你在视图函数里使用request对象,无论是request.method,还是print(request),每当你调用的时候,request都是当前请求的request,换句话说,假设此时另一个人也发送了同样的请求,flask是如何保证你获取到的request来自于你?答案就是全局变量。但是会有一个问题,我们先来看下面的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
from threading import Thread
import time
s = 0
def add(num):
global s
s=num
time.sleep(1)
print(s)

if __name__ == '__main__':
for i in range(5):
trd = Thread(target=add,args=[i])
trd.start()

这里创建了5个线程,第一个线程修改loacl为1,第二个修改local为2,以此类推,在阻塞了1秒之后,最终输出的结果全都是4。这就是数据的不安全问题,这也是进程(和线程)中的经典问题。要解决这个问题有很多种办法,比如皮特森算法、PV原语、加锁等等,这已经是操作系统的范畴,就不展开了。为数据加锁是可行的,但是加锁适合多个线程共用一个数据的情况,而request对象每个请求都应该不一样,也就是说想要实现该线程对于请求对象的修改并不影响其他线程,就是threading.local

1
2
3
4
5
6
7
8
9
10
11
12
from threading import Thread,local
import time
s = local()
def add(num):
s.val=num
time.sleep(1)
print(s.val)

if __name__ == '__main__':
for i in range(5):
trd = Thread(target=add,args=[i])
trd.start()

local的原理是,为每个线程复制一份数据。其实我们也可以使用字典来实现这一功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from threading import get_ident,Thread,current_thread

Local = {}
def set(k,v):
curr=get_ident()
if curr in Local: # 如果字典里有当前线程id就直接存
Local[curr][k]=v
else: # 如果没有线程id就创建一个
Local[curr]={k:v}
def get(k):
curr=get_ident()
return Local[curr][k]

def add(num):
set("val",num)

if __name__ == '__main__':
for i in range(5):
trd = Thread(target=add,args=[i])
trd.start()
print(Local)

最终效果就是这个样子:Local={13324: {'val': 0}, 4164: {'val': 1}, 12764: {'val': 2}, 3120: {'val': 3}, 2004: {'val': 4}},每个线程都各自用自己的id区分开,取值时互不干扰。于是进一步,把它封装在类中以供调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from threading import get_ident,Thread,current_thread
import time

class Local(object):
storage = {}
get_ident = get_ident

def __setattr__(self, k, v):
ident =self.get_ident()
origin = self.storage.get(ident)
if not origin:
origin={}
origin[k] = v
self.storage[ident] = origin
def __getattr__(self, k):
ident = self.get_ident()
v= self.storage[ident].get(k)
return v

locals_values = Local()
def func(num):

locals_values.KEY=num
time.sleep(2)
print(locals_values.KEY,current_thread().name)

for i in range(10):
t = Thread(target=func,args=(i,),name='线程%s'%i)
t.start()

这样实现了需求,但是有一个小问题,那就是如果创建多个Local对象,它们共用同一个字典,我们想做的是把这个字典放在对象中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 这样可以支持协程
try:
from greenlet import getcurrent as get_ident
except Exception as e:
from threading import get_ident
# 每个对象都有自己的storage
from threading import get_ident,Thread
import time
class Local(object):
def __init__(self):
object.__setattr__(self,'storage',{}) # 用父类设置对象的属性,避免循环调用
# self.storage={} # 注意不要写成这样,因为self.storage={}会调用`__setattr__(self,"storage",{})`
def __setattr__(self, k, v):
ident = get_ident()
if ident in self.storage: # 当执行到这句的时候,self.storage会调用`__getattr__(self,"storage")`
self.storage[ident][k] = v
else:
self.storage[ident] = {k: v}
def __getattr__(self, k):
ident = get_ident()
return self.storage[ident][k] # 当执行到这句的时候,会调用`__getattr__(self,"storage")`,会一直在这里无限循环
obj = Local()
def task(arg):
obj.val = arg
time.sleep(1)
print(obj.val)
for i in range(10):
t = Thread(target=task,args=(i,))
t.start()

werkzeug就使用了上述的代码的思想,并且更强大:

  • 会在协程可用的情况下优先使用协程
  • 自定义了__release_local__释放资源
  • 自定义LocalStack:栈,可以像栈一样操作Local,包括入栈、出栈、获取栈顶元素
  • 自定义LocalProxy:即Local代理,把对自己的操作转发给内部的__local对象

flask基于werkzeug实现,所以它理所当然地继承了这些特性。

上下文之Context

flask 中有两种上下文:application contextrequest context,前者用于存储app相关的信息,后者用于存储请求相关的信息。有关它们的定义在globals.py文件中,这个文件代码不多,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def _lookup_req_object(name):
top = _request_ctx_stack.top # top就是ctx
if top is None:
raise RuntimeError(_request_ctx_err_msg)
return getattr(top, name)


def _lookup_app_object(name):
top = _app_ctx_stack.top
if top is None:
raise RuntimeError(_app_ctx_err_msg)
return getattr(top, name)


def _find_app():
top = _app_ctx_stack.top
if top is None:
raise RuntimeError(_app_ctx_err_msg)
return top.app


# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app: "Flask" = LocalProxy(_find_app) # type: ignore
request: "Request" = LocalProxy(partial(_lookup_req_object, "request")) # type: ignore
session: "SessionMixin" = LocalProxy( # type: ignore
partial(_lookup_req_object, "session")
)
g: "_AppCtxGlobals" = LocalProxy(partial(_lookup_app_object, "g")) # type: ignore

application context 包括current_app grequest context包括requestsession。这里也会发现两个单例的LocalStack对象,它们提供了数据隔离的栈访问。来看看它是怎么写的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class LocalStack:

def __init__(self) -> None:
self._local = Local() # 这里的__local就是Local对象

def __release_local__(self) -> None:
self._local.__release_local__() # 用于清空当前线程或者协程的栈数据

@property
def __ident_func__(self) -> t.Callable[[], int]:
return self._local.__ident_func__

@__ident_func__.setter
def __ident_func__(self, value: t.Callable[[], int]) -> None:
object.__setattr__(self._local, "__ident_func__", value)

def __call__(self) -> "LocalProxy":
def _lookup() -> t.Any:
rv = self.top
if rv is None:
raise RuntimeError("object unbound")
return rv

return LocalProxy(_lookup)

def push(self, obj: t.Any) -> t.List[t.Any]:
"""Pushes a new item to the stack"""
rv = getattr(self._local, "stack", []).copy()
rv.append(obj)
self._local.stack = rv
return rv # type: ignore

def pop(self) -> t.Any:
"""Removes the topmost item from the stack, will return the
old value or `None` if the stack was already empty.
"""
stack = getattr(self._local, "stack", None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop()

@property
def top(self) -> t.Any:
"""The topmost item on the stack. If the stack is empty,
`None` is returned.
"""
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None

它主要有pushpoptop 方法,在上一节ctx.push简单介绍了push方法,结合请求流程的分析,也验证了request context存储了requestsession。这里的__call__方法返回当前线程或协程栈顶元素的代理对象,暂时不理解?继续往下看。

我们说LocalProxyLocal对象的代理,源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class LocalProxy:
__slots__ = ("__local", "__name", "__wrapped__")

def __init__(
self,
local: t.Union["Local", t.Callable[[], t.Any]],
name: t.Optional[str] = None,
) -> None:
object.__setattr__(self, "_LocalProxy__local", local) # 设置self.__local
object.__setattr__(self, "_LocalProxy__name", name) # 设置self.__name

if callable(local) and not hasattr(local, "__release_local__"):
# "local" is a callable that is not an instance of Local or
# LocalManager: mark it as a wrapped function.
object.__setattr__(self, "__wrapped__", local)

def _get_current_object(self) -> t.Any:
if not hasattr(self.__local, "__release_local__"): # type: ignore
return self.__local() # type: ignore

try:
return getattr(self.__local, self.__name) # type: ignore
except AttributeError:
name = self.__name # type: ignore
raise RuntimeError(f"no object bound to {name}") from None

__doc__ = _ProxyLookup( # type: ignore
class_value=__doc__, fallback=lambda self: type(self).__doc__
)
# __del__ should only delete the proxy
__repr__ = _ProxyLookup( # type: ignore
repr, fallback=lambda self: f"<{type(self).__name__} unbound>"
)
__str__ = _ProxyLookup(str) # type: ignore
__bytes__ = _ProxyLookup(bytes)

self.__local是在init方法中存储的,由于是双下划线的私有方法,所以才使用__LocalProxy__local这种格式存值。并且它重写了所有的魔法方法(这里只截取一小部分),_get_current_object就是获取当前请求的对象。

回到这里request=LocalProxy(partial(_lookup_req_object, "request")),这里的partial是偏函数,可以在调用之前为函数提前赋值(不执行),举个例子:

1
2
3
4
5
6
from functools import partial
def add(a,b,c):
print(a+b+c)

func = partial(add,1,2) # 为add函数提前传两个值,返回一个func
func(3) # 再调用只需要传剩下的值

关于partial的原理,以后有机会新开一篇讲讲。理解了偏函数,这里就相当于:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _lookup_req_object(name):
top = _request_ctx_stack.top
if top is None:
raise RuntimeError(_request_ctx_err_msg)
return getattr(top, name)


request= LocalProxy(partial(_lookup_req_object, "request"))
# func = _lookup_req_object("request")
# 相当于request= LocalProxy(func),调用init方法:
class LocalProxy:
def __init__(self,local,name=None):
# local:func
# name:None

因此_get_current_object返回的self.__local(),实际上就是func(),最终从ctx里找到并返回request对象。当我们在视图函数里print(request)时,就会调用LocalProxy里的__str__,同理当你request.meathod的时候,就会调用对应的__getattr__方法。这就是一个典型的代理模式使用。

理解了request,那么session也是同理,让我们结合所有知识重新梳理一下请求流程:

  • 当请求到达的时候,首先创建request context,里面保存了当前线程的requestsession
  • 然后ctx.push进栈操作:将request context保存到_request_ctx_stack;将app_context保存到_app_ctx_stack
  • 进行路由分发,交给视图处理
  • 视图中如果需要使用当前请求对象(比如print(request)),就会触发代理LocalProxy对应的魔法方法(比如__str__
  • 请求结束,将ctx出栈并进行清理工作

补充:application context 包括current_app g,前者是app相关的参数,而g是什么?其实它就是一个供我们使用的全局变量,在整个请求的生命周期内,都可以使用g来赋值和取值。你可能会问,已经有这么多全局变量,我难道不可以在current_app或者request中放吗,何必要用g

答案是确实可以,这些全局变量都存储了一些参数,比如request里存储了method字段,当我们赋值的时候很可能会覆盖掉原有字段。为了防止可能出现的错误,使用专门为我们打造的g就是不错的选择。但是使用时要注意g只在请求周期中存在,当一次请求结束,g也就销毁了,这和session不同,session在过期时间内,不同的请求使用的是同一个session

session

ctx.push这一节里提到了session的操作过程,如下

1
2
3
4
5
6
7
8
9
10
# 只有第一次push时,session为None
if self.session is None:
# self.app就是我们创建的app
session_interface = self.app.session_interface
# 将app对象、request对象放入session
self.session = session_interface.open_session(self.app, self.request)

if self.session is None:
# 如果session仍然为空,则创建一个空session
self.session = session_interface.make_null_session(self.app)

我们来深入研究一下,首先是session_interface,它默认(可以自定义,比如使用flask-session第三方插件)是一个位于sessions.py中的SecureCookieSessionInterface类。后面调用的open_sessionmake_null_session都是它的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class SecureCookieSessionInterface(SessionInterface):
# 加密盐,与secret_key配合使用
salt = "cookie-session"
# 加密算法默认是sha1
digest_method = staticmethod(hashlib.sha1)
# 签名算法,默认是hmac
key_derivation = "hmac"
# 序列化器,支持一些python的数据结构
serializer = session_json_serializer
session_class = SecureCookieSession

def get_signing_serializer(
self, app: "Flask"
) -> t.Optional[URLSafeTimedSerializer]:
# 如果我们没有定义secret_key那么就返回None
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation, digest_method=self.digest_method
)
# 传入参数进行序列化,生成URL安全的字符串
return URLSafeTimedSerializer(
app.secret_key,
salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs,
)

def open_session(
self, app: "Flask", request: "Request"
) -> t.Optional[SecureCookieSession]:
# 调用get_signing_serializer,获取
s = self.get_signing_serializer(app)
if s is None:
# 没有定义secret_key那么就返回None
return None
# 通过请求的cookie获取session对象
val = request.cookies.get(self.get_cookie_name(app))
if not val:
# 调用session_class类返回一个session对象
return self.session_class()
# 过期时间,默认31天,这些参数都是从app对象中获取的
max_age = int(app.permanent_session_lifetime.total_seconds())
try:
# 验证session数据是否被篡改
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()

URLSafeTimedSerializer是借助于itsdangerous实现的,它可以进行数据验证,生成URL安全的字符串。

默认的session_classSecureCookieSession,它本质是一个字典,在字典的基础上封装了一些参数,重写了魔法方法,内部的实现不是很复杂,感兴趣可以看看。

接下来我们看看session是怎么保存的,在请求来的时候flask会获取session,并保存在上下文中让视图函数可以获取并修改它,在响应返回时,也会自动把session写回到cookie中。

full_dispatch_request小节中,请求结束会调用 self.finalize_request(),其中就会调用process_response方法返回响应,其中的这部分就是session处理:

1
2
3
4
5
6
def process_response(self, response: Response) -> Response:
# 如果需要返回session就调用save_sessoin
if not self.session_interface.is_null_session(ctx.session):
self.session_interface.save_session(self, ctx.session, response)

return response

调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def save_session(
self, app: "Flask", session: SessionMixin, response: "Response"
) -> None:
name = self.get_cookie_name(app)
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
secure = self.get_cookie_secure(app)
samesite = self.get_cookie_samesite(app)

# 如果session被设置为空,那么直接不设置cookie
if not session:
if session.modified:
response.delete_cookie(
name, domain=domain, path=path, secure=secure, samesite=samesite
)

return
# 是否需要设置cookie
if not self.should_set_cookie(app, session):
return
# 配置参数,设置cookie
httponly = self.get_cookie_httponly(app)
expires = self.get_expiration_time(app, session)
val = self.get_signing_serializer(app).dumps(dict(session)) # type: ignore
response.set_cookie(
name,
val, # type: ignore
expires=expires,
httponly=httponly,
domain=domain,
path=path,
secure=secure,
samesite=samesite,
)

信号

在基本使用中,简单介绍了信号的使用。现在我们来看看信号的实现。在上文信号中,介绍了flask内置的10种信号,并提到这些信号预先被放置在了某些位置,但是不会执行,直到我们订阅才执行。那就从源码里分别来找找这十个信号的位置:

1 template_rendered和before_render_template

模板渲染前、后触发。位于templates.py中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def render_template(template_name_or_list, **context):
# 获取上下文
ctx = _app_ctx_stack.top
# 将模板更新到上下文
ctx.app.update_template_context(context)
# 调用_render
return _render(
ctx.app.jinja_env.get_or_select_template(template_name_or_list),
context,
ctx.app,
)

def _render(template: Template, context: dict, app: "Flask") -> str:
"""Renders the template and fires the signal"""
#--------------------渲染模板之前信号触发--------------------------#
before_render_template.send(app, template=template, context=context)
# 渲染模板
rv = template.render(context)
#--------------------渲染模板之后信号触发--------------------------#
template_rendered.send(app, template=template, context=context)
return rv

2 appcontext_pushed和appcontext_popped

上下文入栈、出栈时触发,位于ctx.py中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class AppContext:
def push(self) -> None:
"""Binds the app context to the current context."""
self._refcnt += 1
_app_ctx_stack.push(self)
#--------------------入栈信号触发--------------------------#
appcontext_pushed.send(self.app)

def pop(self, exc: t.Optional[BaseException] = _sentinel) -> None: # type: ignore
"""Pops the app context."""
try:
self._refcnt -= 1
if self._refcnt <= 0:
if exc is _sentinel:
exc = sys.exc_info()[1]
self.app.do_teardown_appcontext(exc)
finally:
rv = _app_ctx_stack.pop()
assert rv is self, f"Popped wrong app context. ({rv!r} instead of {self!r})"
#--------------------出栈信号触发--------------------------#
appcontext_popped.send(self.app)

3 request_started和request_finished

位于app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def full_dispatch_request(self) -> Response:
self.try_trigger_before_first_request_functions()
try:
#--------------------请求到达前信号触发--------------------------#
request_started.send(self)
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
return self.finalize_request(rv)

def finalize_request(
self,
rv: t.Union[ResponseReturnValue, HTTPException],
from_error_handler: bool = False,
) -> Response:

response = self.make_response(rv)
try:
response = self.process_response(response)
#--------------------请求结束后信号触发--------------------------#
request_finished.send(self, response=response)
except Exception:
if not from_error_handler:
raise
self.logger.exception(
"Request finalizing failed with an error while handling an error"
)
return response

4 request_tearing_down

位于app.py

1
2
3
4
5
6
7
8
9
10
11
12
def do_teardown_request(
self, exc: t.Optional[BaseException] = _sentinel # type: ignore
) -> None:
if exc is _sentinel:
exc = sys.exc_info()[1]

for name in chain(request.blueprints, (None,)):
if name in self.teardown_request_funcs:
for func in reversed(self.teardown_request_funcs[name]):
self.ensure_sync(func)(exc)
#--------------------请求执行完毕信号触发--------------------------#
request_tearing_down.send(self, exc=exc)

5 appcontext_tearing_down

位于app.py

1
2
3
4
5
6
7
8
9
10
def do_teardown_appcontext(
self, exc: t.Optional[BaseException] = _sentinel # type: ignore
) -> None:
if exc is _sentinel:
exc = sys.exc_info()[1]

for func in reversed(self.teardown_appcontext_funcs):
self.ensure_sync(func)(exc)
#--------------------上下文完毕信号触发--------------------------#
appcontext_tearing_down.send(self, exc=exc)

6 got_request_exception

位于app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
def handle_exception(self, e: Exception) -> Response:

exc_info = sys.exc_info()
#--------------------出现异常时信号触发--------------------------#
got_request_exception.send(self, exception=e)

if self.propagate_exceptions:
# Re-raise if called with an active exception, otherwise
# raise the passed in exception.
if exc_info[1] is e:
raise

raise e

7 message_flashed

位于helpers.py

1
2
3
4
5
6
7
8
9
10
11
def flash(message: str, category: str = "message") -> None:

flashes = session.get("_flashes", [])
flashes.append((category, message))
session["_flashes"] = flashes
#--------------------调用flash添加数据自动触发--------------------------#
message_flashed.send(
current_app._get_current_object(), # type: ignore
message=message,
category=category,
)

找到源码里信号触发的位置,再加上启动流程的分析,我们就可以驾轻就熟地使用这些信号了。

接下来,我们研究一下信号机制的源码,其定义是在signals.py中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import typing as t

try:
from blinker import Namespace
# Namespace如果导入成功说明我们安装了blinker,于是signals_available设置为True表示信号机制启动
signals_available = True
except ImportError:
# 如果没有安装blinker
signals_available = False
# flask自己定义了一个Namespace
class Namespace: # type: ignore
def signal(self, name: str, doc: t.Optional[str] = None) -> "_FakeSignal":
return _FakeSignal(name, doc)
# 创建一个虚假的信号系统,它允许触发信号,但是什么都不做
class _FakeSignal:
def __init__(self, name: str, doc: t.Optional[str] = None) -> None:
self.name = name
self.__doc__ = doc

def send(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
# send方法直接pass掉
pass

def _fail(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
raise RuntimeError(
"Signalling support is unavailable because the blinker"
" library is not installed."
) from None

connect = connect_via = connected_to = temporarily_connected_to = _fail
disconnect = _fail
has_receivers_for = receivers_for = _fail
del _fail


# 信号的命名空间
_signals = Namespace()


# 定义的十个信号
template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_pushed = _signals.signal("appcontext-pushed")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")

flask调用了blinker提供信号,接下来的内容不是flask的源码。

我们以其中一个信号创建为例,读一读内部是怎么执行的。首先调用signal方法:

1
2
3
4
5
6
7
8
9
10
class Namespace(dict):
"""A mapping of signal names to signals."""
# 信号名称与信号的映射,是一个字典
def signal(self, name, doc=None):
try:
# 第一次创建时,字典里没有字符串对应的值
return self[name]
except KeyError:
# setdefault:如果键不存在于字典中,将会添加键并将值设为默认值
return self.setdefault(name, NamedSignal(name, doc))

第一次调用,被异常捕获,调用NamedSignal的初始化方法:

1
2
3
4
5
6
7
8
class NamedSignal(Signal):
"""A named generic notification emitter."""

def __init__(self, name, doc=None):
Signal.__init__(self, doc)

#: The name of this signal.
self.name = name

调用Signal的初始化方法

1
2
3
4
5
6
7
8
class Signal(object):
def __init__(self, doc=None):
if doc:
self.__doc__ = doc
self.receivers = {}
self._by_receiver = defaultdict(set)
self._by_sender = defaultdict(set)
self._weak_senders = {}

接下来的流程是,我们需要订阅信号比如signals.request_started.connect(func),会调用connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def connect(self, receiver, sender=ANY, weak=True):
# 生成receiver_id
receiver_id = hashable_identity(receiver)
if weak:
# 将receiver_id和self传入创建引用
receiver_ref = reference(receiver, self._cleanup_receiver)
receiver_ref.receiver_id = receiver_id
else:
receiver_ref = receiver
if sender is ANY:
sender_id = ANY_ID
else:
sender_id = hashable_identity(sender)
# 在字典中存储这个引用
self.receivers.setdefault(receiver_id, receiver_ref)
self._by_sender[sender_id].add(receiver_id)
self._by_receiver[receiver_id].add(sender_id)
del receiver_ref
return receiver

当我们订阅某个信号时,信号内的receivers字典保存了我们绑定的函数。当flask内部调用该信号的send方法时,比如appcontext_pushed.send(self.app)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Signal(object):
def send(self, *sender, **kwargs):
if len(sender) == 0:
sender = None
elif len(sender) > 1:
raise TypeError('send() accepts only one positional argument, '
'%s given' % len(sender))
else:
# 进行长度判断,最终获取到self.app
sender = sender[0]
# 如果我们没订阅信号,字典为空,执行到这里结束
if not self.receivers:
return []
# 如果是我们订阅的信号,receivers字典不为空
else:
return [(receiver, receiver(sender, **kwargs)) # 在这里执行我们的函数
for receiver in self.receivers_for(sender)]

调用self.receivers_for(sender)内部实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def receivers_for(self, sender):
# 再次判断self.receivers是否为空
if self.receivers:
# 生成sender_id
sender_id = hashable_identity(sender)
if sender_id in self._by_sender:
ids = (self._by_sender[ANY_ID] |
self._by_sender[sender_id])
else:
ids = self._by_sender[ANY_ID].copy()
# ids里存的就是receiver_id的集合
for receiver_id in ids:
# 通过receiver_id获取到receiver,内部包含了我们的函数
receiver = self.receivers.get(receiver_id)
if receiver is None:
continue
if isinstance(receiver, WeakTypes):
# receiver()执行返回我们自定义的函数
strong = receiver()
if strong is None:
self._disconnect(receiver_id, ANY_ID)
continue
receiver = strong
# 返回生成器
yield receiver

TODO:flask源码还有一些没读,以后会在这里继续更新