一、介绍

twisted是基于事件驱动的网络引擎框架,事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。

twisted支持许多常见的传输及应用层协议,包括TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP。

twisted的架构与单线程、多线程的对比如下:

threading_models

和asyncio一样,twisted的编程模型可以在单线程下实现并发编程,即事件驱动版本的程序中,多个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他耗时的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

二、反应堆reactor

1 hello world

1
2
3
4
5
6
7
8
9
10
from twisted.internet import reactor
# reactor叫做反应堆,实际上就是事件循环的死循环

def func(arg):
print(arg)
reactor.stop()

# 第一个参数是回调函数,后面是它的参数,若有多个参数,依次传入即可
reactor.callWhenRunning(func,"hello,twisted")
reactor.run()

调用callWhenRunning方法,传入回调函数,即可获得输出。

导入的reactor.py源码如下:

1
2
3
4
5
6
7
8
import sys

# 避免冲突,删掉全局字典中的reactor模块
del sys.modules["twisted.internet.reactor"]
# 导入default
from twisted.internet import default

default.install()

导入default.py源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
__all__ = ["install"]

from twisted.python.runtime import platform

def _getInstallFunction(platform):
# 选择一个最适合当前平台的reactor
try:
if platform.isLinux():
try:
# linux下首先尝试epoll
from twisted.internet.epolreactor import install
except ImportError:
# 其次尝试poll
from twisted.internet.pollreactor import install
elif platform.getType() == "posix" and not platform.isMacOSX():
# 在不是mac的POSIX平台首先尝试poll
from twisted.internet.pollreactor import install
else:
# 其他平台尝试select
from twisted.internet.selectreactor import install
except ImportError:
from twisted.internet.selectreactor import install
return install


install = _getInstallFunction(platform)

当然,这是源码层面自动选择的IO多路复用模型,也可以手动指定:

1
2
3
4
5
6
7
8
9
10
11
from twisted.internet import selectreactor
selectreactor.install()
# 注意,若使用其它的reactor,需要在引入twisted.internet.reactor前安装
from twisted.internet import reactor
# 在导入后安装,由于已经有一个reactor,又因为是单例模式,所以再安装其它reactor会报错
def func(arg):
print(arg)
reactor.stop()

reactor.callWhenRunning(func,"hello,twisted")
reactor.run()

2 定时调用

接下来介绍定时调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from twisted.internet import reactor

def stop(arg):
print(arg)
reactor.stop()

def func(arg):
print(arg)
# 定时调用,第一个参数为调用时间,以秒为单位,第二个参数为回调函数,之后是它的参数
reactor.callLater(5, stop, "stop被执行了")
print("它是异步的")

reactor.callWhenRunning(func, "hello,twisted")
reactor.run()

和asyncio一样,twisted是异步的,输出结果为:

1
2
3
hello,twisted
它是异步的
stop被执行了

还可以多次定时调用,实现循环:

1
2
3
4
5
6
7
8
9
10
11
12
from twisted.internet import reactor

def loop(arg):
print(arg)
reactor.callLater(5, func)

def func():
reactor.callLater(5, loop, "loop调用...")
print("它是异步的")

reactor.callWhenRunning(func)
reactor.run()

输出结果为:

1
2
3
4
5
6
它是异步的
loop调用...
它是异步的
loop调用...
它是异步的
loop调用...

twisted的task模块还提供了专门的循环调用,可以简化以上过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from twisted.internet import reactor
from twisted.internet import task

def foo(arg):
print(arg)

def main():
# 创建loop循环对象
loop = task.LoopingCall(foo, "hello")
# 通过start启动循环,interval=3 定时器3秒启动一次
# now=False表示等待时间间隔再调用,now默认为True表示执行到这句时,立即调用回调函数
loop.start(interval=3,now=False)
print("main执行完毕")

reactor.callWhenRunning(main)
reactor.run()

它会一直循环下去,可以显式使用loop.stop停止调用。

三、异步回调defer

由于整个调用是异步的,那么能否进行异步回调就成了关键,如果无法获取异步调用的结果,整个异步代码就会难以维护。twisted提供了Deferred实现异步回调管理,可以在异步任务的执行过程进行监控,防止出现因为等待某个任务而发生阻塞。Deferred不依赖于reactor,因此不在事件循环中使用Deferred。

Deferred对象以抽象化的方式表达了一种思想,即结果还尚不存在。它同样能够帮助管理产生这个结果所需要的回调链。当从函数中返回时,Deferred对象承诺在某个时刻函数将产生一个结果。返回的Deferred对象中包含所有注册到事件上的回调引用,因此在函数间只需要传递这一个对象即可,跟踪这个对象比单独管理所有的回调要简单的多。

1 快速使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
from twisted.internet import defer

def func(arg):
deferred = defer.Deferred()
deferred.callback(arg)
return deferred

def foo(arg):
# foo是func的回调函数,在这里执行计算,模拟延迟的情况 sleep 2秒
time.sleep(2)
print(arg+1)

# 假设需要完成一个计算过程,这个过程耗时很久,于是交给defer
# 可以让func返回deferred对象
deferred = func(3)
print("已经获得deferred对象")
# 注册回调函数,当调用deferred.callback后,自动执行注册的foo函数
# 把callback的arg参数传递到foo函数中
deferred.addCallback(foo)

2 结合reactor

虽然可以单独使用,但意义不大。因此可以结合reactor使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from twisted.internet import defer,reactor

def func(arg):
deferred = defer.Deferred()
# 3秒后调用deferred.callback函数,并把arg参数传递给它
# 它是异步调用的
reactor.callLater(3, deferred.callback,arg)
return deferred

def foo(arg):
print(arg+1)
reactor.stop()

deferred = func(3)
print("已经获得deferred对象")
# 注册回调函数,当调用deferred.callback后,自动执行注册的foo函数
# 把callback的arg参数(从反应堆的延迟任务中获得)传递到foo函数中
deferred.addCallback(foo)
print("回调函数注册成功")
reactor.run()

整个流程是:首先执行func,在这里实例化deferred对象,并且反应堆中有一个3秒后的定时调用,调用的函数就是deferred.callback,然后将foo函数注册到deferred的回调函数中,由于调用deferred.callback时,会执行注册的foo函数,因此效果就是3秒后执行了foo函数。

1
2
3
4
已经获得deferred对象
回调函数注册成功
(大约3秒后...)
4

3 错误回调

你也可以将错误处理逻辑errback添加到Deferred,当出现错误,数据不能正常取回时,Deferred会调用它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from twisted.internet import defer,reactor

def func(arg):
deferred = defer.Deferred()
if arg>10:
reactor.callLater(3, deferred.callback,arg)
else:
reactor.callLater(3, deferred.errback,ValueError("值错误"))
return deferred

def foo(arg):
print(arg+1)

def errback(error):
print(error.value)
print(error.type)
print("如果出现异常就执行errback")

deferred = func(3)
print("已经获得deferred对象")
deferred.addCallback(foo)
deferred.addErrback(errback)
print("回调函数注册成功")
reactor.run()

输出是这样的:

1
2
3
4
5
已经获得deferred对象
回调函数注册成功
值错误
<class 'ValueError'>
如果出现异常就执行errback

4 回调链

以上的例子,只添加了一个正确回调和错误回调,但其实,回调可以添加多个。Deferred对象包含一对回调链,一个是针对操作成功的回调,一个是针对操作失败的回调。初始状态下Deferred对象的两条链都为空。在事件处理的过程中,每个阶段都为其添加处理成功的回调和处理失败的回调。当一个异步结果到来时,Deferred对象就被“激活”,那么处理成功的回调和处理失败的回调就可以以合适的方式按照它们添加进来的顺序依次得到调用。

一般来说,异步代码会在结果准备就绪时,调用第一个callback,或是在出现错误时,调用第一个errback,然后Deferred对象就会将callback或errback的返回结果作为参数传递给链中的下一个函数。Deferred提供了四个方法添加回调链:

  • addCallback(callback):添加成功回调
  • addErrback(errback):添加错误回调
  • addCallbacks(callback,errback):分别添加成功回调和错误回调
  • addBoth(Anyback):在两个回调链都添加这个回调(不管成功失败都执行这个回调)

下面的代码演示了如何添加回调链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from twisted.internet import defer

def main(arg):
deferred = defer.Deferred()
if arg >10:
deferred.callback("func1")
else:
deferred.errback(ValueError("错误"))
return deferred

def func1(arg):
print(arg)

def err1(error):
print(error.value)

def bothback(result):
# 不管成功还是失败都会执行
print(result)
deferred = main(100)
# 写法一,分开写
# deferred.addCallback(func1)
# deferred.addErrback(err1)

# 写法二,链式调用
# deferred.addCallback(func1).addErrback(err1)

# 写法三,使用addCallbacks
# deferred.addCallbacks(func1,err1)

# addBoth:无论成功还是失败都会执行该回调
deferred.addBoth(bothback)

下面演示回调链函数的依次调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from twisted.internet import defer

def main(num):
deferred = defer.Deferred()
deferred.callback(num)
return deferred

def callback1(num):
num+=1
print("callback1")
print(num)
return num

def callback2(num):
num+=1
print("callback2")
print(num)
return num

def callback3(num):
num+=1
print("callback3")
print(num)
return num
deferred = main(100)
# 回调链,会把上一个回调函数的返回值当作参数传入下一个回调函数中
deferred.addCallback(callback1).addCallback(callback2).addCallback(callback3)

输出结果为:

1
2
3
4
5
6
callback1
101
callback2
102
callback3
103

5 内联回调

内联回调inlineCallbacks是一个装饰器,它用来装饰生成器函数(yield语句函数)。它可以将生成器函数转化为异步回调,每个回调被yield分隔,yield的返回值会传递到下一个回调。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from twisted.internet import reactor,defer

def func1(num):
num+=1
print("func1执行了")
deferred = defer.Deferred()
reactor.callLater(5,deferred.callback,num+1)
return deferred
def func2(num):
num+=1
print("func2执行了")
deferred = defer.Deferred()
reactor.callLater(5,deferred.callback,num+1)
return deferred

def func3(num):
num+=1
print("func3执行了")
deferred = defer.Deferred()
reactor.callLater(5,deferred.callback,num+1)
return deferred

@defer.inlineCallbacks
def main():
res1 = yield func1(100)
print(res1) # 获取执行结果
res2 = yield func2(100)
print(res2) # 获取执行结果
res3 = yield func3(100)
print(res3) # 获取执行结果

main()
reactor.run()

内联回调使用yield和装饰器即可在一个函数中执行多次回调,省去了手动添加回调链的过程。