Flask
一、介绍和安装
Flask是一个基于WSGI协议的轻量级web框架,它使用起来非常简单且快捷,并且有能力扩展到开发大型项目。它基于 Werkzeug and Jinja 开发,已经成为目前流行的Python web应用程序框架之一。Flask为开发者提供了一些建议,但是并不会强制依赖某些布局,由开发人员自己来选择他们想要使用的工具和库。在社区提供了许多Flask的扩展包,这使添加新功能变得容易。
参考文档:官方文档 ,中文文档
参考文章:https://www.cnblogs.com/wupeiqi/articles/7552008.html
源码:GitHub
安装:
二、基本使用
1 flask初识
先创建一个最简单的Flask应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from flask import Flaskapp = Flask(__name__) @app.route("/" ) def hello_world (): return "<p>Hello, World!</p>" app.run()
这样就启动了一个非常简单的服务器,默认flask会监听本地的5000端口,打开浏览器输入http://127.0.0.1:5000/ 即可看到hello world
。接下来对比django,看看flask中如何使用。
2 三种类型的响应
在django中,有render,HttpResponse,redirect
三种类型的响应,在flask中同样也有
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from flask import Flask,render_template,redirectapp = Flask(__name__) @app.route("/" ) def hello_world (): return "<p>Hello, World!</p>" @app.route("/" ) def hello_world (): return render_template("index.html" ) @app.route("/" ) def hello_world (): return redirect("https://www.baidu.com" )
3 模版语法
flask中同样有模版语法,django的模版语法是自己写的,flask使用的是jinja模块,它更好用一些。在HTML中双大括号中的变量可以被动态替换,在点.
调用的同时,jinja还可以直接调用方法,并且可以括号取值、get取值。使用和django类似,
3.1 举例
demo.py
1 2 3 4 5 6 7 8 9 10 11 from flask import Flask,render_template,request,redirectapp = Flask(__name__) USERS = { 1 :{'name' :'令狐冲' ,'age' :20 ,'gender' :'男' ,'text' :"独孤九剑" }, 2 :{'name' :'小龙女' ,'age' :18 ,'gender' :'女' ,'text' :"玉女心经" }, 3 :{'name' :'段誉' ,'age' :23 ,'gender' :'男' ,'text' :"凌波微步" }, } @app.route('/index' ,methods=['GET' ] ) def index (): return render_template('index.html' ,user_dict=USERS)
index.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <body > <h1 > 列表</h1 > <table > {% for k,v in user_dict.items() %} <tr > <td > {{k}}</td > <td > {{v.name}}</td > <td > {{v['age']}}</td > <td > {{v.get('gender')}}</td > {% if k==2 %} <td > 秘笈!{{ secret }}</td > {% endif %} </tr > {% endfor %} </table > </body >
3.2 总结:
变量
循环
1 2 3 4 5 {% for i in var %} ... {% endfor %}
条件
1 2 3 4 5 {% if ... %} .. {% endif %}
4 请求方法、request对象、反向解析
4.1 举例
demo.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from flask import Flask,render_template,requestapp = Flask(__name__) @app.route('/login' ,methods=['GET' ,'POST' ],endpoint='log' ) def login (): if request.method == "GET" : params = request.query_string print (params) return render_template("login.html" ) else : print (request.form) user = request.form.get('user' ) pwd = request.form.get('pwd' ) if user == 'abc' and pwd == '123' : return "post"
login.html
1 2 3 4 5 6 7 8 <body > <h1 > 用户登录</h1 > <form method ="post" > <input type ="text" name ="user" > <input type ="text" name ="pwd" > <input type ="submit" value ="登录" > </form > </body >
4.2 总结
使用methods
参数,传递一个列表,指定接收什么类型的请求,比如GET,POST
。
endpoint
用作反向解析的关键字(别名)。使用url_for
作后端反向解析,根据endpoint
设置的值获取url。
request
对象导入即可使用,能够自动区别每次的请求对象,通过request.method
判断请求类型,
request.query_string
获取get请求携带参数,request.form.get
获取form表单提交的post请求数据。
5 路由的另一种形式
flask通过app.route
装饰器来实现路由分发,这是它的源码
1 2 3 4 5 6 7 def route (self, rule: str , **options: t.Any ) -> t.Callable : def decorator (f: t.Callable ) -> t.Callable : endpoint = options.pop("endpoint" , None ) self.add_url_rule(rule, endpoint, f, **options) return f return decorator
举例:@app.route('/login',methods=['GET','POST'],endpoint='log')
,首先调用route函数返回装饰器,然后将视图函数作为参数传到decorator
中,然后调用self.add_url_rule
方法,此时的self是Flask类的对象(app对象),即Flask.add_url_rule
方法。所以,可以不使用装饰器,而改为类似django的路由分发形式:
1 2 3 4 5 6 7 8 9 10 from flask import Flask,render_template,redirectapp = Flask(__name__) def hello_world (): return "hello!" app.add_url_rule("/" ,view_func=hello_world,endpoint="root" ) app.run()
访问 http://127.0.0.1:5000/同样生效。这和django中的 url('/',xxxx.view,name="root")
的形式很相似。
6 配置文件与配置方法
flask中的配置文件是一个flask.config.Config
,它继承字典,可以像字典一样操作。默认配置为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 { 'DEBUG' : get_debug_flag(default=False ), 'TESTING' : False , 'PROPAGATE_EXCEPTIONS' : None , 'PRESERVE_CONTEXT_ON_EXCEPTION' : None , 'SECRET_KEY' : None , 'PERMANENT_SESSION_LIFETIME' : timedelta(days=31 ), 'USE_X_SENDFILE' : False , 'LOGGER_NAME' : None , 'LOGGER_HANDLER_POLICY' : 'always' , 'SERVER_NAME' : None , 'APPLICATION_ROOT' : None , 'SESSION_COOKIE_NAME' : 'session' , 'SESSION_COOKIE_DOMAIN' : None , 'SESSION_COOKIE_PATH' : None , 'SESSION_COOKIE_HTTPONLY' : True , 'SESSION_COOKIE_SECURE' : False , 'SESSION_REFRESH_EACH_REQUEST' : True , 'MAX_CONTENT_LENGTH' : None , 'SEND_FILE_MAX_AGE_DEFAULT' : timedelta(hours=12 ), 'TRAP_BAD_REQUEST_ERRORS' : False , 'TRAP_HTTP_EXCEPTIONS' : False , 'EXPLAIN_TEMPLATE_LOADING' : False , 'PREFERRED_URL_SCHEME' : 'http' , 'JSON_AS_ASCII' : True , 'JSON_SORT_KEYS' : True , 'JSONIFY_PRETTYPRINT_REGULAR' : True , 'JSONIFY_MIMETYPE' : 'application/json' , 'TEMPLATES_AUTO_RELOAD' : None , }
6.1 配置方法1
直接写在项目里,如果是小项目,可以使用;对于大型项目,不建议采用这种方式。
1 2 3 app.config['DEBUG' ] = True app.config.update(...)
6.2 配置方法2
通过py文件配置
1 2 app.config.from_pyfile("settings.py" )
settings.py
6.3 配置方法3
通过对象来配置
1 2 3 4 app.config.from_object('settings.TestingConfig' )
settings.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Config (object ): DEBUG = False TESTING = False DATABASE_URI = 'sqlite://:memory:' class ProductionConfig (Config ): DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig (Config ): DEBUG = True class TestingConfig (Config ): TESTING = True
这种方式的好处在于,事先定义好每个环境的配置文件,然后只需要在app.config.from_object()
配置就能切换。
6.4 其他配置方法
通过文件、json、环境变量配置,详见这里
7 路由层转换器和参数
7.1 转换器
flask有内置的转换器,和django2.x+版本的转换器效果相同
1 2 3 4 5 6 7 8 9 DEFAULT_CONVERTERS = { 'default' : UnicodeConverter, 'string' : UnicodeConverter, 'any' : AnyConverter, 'path' : PathConverter, 'int' : IntegerConverter, 'float' : FloatConverter, 'uuid' : UUIDConverter, }
使用方法:
1 2 @app.route('/post/<int:post_id>' ) @app.route('/post/<post_id>' )
7.2 参数
app.route
中的参数,和app.add_url_rule
中参数一样
1 2 3 4 5 6 7 8 rule view_func=None endpoint methods defaults strict_slashes redirect_to subdomain
flask本质调用了werkzeug的Rule的init方法,所以更多详情详见这里 。
8 flask的CBV
原理和django的CBV基本相同
1 2 3 4 5 6 7 8 9 10 11 12 from flask import Flaskfrom flask import views app=Flask(__name__) class LoginView (views.View): methods = ['GET' ] def dispatch_request (self ): return 'xxx' app.add_url_rule('/login' ,view_func=LoginView.as_view('login' ))
也可以直接继承MethodView
,就不用手动重写dispatch_request
方法了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def outer (func ): def inner (*args, **kwargs ): print ('before' ) result = func(*args, **kwargs) print ('after' ) return result return inner class LoginView (views.MethodView): methods = ['GET' ] decorators = [outer, ] def get (self ): print ('xxxxx' ) return "get" def post (self ): return 'post' app.add_url_rule('/login' ,view_func=LoginView.as_view('login' ))
9 pycharm设置jinja模版自动补全
第一步,右键模版文件夹--->Mark Directory as--->Template Folder
,此时可能会提示你还未选模板语言,是否要选择一个?
,点击确定,
如果没有提示也没关系,打开设置,找到下图位置,选择jinja2即可
10 request请求对象方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 request.method request.args request.form request.values request.cookies request.headers request.path request.full_path request.url request.base_url request.url_root request.host_url request.host request.files
11 response响应对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 return "字符串" return render_template('html模板路径' ,**{}) return redirect('/index.html' ) return jsonify({'k1' :'v1' }) response.delete_cookie('key' ) response.set_cookie('key' , 'value' ) response.headers['X-Something' ] = 'A value' return response
12 session
1 from flask import Flask,sessions
在使用session之前需要在配置文件中设置SECRET_KEY
然后在视图函数中直接使用
1 2 3 session['key' ]=value session.pop('key' ) session['key' ]
13 消息闪现
闪现系统的基本工作方式是:在且只在下一个请求中访问上一个请求结束时记录的消息。
1 2 flash("xxxx" ) get_flashed_message()
在a页面设置flash,然后在b页面获取,并且只能获取一次。一般结合布局模板来使用闪现系统。此外还支持分类,更多内容详见这里 。
14 请求扩展
相当于django的中间件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 from flask import Flask, Requestapp = Flask(__name__, template_folder='templates' ) app.debug = True @app.before_first_request def before_first_request1 (): print ('before_first_request1' ) @app.before_first_request def before_first_request2 (): print ('before_first_request2' ) @app.before_request def before_request1 (): Request.nnn = 123 print ('before_request1' ) @app.before_request def before_request2 (): print ('before_request2' ) @app.after_request def after_request1 (response ): print ('before_request1' , response) return response @app.after_request def after_request2 (response ): print ('before_request2' , response) return response @app.errorhandler(404 ) def page_not_found (error ): return 'This page does not exist' , 404 @app.template_global() def sb (a1, a2 ): return a1 + a2 @app.template_filter() def db (a1, a2, a3 ): return a1 + a2 + a3 @app.route('/' ) def hello_world (): return "helloworld" if __name__ == '__main__' : app.run()
15 蓝图
为了在一个或多个应用中,使应用模块化并且支持常用方案,Flask 引入了蓝图概念,主要是为应用提供目录划分,类似于django的多个app注册。
多个应用注册时,可以不使用蓝图,但是这样会导致每个app都使用自己独立的配置,且只能在 WSGI 层中管理应用。而如果使用蓝图,那么应用会在Flask层中进行管理,共享配置,通过注册按需改变应用对象。蓝图的缺点是一旦应用被创建后,只有销毁整个应用对象才能注销蓝图。
要使用蓝图,首先要注册蓝图
1 2 3 4 5 from flask import Flaskfrom yourapplication.simple_page import simple_pageapp = Flask(__name__) app.register_blueprint(simple_page)
然后在应用下使用蓝图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from flask import Blueprint, render_template, abortfrom jinja2 import TemplateNotFoundsimple_page = Blueprint('simple_page' , __name__, template_folder='templates' ,static_folder='statics' ,static_url_path='/static' ) @simple_page.route('/' , defaults={'page' : 'index' } ) @simple_page.route('/<page>' ) def show (page ): try : return render_template(f'pages/{page} .html' ) except TemplateNotFound: abort(404 )
一般来说把项目划分成几个部分,比如模版、视图、静态文件、中间件等等,使用蓝图注册视图层下的应用,统一管理。
蓝图的应用示例,参考这里 。
大型项目的目录结构:点击下载
简单项目的目录结构:点击下载
16 信号
Flask 自 0.6 版本开始在内部支持信号。信号功能由优秀的 blinker 库提供支持, 如果没有安装该库就无法使用信号功能,但不影响其他功能。
什么是信号?当flask的运行过程中或者是在flask扩展中发生动作时,会设置一些信号,默认是不会执行的。当你需要使用信号的时候,订阅某个信号,为它绑定你的函数,它就会执行,并且所有的信号处理器是乱序执行的。
flask内置了如下信号
1 2 3 4 5 6 7 8 9 10 template_rendered = _signals.signal('template-rendered' ) before_render_template = _signals.signal('before-render-template' ) request_started = _signals.signal('request-started' ) request_finished = _signals.signal('request-finished' ) request_tearing_down = _signals.signal('request-tearing-down' ) got_request_exception = _signals.signal('got-request-exception' ) appcontext_tearing_down = _signals.signal('appcontext-tearing-down' ) appcontext_pushed = _signals.signal('appcontext-pushed' ) appcontext_popped = _signals.signal('appcontext-popped' ) message_flashed = _signals.signal('message-flashed' )
信号的使用:
首先需要安装blinker
然后只需要定义函数,订阅信号即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from flask import Flask,signals app = Flask(__name__) app.config['DEBUG' ] = True def func (*args, **kwargs ): print ('request_started信号触发了' , args, kwargs) signals.request_started.connect(func) @app.route("/" ,endpoint="root" ) def hello_world (): print ("视图函数执行" ) return "hello!" if __name__ == '__main__' : app.run()
支持自定义信号(需要自己触发信号,其余操作和内置信号使用相同):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from flask import Flaskfrom flask.signals import _signalsapp = Flask(__name__) app.config['DEBUG' ] = True sig = _signals.signal('xxxx' ) def func ( *args, **kwargs ): print (args) print (kwargs) sig.connect(func) @app.route("/" ,endpoint="root" ) def hello_world (): sig.send("abc" ,k="v" ) print ("视图函数执行" ) return "hello!" if __name__ == '__main__' : app.run()
三、Flask第三方插件
flask有内置的session,但是我们如果想要保存在数据库或者文件中,需要使用插件。安装:
1 pip install flask-session
使用方法很简单,在配置中替换一下即可
1 2 3 4 5 6 7 8 9 10 11 12 13 from flask import Flask,sessionfrom flask_session import RedisSessionInterfaceimport redisconn = redis.Redis("127.0.0.1" ,6379 ) app = Flask(__name__) app.session_interface=RedisSessionInterface(conn,'lqz' ,permanent=False ) @app.route("/" ,endpoint="root" ) def hello_world (): session["k1" ]="v1" return "hello!" if __name__ == '__main__' : app.run()
过期时间配置PERMANENT_SESSION_LIFETIME
属性就可以,默认是31天。如果想要关闭浏览器使得session失效,配置一下permanent参数:
1 app.session_interface=RedisSessionInterface(conn,key_prefix='lqz' ,permanent=False )
2 DBUtils
源码:GitHub
参考文章:https://www.cnblogs.com/liuqingzheng/articles/9006055.html
用于创建数据库连接池,而不是直接用pymysql。安装
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import pymysqlfrom dbutils.pooled_db import PooledDBimport timefrom threading import ThreadPOOL = PooledDB( creator=pymysql, maxconnections=6 , mincached=2 , maxcached=5 , maxshared=3 , blocking=True , maxusage=None , setsession=[], ping=0 , host='127.0.0.1' , port=3306 , user='root' , password='123' , database='flask' , charset='utf8' ) def func (): conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from boy' ) result = cursor.fetchall() time.sleep(2 ) print (result) conn.close() if __name__ == '__main__' : for i in range (10 ): t=Thread(target=func) t.start()
用类封装的dbutils:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import pymysqlfrom settings import Configclass SQLHelper (object ): @staticmethod def open (cursor ): POOL = Config.PYMYSQL_POOL conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod def close (conn,cursor ): conn.commit() cursor.close() conn.close() @classmethod def fetch_one (cls,sql,args,cursor =pymysql.cursors.DictCursor ): conn,cursor = cls.open (cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod def fetch_all (cls,sql, args,cursor =pymysql.cursors.DictCursor ): conn, cursor = cls.open (cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj @classmethod def execute (cls,sql, args,cursor =pymysql.cursors.DictCursor ): conn, cursor = cls.open (cursor) cursor.execute(sql, args) cls.close(conn, cursor)
类似于django的forms组件。
官方文档
源码:GiuHub
安装:
使用方法:
下载代码wtforms使用
注意,如果需要使用EmailField
的校验功能,需要安装另一个第三方模块pip install email_validator
4 SQLAlchemy
源码:github
安装:
注意:sqlalchemy本身不能对数据库直接操作,底层需要借助于pymysql等库支持。
简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from sqlalchemy import create_engineengine = create_engine( "mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8" , max_overflow=0 , pool_size=5 , pool_timeout=30 , pool_recycle=-1 ) conn = engine.raw_connection() cursor = conn.cursor() cursor.execute('select * from user_user' ) data = cursor.fetchall() print (data)
除了生成engine对象之外,其他操作和pymysql相同。二者的区别是,sqlalchemy自动创建了连接池,不需要我们来维护。
orm创建表
接下来,要在flask中使用。比如和django一样,我们在flask项目下也创建models.py
(文件名随意)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import datetimefrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, IndexModel = declarative_base() class User (Model ): __tablename__ = 'users' id = Column(Integer, primary_key=True ) name = Column(String(32 ), index=True , nullable=False ) email = Column(String(32 ), unique=True ) ctime = Column(DateTime, default=datetime.datetime.now) extra = Column(Text, nullable=True ) __table_args__ = ( UniqueConstraint('id' , 'name' , name='uix_id_name' ), Index('ix_id_name' , 'name' , 'email' ), ) def create_table (): engine = create_engine( "mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8" , max_overflow=0 , pool_size=5 , pool_timeout=30 , pool_recycle=-1 ) Model.metadata.create_all(engine) def drop_table (): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8" , max_overflow=0 , pool_size=5 , pool_timeout=30 , pool_recycle=-1 ) Model.metadata.drop_all(engine) if __name__ == '__main__' : create_table() drop_table()
以上是orm表的基本操作。
orm表关系创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class Hobby (Base ): __tablename__ = 'hobby' id = Column(Integer, primary_key=True ) caption = Column(String(50 ), default='篮球' ) class Person (Base ): __tablename__ = 'person' nid = Column(Integer, primary_key=True ) name = Column(String(32 ), index=True , nullable=True ) hobby_id = Column(Integer, ForeignKey("hobby.id" )) hobby = relationship('Hobby' , backref='pers' ) class Boy2Girl (Base ): __tablename__ = 'boy2girl' id = Column(Integer, primary_key=True , autoincrement=True ) girl_id = Column(Integer, ForeignKey('girl.id' )) boy_id = Column(Integer, ForeignKey('boy.id' )) class Girl (Base ): __tablename__ = 'girl' id = Column(Integer, primary_key=True ) name = Column(String(64 ), unique=True , nullable=False ) class Boy (Base ): __tablename__ = 'boy' id = Column(Integer, primary_key=True , autoincrement=True ) hostname = Column(String(64 ), unique=True , nullable=False ) girls = relationship('Girl' , secondary='boy2girl' , backref='boys' )
数据的简单插入
接下来是数据的curd部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom models import Userengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/test" , max_overflow=0 , pool_size=5 ) Session = sessionmaker(bind=engine) session=Session() obj1 = User(name="xxx" ) session.add(obj1) session.commit()
这个创建过程存在线程安全的问题,scoped_session提供了另一种方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom models import Userfrom sqlalchemy.orm import scoped_sessionengine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/test" , max_overflow=0 , pool_size=5 ) Session = sessionmaker(bind=engine) session = scoped_session(Session) obj1 = User(name="xxx" ) session.add(obj1) session.commit() session.close()
数据的curd
数据插入的更多用法(curd、分组查询、排序、筛选、连表查询等等):下载源码 查看。
5 Flask-SQLAlchemy
github
配合flask-migrate 插件做数据库迁移,搭配flask-script 插件制作成命令行命令
1 2 3 pip install Flask-SQLAlchemy pip install Flask-Migrate pip install flask-script
使用方法简单概述(配合源码:下载示例源码 ):
四、源码分析
准备工作
另外,在粘贴源码时,删除了无关部分,只挑出与执行流程相关的主干部分。
启动流程分析
从helloworld读起,这是flask官方文档上提供的代码:
1 2 3 4 5 6 7 8 9 from flask import Flaskapp = Flask(__name__) @app.route("/" ) def hello_world (): return "<p>Hello, World!</p>" if __name__ == '__main__' : app.run()
执行这段代码,执行app.run()
,项目就启动了,也就是说,启动的代码一定在run
方法里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def run (self, host=None , port=None , debug=None , load_dotenv=True , **options ): if not host: if sn_host: host = sn_host else : host = "127.0.0.1" if port or port == 0 : port = int (port) elif sn_port: port = int (sn_port) else : port = 5000 from werkzeug.serving import run_simple try : run_simple(host, port, self, **options) finally : self._got_first_request = False
run
主要做了这些事:设置主机和端口号,然后处理一些其他参数(此处未截取),最后调用werkzeug.serving
里的run_simple
方法。这个方法封装了一些参数,底层使用socket,无限循环等待请求到达。当HTTP请求到来的时候,它将其解析为WSGI格式,然后werkzeug.serving:WSGIRequestHandler
调用了app去执行相应的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 try : execute(self.server.app) def execute (app: "WSGIApplication" ) -> None : application_iter = app(environ, start_response) try : for data in application_iter: write(data) if not headers_sent: write(b"" ) finally : if hasattr (application_iter, "close" ): application_iter.close()
可以看到这里调用了 application_iter = app(environ, start_response)
,相当于app()
,也就是说会调用Flask.__call__
方法。
1 2 3 4 5 6 def __call__ (self, environ: dict , start_response: t.Callable ) -> t.Any : """The WSGI server calls the Flask application object as the WSGI application. This calls :meth:`wsgi_app`, which can be wrapped to apply middleware. """ return self.wsgi_app(environ, start_response)
调用了wsgi_app
,直接点过来看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def wsgi_app (self, environ: dict , start_response: t.Callable ) -> t.Any : ctx = self.request_context(environ) error: t.Optional [BaseException] = None try : try : ctx.push() response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except : error = sys.exc_info()[1 ] raise return response(environ, start_response) finally : if self.should_ignore_error(error): error = None ctx.auto_pop(error)
1 request_context
这里的内容比较多,分开来研究,首先是request_context
,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def request_context (self, environ: dict ) -> RequestContext: return RequestContext(self, environ) class RequestContext : def __init__ (self,app,environ,request=None ,session=None ): self.app = app if request is None : request = app.request_class(environ) self.request = request self.url_adapter = None try : self.url_adapter = app.create_url_adapter(self.request) except HTTPException as e: self.request.routing_exception = e self.flashes = None self.session = session self._implicit_app_ctx_stack: t.List [t.Optional ["AppContext" ]] = [] self.preserved = False self._preserved_exc = None self._after_request_functions: t.List [AfterRequestCallable] = []
2 ctx.push
然后是ctx.push()
,也就是RequestContext
类的push
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def push (self ): app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else : self._implicit_app_ctx_stack.append(None ) _request_ctx_stack.push(self) if self.session is None : session_interface = self.app.session_interface self.session = session_interface.open_session(self.app, self.request) if self.session is None : self.session = session_interface.make_null_session(self.app) if self.url_adapter is not None : self.match_request()
稍微研究一下 _request_ctx_stack.push(self)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class LocalStack : def __init__ (self ) -> None : self._local = Local() def push (self, obj: t.Any ) -> t.List [t.Any ]: rv = getattr (self._local, "stack" , []).copy() rv.append(obj) self._local.stack = rv return rv
3 full_dispatch_request
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def full_dispatch_request (self ) -> Response: self.try_trigger_before_first_request_functions() try : request_started.send(self) rv = self.preprocess_request() if rv is None : rv = self.dispatch_request() except Exception as e: rv = self.handle_user_exception(e) return self.finalize_request(rv)
除了请求扩展的这些函数之外,最关键的就是self.dispatch_request
,关于路由分发和异常处理,暂时先不深入,先把整个请求流程走完。
4 ctx.auto_pop
不管前面的处理是否发生异常,最终执行ctx.auto_pop(error)
,把这次请求的ctx对象出栈
1 2 3 4 5 6 7 8 9 def auto_pop (self, exc: t.Optional [BaseException] ) -> None : if self.request.environ.get("flask._preserve_context" ) or ( exc is not None and self.app.preserve_context_on_exception ): self.preserved = True self._preserved_exc = exc else : self.pop(exc)
调用ctx.pop
出栈
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def pop (self, exc: t.Optional [BaseException] = _sentinel ) -> None : app_ctx = self._implicit_app_ctx_stack.pop() clear_request = False try : if not self._implicit_app_ctx_stack: self.preserved = False self._preserved_exc = None if exc is _sentinel: exc = sys.exc_info()[1 ] self.app.do_teardown_request(exc) request_close = getattr (self.request, "close" , None ) if request_close is not None : request_close() clear_request = True finally : rv = _request_ctx_stack.pop()
整个请求流程结束。
路由分发
上一节梳理了大致流程,这里详细看一看full_dispatch_request
里面的self.dispatch_request
,也就是路由分发的过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def dispatch_request (self ) -> ResponseReturnValue: req = _request_ctx_stack.top.request if req.routing_exception is not None : self.raise_routing_exception(req) rule = req.url_rule if ( getattr (rule, "provide_automatic_options" , False ) and req.method == "OPTIONS" ): return self.make_default_options_response() return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
这里又出现了_request_ctx_stack
,前面提到过,它就是一个全局变量,每次请求到达的时候就会把当次的请求保存到栈里,使得我们可以在整个请求处理过程中使用它。上一节也简单介绍了这个栈的特点,这里暂时先不深入研究,我们在后面单独讲。总之,dispatch_request
就做了一件事,那就是获取当前请求的request对象,然后把它交给某个合适的函数(我们写的视图函数)处理。那么,这里的路由url_rule
是什么时候匹配的?
在上一节request_context 里,创建ctx,也就是ctx = self.request_context(environ)
这行,最终会调用RequestContext.__init__
初始化方法。在这个初始化方法里,会执行这一句app.create_url_adapter(self.request)
,我们来看看这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def create_url_adapter (self, request: t.Optional [Request] ) -> t.Optional [MapAdapter]: if request is not None : return self.url_map.bind_to_environ( request.environ, server_name=self.config["SERVER_NAME" ], subdomain=subdomain, ) if self.config["SERVER_NAME" ] is not None : return self.url_map.bind( self.config["SERVER_NAME" ], script_name=self.config["APPLICATION_ROOT" ], url_scheme=self.config["PREFERRED_URL_SCHEME" ], ) return None
无论是bind_to_environ
还是bind
,它们的效果相同,最终都会返回一个werkzeug.routing.MapAdapter
对象,这个对象主要用于url的匹配,这涉及到werkzeug的源码,这里就不深入研究了,想了解的话建议查阅文档 。总之记住,此时ctx.url_adapter
不为空,这个很重要。
还是在上一节ctx.push 里,我们阅读了RequestContext
类的push
源码,其实留下了一些代码没有讲,我把它留到这里是因为,除了将ctx
压栈之外,同时还进行了路由匹配,这就是最后两句所做的:
1 2 3 4 def push (self ): .... if self.url_adapter is not None : self.match_request()
还记得吗,此时ctx.url_adapter
不为空,所以会调用self.match_request()
,
1 2 3 4 5 6 7 8 def match_request (self ) -> None : try : result = self.url_adapter.match (return_rule=True ) self.request.url_rule, self.request.view_args = result except HTTPException as e: self.request.routing_exception = e
这里调用的是ctx.url_adapter
的match
方法,底层是由werkzeug实现的。匹配到的路径保存在了ctx.request
对象里,交给dispatch_request
,最后匹配到我们写的视图函数。
上下文之Local
在前面我们多次提到flask的全局变量,比如ctx.push 介绍的源码里_request_ctx_stack
或者是_local
。它们是什么?这就是flask中的上下文机制。从一个简单的例子开始:
1 2 3 4 5 6 7 8 9 from flask import Flask,requestapp = Flask(__name__) @app.route("/" ,endpoint="root" ) def hello_world (): if request.method =="GET" : print (request) return "hello!"
是否注意到,当你在视图函数里使用request对象,无论是request.method
,还是print(request)
,每当你调用的时候,request都是当前请求的request,换句话说,假设此时另一个人也发送了同样的请求,flask是如何保证你获取到的request来自于你?答案就是全局变量。但是会有一个问题,我们先来看下面的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 from threading import Threadimport times = 0 def add (num ): global s s=num time.sleep(1 ) print (s) if __name__ == '__main__' : for i in range (5 ): trd = Thread(target=add,args=[i]) trd.start()
这里创建了5个线程,第一个线程修改loacl为1,第二个修改local为2,以此类推,在阻塞了1秒之后,最终输出的结果全都是4。这就是数据的不安全问题,这也是进程(和线程)中的经典问题。要解决这个问题有很多种办法,比如皮特森算法、PV原语、加锁等等,这已经是操作系统的范畴,就不展开了。为数据加锁是可行的,但是加锁适合多个线程共用一个数据的情况,而request对象每个请求都应该不一样,也就是说想要实现该线程对于请求对象的修改并不影响其他线程,就是threading.local
。
1 2 3 4 5 6 7 8 9 10 11 12 from threading import Thread,localimport times = local() def add (num ): s.val=num time.sleep(1 ) print (s.val) if __name__ == '__main__' : for i in range (5 ): trd = Thread(target=add,args=[i]) trd.start()
local的原理是,为每个线程复制一份数据。其实我们也可以使用字典来实现这一功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from threading import get_ident,Thread,current_threadLocal = {} def set (k,v ): curr=get_ident() if curr in Local: Local[curr][k]=v else : Local[curr]={k:v} def get (k ): curr=get_ident() return Local[curr][k] def add (num ): set ("val" ,num) if __name__ == '__main__' : for i in range (5 ): trd = Thread(target=add,args=[i]) trd.start() print (Local)
最终效果就是这个样子:Local={13324: {'val': 0}, 4164: {'val': 1}, 12764: {'val': 2}, 3120: {'val': 3}, 2004: {'val': 4}}
,每个线程都各自用自己的id区分开,取值时互不干扰。于是进一步,把它封装在类中以供调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from threading import get_ident,Thread,current_threadimport timeclass Local (object ): storage = {} get_ident = get_ident def __setattr__ (self, k, v ): ident =self.get_ident() origin = self.storage.get(ident) if not origin: origin={} origin[k] = v self.storage[ident] = origin def __getattr__ (self, k ): ident = self.get_ident() v= self.storage[ident].get(k) return v locals_values = Local() def func (num ): locals_values.KEY=num time.sleep(2 ) print (locals_values.KEY,current_thread().name) for i in range (10 ): t = Thread(target=func,args=(i,),name='线程%s' %i) t.start()
这样实现了需求,但是有一个小问题,那就是如果创建多个Local
对象,它们共用同一个字典,我们想做的是把这个字典放在对象中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 try : from greenlet import getcurrent as get_ident except Exception as e: from threading import get_ident from threading import get_ident,Threadimport timeclass Local (object ): def __init__ (self ): object .__setattr__(self,'storage' ,{}) def __setattr__ (self, k, v ): ident = get_ident() if ident in self.storage: self.storage[ident][k] = v else : self.storage[ident] = {k: v} def __getattr__ (self, k ): ident = get_ident() return self.storage[ident][k] obj = Local() def task (arg ): obj.val = arg time.sleep(1 ) print (obj.val) for i in range (10 ): t = Thread(target=task,args=(i,)) t.start()
werkzeug就使用了上述的代码的思想,并且更强大:
会在协程可用的情况下优先使用协程
自定义了__release_local__
释放资源
自定义LocalStack:栈,可以像栈一样操作Local,包括入栈、出栈、获取栈顶元素
自定义LocalProxy:即Local代理,把对自己的操作转发给内部的__local
对象
flask基于werkzeug实现,所以它理所当然地继承了这些特性。
上下文之Context
flask 中有两种上下文:application context
和 request context
,前者用于存储app相关的信息,后者用于存储请求相关的信息。有关它们的定义在globals.py
文件中,这个文件代码不多,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def _lookup_req_object (name ): top = _request_ctx_stack.top if top is None : raise RuntimeError(_request_ctx_err_msg) return getattr (top, name) def _lookup_app_object (name ): top = _app_ctx_stack.top if top is None : raise RuntimeError(_app_ctx_err_msg) return getattr (top, name) def _find_app (): top = _app_ctx_stack.top if top is None : raise RuntimeError(_app_ctx_err_msg) return top.app _request_ctx_stack = LocalStack() _app_ctx_stack = LocalStack() current_app: "Flask" = LocalProxy(_find_app) request: "Request" = LocalProxy(partial(_lookup_req_object, "request" )) session: "SessionMixin" = LocalProxy( partial(_lookup_req_object, "session" ) ) g: "_AppCtxGlobals" = LocalProxy(partial(_lookup_app_object, "g" ))
application context
包括current_app
和 g
,request context
包括request
和session
。这里也会发现两个单例的LocalStack
对象,它们提供了数据隔离的栈访问。来看看它是怎么写的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class LocalStack : def __init__ (self ) -> None : self._local = Local() def __release_local__ (self ) -> None : self._local.__release_local__() @property def __ident_func__ (self ) -> t.Callable [[], int ]: return self._local.__ident_func__ @__ident_func__.setter def __ident_func__ (self, value: t.Callable [[], int ] ) -> None : object .__setattr__(self._local, "__ident_func__" , value) def __call__ (self ) -> "LocalProxy" : def _lookup () -> t.Any : rv = self.top if rv is None : raise RuntimeError("object unbound" ) return rv return LocalProxy(_lookup) def push (self, obj: t.Any ) -> t.List [t.Any ]: """Pushes a new item to the stack""" rv = getattr (self._local, "stack" , []).copy() rv.append(obj) self._local.stack = rv return rv def pop (self ) -> t.Any : """Removes the topmost item from the stack, will return the old value or `None` if the stack was already empty. """ stack = getattr (self._local, "stack" , None ) if stack is None : return None elif len (stack) == 1 : release_local(self._local) return stack[-1 ] else : return stack.pop() @property def top (self ) -> t.Any : """The topmost item on the stack. If the stack is empty, `None` is returned. """ try : return self._local.stack[-1 ] except (AttributeError, IndexError): return None
它主要有push
、pop
和 top
方法,在上一节ctx.push 简单介绍了push方法,结合请求流程的分析,也验证了request context
存储了request
和session
。这里的__call__
方法返回当前线程或协程栈顶元素的代理对象,暂时不理解?继续往下看。
我们说LocalProxy
是Local
对象的代理,源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class LocalProxy : __slots__ = ("__local" , "__name" , "__wrapped__" ) def __init__ ( self, local: t.Union ["Local" , t.Callable [[], t.Any ]], name: t.Optional [str ] = None , ) -> None : object .__setattr__(self, "_LocalProxy__local" , local) object .__setattr__(self, "_LocalProxy__name" , name) if callable (local) and not hasattr (local, "__release_local__" ): object .__setattr__(self, "__wrapped__" , local) def _get_current_object (self ) -> t.Any : if not hasattr (self.__local, "__release_local__" ): return self.__local() try : return getattr (self.__local, self.__name) except AttributeError: name = self.__name raise RuntimeError(f"no object bound to {name} " ) from None __doc__ = _ProxyLookup( class_value=__doc__, fallback=lambda self: type (self).__doc__ ) __repr__ = _ProxyLookup( repr , fallback=lambda self: f"<{type (self).__name__} unbound>" ) __str__ = _ProxyLookup(str ) __bytes__ = _ProxyLookup(bytes )
self.__local
是在init方法中存储的,由于是双下划线的私有方法,所以才使用__LocalProxy__local
这种格式存值。并且它重写了所有的魔法方法(这里只截取一小部分),_get_current_object
就是获取当前请求的对象。
回到这里request=LocalProxy(partial(_lookup_req_object, "request"))
,这里的partial
是偏函数,可以在调用之前为函数提前赋值(不执行),举个例子:
1 2 3 4 5 6 from functools import partialdef add (a,b,c ): print (a+b+c) func = partial(add,1 ,2 ) func(3 )
关于partial的原理,以后有机会新开一篇讲讲。理解了偏函数,这里就相当于:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def _lookup_req_object (name ): top = _request_ctx_stack.top if top is None : raise RuntimeError(_request_ctx_err_msg) return getattr (top, name) request= LocalProxy(partial(_lookup_req_object, "request" )) class LocalProxy : def __init__ (self,local,name=None ):
因此_get_current_object
返回的self.__local()
,实际上就是func()
,最终从ctx
里找到并返回request
对象。当我们在视图函数里print(request)
时,就会调用LocalProxy
里的__str__
,同理当你request.meathod
的时候,就会调用对应的__getattr__
方法。这就是一个典型的代理模式使用。
理解了request
,那么session
也是同理,让我们结合所有知识重新梳理一下请求流程:
当请求到达的时候,首先创建request context
,里面保存了当前线程的request
和session
然后ctx.push
进栈操作:将request context
保存到_request_ctx_stack
;将app_context
保存到_app_ctx_stack
进行路由分发,交给视图处理
视图中如果需要使用当前请求对象(比如print(request)
),就会触发代理LocalProxy
对应的魔法方法(比如__str__
)
请求结束,将ctx
出栈并进行清理工作
补充:application context
包括current_app
和 g
,前者是app
相关的参数,而g
是什么?其实它就是一个供我们使用的全局变量,在整个请求的生命周期内,都可以使用g
来赋值和取值。你可能会问,已经有这么多全局变量,我难道不可以在current_app
或者request
中放吗,何必要用g
?
答案是确实可以,这些全局变量都存储了一些参数,比如request
里存储了method
字段,当我们赋值的时候很可能会覆盖掉原有字段。为了防止可能出现的错误,使用专门为我们打造的g
就是不错的选择。但是使用时要注意g
只在请求周期中存在,当一次请求结束,g
也就销毁了,这和session
不同,session
在过期时间内,不同的请求使用的是同一个session
。
session
在ctx.push 这一节里提到了session的操作过程,如下
1 2 3 4 5 6 7 8 9 10 if self.session is None : session_interface = self.app.session_interface self.session = session_interface.open_session(self.app, self.request) if self.session is None : self.session = session_interface.make_null_session(self.app)
我们来深入研究一下,首先是session_interface
,它默认(可以自定义,比如使用flask-session第三方插件)是一个位于sessions.py
中的SecureCookieSessionInterface
类。后面调用的open_session
和make_null_session
都是它的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 class SecureCookieSessionInterface (SessionInterface ): salt = "cookie-session" digest_method = staticmethod (hashlib.sha1) key_derivation = "hmac" serializer = session_json_serializer session_class = SecureCookieSession def get_signing_serializer ( self, app: "Flask" ) -> t.Optional [URLSafeTimedSerializer]: if not app.secret_key: return None signer_kwargs = dict ( key_derivation=self.key_derivation, digest_method=self.digest_method ) return URLSafeTimedSerializer( app.secret_key, salt=self.salt, serializer=self.serializer, signer_kwargs=signer_kwargs, ) def open_session ( self, app: "Flask" , request: "Request" ) -> t.Optional [SecureCookieSession]: s = self.get_signing_serializer(app) if s is None : return None val = request.cookies.get(self.get_cookie_name(app)) if not val: return self.session_class() max_age = int (app.permanent_session_lifetime.total_seconds()) try : data = s.loads(val, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class()
URLSafeTimedSerializer
是借助于itsdangerous 实现的,它可以进行数据验证,生成URL安全的字符串。
默认的session_class
是SecureCookieSession
,它本质是一个字典,在字典的基础上封装了一些参数,重写了魔法方法,内部的实现不是很复杂,感兴趣可以看看。
接下来我们看看session是怎么保存的,在请求来的时候flask会获取session,并保存在上下文中让视图函数可以获取并修改它,在响应返回时,也会自动把session写回到cookie中。
在full_dispatch_request 小节中,请求结束会调用 self.finalize_request()
,其中就会调用process_response
方法返回响应,其中的这部分就是session处理:
1 2 3 4 5 6 def process_response (self, response: Response ) -> Response: if not self.session_interface.is_null_session(ctx.session): self.session_interface.save_session(self, ctx.session, response) return response
调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def save_session ( self, app: "Flask" , session: SessionMixin, response: "Response" ) -> None : name = self.get_cookie_name(app) domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) secure = self.get_cookie_secure(app) samesite = self.get_cookie_samesite(app) if not session: if session.modified: response.delete_cookie( name, domain=domain, path=path, secure=secure, samesite=samesite ) return if not self.should_set_cookie(app, session): return httponly = self.get_cookie_httponly(app) expires = self.get_expiration_time(app, session) val = self.get_signing_serializer(app).dumps(dict (session)) response.set_cookie( name, val, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure, samesite=samesite, )
信号
在基本使用中,简单介绍了信号的使用。现在我们来看看信号的实现。在上文信号 中,介绍了flask内置的10种信号,并提到这些信号预先被放置在了某些位置,但是不会执行,直到我们订阅才执行。那就从源码里分别来找找这十个信号的位置:
1 template_rendered和before_render_template
模板渲染前、后触发。位于templates.py
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def render_template (template_name_or_list, **context ): ctx = _app_ctx_stack.top ctx.app.update_template_context(context) return _render( ctx.app.jinja_env.get_or_select_template(template_name_or_list), context, ctx.app, ) def _render (template: Template, context: dict , app: "Flask" ) -> str : """Renders the template and fires the signal""" before_render_template.send(app, template=template, context=context) rv = template.render(context) template_rendered.send(app, template=template, context=context) return rv
2 appcontext_pushed和appcontext_popped
上下文入栈、出栈时触发,位于ctx.py
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class AppContext : def push (self ) -> None : """Binds the app context to the current context.""" self._refcnt += 1 _app_ctx_stack.push(self) appcontext_pushed.send(self.app) def pop (self, exc: t.Optional [BaseException] = _sentinel ) -> None : """Pops the app context.""" try : self._refcnt -= 1 if self._refcnt <= 0 : if exc is _sentinel: exc = sys.exc_info()[1 ] self.app.do_teardown_appcontext(exc) finally : rv = _app_ctx_stack.pop() assert rv is self, f"Popped wrong app context. ({rv!r} instead of {self!r} )" appcontext_popped.send(self.app)
3 request_started和request_finished
位于app.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def full_dispatch_request (self ) -> Response: self.try_trigger_before_first_request_functions() try : request_started.send(self) rv = self.preprocess_request() if rv is None : rv = self.dispatch_request() except Exception as e: rv = self.handle_user_exception(e) return self.finalize_request(rv) def finalize_request ( self, rv: t.Union [ResponseReturnValue, HTTPException], from_error_handler: bool = False , ) -> Response: response = self.make_response(rv) try : response = self.process_response(response) request_finished.send(self, response=response) except Exception: if not from_error_handler: raise self.logger.exception( "Request finalizing failed with an error while handling an error" ) return response
4 request_tearing_down
位于app.py
1 2 3 4 5 6 7 8 9 10 11 12 def do_teardown_request ( self, exc: t.Optional [BaseException] = _sentinel ) -> None : if exc is _sentinel: exc = sys.exc_info()[1 ] for name in chain(request.blueprints, (None ,)): if name in self.teardown_request_funcs: for func in reversed (self.teardown_request_funcs[name]): self.ensure_sync(func)(exc) request_tearing_down.send(self, exc=exc)
5 appcontext_tearing_down
位于app.py
1 2 3 4 5 6 7 8 9 10 def do_teardown_appcontext ( self, exc: t.Optional [BaseException] = _sentinel ) -> None : if exc is _sentinel: exc = sys.exc_info()[1 ] for func in reversed (self.teardown_appcontext_funcs): self.ensure_sync(func)(exc) appcontext_tearing_down.send(self, exc=exc)
6 got_request_exception
位于app.py
1 2 3 4 5 6 7 8 9 10 11 12 13 def handle_exception (self, e: Exception ) -> Response: exc_info = sys.exc_info() got_request_exception.send(self, exception=e) if self.propagate_exceptions: if exc_info[1 ] is e: raise raise e
7 message_flashed
位于helpers.py
1 2 3 4 5 6 7 8 9 10 11 def flash (message: str , category: str = "message" ) -> None : flashes = session.get("_flashes" , []) flashes.append((category, message)) session["_flashes" ] = flashes message_flashed.send( current_app._get_current_object(), message=message, category=category, )
找到源码里信号触发的位置,再加上启动流程的分析,我们就可以驾轻就熟地使用这些信号了。
接下来,我们研究一下信号机制的源码,其定义是在signals.py
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import typing as ttry : from blinker import Namespace signals_available = True except ImportError: signals_available = False class Namespace : def signal (self, name: str , doc: t.Optional [str ] = None ) -> "_FakeSignal" : return _FakeSignal(name, doc) class _FakeSignal : def __init__ (self, name: str , doc: t.Optional [str ] = None ) -> None : self.name = name self.__doc__ = doc def send (self, *args: t.Any , **kwargs: t.Any ) -> t.Any : pass def _fail (self, *args: t.Any , **kwargs: t.Any ) -> t.Any : raise RuntimeError( "Signalling support is unavailable because the blinker" " library is not installed." ) from None connect = connect_via = connected_to = temporarily_connected_to = _fail disconnect = _fail has_receivers_for = receivers_for = _fail del _fail _signals = Namespace() template_rendered = _signals.signal("template-rendered" ) before_render_template = _signals.signal("before-render-template" ) request_started = _signals.signal("request-started" ) request_finished = _signals.signal("request-finished" ) request_tearing_down = _signals.signal("request-tearing-down" ) got_request_exception = _signals.signal("got-request-exception" ) appcontext_tearing_down = _signals.signal("appcontext-tearing-down" ) appcontext_pushed = _signals.signal("appcontext-pushed" ) appcontext_popped = _signals.signal("appcontext-popped" ) message_flashed = _signals.signal("message-flashed" )
flask调用了blinker 提供信号,接下来的内容不是flask的源码。
我们以其中一个信号创建为例,读一读内部是怎么执行的。首先调用signal
方法:
1 2 3 4 5 6 7 8 9 10 class Namespace (dict ): """A mapping of signal names to signals.""" def signal (self, name, doc=None ): try : return self[name] except KeyError: return self.setdefault(name, NamedSignal(name, doc))
第一次调用,被异常捕获,调用NamedSignal
的初始化方法:
1 2 3 4 5 6 7 8 class NamedSignal (Signal ): """A named generic notification emitter.""" def __init__ (self, name, doc=None ): Signal.__init__(self, doc) self.name = name
调用Signal
的初始化方法
1 2 3 4 5 6 7 8 class Signal (object ): def __init__ (self, doc=None ): if doc: self.__doc__ = doc self.receivers = {} self._by_receiver = defaultdict(set ) self._by_sender = defaultdict(set ) self._weak_senders = {}
接下来的流程是,我们需要订阅信号比如signals.request_started.connect(func)
,会调用connect
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def connect (self, receiver, sender=ANY, weak=True ): receiver_id = hashable_identity(receiver) if weak: receiver_ref = reference(receiver, self._cleanup_receiver) receiver_ref.receiver_id = receiver_id else : receiver_ref = receiver if sender is ANY: sender_id = ANY_ID else : sender_id = hashable_identity(sender) self.receivers.setdefault(receiver_id, receiver_ref) self._by_sender[sender_id].add(receiver_id) self._by_receiver[receiver_id].add(sender_id) del receiver_ref return receiver
当我们订阅某个信号时,信号内的receivers
字典保存了我们绑定的函数。当flask内部调用该信号的send方法时,比如appcontext_pushed.send(self.app)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class Signal (object ): def send (self, *sender, **kwargs ): if len (sender) == 0 : sender = None elif len (sender) > 1 : raise TypeError('send() accepts only one positional argument, ' '%s given' % len (sender)) else : sender = sender[0 ] if not self.receivers: return [] else : return [(receiver, receiver(sender, **kwargs)) for receiver in self.receivers_for(sender)]
调用self.receivers_for(sender)
内部实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def receivers_for (self, sender ): if self.receivers: sender_id = hashable_identity(sender) if sender_id in self._by_sender: ids = (self._by_sender[ANY_ID] | self._by_sender[sender_id]) else : ids = self._by_sender[ANY_ID].copy() for receiver_id in ids: receiver = self.receivers.get(receiver_id) if receiver is None : continue if isinstance (receiver, WeakTypes): strong = receiver() if strong is None : self._disconnect(receiver_id, ANY_ID) continue receiver = strong yield receiver
TODO:flask源码还有一些没读,以后会在这里继续更新