lenskit.logging.multiprocess#

Multiprocess logging support for Ray integration. Most code doesn’t need to touch this.

Stability:

Internal

Classes#

Monitor

LensKit monitor controller.

RecordSink

Generic interface for record sinks.

WorkerContext

Activate (and deactivate) a worker context. This handles setup and teardown

WorkerLogConfig

Configuration for worker logging.

Functions#

get_monitor()

Get the monitor, starting it if it is not yet running.

send_task(task)

Package Contents#

class lenskit.logging.multiprocess.Monitor(handle_logging=True)#

LensKit monitor controller.

The monitor does several things:

  • Receive and re-inject log messages from worker processes.

  • Track work in progress and periodically write work logs.

The monitor is managed and used internally, and neither LensKit client code nor component implementations often need to interact with it.

Parameters:

handle_logging (bool) – Whether or not to handle log messages.

zmq: Monitor.zmq[Monitor.zmq[bytes]]#
log_address: str | None#
refreshables: dict[uuid.UUID, MonitorRefreshable]#
record_sinks: dict[uuid.UUID, lenskit.logging.multiprocess._records.RecordSink[Any]]#
lock: threading.Lock#
add_refreshable(obj)#
Parameters:

obj (MonitorRefreshable)

Return type:

uuid.UUID

remove_refreshable(uuid)#
Parameters:

uuid (uuid.UUID)

add_record_sink(sink)#
Parameters:

sink (lenskit.logging.multiprocess._records.RecordSink[Any])

remove_record_sink(sink)#
Parameters:

sink (lenskit.logging.multiprocess._records.RecordSink[Any] | uuid.UUID)

await_quiesce(*, ms=100)#

Wait for the monitor to quiesce.

Parameters:

ms (int) – The number of milliseconds of quiet to expect for quiescence.

shutdown()#
lenskit.logging.multiprocess.get_monitor()#

Get the monitor, starting it if it is not yet running.

Return type:

Monitor

class lenskit.logging.multiprocess.RecordSink#

Bases: Protocol, Generic[R]

Generic interface for record sinks.

Stability:
Internal (see Stability Levels).
sink_id: uuid.UUID#
record(record, /)#

Record the timings for a pipeline run.

Parameters:

record (R)

class lenskit.logging.multiprocess.WorkerContext(config, *, driver=False, zmq=None)#

Activate (and deactivate) a worker context. This handles setup and teardown of logging, etc.

Only one worker context can be active, regardless of how many threads are active.

Stability:
Internal (see Stability Levels).
Parameters:
config: WorkerLogConfig#
zmq: WorkerContext.zmq[WorkerContext.zmq[bytes]]#
static active()#
Return type:

WorkerContext | None

start()#

Start the logging context.

shutdown()#
send_task(task)#
Parameters:

task (lenskit.logging.tasks.Task)

send_progress(update)#

Send a progress update event.

Parameters:

update (lenskit.logging.multiprocess._protocol.ProgressMessage)

send_record(sink_id, record)#

Send a record to a record sink.

Parameters:
__enter__()#
__exit__(*args)#
class lenskit.logging.multiprocess.WorkerLogConfig#

Configuration for worker logging.

address: str#
level: int#
authkey: bytes | None = None#
classmethod current() Self#
classmethod current(*, from_monitor: bool = True) Self | None

Get the current worker logging configuration.

lenskit.logging.multiprocess.send_task(task)#
Parameters:

task (lenskit.logging.tasks.Task)