lenskit.parallel.ray.TaskLimiter#

class lenskit.parallel.ray.TaskLimiter(limit=None)#

Limit task concurrency using ray.wait().

This class provides two key operations:

  • Add a task to the limiter with add_task().

  • Wait for tasks until the number of pending tasks is less than the limit.

  • Wait for all tasks with drain.

Parameters:

limit (int | None) – The maximum number of pending tasks. Defaults to the LensKit process count (see lenskit.parallel.initialize()).

limit: int#

The maximum number of pending tasks.

finished: int#

The number of tasks completed.

property pending: int#

The number of pending tasks.

Return type:

int

throttle[Elt](tasks, *, ordered=True)#

Throttle a generator of Ray tasks, only requesting new tasks from the generator as the limit has space.

Parameters:
  • tasks (collections.abc.Generator[ray.ObjectRef[Elt]]) – A generator of tasks (usually the result of calling a Ray remote function).

  • ordered (bool)

Returns:

A generator of the results (already awaited with ray.get()).

Return type:

collections.abc.Generator[Elt, None, int]

imap(function: ray.remote_function.RemoteFunction, items: collections.abc.Iterable[Any], *, ordered: bool = True) collections.abc.Generator[Any, None, int]#
imap(function: collections.abc.Callable[[A], R], items: collections.abc.Iterable[A], *, ordered: bool = True) collections.abc.Generator[R, None, int]
add_task(task)#
Parameters:

task (ray.ObjectRef | ray.ObjectID)

results_until_limit()#

Iterate over available results until the number of pending results tasks is less than the limit, blocking as needed.

This is a generator, returning the task result references. The iterator will stop when the pending tasks list is under the limit. No guarantee is made on the order of returned results.

Return type:

collections.abc.Generator[ray.ObjectRef, None, None]

wait_for_limit()#

Wait until the pending tasks are back under the limit.

This method calls ray.get() on the result of each pending task to resolve errors, but discards the return value.

drain_results()#

Iterate over all remaining tasks until the pending task list is empty, blocking as needed.

This is a generator, returning the task result references. No guarantee is made on the order of returned results.

Return type:

collections.abc.Generator[ray.ObjectRef, None, None]

drain()#

Wait until all pending tasks are finished.

This method calls ray.get() on the result of each pending task to resolve errors, but discards the return value.