Source code for noob.node.map
import uuid
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import Annotated as A
from typing import TypeVar
from noob.event import Event, MetaSignal
from noob.node.base import Node
from noob.types import Epoch, Name
_TInput = TypeVar("_TInput")
[docs]
class Map(Node):
"""
Cardinality expansion
Given a node that emits 1 (iterable) event, split it into separate events.
```{admonition} Implementation Note
Map is a *special node* -
it is the only node where returning a list of values is interpreted as multiple events.
For all other nodes,
returning a list of values is interpreted as a single event with a list value.
To return multiple events from a single node, chain a `map` after it.
```
"""
stateful: bool = False
[docs]
def process(
self, value: Sequence[_TInput], epoch: Epoch
) -> tuple[A[list[Event[_TInput]] | MetaSignal, Name("value")], A[int | MetaSignal, Name("n")]]:
if len(value) == 0:
return MetaSignal.NoEvent, MetaSignal.NoEvent
subepochs = epoch.make_subepochs(self.id, len(value))
now = datetime.now(UTC)
ret = []
for item, subepoch in zip(value, subepochs):
ret.append(
Event(
id=uuid.uuid4().int,
timestamp=now,
node_id=self.id,
signal="value",
epoch=subepoch,
value=item,
)
)
return ret, len(ret)