一、介绍
twisted是基于事件驱动的网络引擎框架,事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。
twisted支持许多常见的传输及应用层协议,包括TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP。
twisted的架构与单线程、多线程的对比如下:

和asyncio一样,twisted的编程模型可以在单线程下实现并发编程,即事件驱动版本的程序中,多个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他耗时的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。
二、反应堆reactor
1 hello world
1 2 3 4 5 6 7 8 9 10
| from twisted.internet import reactor
def func(arg): print(arg) reactor.stop()
reactor.callWhenRunning(func,"hello,twisted") reactor.run()
|
调用callWhenRunning
方法,传入回调函数,即可获得输出。
导入的reactor.py
源码如下:
1 2 3 4 5 6 7 8
| import sys
del sys.modules["twisted.internet.reactor"]
from twisted.internet import default
default.install()
|
导入default.py
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| __all__ = ["install"]
from twisted.python.runtime import platform
def _getInstallFunction(platform): try: if platform.isLinux(): try: from twisted.internet.epolreactor import install except ImportError: from twisted.internet.pollreactor import install elif platform.getType() == "posix" and not platform.isMacOSX(): from twisted.internet.pollreactor import install else: from twisted.internet.selectreactor import install except ImportError: from twisted.internet.selectreactor import install return install
install = _getInstallFunction(platform)
|
当然,这是源码层面自动选择的IO多路复用模型,也可以手动指定:
1 2 3 4 5 6 7 8 9 10 11
| from twisted.internet import selectreactor selectreactor.install()
from twisted.internet import reactor
def func(arg): print(arg) reactor.stop()
reactor.callWhenRunning(func,"hello,twisted") reactor.run()
|
2 定时调用
接下来介绍定时调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from twisted.internet import reactor
def stop(arg): print(arg) reactor.stop()
def func(arg): print(arg) reactor.callLater(5, stop, "stop被执行了") print("它是异步的")
reactor.callWhenRunning(func, "hello,twisted") reactor.run()
|
和asyncio一样,twisted是异步的,输出结果为:
1 2 3
| hello,twisted 它是异步的 stop被执行了
|
还可以多次定时调用,实现循环:
1 2 3 4 5 6 7 8 9 10 11 12
| from twisted.internet import reactor
def loop(arg): print(arg) reactor.callLater(5, func)
def func(): reactor.callLater(5, loop, "loop调用...") print("它是异步的")
reactor.callWhenRunning(func) reactor.run()
|
输出结果为:
1 2 3 4 5 6
| 它是异步的 loop调用... 它是异步的 loop调用... 它是异步的 loop调用...
|
twisted的task模块还提供了专门的循环调用,可以简化以上过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from twisted.internet import reactor from twisted.internet import task
def foo(arg): print(arg)
def main(): loop = task.LoopingCall(foo, "hello") loop.start(interval=3,now=False) print("main执行完毕")
reactor.callWhenRunning(main) reactor.run()
|
它会一直循环下去,可以显式使用loop.stop
停止调用。
三、异步回调defer
由于整个调用是异步的,那么能否进行异步回调就成了关键,如果无法获取异步调用的结果,整个异步代码就会难以维护。twisted提供了Deferred实现异步回调管理,可以在异步任务的执行过程进行监控,防止出现因为等待某个任务而发生阻塞。Deferred不依赖于reactor,因此不在事件循环中使用Deferred。
Deferred对象以抽象化的方式表达了一种思想,即结果还尚不存在。它同样能够帮助管理产生这个结果所需要的回调链。当从函数中返回时,Deferred对象承诺在某个时刻函数将产生一个结果。返回的Deferred对象中包含所有注册到事件上的回调引用,因此在函数间只需要传递这一个对象即可,跟踪这个对象比单独管理所有的回调要简单的多。
1 快速使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import time from twisted.internet import defer
def func(arg): deferred = defer.Deferred() deferred.callback(arg) return deferred
def foo(arg): time.sleep(2) print(arg+1)
deferred = func(3) print("已经获得deferred对象")
deferred.addCallback(foo)
|
2 结合reactor
虽然可以单独使用,但意义不大。因此可以结合reactor使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from twisted.internet import defer,reactor
def func(arg): deferred = defer.Deferred() reactor.callLater(3, deferred.callback,arg) return deferred
def foo(arg): print(arg+1) reactor.stop()
deferred = func(3) print("已经获得deferred对象")
deferred.addCallback(foo) print("回调函数注册成功") reactor.run()
|
整个流程是:首先执行func,在这里实例化deferred对象,并且反应堆中有一个3秒后的定时调用,调用的函数就是deferred.callback,然后将foo函数注册到deferred的回调函数中,由于调用deferred.callback时,会执行注册的foo函数,因此效果就是3秒后执行了foo函数。
1 2 3 4
| 已经获得deferred对象 回调函数注册成功 (大约3秒后...) 4
|
3 错误回调
你也可以将错误处理逻辑errback添加到Deferred,当出现错误,数据不能正常取回时,Deferred会调用它:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| from twisted.internet import defer,reactor
def func(arg): deferred = defer.Deferred() if arg>10: reactor.callLater(3, deferred.callback,arg) else: reactor.callLater(3, deferred.errback,ValueError("值错误")) return deferred
def foo(arg): print(arg+1)
def errback(error): print(error.value) print(error.type) print("如果出现异常就执行errback")
deferred = func(3) print("已经获得deferred对象") deferred.addCallback(foo) deferred.addErrback(errback) print("回调函数注册成功") reactor.run()
|
输出是这样的:
1 2 3 4 5
| 已经获得deferred对象 回调函数注册成功 值错误 <class 'ValueError'> 如果出现异常就执行errback
|
4 回调链
以上的例子,只添加了一个正确回调和错误回调,但其实,回调可以添加多个。Deferred对象包含一对回调链,一个是针对操作成功的回调,一个是针对操作失败的回调。初始状态下Deferred对象的两条链都为空。在事件处理的过程中,每个阶段都为其添加处理成功的回调和处理失败的回调。当一个异步结果到来时,Deferred对象就被“激活”,那么处理成功的回调和处理失败的回调就可以以合适的方式按照它们添加进来的顺序依次得到调用。
一般来说,异步代码会在结果准备就绪时,调用第一个callback,或是在出现错误时,调用第一个errback,然后Deferred对象就会将callback或errback的返回结果作为参数传递给链中的下一个函数。Deferred提供了四个方法添加回调链:
addCallback(callback)
:添加成功回调
addErrback(errback)
:添加错误回调
addCallbacks(callback,errback)
:分别添加成功回调和错误回调
addBoth(Anyback)
:在两个回调链都添加这个回调(不管成功失败都执行这个回调)
下面的代码演示了如何添加回调链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| from twisted.internet import defer
def main(arg): deferred = defer.Deferred() if arg >10: deferred.callback("func1") else: deferred.errback(ValueError("错误")) return deferred
def func1(arg): print(arg)
def err1(error): print(error.value)
def bothback(result): print(result) deferred = main(100)
deferred.addBoth(bothback)
|
下面演示回调链函数的依次调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from twisted.internet import defer
def main(num): deferred = defer.Deferred() deferred.callback(num) return deferred
def callback1(num): num+=1 print("callback1") print(num) return num
def callback2(num): num+=1 print("callback2") print(num) return num
def callback3(num): num+=1 print("callback3") print(num) return num deferred = main(100)
deferred.addCallback(callback1).addCallback(callback2).addCallback(callback3)
|
输出结果为:
1 2 3 4 5 6
| callback1 101 callback2 102 callback3 103
|
5 内联回调
内联回调inlineCallbacks
是一个装饰器,它用来装饰生成器函数(yield语句函数)。它可以将生成器函数转化为异步回调,每个回调被yield分隔,yield的返回值会传递到下一个回调。
举个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| from twisted.internet import reactor,defer
def func1(num): num+=1 print("func1执行了") deferred = defer.Deferred() reactor.callLater(5,deferred.callback,num+1) return deferred def func2(num): num+=1 print("func2执行了") deferred = defer.Deferred() reactor.callLater(5,deferred.callback,num+1) return deferred
def func3(num): num+=1 print("func3执行了") deferred = defer.Deferred() reactor.callLater(5,deferred.callback,num+1) return deferred
@defer.inlineCallbacks def main(): res1 = yield func1(100) print(res1) res2 = yield func2(100) print(res2) res3 = yield func3(100) print(res3)
main() reactor.run()
|
内联回调使用yield和装饰器即可在一个函数中执行多次回调,省去了手动添加回调链的过程。