Source code for noob.event

import sys
from enum import StrEnum
from typing import Annotated as A
from typing import Any, Generic, Literal, TypeAlias

from pydantic import AfterValidator, Discriminator, Tag, TypeAdapter

from noob.const import META_SIGNAL
from noob.types import Epoch, Picklable, SerializableDatetime

if sys.version_info < (3, 12):
    from typing_extensions import TypedDict, TypeVar
elif sys.version_info < (3, 13):
    from typing import TypedDict

    from typing_extensions import TypeVar
else:
    from typing import TypedDict, TypeVar

_TEvent = TypeVar("_TEvent", default=Any)


[docs] class Event(TypedDict, Generic[_TEvent]): """ Container for a single value returned from a single :meth:`.Node.process` call """ id: int """Unique ID for each event""" timestamp: SerializableDatetime """Timestamp of when the event was received by the :class:`.TubeRunner`""" node_id: str """ID of node that emitted the event""" signal: str """name of the signal that emitted the event""" epoch: Epoch """Epoch number the event was emitted in""" value: Picklable[_TEvent] """Value emitted by the processing node"""
[docs] def is_event(instance: Any) -> bool: """ TypedDicts don't support instancechecks by default, we want to use typed dicts for perf's sake, but we still also would like to check if something is an event """ if not isinstance(instance, dict): return False return all(key in instance for key in Event.__annotations__)
[docs] class MetaEventType(StrEnum): """ Types of meta events emitted by tubes, schedulers, stores, and runners. """ NodeReady = "NodeReady" """ A node was made ready in the toplogical sorting graph. The value of the event is the node_id of the node that is ready. """ EpochEnded = "EpochEnded" """ An epoch has ended, the value of the event contains which epoch has ended """
[docs] class MetaEvent(Event): """ All events generated by internal processes rather than nodes. Used to coordinate the tube as well as allow code to hook into tube execution. These are not stored in the :class:`.EventStore`, but emitted by callbacks and consumed internally. See :class:`.MetaEventType` for descriptions of the types of MetaEvents. """ # mypy doesn't allow narrowing types in typed dicts? node_id: Literal["meta"] # type: ignore signal: MetaEventType # type: ignore
[docs] class MetaSignal(StrEnum): NoEvent = f"{META_SIGNAL}NoEvent"
def _type_discriminator(v: dict | Event | MetaEvent) -> str: if v.get("node_id", None) == "meta": return "meta" else: return "event" def _meta_signals_to_enum(evt: Event) -> Event: """If we are a meta signal, ensure the value is cast to the enum rather than string value""" if ( isinstance(evt["value"], str) and evt["value"].startswith(META_SIGNAL) and (metaevt_key := evt["value"].replace(META_SIGNAL, "")) in MetaSignal.__members__ ): evt["value"] = MetaSignal.__members__[metaevt_key] return evt EventUnion = A[ A[ Event, AfterValidator(_meta_signals_to_enum), Tag("event"), ] | A[MetaEvent, Tag("meta")], Discriminator(_type_discriminator), ] EventAdapter = TypeAdapter[EventUnion](EventUnion) _NoEventableInner = TypeVar("_NoEventableInner") NoEventable: TypeAlias = ( _NoEventableInner | "Event[_NoEventableInner]" | Literal[MetaSignal.NoEvent] ) """Convenience generic type to indicate that some signal can be a NoEvent"""