zmq

Todo

Currently only IPC is supported, and thus the zmq runner can’t run across machines. Supporting TCP is WIP, it will require some degree of authentication among nodes to prevent arbitrary code execution, since we shouldn’t count on users to properly firewall their runners.

Todo

The socket spawning and event handling is awfully manual here. Leaving it as is because it’s somewhat unlikely we’ll need to generalize it, but otherwise it would be great to standardize socket names and have event handler decorators like:

@on_router(MessageType.sometype)

Main Runner

class ZMQRunner(tube: Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[Callable[[Event | MetaEvent], None]] = <factory>, _logger: Logger = None, _runner_id: str | None = None, node_procs: dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier), ~pydantic.functional_validators.AfterValidator(func=~noob.types._not_reserved)], ~multiprocessing.Process] = <factory>, command: ~noob.runner.zmq.command.CommandNode | None = None, quit_timeout: float = 10, autoclear_store: bool = True, _initialized: ~multiprocessing.synchronize.Event = <factory>, _running: ~multiprocessing.synchronize.Event = <factory>, _init_lock: ~threading.RLock = <factory>, _running_lock: ~_thread.lock = <factory>, _ignore_events: bool = False, _return_node: ~noob.node.return_.Return | None = None, _to_throw: ~noob.network.message.ErrorValue | None = None, _current_epoch: ~noob.types.Epoch = (('tube', 0),), _epoch_futures: dict[~noob.types.Epoch, ~concurrent.futures._base.Future] = <factory>)[source]

A concurrent runner that uses zmq to broker events between nodes running in separate processes.

On init(), creates a CommandNode in a thread in the main process, and spawn a set of NodeRunner s for the nodes in a Tube .

The Command node is the broker between the ZMQRunner and node runners, sending command messages and receiving events from each of the nodes (see network.message ). Each Node runner subscribes directly to its dependent nodes to receive events, and the commmand node subscribes to every node to make them available to the ZMQRunner, but events do not need to pass through the command node before being processed.

Note

Asset Handling

See NodeRunner for documentation about how Assets are handled in the ZMQRunner

node_procs: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Process]
command: CommandNode | None = None
quit_timeout: float = 10

time in seconds to wait after calling deinit to wait before killing runner processes

store: EventStore
autoclear_store: bool = True

If True (default), clear the event store after events are processed and returned. If False , don’t clear events from the event store

property running: bool
property initialized: bool
init() None[source]
deinit() None[source]
process(**kwargs: Any) None | dict[str, Any] | Any[source]
iter(n: int | None = None) Generator[None | dict[str, Any] | Any, None, None][source]

Iterate over results as they are available.

Tube runs in free-run mode for n iterations, This method is usually only useful for tubes with Return nodes. This method yields only when return is available: the tube will run more than n process calls if there are e.g. gather nodes that cause the return value to be empty.

To call the tube a specific number of times and do something with the events other than returning a value, use callbacks and run() !

Note that backpressure control is not yet implemented!!! If the outer iter method is slow, or there is a bottleneck in your tube, you might incur some serious memory usage! Backpressure and observability is a WIP!

If you need a version of this method that always makes a fixed number of process calls, raise an issue!

run(n: int) list[None | dict[str, Any] | Any][source]
run(n: None = None) None

Run the tube in freerun mode - every node runs as soon as its dependencies are satisfied, not waiting for epochs to complete before starting the next epoch.

Blocks when n is not None - This is for consistency with the synchronous/asyncio runners, but may change in the future.

If n is None, does not block. stop processing by calling stop() or deinitializing (exiting the contextmanager, or calling deinit())

stop() None[source]

Stop running the tube.

async on_event(msg: Message) None[source]
on_router(msg: Message) None[source]
collect_return(epoch: Epoch | None = None) Any[source]
enable_node(node_id: str) None[source]
disable_node(node_id: str) None[source]

Command Node

class CommandNode(runner_id: str, protocol: str = 'ipc', port: int | None = None)[source]

Pub node that controls the state of the other nodes/announces addresses

  • one PUB socket to distribute commands

  • one ROUTER socket to receive return messages from runner nodes

  • one SUB socket to subscribe to all events

The wrapping runner should register callbacks with add_callback to handle incoming messages.

property pub_address: str

Address the publisher bound to

property router_address: str

Address the return router is bound to

run() None[source]

Target for threading.Thread

init() None[source]
deinit() None[source]

Close the eventloop, stop processing messages, reset state

stop() None[source]
async announce() None[source]
async ping() None[source]

Send a ping message asking everyone to identify themselves

start(n: int | None = None) None[source]

Start running in free-run mode

process(epoch: Epoch, input: dict | None = None) None[source]

Emit a ProcessMsg to process a single round through the graph

async epoch_ended(epoch: Epoch) None[source]
await_ready(node_ids: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]], timeout: float = 10) None[source]

Wait until all the node_ids have announced themselves

async on_router(message: Message) None[source]
async on_identify(msg: IdentifyMsg) None[source]
async on_status(msg: StatusMsg) None[source]

Node Runner

class NodeRunner(spec: NodeSpecification, runner_id: str, command_outbox: str, command_router: str, input_collection: InputCollection, asset_specs: dict[str, AssetSpecification] | None = None, asset_generations: dict[str, list[tuple[str, ...]]] | None = None, edges: list[Edge] | None = None, protocol: str = 'ipc')[source]

Runner for a single node

  • DEALER to communicate with command inbox

  • PUB (outbox) to publish events

  • SUB (inbox) to subscribe to events from other nodes.

Assets & Mutation.

Assets behave slightly differently in the ZMQRunner and NodeRunner than they do in the other, local runners. Assets are objects that are shraed within some AssetScope - in turn implying that mutations of that object will also be shared. However since each Node runner runs in a separate process, shared asset state must work differently.

  • node -scoped assets work the same as other runners.

  • process -scoped assets are instantiated for every root Epoch by the nodes in the first topological generation that depends on the asset, and then the asset is forwarded on as an Event to subsequent generations. The asset will be initialized multiple times if there are multiple nodes in the same topological epoch that depend on the asset. The only guarantee which version of the asset downstream nodes will receive is the topology of the graph: e.g. if nodes a, b, c, d all depend on an asset, and a, b run in the first generation and c, d run in the second, c, d could receive the version of the asset emitted by either a, b.

  • runner -scoped assets are instantiated once per runner initialization by the nodes in the first topological epoch that depends on the asset. The same caveats as process-scoped assets apply. Additionally, unless an asset uses depends , mutations in later topo generations are not propagated to subsequent epochs. e.g. if nodes a, b depend on an asset and run in the first and second generation, and they both increment the asset by 1, the values will be

    • Epoch 0: {a: 1, b: 2};

    • Epoch 1: {a: 2, b: 3};

    Rather than a receiving 2 after the mutation from b.

As is the case with all runners, if you want to mutate an asset with multiple nodes in a tube, the safest and most predictable way to do that is to put the mutation order in the topology of the graph: only depend on an asset directly with the first node that mutates it, and pass the asset through the graph by emitting it as an event. To persist data between epochs, use depends to store the value after the last desired mutation. See Persisting Data Between Epochs

property outbox_address: str
property depends: tuple[tuple[str, str], ...] | None

(node, signal) tuples of the wrapped node’s dependencies

property subscribes_to: set[str][source]

The set of node IDs that we should subscribe to. All the nodes we depend on, and all those that we listen for mutated assets from

property inits_assets: set[str][source]

The set of assets that we should initialize since * They are node scoped and we depend on them, or * We are in the first topo generation that uses them

property publishes_assets: set[str][source]

The set of assets that we publish for downstream nodes to consume

property receives_assets_from: dict[str, set[str]][source]

The map of assets that we must receive from other nodes (since we are not in the first topo generation that uses them) to the set of nodes that we may receive them from.

property has_input: bool
property status: NodeStatus
classmethod run(spec: NodeSpecification, **kwargs: Any) None[source]

Target for multiprocessing.run, init the class and start it!

async await_inputs() AsyncGenerator[tuple[tuple[Any], dict[str, Any], Epoch]][source]

Iterate inputs as they are ready

Handle multiple types of running - process based: run a single epoch, or set of epochs at a time - free`run` based: run until told to stop

And multiple types of nodes - stateful: must call epochs and subepochs in order - stateless: call whichever epoch whenever the dependencies are met

async publish_events(events: list[Event], epoch: Epoch) None[source]
async init() None[source]
async deinit() None[source]

Deinitialize the node class after receiving on_deinit message and draining out the end of the _process_loop.

async identify() None[source]

Send the command node an announce to say we’re alive

async update_status(status: NodeStatus) None[source]

Update our internal status and announce it to the command node

async init_node() None[source]
async on_inbox(message: Message) None[source]
async on_announce(msg: AnnounceMsg) None[source]

Store map, connect to the nodes we depend on

async on_event(msg: EventMsg) None[source]
async on_start(msg: StartMsg) None[source]

Start running in free mode

async on_process(msg: ProcessMsg) None[source]

Process a single graph iteration

async on_stop(msg: StopMsg) None[source]

Stop processing (but stay responsive)

async on_deinit(msg: DeinitMsg) None[source]

Deinitialize the node, close networking thread.

Cause the main loop to end, which calls deinit

async error(err: Exception) None[source]

Capture the error and traceback context from an exception using traceback.TracebackException and send to command node to re-raise

async on_epoch_ended(msg: EpochEndedMsg) None[source]

Command node has told us that an epoch has ended. Under most conditions, we don’t need to be told this explicitly, but when we are a node that stores an asset from a later node, we need to mark the asset as done manually.

async await_node(epoch: Epoch | None = None) list[MetaEvent][source]

Block until a node is ready

Parameters:

epoch (Epoch, None) – if int , wait until the node is ready in the given epoch, otherwise wait until the node is ready in any epoch