lenskit.parallel.ray.TaskLimiter ================================ .. py:class:: lenskit.parallel.ray.TaskLimiter(limit = None) Limit task concurrency using :func:`ray.wait`. This class provides two key operations: - Add a task to the limiter with :meth:`add_task`. - Wait for tasks until the number of pending tasks is less than the limit. - Wait for all tasks with `drain`. :param limit: The maximum number of pending tasks. Defaults to the LensKit process count (see :func:`lenskit.parallel.initialize`). .. py:attribute:: limit :type: int The maximum number of pending tasks. .. py:attribute:: finished :type: int The number of tasks completed. .. py:property:: pending :type: int The number of pending tasks. .. py:method:: throttle[Elt](tasks, *, ordered = True) Throttle a generator of Ray tasks, only requesting new tasks from the generator as the limit has space. :param tasks: A generator of tasks (usually the result of calling a Ray remote function). :returns: A generator of the results (already awaited with :func:`ray.get`). .. py:method:: imap(function: ray.remote_function.RemoteFunction, items: collections.abc.Iterable[Any], *, ordered: bool = True) -> collections.abc.Generator[Any, None, int] imap(function: collections.abc.Callable[[A], R], items: collections.abc.Iterable[A], *, ordered: bool = True) -> collections.abc.Generator[R, None, int] .. py:method:: add_task(task) .. py:method:: results_until_limit() Iterate over available results until the number of pending results tasks is less than the limit, blocking as needed. This is a generator, returning the task result references. The iterator will stop when the pending tasks list is under the limit. No guarantee is made on the order of returned results. .. py:method:: wait_for_limit() Wait until the pending tasks are back under the limit. This method calls :meth:`ray.get` on the result of each pending task to resolve errors, but discards the return value. .. py:method:: drain_results() Iterate over all remaining tasks until the pending task list is empty, blocking as needed. This is a generator, returning the task result references. No guarantee is made on the order of returned results. .. py:method:: drain() Wait until all pending tasks are finished. This method calls :meth:`ray.get` on the result of each pending task to resolve errors, but discards the return value.