Source code for noob.testing.nodes

import asyncio
import random
import string
from collections.abc import Generator, Iterator
from datetime import datetime
from itertools import count
from itertools import cycle as cycle_
from time import sleep
from typing import Annotated as A
from typing import Any

from faker import Faker
from pydantic import Field

from noob import Name, NodeSpecification, process_method
from noob.edge import Signal, Slot
from noob.event import MetaSignal
from noob.node import Node
from noob.types import Epoch, EventMap


[docs] def count_source( limit: int = 10000, start: int = 0 ) -> Generator[A[int, Name("index")], None, None]: counter = count(start=start) if limit == 0: while True: yield next(counter) else: while (val := next(counter)) < limit: yield val
[docs] def letter_source() -> Generator[A[str, Name("letter")]]: yield from cycle_(string.ascii_lowercase)
[docs] def word_source() -> Generator[A[str, Name("word")]]: fake = Faker() while True: word = fake.word() if len(word) > 1: yield word
[docs] def multi_words_source(n: int) -> Generator[A[list[str], Name("multi_words")]]: fake = Faker() while True: yield [fake.unique.word() for _ in range(n)]
[docs] def sporadic_word(every: int = 3) -> Generator[A[str, Name("word")] | None, None, None]: fake = Faker() i = 0 while True: i += 1 if i % every == 0: yield fake.word() else: yield None
[docs] def word_counts() -> Generator[tuple[A[str, Name("word")], A[list[int], Name("counts")]]]: fake = Faker() while True: n_counts = random.randint(2, 5) yield fake.unique.word(), [random.randint(1, 100) for _ in range(n_counts)]
[docs] def multiply(left: int, right: int = 2) -> int: """ Return value purposely unnamed, to be used as `{nodename}.value` """ return left * right
[docs] def divide(numerator: int, denominator: int = 5) -> A[float, Name("ratio")]: return numerator / denominator
[docs] def concat(strings: list[str]) -> str: return "".join(strings)
[docs] def multi_concat(**kwargs: list[str]) -> str: return "".join("".join(letter for letter in word) for word in kwargs.values())
[docs] def exclaim(string: str, hype: int = 1) -> str: return string + ("!" * hype)
[docs] def repeat(string: str, times: int) -> str: return string * times
[docs] def dictify(key: str, items: list[Any]) -> dict[str, Any]: return {key: items}
[docs] def error(value: Any) -> None: raise ValueError("This node just emits errors")
[docs] class CountSource(Node): limit: int = 1000 start: int = 0
[docs] def process(self) -> Generator[A[int, Name("index")], None, None]: counter = count(start=self.start) while (val := next(counter)) < self.limit: yield val
[docs] class UnannotatedGenerator(Node): limit: int = 1000 start: int = 0
[docs] def process(self): # noqa: ANN201 counter = count(start=self.start) while (val := next(counter)) < self.limit: yield val
[docs] class Multiply(Node):
[docs] def process(self, left: int, right: int = 2) -> A[int, Name("product")]: return multiply(left=left, right=right)
[docs] class VolumeProcess: def __init__(self, height: int = 2): self.height = height
[docs] def process(self, width: int, depth: int) -> A[int, Name("volume")]: return self.height * multiply(left=width, right=depth)
[docs] class Volume: def __init__(self, height: int = 2): self.height = height
[docs] @process_method def volume(self, width: int, depth: int) -> A[int, Name("volume")]: return self.height * multiply(left=width, right=depth)
[docs] class Now: def __init__(self): self.now = datetime.now()
[docs] @process_method def print(self, prefix: str = "Now: ") -> A[str, Name("timestamp")]: return f"{prefix}{self.now.isoformat()}"
[docs] class CountSourceDecor: def __init__(self, start: int = 0) -> None: self.gen = count(start=start)
[docs] @process_method def process(self) -> Generator[A[int, Name("count")], None, None]: yield from self.gen
[docs] def input_party( one: int, two: float, three: str, four: bool, five: list, six: dict, seven: set ) -> A[bool, Name("works")]: return True
[docs] def long_add(value: float, sleep_for: float = 0.25) -> float: sleep(sleep_for) return value + 1
[docs] async def number_to_letter(number: int, offset: int = 0) -> str: sleep_for = random.random() / 10 await asyncio.sleep(sleep_for) return string.ascii_lowercase[(number + offset) % len(string.ascii_lowercase)]
[docs] class NumberToLetterCls: def __init__(self, offset: int = 0): self.offset = offset
[docs] async def process(self, number: int) -> str: sleep_for = random.random() / 10 await asyncio.sleep(sleep_for) return string.ascii_lowercase[(number + self.offset) % len(string.ascii_lowercase)]
[docs] async def async_error(value: Any) -> None: """Just raise an error!""" raise ValueError("This is the error that should be raised")
[docs] class StatefulMultiply: def __init__(self, start: int = 0) -> None: self.start = start self.current = self.start
[docs] def process(self, left: float, right: float = 1) -> float: value = left * right * self.current self.current += 1 return value
[docs] def fast_forward(generator: count, n: int = 1) -> tuple[A[int, Name("next")]]: for _ in range(n): val = next(generator) return val
[docs] def jump(generator: count, n: int = 1) -> tuple[A[count, Name("skirttt")], A[int, Name("next")]]: value = 0 for _ in range(n): value = next(generator) return generator, value
[docs] def rewind(generator: count, n: int = 1) -> A[count, Name("skrittt")]: """Purposely designed to mutate the input and return a new object""" return count(next(generator) - n)
[docs] def zip_iter(*args: Iterator) -> tuple[Any, ...]: return tuple(next(a) for a in args)
[docs] def increment( iterator: Iterator[int], increment: int = 1 ) -> tuple[A[Iterator[int], Name("iterator")], A[int, Name("value")]]: for _ in range(increment): value = next(iterator) return iterator, value
[docs] def passthrough(value: Any, *args: Any, **kwargs: Any) -> Any: return value, args, kwargs
[docs] class InitCounter(Node): """Count how many times we have been initialized and deinitalized""" def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self._inits = 0 self._deinits = 0
[docs] def process(self) -> tuple[A[int, Name("inits")], A[int, Name("deinits")]]: # sleep to just not have this flood the networking modules. sleep(0.01) return self._inits, self._deinits
[docs] def init(self) -> None: self._inits += 1
[docs] def deinit(self) -> None: self._deinits += 1
[docs] def inject_epoch(epoch: Epoch) -> Epoch: return epoch
[docs] def inject_eventmap(special_value: Any, events: EventMap) -> EventMap: return events
[docs] def just_wait(value: Any, wait_for: float = 0.5) -> Any: sleep(wait_for) return value
[docs] def switch() -> ( Generator[tuple[A[str, Name("fruits")], A[str, Name("vegetables")], A[str, Name("minerals")]]] ): """Yield in a cycle from different signals, noeventing in the others""" fruits = cycle_(["apple", "banana", "cherry"]) vegetables = cycle_(["daikon", "eggplant", "fiddlehead"]) minerals = cycle_(["galaxite", "halite", "iolite"]) while True: yield next(fruits), MetaSignal.NoEvent, MetaSignal.NoEvent yield MetaSignal.NoEvent, next(vegetables), MetaSignal.NoEvent yield MetaSignal.NoEvent, MetaSignal.NoEvent, next(minerals)
[docs] def this_or_that( this: Any | None = None, that: Any | None = None, the_other: Any | None = None ) -> dict: """Optional inputs!""" ret = {} if this is not None: ret["this"] = this if that is not None: ret["that"] = that if the_other is not None: ret["the_other"] = the_other return ret
[docs] class DynamicSignals(Node): """Node whose signals and slots are dynamic""" signals_: list[str] = Field(default_factory=list) slots_: list[str] = Field(default_factory=list)
[docs] def process(self, **kwargs: Any) -> Any | None: return tuple(v for v in kwargs.values())
[docs] @classmethod def get_signals(cls, spec: NodeSpecification | None = None) -> dict[str, Signal]: if not spec or not spec.params: return {} return {sig: Signal(name=sig, annotation=Any) for sig in spec.params.get("signals_", [])}
[docs] @classmethod def get_slots(cls, spec: NodeSpecification | None = None) -> dict[str, Slot]: if not spec or not spec.params: return {} return { sig: Slot(name=sig, annotation=Any, required=True) for sig in spec.params.get("slots_", []) }
[docs] def cycle(start: int = 0, stop: int = 100, step: int = 1) -> Generator[float]: while True: yield from range(start, stop, step)