twisted-1

一些关于twisted的简单内容

TCP SERVER

protocol

通常用于处理协议的子类为:twisted.internet.protocol.Protocol , 为了保存配置文件,使用工厂模式:twisted.internet.protocol.Factory

先看一个简单的例子

1
2
3
class Echo(Protocol):
def dataReceived(self, data):
self.transport.write(data)

这就是一个简单的协议,重写了dataReceived 当收到消息时,返回给 sender 同样内容。

1
2
3
4
5
from twisted.internet.protocol import Protocol
class QOTD(Protocol):
def connectionMade(self):
self.transport.write(b"An apple a day keeps the doctor away\r\n")
self.transport.loseConnection()

上面这个重写了connectionMade方法,这个时双方建立连接时的协商工作。

1
2
3
4
5
6
7
8
9
10
11
from twisted.protocols.basic import LineReceiver

class Answer(LineReceiver):

answers = {b'How are you?': b'Fine', None: b"I don't know what you mean"}

def lineReceived(self, line):
if line in self.answers:
self.sendLine(self.answers[line])
else:
self.sendLine(self.answers[None])

这是一个没收到一行就处理一次的Protocol。

运行server

1
2
3
4
5
6
7
8
9
10
11
12
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTDFactory(Factory):
def buildProtocol(self, addr):
return QOTD()

# 8007 is the port you want to run under. Choose something >1024
endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory())
reactor.run()

创建工程类,重写 buildProtocol 方法,返回你的 Protocol子类。然后通过 TCP4ServerEndpoint 绑定端口,然后指定工厂类。通过 reactor.run() 来启动整个流程。(目前我还不怎么会用reactor)

另一种工程创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class QOTD(Protocol):

def connectionMade(self):
# self.factory was set by the factory's default buildProtocol:
self.transport.write(self.factory.quote + b'\r\n')
self.transport.loseConnection()


class QOTDFactory(Factory):

# This will be used by the default buildProtocol to create new protocols:
protocol = QOTD

def __init__(self, quote=None):
self.quote = quote or b'An apple a day keeps the doctor away'

endpoint = TCP4ServerEndpoint(reactor, 8007)
endpoint.listen(QOTDFactory(b"configurable quote"))
reactor.run()

工厂类的startup & shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class LoggingProtocol(LineReceiver):

def lineReceived(self, line):
print(line)
if line == b'quit':
# self.factory.stopFactory()
pass

self.factory.fp.write(line + b'\n')
self.transport.loseConnection()


class LogfileFactory(Factory):
protocol = LoggingProtocol

def __init__(self, fileName):
self.file = fileName

def startFactory(self):
self.fp = open(self.file, 'ab')

def stopFactory(self):
self.fp.close()

endpoint = TCP4ServerEndpoint(reactor, 8006)
endpoint.listen(LogfileFactory("1.txt"))
reactor.run()

这个需要通过 python test.py 来启动,然后通过 CTRL+C 来终止。如果时pycharm直接kill的话,无法调用 stopFactory 函数,导致无法写日志。官方文档说,用户不应该手动调用此方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding: utf-8 -*-
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor


class Echo(Protocol):

def __init__(self, factory):
self.factory = factory

def connectionMade(self):
self.factory.numProtocols = self.factory.numProtocols + 1
self.transport.write(
b"Welcome! There are currently %d open connections.\n" %
(self.factory.numProtocols,))

def connectionLost(self, reason):
self.factory.numProtocols = self.factory.numProtocols - 1

def dataReceived(self, data):
self.transport.write(data)


class EchoFactory(Factory):
numProtocols = 0

def buildProtocol(self, addr):
return Echo(self) # 通过self将factory实例传入

# 34 35的代码可以通过如下代码替换
# reactor.listenTCP(8006, EchoFactory())
endpoint = TCP4ServerEndpoint(reactor, 8006)
endpoint.listen(EchoFactory())
reactor.run()

listenTCP是将Factory连接到网络的方法。

TCP CLIENT

In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol.

文档意思是大多数情况下,Clinet只需要请求一次SERVER,所以不需要Factory。twisted.internet.endpoints提供了适当的API,特别是connectProtocol,它采用协议实例而不是工厂。

一个简单的协议子类与 SERVER 版本没什么区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
from twisted.internet.protocol import Protocol
from sys import stdout

class Echo(Protocol):
def dataReceived(self, data):
stdout.write(data)
########################################################
from twisted.internet.protocol import Protocol

class WelcomeMessage(Protocol):
def connectionMade(self):
self.transport.write("Hello server, I am the client!\r\n")
self.transport.loseConnection()

下面是官方文档使用的例子。主要是关注下它启动的方式。还有另一种启动方式,已经不推荐。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding: utf-8 -*-
from twisted.internet import reactor
from twisted.internet.protocol import Protocol
from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol

class Greeter(Protocol):
def connectionMade(self):
self.transport.write(b"Hello server, I am the client!\r\n")
# self.transport.loseConnection()

def dataReceived(self, data):
self.sendMessage(b'yes')

def sendMessage(self, msg):
self.transport.write(b"MESSAGE %s\n" % msg)

def gotProtocol(p):
p.sendMessage(b"Hello")
reactor.callLater(1, p.sendMessage, b"This is sent in a second")
reactor.callLater(2, p.transport.loseConnection)

point = TCP4ClientEndpoint(reactor, "127.0.0.1", 1234)
d = connectProtocol(point, Greeter())
d.addCallback(gotProtocol)
reactor.run()

工厂方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from twisted.internet.protocol import Protocol, ClientFactory
from sys import stdout

class Echo(Protocol):
def dataReceived(self, data):
stdout.write(data)

class EchoClientFactory(ClientFactory):
def startedConnecting(self, connector):
print('Started to connect.')

def buildProtocol(self, addr):
print('Connected.')
return Echo()

def clientConnectionLost(self, connector, reason):
print('Lost connection. Reason:', reason)

def clientConnectionFailed(self, connector, reason):
print('Connection failed. Reason:', reason)

from twisted.internet import reactor
reactor.connectTCP('127.0.0.1', 1234, EchoClientFactory())
reactor.run()

无法建立连接时调用clientConnectionFailed,并且在建立连接然后断开连接时调用clientConnectionLost。

下面的例子是当连接超时或者出错,进行重新连接的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from sys import stdout

class Echo(Protocol):
def dataReceived(self, data):
stdout.write(data)

class EchoClientFactory(ReconnectingClientFactory):
def startedConnecting(self, connector):
print('Started to connect.')

def buildProtocol(self, addr):
print('Connected.')
print('Resetting reconnection delay')
self.resetDelay()
return Echo()

def clientConnectionLost(self, connector, reason):
print('Lost connection. Reason:', reason)
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

def clientConnectionFailed(self, connector, reason):
print('Connection failed. Reason:', reason)
ReconnectingClientFactory.clientConnectionFailed(self, connector,
reason)

schedule

对于 schedule ,官方给出3个使用案例。

延时调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# -*- coding: utf-8 -*-
from twisted.internet import task
from twisted.internet import reactor


def f(s):
print("This will run 3.5 seconds after it was scheduled: %s" % s)
return 'hehe'

d = task.deferLater(reactor, 1, f, "hello, world")
#也可以用下面的语句,但我还不知道着2个是否有什么区别
reactor.callLater(1, f, "This is sent in a second")

def called(result):
print(result)

d.addCallback(called)
reactor.run()

循环调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from twisted.internet import task
from twisted.internet import reactor

loopTimes = 3 # 循环次数
failInTheEnd = False
_loopCounter = 0 # 已经循环的次数

def runEverySecond():
"""
Called at ever loop interval.
"""
global _loopCounter

if _loopCounter < loopTimes:
_loopCounter += 1
print('A new second has passed.')
return

# 如果发生错误,则抛出异常
if failInTheEnd:
raise Exception('Failure during loop execution.')

# We looped enough times.
loop.stop()
return


def cbLoopDone(result):
"""
Called when loop was stopped with success.
"""
print("Loop done.")
reactor.stop()


def ebLoopFailed(failure):
"""
Called when loop execution failed.
"""
print(failure.getBriefTraceback())
reactor.stop()


loop = task.LoopingCall(runEverySecond)

# Start looping every 1 second.
loopDeferred = loop.start(1.0)

# Add callbacks for stop and failure.
loopDeferred.addCallback(cbLoopDone)
loopDeferred.addErrback(ebLoopFailed)

reactor.run()

取消调用

1
2
3
4
5
6
7
8
from twisted.internet import reactor

def f():
print("I'll never run.")

callID = reactor.callLater(5, f)
callID.cancel()
reactor.run()

Deferred

个人理解

Deferred是一个可以添加回调链的对象。上面代码中的Deferred都是通过调用延时来模拟的。实际上会有类似于请求WEB的这种API作为真实的Deferred参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from twisted.internet import reactor, defer

def getDummyData(inputData):
print('getDummyData called')
deferred = defer.Deferred()
# 通过callLater来模拟一个会产生延时的函数
reactor.callLater(2, deferred.callback, inputData * 3)
return deferred

def cbPrintData(result):
# 用于处理上面的输出
print('Result received: {}'.format(result))

deferred = getDummyData(3)
deferred.addCallback(cbPrintData)

# 设置结束
reactor.callLater(4, reactor.stop)
# start up the Twisted reactor (event loop handler) manually
print('Starting the reactor')
reactor.run()

  • 先从数据源获取数据,这个获取的方法会产生一个 Deferred 对象。
  • 然后开始回调。

关于回调函数

  1. 如果成功,则调用 .callback(result) 。如果失败,则 .errback(failure)
  2. 回调函数总是将上一个函数的返回作为下个函数的参数进行传递。
  3. 如果 callback 中产生异常,则切换到 errback 中。

对于一个正常的python处理异常:

1
2
3
4
5
6
try:
# code that may throw an exception
cookSpamAndEggs()
except (SpamException, EggException):
# Handle SpamExceptions and EggExceptions
...

如果用twisted的异常链,应该是如下内容:

1
2
3
4
5
6
def errorHandler(failure):
failure.trap(SpamException, EggException)
# Handle SpamExceptions and EggExceptions

d.addCallback(cookSpamAndEggs)
d.addErrback(errorHandler)

如果 failure.trap(…) 没有匹配到异常,则会抛出另外的异常。

另一个需要注意的就是下面两种添加会掉链的方式可能有所不同。

1
2
3
4
5
6
7
8
9
10
11
# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1) # A
d.addErrback(errback1) # B
d.addCallback(callback2)
d.addErrback(errback2)

# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1) # C
d.addCallbacks(callback2, errback2)

对于 case1 而言,在callback1发生异常时,errback1会被调用。 而对于 case2 而言,则errback2会被调用。

maybeDeferred

这个函数用来处理,当你不知道这个到底是 同步 还是 异步 时候使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 同步的验证函数
def synchronousIsValidUser(user):
'''
Return true if user is a valid user, false otherwise
'''
return user in ["Alice", "Angus", "Agnes"]
from twisted.internet import reactor, defer
# 异步函数
def asynchronousIsValidUser(user):
d = defer.Deferred()
reactor.callLater(2, d.callback, user in ["Alice", "Angus", "Agnes"])
return d
from twisted.internet import defer

# 返回结果
def printResult(result):
if result:
print("User is authenticated")
else:
print("User is not authenticated")

def authenticateUser(isValidUser, user):
d = defer.maybeDeferred(isValidUser, user)
d.addCallback(printResult)
# 通过同步验证
authenticateUser(synchronousIsValidUser,user)
# 通过异步验证
authenticateUser(asynchronousIsValidUser,user)

取消任务

动机

假如在请求某个连接时,一直在转圈圈,那么用户想停止请求它,就要取消。看一下下面模拟的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import random
from twisted.internet import task

def f():
return "Hopefully this will be called in 3 seconds or less"

def main(reactor):
delay = random.uniform(1, 5) # 随机等待 1-5 秒

def called(result):
print("{0} seconds later:".format(delay), result)

d = task.deferLater(reactor, delay, f)
d.addTimeout(3, reactor).addBoth(called) # 如果超过3秒,就会打断上面的命令,抛出异常
# 如果没有超过3秒,就不会打断上面命令

return d

# f() will be timed out if the random delay is greater than 3 seconds
task.react(main)

DeferredList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# A callback that unpacks and prints the results of a DeferredList
def printResult(result):
for (success, value) in result:
if success:
print('Success:', value)
else:
print('Failure:', value.getErrorMessage())

# Create three deferreds.
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

# Pack them into a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

# Add our callback
dl.addCallback(printResult)

# Fire our three deferreds with various values.
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')

# At this point, dl will fire its callback, printing:
# Success: one
# Failure: bang!
# Success: three
# (note that defer.SUCCESS == True, and defer.FAILURE == False)

如果在将Deferred添加到DeferredList之后向Deferred添加回调,则该回调返回的值将不会提供给DeferredList的回调。为避免混淆,我们建议在DeferredList中使用后,不要向Deferred添加回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def printResult(result):
print(result)

def addTen(result):
return result + " ten"

# Deferred gets callback before DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred1.addCallback(addTen)
dl = defer.DeferredList([deferred1, deferred2])
dl.addCallback(printResult)
deferred1.callback("one") # fires addTen, checks DeferredList, stores "one ten"
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
# [(1, 'one ten'), (1, 'two')]

# Deferred gets callback after DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
dl = defer.DeferredList([deferred1, deferred2])
deferred1.addCallback(addTen) # will run *after* DeferredList gets its value
dl.addCallback(printResult)
deferred1.callback("one") # checks DeferredList, stores "one", fires addTen
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
# [(1, 'one), (1, 'two')]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from twisted.internet import defer

d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.gatherResults([d1, d2], consumeErrors=True)

def cbPrintResult(result):
print(result)

d.addCallback(cbPrintResult)

d1.callback("one")
# nothing is printed yet; d is still awaiting completion of d2
d2.callback("two")
# printResult prints ["one", "two"]