コルーチン¶
コルーチンは、Tornadoで非同期コードを書くための推奨される方法です。コルーチンは、Pythonの await キーワードを使用して、コールバックの連鎖の代わりに実行を一時停止および再開します(geventのようなフレームワークで見られる協調的な軽量スレッドもコルーチンと呼ばれることがありますが、Tornadoではすべてのコルーチンは明示的なコンテキストスイッチを使用し、非同期関数として呼び出されます)。
コルーチンは、スレッドのコストをかけずに、ほとんど同期コードと同じくらい簡単です。また、コンテキストスイッチが発生する場所の数を減らすことで、並行処理について推論しやすくなります。
例
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
ネイティブコルーチンとデコレートされたコルーチン¶
Python 3.5では、async および await キーワードが導入されました(これらのキーワードを使用する関数は「ネイティブコルーチン」とも呼ばれます)。古いバージョンのPythonとの互換性のために、tornado.gen.coroutine デコレーターを使用した「デコレートされた」または「yieldベースの」コルーチンを使用できます。
可能な限り、ネイティブコルーチンが推奨される形式です。古いバージョンのPythonとの互換性が必要な場合にのみ、デコレートされたコルーチンを使用してください。Tornadoドキュメントの例では、一般的にネイティブ形式を使用します。
2つの形式間の変換は、一般的に簡単です
# Decorated: # Native:
# Normal function declaration
# with decorator # "async def" keywords
@gen.coroutine
def a(): async def a():
# "yield" all async funcs # "await" all async funcs
b = yield c() b = await c()
# "return" and "yield"
# cannot be mixed in
# Python 2, so raise a
# special exception. # Return normally
raise gen.Return(b) return b
コルーチンの2つの形式のその他の違いを以下に示します。
ネイティブコルーチン
一般的に高速です。
async forおよびasync withステートメントを使用でき、いくつかのパターンがはるかに簡単になります。awaitまたはyieldするまで、まったく実行されません。デコレートされたコルーチンは、呼び出されるとすぐに「バックグラウンドで」実行を開始できます。両方の種類のコルーチンで、例外が発生した場合にどこかに移動できるように、awaitまたはyieldを使用することが重要であることに注意してください。
デコレートされたコルーチン
concurrent.futuresパッケージとの追加の統合があり、executor.submitの結果を直接yieldできます。ネイティブコルーチンの場合は、代わりにIOLoop.run_in_executorを使用してください。リストまたは辞書をyieldすることにより、複数のオブジェクトの待機をサポートするいくつかの省略形をサポートします。ネイティブコルーチンでこれを行うには、
tornado.gen.multiを使用してください。変換関数のレジストリを介して、Twistedを含む他のパッケージとの統合をサポートできます。ネイティブコルーチンでこの機能にアクセスするには、
tornado.gen.convert_yieldedを使用してください。常に
Futureオブジェクトを返します。ネイティブコルーチンは、Futureではないawaitableオブジェクトを返します。Tornadoでは、これら2つはほとんど互換性があります。
仕組み¶
このセクションでは、デコレートされたコルーチンの動作について説明します。ネイティブコルーチンは概念的には似ていますが、Pythonランタイムとの統合が追加されているため、少し複雑です。
yield を含む関数は、ジェネレーターです。すべてのジェネレーターは非同期です。呼び出されると、完了まで実行する代わりにジェネレーターオブジェクトを返します。@gen.coroutine デコレーターは、yield 式を介してジェネレーターと、Future を返すことによってコルーチンの呼び出し元と通信します。
以下は、コルーチンデコレーターの内部ループの簡略化されたバージョンです
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
デコレーターは、ジェネレーターから Future を受信し、その Future が完了するのを(ブロッキングせずに)待機し、次に Future を「アンラップ」し、yield 式の結果として、結果をジェネレーターに送り返します。ほとんどの非同期コードは、非同期関数によって返された Future を yield 式に即座に渡す場合を除いて、Future クラスに直接触れることはありません。
コルーチンの呼び出し方¶
コルーチンは通常の方法で例外を発生させません。発生した例外は、yieldされるまでawaitableオブジェクトにトラップされます。これは、コルーチンを適切な方法で呼び出すことが重要であることを意味します。そうしないと、エラーに気づかない可能性があります。
async def divide(x, y):
return x / y
def bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)
ほとんどの場合、コルーチンを呼び出す関数は、それ自体がコルーチンであり、呼び出しで await または yield キーワードを使用する必要があります。スーパークラスで定義されたメソッドをオーバーライドする場合は、コルーチンが許可されているかどうかをドキュメントで確認してください(ドキュメントには、メソッドが「コルーチンである可能性がある」または「Futureを返す可能性がある」と記載されている必要があります)。
async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)
場合によっては、結果を待たずにコルーチンを「起動して忘れる」必要がある場合があります。この場合は、IOLoop.spawn_callback を使用して、呼び出しを IOLoop に委ねることをお勧めします。失敗した場合、IOLoop はスタックトレースをログに記録します
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
この方法で IOLoop.spawn_callback を使用することは、@gen.coroutine を使用する関数に対しては推奨されますが、async def を使用する関数には必須です(そうしないと、コルーチンランナーが開始されません)。
最後に、プログラムのトップレベルでは、IOLoopがまだ実行されていない場合、IOLoop を開始し、コルーチンを実行してから、IOLoop.run_sync メソッドで IOLoop を停止できます。これは、バッチ指向プログラムの main 関数を開始するためによく使用されます
# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))
コルーチンパターン¶
ブロッキング関数の呼び出し¶
コルーチンからブロッキング関数を呼び出す最も簡単な方法は、コルーチンと互換性のある Futures を返す IOLoop.run_in_executor を使用することです。
async def call_blocking():
await IOLoop.current().run_in_executor(None, blocking_func, args)
並列処理¶
multi 関数は、値が Futures であるリストと辞書を受け入れ、それらのすべての Futures を並行して待機します
from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}
デコレートされたコルーチンでは、リストまたは辞書を直接 yield することが可能です
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
インターリーブ¶
場合によっては、すぐにyieldするのではなく、Future を保存すると、待機する前に別の操作を開始できるため便利です。
from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = await fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
await self.flush()
デコレートされたコルーチンは呼び出されるとすぐに開始されるため、これを行うのは少し簡単です
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
ループ¶
ネイティブコルーチンでは、async for を使用できます。古いバージョンの Python では、コルーチンでのループ処理は、for ループや while ループのすべての反復処理で yield して、その結果を取得する方法がないため、トリッキーです。代わりに、Motor の例にあるように、ループ条件と結果へのアクセスを分離する必要があります。
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
バックグラウンドでの実行¶
PeriodicCallback の代替として、コルーチンは while True: ループを含み、tornado.gen.sleep を使用できます。
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
より複雑なループが必要な場合もあります。たとえば、前のループは do_something() の実行時間が N である場合、60+N 秒ごとに実行されます。正確に 60 秒ごとに実行するには、上記のインターリーブパターンを使用します。
async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.