I think this pipeline implementation does some things different from what I wanted (but did not precisely describe. It seems that each step is run right away, as it is "added", rather than collected and run when `terminate` is called. Also each step can only consume the result of the previous step, not the results of earlier steps. This can be worked around, by ending the pipeline and then starting multiple pipelines from the result of the first pipeline, if needed. I think you would need to import Generic and write something like `class Pipeline(Generic[T]):` as well? Or is `class Pipeline[T]:` a short form of that?

In my experiment I wanted to get a syntax like this:

    pipeline = Pipeline()

    ...some code here...

    pipeline.add_step(Step(...some meta data..., ...actual procedure to run...))
So then I would need generics for `Step` too and then Pipeline would need to change result type with each call of `add_step`, which seems like current type checkers cannot statically check.

I think your solution circumvents the problem maybe, because you immediately apply each step. But then how would the generic type work? When is that bound to a specific type?

> Or is `class Pipeline[T]:` a short form of that?

Yes, since 3.12.

> Pipeline would need to change result type with each call of `add_step`, which seems like current type checkers cannot statically check.

Sounds like you want a dynamic type with your implementation (note the emphasis). Types shouldn't change at runtime, so a type checker can perform its duty. I'd recommend rethinking the implementation.

This is the best I can do for now, but it requires an internal cast. The caller side is type safe though, and the same principle as above applies:

    from functools import reduce
    from typing import cast, Any, Callable, Mapping, TypeVar


    def _id_fn[T](value: T) -> T:
        return value


    class Step[T, U]:
        def __init__(
            self,
            metadata: Mapping[str, Any],
            procedure: Callable[[T], U],
        ) -> None:
            self._metadata = metadata
            self._procedure = procedure

        def run(self, value: T) -> U:
            return self._procedure(value)
        

    TInput = TypeVar('TInput')
    
    
    class Pipeline[TInput, TOutput = TInput]:
        def __init__(
            self,
            steps: tuple[*tuple[Step[TInput, Any], ...], Step[Any, TOutput]] | None = None,
        ) -> None:
            self._steps: tuple[*tuple[Step[TInput, Any], ...], Step[Any, TOutput]] = (
                steps or (Step({}, _id_fn),)
            )
        
        def add_step[V](self, step: Step[TOutput, V]) -> 'Pipeline[TInput, V]':
            steps = (
                *self._steps,
                step,
            )
        
            return Pipeline(steps)

        def run(self, value: TInput) -> TOutput:
            return cast(
                TOutput,
                reduce(
                    lambda acc, val: val.run(acc),
                    self._steps,
                    value,
                ),
            )


    def _float_to_int(value: float) -> int:
        return int(value)


    def _int_to_str(value: int) -> str:
        return str(value)


    def main() -> None:
        step_a = Step({}, _float_to_int)
        step_b = Step({}, _int_to_str)

        foo = Pipeline[float]()\
            .add_step(step_a)\
            .add_step(step_b)\
            .run(3.14)
        print(foo)

        bar = Pipeline[float]()\
            .run(3.14)
        print(bar)


    if __name__ == '__main__':
        main()

Thanks for your efforts. I didn't expect and couldn't expect anyone to invest time into trying to make this work. I might try your version soon.