コルーチン¶
コルーチンは、Tornadoで非同期コードを書くための推奨される方法です。コルーチンは、Pythonの await
キーワードを使用して、コールバックの連鎖の代わりに実行を一時停止および再開します(geventのようなフレームワークで見られる協調的な軽量スレッドもコルーチンと呼ばれることがありますが、Tornadoではすべてのコルーチンは明示的なコンテキストスイッチを使用し、非同期関数として呼び出されます)。
コルーチンは、スレッドのコストをかけずに、ほとんど同期コードと同じくらい簡単です。また、コンテキストスイッチが発生する場所の数を減らすことで、並行処理について推論しやすくなります。
例
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
ネイティブコルーチンとデコレートされたコルーチン¶
Python 3.5では、async
および await
キーワードが導入されました(これらのキーワードを使用する関数は「ネイティブコルーチン」とも呼ばれます)。古いバージョンのPythonとの互換性のために、tornado.gen.coroutine
デコレーターを使用した「デコレートされた」または「yieldベースの」コルーチンを使用できます。
可能な限り、ネイティブコルーチンが推奨される形式です。古いバージョンのPythonとの互換性が必要な場合にのみ、デコレートされたコルーチンを使用してください。Tornadoドキュメントの例では、一般的にネイティブ形式を使用します。
2つの形式間の変換は、一般的に簡単です
# Decorated: # Native:
# Normal function declaration
# with decorator # "async def" keywords
@gen.coroutine
def a(): async def a():
# "yield" all async funcs # "await" all async funcs
b = yield c() b = await c()
# "return" and "yield"
# cannot be mixed in
# Python 2, so raise a
# special exception. # Return normally
raise gen.Return(b) return b
コルーチンの2つの形式のその他の違いを以下に示します。
ネイティブコルーチン
一般的に高速です。
async for
およびasync with
ステートメントを使用でき、いくつかのパターンがはるかに簡単になります。await
またはyield
するまで、まったく実行されません。デコレートされたコルーチンは、呼び出されるとすぐに「バックグラウンドで」実行を開始できます。両方の種類のコルーチンで、例外が発生した場合にどこかに移動できるように、await
またはyield
を使用することが重要であることに注意してください。
デコレートされたコルーチン
concurrent.futures
パッケージとの追加の統合があり、executor.submit
の結果を直接yieldできます。ネイティブコルーチンの場合は、代わりにIOLoop.run_in_executor
を使用してください。リストまたは辞書をyieldすることにより、複数のオブジェクトの待機をサポートするいくつかの省略形をサポートします。ネイティブコルーチンでこれを行うには、
tornado.gen.multi
を使用してください。変換関数のレジストリを介して、Twistedを含む他のパッケージとの統合をサポートできます。ネイティブコルーチンでこの機能にアクセスするには、
tornado.gen.convert_yielded
を使用してください。常に
Future
オブジェクトを返します。ネイティブコルーチンは、Future
ではないawaitableオブジェクトを返します。Tornadoでは、これら2つはほとんど互換性があります。
仕組み¶
このセクションでは、デコレートされたコルーチンの動作について説明します。ネイティブコルーチンは概念的には似ていますが、Pythonランタイムとの統合が追加されているため、少し複雑です。
yield
を含む関数は、ジェネレーターです。すべてのジェネレーターは非同期です。呼び出されると、完了まで実行する代わりにジェネレーターオブジェクトを返します。@gen.coroutine
デコレーターは、yield
式を介してジェネレーターと、Future
を返すことによってコルーチンの呼び出し元と通信します。
以下は、コルーチンデコレーターの内部ループの簡略化されたバージョンです
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
デコレーターは、ジェネレーターから Future
を受信し、その Future
が完了するのを(ブロッキングせずに)待機し、次に Future
を「アンラップ」し、yield
式の結果として、結果をジェネレーターに送り返します。ほとんどの非同期コードは、非同期関数によって返された Future
を yield
式に即座に渡す場合を除いて、Future
クラスに直接触れることはありません。
コルーチンの呼び出し方¶
コルーチンは通常の方法で例外を発生させません。発生した例外は、yieldされるまでawaitableオブジェクトにトラップされます。これは、コルーチンを適切な方法で呼び出すことが重要であることを意味します。そうしないと、エラーに気づかない可能性があります。
async def divide(x, y):
return x / y
def bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)
ほとんどの場合、コルーチンを呼び出す関数は、それ自体がコルーチンであり、呼び出しで await
または yield
キーワードを使用する必要があります。スーパークラスで定義されたメソッドをオーバーライドする場合は、コルーチンが許可されているかどうかをドキュメントで確認してください(ドキュメントには、メソッドが「コルーチンである可能性がある」または「Future
を返す可能性がある」と記載されている必要があります)。
async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)
場合によっては、結果を待たずにコルーチンを「起動して忘れる」必要がある場合があります。この場合は、IOLoop.spawn_callback
を使用して、呼び出しを IOLoop
に委ねることをお勧めします。失敗した場合、IOLoop
はスタックトレースをログに記録します
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
この方法で IOLoop.spawn_callback
を使用することは、@gen.coroutine
を使用する関数に対しては推奨されますが、async def
を使用する関数には必須です(そうしないと、コルーチンランナーが開始されません)。
最後に、プログラムのトップレベルでは、IOLoopがまだ実行されていない場合、IOLoop
を開始し、コルーチンを実行してから、IOLoop.run_sync
メソッドで IOLoop
を停止できます。これは、バッチ指向プログラムの main
関数を開始するためによく使用されます
# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))
コルーチンパターン¶
ブロッキング関数の呼び出し¶
コルーチンからブロッキング関数を呼び出す最も簡単な方法は、コルーチンと互換性のある Futures
を返す IOLoop.run_in_executor
を使用することです。
async def call_blocking():
await IOLoop.current().run_in_executor(None, blocking_func, args)
並列処理¶
multi
関数は、値が Futures
であるリストと辞書を受け入れ、それらのすべての Futures
を並行して待機します
from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}
デコレートされたコルーチンでは、リストまたは辞書を直接 yield
することが可能です
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
インターリーブ¶
場合によっては、すぐにyieldするのではなく、Future
を保存すると、待機する前に別の操作を開始できるため便利です。
from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = await fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
await self.flush()
デコレートされたコルーチンは呼び出されるとすぐに開始されるため、これを行うのは少し簡単です
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
ループ¶
ネイティブコルーチンでは、async for
を使用できます。古いバージョンの Python では、コルーチンでのループ処理は、for
ループや while
ループのすべての反復処理で yield
して、その結果を取得する方法がないため、トリッキーです。代わりに、Motor の例にあるように、ループ条件と結果へのアクセスを分離する必要があります。
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
バックグラウンドでの実行¶
PeriodicCallback
の代替として、コルーチンは while True:
ループを含み、tornado.gen.sleep
を使用できます。
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
より複雑なループが必要な場合もあります。たとえば、前のループは do_something()
の実行時間が N
である場合、60+N
秒ごとに実行されます。正確に 60 秒ごとに実行するには、上記のインターリーブパターンを使用します。
async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.