lenskit.parallel.ray.TaskLimiter#
- class lenskit.parallel.ray.TaskLimiter(limit=None)#
Limit task concurrency using
ray.wait().This class provides two key operations:
Add a task to the limiter with
add_task().Wait for tasks until the number of pending tasks is less than the limit.
Wait for all tasks with drain.
- Parameters:
limit (int | None) – The maximum number of pending tasks. Defaults to the LensKit process count (see
lenskit.parallel.initialize()).
- throttle[Elt](tasks, *, ordered=True)#
Throttle a generator of Ray tasks, only requesting new tasks from the generator as the limit has space.
- Parameters:
tasks (collections.abc.Generator[ray.ObjectRef[Elt]]) – A generator of tasks (usually the result of calling a Ray remote function).
ordered (bool)
- Returns:
A generator of the results (already awaited with
ray.get()).- Return type:
collections.abc.Generator[Elt, None, int]
- imap(function: ray.remote_function.RemoteFunction, items: collections.abc.Iterable[Any], *, ordered: bool = True) collections.abc.Generator[Any, None, int]#
- imap(function: collections.abc.Callable[[A], R], items: collections.abc.Iterable[A], *, ordered: bool = True) collections.abc.Generator[R, None, int]
- add_task(task)#
- Parameters:
task (ray.ObjectRef | ray.ObjectID)
- results_until_limit()#
Iterate over available results until the number of pending results tasks is less than the limit, blocking as needed.
This is a generator, returning the task result references. The iterator will stop when the pending tasks list is under the limit. No guarantee is made on the order of returned results.
- Return type:
collections.abc.Generator[ray.ObjectRef, None, None]
- wait_for_limit()#
Wait until the pending tasks are back under the limit.
This method calls
ray.get()on the result of each pending task to resolve errors, but discards the return value.
- drain_results()#
Iterate over all remaining tasks until the pending task list is empty, blocking as needed.
This is a generator, returning the task result references. No guarantee is made on the order of returned results.
- Return type:
collections.abc.Generator[ray.ObjectRef, None, None]
- drain()#
Wait until all pending tasks are finished.
This method calls
ray.get()on the result of each pending task to resolve errors, but discards the return value.