tornado.queues – コルーチン用のキュー

バージョン4.2の新機能。

コルーチン用の非同期キュー。これらのクラスは、標準ライブラリのasyncioパッケージで提供されているものと非常に似ています。

警告

標準ライブラリのqueueモジュールとは異なり、ここで定義されているクラスはスレッドセーフではありません。別のスレッドからこれらのキューを使用するには、IOLoop.add_callbackを使用して、キューメソッドを呼び出す前にIOLoopスレッドに制御を移してください。

クラス

Queue

class tornado.queues.Queue(maxsize: int = 0)[source]

プロデューサーとコンシューマーのコルーチンを調整します。

maxsizeが0の場合(デフォルト)、キューサイズは無制限です。

import asyncio
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await asyncio.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    for item in range(5):
        await q.put(item)
        print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')

asyncio.run(main())
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

ネイティブコルーチンがないPythonのバージョン(3.5より前)では、consumer()は次のように記述できます。

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

バージョン4.3での変更: Python 3.5でasync forサポートを追加しました。

property maxsize: int

キューに許可されるアイテムの数。

qsize() int[source]

キュー内のアイテムの数。

put(item: _T, timeout: Optional[Union[float, timedelta]] = None) Future[None][source]

アイテムをキューに入れます。空きができるまで待つ場合があります。

tornado.util.TimeoutErrorをタイムアウト後に発生させるFutureを返します。

timeoutは、tornado.ioloop.IOLoop.timeと同じスケール(通常はtime.time)の数値、または現在の時刻からの期限を示すdatetime.timedeltaオブジェクトです。

put_nowait(item: _T) None[source]

ブロッキングせずにアイテムをキューに入れます。

すぐに空きスロットがない場合は、QueueFullを発生させます。

get(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_T][source]

キューからアイテムを削除して返します。

アイテムが利用可能になると解決される、またはタイムアウト後にtornado.util.TimeoutErrorを発生させるawaitableを返します。

timeoutは、tornado.ioloop.IOLoop.timeと同じスケール(通常はtime.time)の数値、または現在の時刻からの期限を示すdatetime.timedeltaオブジェクトです。

注記

このメソッドのtimeout引数は、標準ライブラリのqueue.Queue.getのものとは異なります。そのメソッドは数値を相対的なタイムアウトとして解釈しますが、これは絶対的な期限として解釈し、相対的なタイムアウトにはtimedeltaオブジェクトが必要です(Tornadoの他のタイムアウトと一貫性があります)。

get_nowait() _T[source]

ブロッキングせずにキューからアイテムを削除して返します。

アイテムがすぐに利用可能な場合はアイテムを返し、そうでない場合はQueueEmptyを発生させます。

task_done() None[source]

以前にエンキューされたタスクが完了したことを示します。

キューコンシューマーによって使用されます。getを使用してタスクを取得するたびに、後続のtask_done呼び出しによって、キューにタスクの処理が完了したことが伝えられます。

もしjoinがブロックしている場合、すべてのアイテムが処理された時、つまり、すべてのputtask_doneと対応付けられた時に再開します。

ValueErrorは、putよりも多くの回数呼び出された場合に発生します。

join(timeout: Optional[Union[float, timedelta]] = None) Awaitable[None][source]

キュー内のすべてのアイテムが処理されるまでブロックします。

タイムアウト後にtornado.util.TimeoutErrorを発生させるawaitableを返します。

PriorityQueue

class tornado.queues.PriorityQueue(maxsize: int = 0)[source]

優先順位順(低い順)にエントリを取得するQueueです。

エントリは通常、(priority number, data)のようなタプルです。

import asyncio
from tornado.queues import PriorityQueue

async def main():
    q = PriorityQueue()
    q.put((1, 'medium-priority item'))
    q.put((0, 'high-priority item'))
    q.put((10, 'low-priority item'))

    print(await q.get())
    print(await q.get())
    print(await q.get())

asyncio.run(main())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize: int = 0)[source]

最後に追加されたアイテムを最初に取得するQueueです。

import asyncio
from tornado.queues import LifoQueue

async def main():
    q = LifoQueue()
    q.put(3)
    q.put(2)
    q.put(1)

    print(await q.get())
    print(await q.get())
    print(await q.get())

asyncio.run(main())
1
2
3

例外

QueueEmpty

exception tornado.queues.QueueEmpty[source]

キューにアイテムがない場合、Queue.get_nowaitによって発生します。

QueueFull

exception tornado.queues.QueueFull[source]

キューが最大サイズに達した場合、Queue.put_nowaitによって発生します。