Higher-Order Transform Streams: Sequentially Injecting Streams Within Streams
August 23, 2025
Streaming Parallel Recursive AI Swarms
Here’s a fun demo: 50 LLM agents streaming recursively and in parallel to summarize the entire Star Wars franchise in order, in 30 seconds. Normally this would be a 5½ minutes blocking operation, but with stream injection, you can stream different parts of the response simultaneously.
In this contrived example, I built a franchise series summary agent swarm with three levels of agents: a franchise agent that lists the installments, an installment agent that lists out the acts, and an act synopsis agent that provides a detailed, multi-paragraph synopsis of each act (watch until the end):
This produces a lot of tokens very fast. So many, in fact, that this run on Claude 4 Sonnet cost me 30¢ (a single run on 4 Opus costs $1.50)! To compare the speed, I ran the same task through a monolithic prompt in a single agent call.
The traditional monolithic prompt produced about 10k tokens in 5½ minutes, producing 30 tokens/sec. However, with the parallel streaming approach, it produced about 8.7k tokens in 30 seconds, spawning 50 agents with a blended rate of 281 tokens/sec!
This is a parallel recursive streaming AI swarm in action. By injecting granular child agents, it enables recursive parallel streams. In this example, it’s summarizing Star Wars. The output is sequential, but the child agents are not blocked. After the main franchise agent spawns a child agent, it keeps generating tokens. While that first child agent is still running, it can spawn another child agent. These child agents can then spawn another child agent, and another, all running simultaneously. The client gets tokens the instant they’re ready, while downstream tokens that were already produced get unleashed immediately. This enables streams to produce other streams running in parallel while maintaining sequential order.
Watch the demo closely and notice it begins by printing movie titles. To start, there’s a network delay while it swaps to the act-writing stream (this can be reduced with HTTP request pre-warming or with cluster-local LLMs to decrease round-trip time). Then the act agent starts streaming the first act of “The Phantom Menace”. By the time Act 1 completes, the remaining acts are almost done, causing it to rapidly jump between acts. Blink and you’ll miss it: once the first movie completes, we start jumping entire movies at a time, first to the end of “Attack of the Clones” then to the end of “Revenge of the Sith”. By the time the prequels are complete, all 9 movies are already done and we zoom ahead to the end. The tokens come so fast, the output needed to be throttled just to show what’s going on.
The speed is insane, but it unlocks more power by splitting up agents to simplify prompts. If you want to play around with it, check out my proof of concept repo.
Stream Injection
The ability to sequentially inject a stream within another stream enables remarkably powerful behavior. Each injected stream can perform its own work in parallel, while consumers of the output stream still get content in order without worrying about what’s happening inside the stream. Streams composed within streams are higher-order streams.
When combined with streaming LLM calls, stream injection lets us split complex monolithic prompts into child agents, each with their own simpler, more focused prompts, solving vast swaths of prompt complexity issues. Additionally, stream injection allows each of the child streams to run in parallel, simultaneously constructing different parts of the response ahead of what’s currently streaming. Not only do we get better answers, they finish at a ludicrous speed.
Spawning parallel recursive AI agents was already possible on our backend, but existing methods break streaming output and often block on parent agent output before spawning children.
Prompt Complexity and Attention Dilution
I discovered this need while exploring a way to embed JSX component rendering in markdown responses to power AI agent responses at my company Vetted. I’m very excited about the new UI experiences it will power within our AI answers, but getting agents to output JSX component tags in the correct format has been pushing the latest LLM models to their limit. We hit a complexity wall where current model limits prevent us from adding more advanced component composition to our answers.
When a prompt gets too large and complicated on the backend, our standard approach has been to break it down into smaller tasks and synthesize those outputs into an answer. We’ve already implemented agent swarms in our backend to distribute research and keep agents accurate with tightly scoped tasks, but the problem with that approach on the frontend is that the client needs to support streaming.
On the backend, this isn’t an issue. We can run agents in parallel, have agents spawn subagents recursively, and wait for each block of work to complete before processing it. However, on the client, blocking is not an option. Our answers take a while to produce, and while users are willing to wait for high quality research, they want content to start streaming as early as possible. It’s the same as the time-to-first-paint problem: you don’t need the entire page ready and interactive immediately, but you need to show something is happening as soon as you can.
Breaking down a monolithic agent into simpler tasks would enable several very important capabilities. It would solve “attention dilution” problems where an overloaded prompt becomes unable to follow complex instructions accurately because instruction attention gets spread too thin. It would also enable optimizing our model choices. With a monolithic prompt, only one model can be used at a time, meaning tradeoffs must be made between speed, cost, and specialization. By breaking up a prompt, we could choose the optimal model for each subtask.
Stream injection also makes it possible to interpolate actively streaming agents with data from any external data source such as databases, caches, and APIs. An injected stream doesn’t need to be an AI stream; it could be any data stream. Injecting newly discovered work as data streams in can enable significant streaming orchestrations and optimization improvements.
To do all this, we need a way to provide ordered consumption of parallel production over streams.
Injecting and Hot Swapping Streams
So what’s the solution? How do you execute multiple prompts in parallel without breaking streaming?
First, you need a way to swap between streams. When a parent agent spawns a child agent, the child agent needs to take over streaming duties. Second, you need a way to split a stream in the middle of its response; injecting the child stream into the parent stream, then swapping to it and back. Third, you need to do this on the fly so a parent stream can spawn a child stream in the middle of stream execution.
Conceptually, you could do this with a transform stream that parses its input stream for directives to spawn a child stream. You could then orchestrate those streams to swap from the main stream to the child stream and back again. I later found out that these are called higher-order streams. They’re a common pattern in functional streaming, but don’t seem to have an equivalence in procedural streaming.
A Simple Solution to a Hard Problem
When I first started working on the solution, it seemed like I had my work cut out for me. At first, I thought I would need some complex central orchestrator to keep track of all the child agents and nested child agents and so forth, along with tremendous memory buffering overhead.
Then I started breaking down the problem more. I wanted to use every tool available, so I started reading the web streams spec front to back. While doing that, I was reminded of something: streams in JavaScript are also async iterables. That opened up a new angle I wasn’t considering. I didn’t need to orchestrate streams, I needed to orchestrate async iterables. What if, instead of a complex central orchestration layer, I could combine async iterables together?
Refreshing my memory on async iterables, I found the next key component. Generators allow you to delegate to other iterables using yield*
, enabling you to swap from one stream to another. Still, we needed the ability to spawn new work dynamically as we discovered it on the stream. The generator would start with a finite chain of iterators and end without adding any new child agents. I needed the generator to stop and wait for a new iterator when it ran out of iterators to delegate to. Naturally, this called for a promise.
At this point, I had all the components I needed. The solution was to combine async iterables, generator delegation, and an unresolved promise that would eventually resolve with the next iterable or a termination signal.
The Relay Pattern
I’ve looked extensively, and haven’t this pattern mentioned anywhere, so I called it the “Relay Pattern”. Like a relay race, each iterable runs through a stint. When it completes its stint, it awaits an iterable promise (waiting for the next racer) to be ready and then delegates the main iterator to the next iterator (passing the baton). At its core, it uses promises to coordinate delegation handoffs to the next iterator. This lets you chain new iterables on the fly without knowing about those iterables ahead of time. When a consumer consumes the main async iterable sequence, they can do so transparently without needing to be aware that a swap even happened.
While checking prior art, I found a similar approach done at the item level using yield
, but couldn’t find the pattern using yield*
. When paired with yield
, the pattern essentially turns a pull-only generator into a push/pull stream, but when yield*
is used instead, it turns it into a higher-order push/pull stream akin to the flatten
operation.
AsyncIterableSequencer
To implement this pattern, I’ve built a library: async-iterable-sequencer
. As the name suggests, it creates an async iterable that sequences other async iterables. The code is so simple I can paste it here. Excluding types, it’s only 25 lines of code:
export type AnyIterable<T> = AsyncIterable<T> | Iterable<T>;
export type Chainable<T> = AnyIterable<T> | null;
export type Chain<T> = (iterator: Chainable<T>) => void;
export interface AsyncIterableSequencerReturn<T> {
sequence: AsyncGenerator<T>;
chain: Chain<T>;
}
export function asyncIterableSequencer<T>(): AsyncIterableSequencerReturn<T> {
const queue: Promise<Chainable<T>>[] = [];
let resolver: Chain<T>;
const next = () => {
const { promise, resolve } = Promise.withResolvers<Chainable<T>>();
queue.push(promise);
resolver = (nextIterator) => {
next();
resolve(nextIterator);
};
};
next();
return {
sequence: (async function* () {
let iterator: Chainable<T> | undefined;
while ((iterator = await queue.shift())) {
yield* iterator;
}
})(),
chain: (iterator) => {
resolver(iterator);
},
};
}
The factory function returns an async iterator called sequence
and a bound function called chain
for adding new iterators. Because chain
fulfills a promise, it can be called both synchronously and asynchronously at any point in time.
This pattern allows us to create a chain of async iterables, and because web streams are a superset of async iterables, it enables us to enqueue entire streams; the main requirement for stream injection. As async iterables do not need to be complete at the time they’re chained, we can connect multiple batches of async work to be completed in any order and in parallel, while still maintaining their sequential ordering. Because this uses iterator delegation, there’s no additional buffering overhead for orchestrating the streams; we can use language level imperative constructs to do this in a cleaner way.
AsyncIterableSequencers
enable the parallel streaming part of parallel recursive streaming AI swarms, but we still need to solve the recursive part.
DelegateStream
That brings us to the next library I created: delegate-stream
. This library takes AsyncIterableSequencer
and combines it with a TransformStream
interface. A transform stream allows you to intercept a stream and process chunks as they pass through, optionally transforming chunks from an input stream, and enqueuing them onto an output stream. The difference is that a DelegateStream
doesn’t enqueue chunks like a transform stream; it chains entire streams as well as any other async iterables.
It is a higher-order version of TransformStream
that streams other streams. It’s akin to flatMap
in functional streaming libraries, but unlike flatMap
, it adheres to imperative interface-based streaming patterns allowing it to be used with common standard library stream interfaces like ReadableStream
with pipeThrough
.
This means that an in-progress stream can be parsed for a directive to generate a new stream, and then that stream can be injected into the middle of the in-progress output stream by chaining it before continuing. Since those injected streams can be another delegate stream, delegate streams can inject delegate streams recursively, fulfilling the recursive streaming part of parallel recursive streaming AI swarms.
While the chaining is recursive, the output is flattened. When consuming the stream, the internal implementation details are completely transparent, adding no additional requirements for consumption. While the concept of flattening streams exists in functional stream programming, I have yet to see higher-order streams implemented around a TransformStream
interface.
It’s a “Delegate” because it is powered by the yield*
used under the hood of AsyncIterableSequencer
. Yielding to another generator is called delegation so when a stream is added to the chain, it will eventually delegate iteration to the chained stream.
As async iterable sequencers are so simple, so are delegate streams:
import { asyncIterableSequencer, Chain } from "async-iterable-sequencer";
export interface DelegateStreamOptions<I, O> {
start?: (chain: Chain<O>) => void;
transform: (chunk: I, chain: Chain<O>) => void;
finish?: (chain: Chain<O>) => void;
}
export class DelegateStream<I, O> {
public readable: ReadableStream<O>;
public writable: WritableStream<I>;
constructor({ start, transform, finish }: DelegateStreamOptions<I, O>) {
const { sequence, chain } = asyncIterableSequencer<O>();
this.readable = ReadableStream.from<O>(sequence);
this.writable = new WritableStream<I>({
write: (chunk) => {
transform(chunk, chain);
},
close: () => {
finish?.(chain);
},
});
start?.(chain);
}
}
It’s little more than a class wrapper that implements the transform stream interface and exposes some lifecycle callbacks. All the magic happens within the async iterable sequencer. The delegate stream simply allows you to interface with it in the middle of another stream.
With all the building blocks in place, we can now build the engine to power parallel recursive streaming AI swarms.
A Missing Abstraction?
Surprisingly, I’ve failed to find any reference or implementation of higher-order transform streams. For first-order streams there’s an abstraction spectrum with async generators serving imperative pull streams, interface-based streams, and functional streams. These abstractions have their uses with a tradeoff between control and composability.
- Async generators are the least abstract, providing low-level imperative control, allowing you to suspend values with
yield
. They provide the maximum amount of control, exposing control flow operations; however, their semantics make them harder to compose. - Interface-based streams are in between, wrapping streams in a context that can encapsulate a procedure while exposing a controller for output emission. They find a balance, giving you procedural lifecycle control, with a middle ground for composability.
- Functional streams are the most abstract, coordinating pipelines with declarative transformations. While they have the least granular control based on function types, they provide the most safety and composability.
Looking at the transform use case, there’s a clear progression between the abstractions:
yield → TransformStream → stream.map
But what about higher-order streams of streams? There seems to be a gap:
yield* → ???Stream → stream.flatMap
What’s the interface-based stream equivalence in this progression? I couldn’t find it, which is why I implemented my own DelegateStream
.
AI Swarm Orchestration
For the AI swarm proof of concept demo, I’ve created a CLI tool called swarm-weaver
. It has two kinds of delegators: one for prompts and one for agent execution. The PromptDelegator
takes a stream of content that it interpolates into a prompt, and then chains an AgentDelegator
onto it once the prompt input stream is ready to be sent to an LLM provider for content generation. The output of that agent delegator is then parsed for child agent directives. For the proof of concept, I used XML tags.
Franchise Example In Depth
In the example video, it generated a synopsis of a media franchise using multiple layers of child agents. The entrypoint main
prompt is used to generate text containing XML directives which are used to spawn child agents. Here is an example of what those prompts look like (simplified for brevity):
main
prompt
You are an agent that outlines the installments of book series, movie franchises, and similar large-scale series. Your task is to take a user's request about a series and output a specific template format listing all installments.
**Input:**
{{_content_}}
**Output Format:**
# [series title]
## [installment title]
<installment series="[series name]" title="[installment title]"/>
## [installment title]
<installment series="[series name]" title="[installment title]"/>
{{_content_}}
is an interpolation placeholder that gets replaced with the prompt’s input stream. For the root prompt, this stream comes from stdin.
The tool also substitutes, {{_context_}}
with the tokens generated so far by the parent stream to make it easier to pass in the full context generated so far, however it’s not used in these examples.
When the prompt generates XML tags, those tags trigger a child prompt. In this example, the root main
agent spawns child agents for the installment
prompt:
installment
prompt
Generate chronological act summaries for the specified media content. Each act should be a single, concise sentence that captures the essential narrative progression and provides sufficient context for further elaboration.
**Input:**
Series: {{series}}
Title: {{title}}
**Output Format:**
### Act [number]
<act series="[series name]" title="[title]">
[Single sentence act summary]
</act>
Here, instead of receiving {{_content_}}
, the prompt receives attribute interpolation values for {{series}}
and {{title}}
. These come from the main
agent’s output of <installment series="[series name]" title="[installment title]"/>
. The installment
agent then spawns another child agent, feeding its output into the act
prompt:
act
prompt
You are tasked with creating an engaging 1-3 paragraph synopsis for the following media based on the provided act outline.
**Input:**
- Series: {{series}}
- Title: {{title}}
**Output Format:**
[1 - 3 paragraphs]
The act
prompt receives the series and title values again, as well as the special {{_content_}}
value, which is populated with the act summary that was generated inside the <act>
tag.
Executing the Swarm
Once the application starts, the main
agent executes first. It outputs something like this:
# Star Wars
## Episode I: The Phantom Menace
<installment series="Star Wars" title="Episode I: The Phantom Menace"/>
...
## Episode IX: The Rise of Skywalker
<installment series="Star Wars" title="Episode IX: The Rise of Skywalker"/>
Non-XML content is chained onto the output stream as a string, while XML tags trigger the chaining of a new prompt delegator. The chain created by the main
agent then looks like this:
stdout < string < installment_prompt < string < installment_prompt < ...
Once content sent to the installment
prompts is complete those prompt delegators chain the corresponding agent delegators in their place. At this point the chain looks like this:
stdout < string < installment_agent < string < installment_agent < ...
The installment
agents now generate their own content. The resulting output looks like this:
# Star Wars
## Episode I: The Phantom Menace
### Act 1
<act series="Star Wars" title="Episode I: The Phantom Menace">
Jedi Master Qui-Gon Jinn and his apprentice Obi-Wan Kenobi are dispatched to negotiate a trade dispute with the Trade Federation, but discover a sinister plot involving an invasion of the peaceful planet Naboo and encounter a young slave named Anakin Skywalker on Tatooine.
</act>
...
The chain has now been expanded with newly injected delegate streams:
stdout < string < act_agent < string < act_agent < ... < string < installment_agent < ...
Finally, the act
agents receive the summary sentences generated by the installment
agents and uses that context to generate a synopsis with more detail. As the summary sentence contains the starting and ending context of the act, the generated text flows seamlessly from one act to the next.
Each delegator expands its work directly in place within the chain, maintaining sequential order while creating new subtasks at its position.
This approach solves the problem of overly complicated prompts. By breaking problems down without interrupting streams, we can create focused prompts that do fewer things more effectively. Even better, breaking out the prompts unlocks ludicrous speed. You not only get the time-to-first-token benefits of streaming, but also a swarm of agents generating and expanding content in place and in parallel with no downside.
The biggest issue I’ve encountered is that the output is way too fast. Presenting it reasonably to the user, without huge chunks of content suddenly appearing, will require smoothing and throttling.
Meta-Prompting
Because I decided to represent child agent directives as XML tags, it got me thinking. What would happen if I put an agent inside another agent? As text inside a child agent tag is interpolated into its prompt, nesting one agent tag inside another should logically send the inner agent’s output to the outer agent’s prompt.
By sending the output of one agent to another, we enable meta-prompting: agents prompting agents. To accomplish this, I implemented a tag stack that allows inner agent tags to stream their content to the prompt delegator of the outer agent tag. This content can include yet another nested agent tag, allowing meta-prompts to stream recursively up the chain of nested agent tags.
With this approach, you can use sub-agents as idea generators or context generators for the containing agent. The inner tags don’t even have to spawn agents. They could be standard subroutines that stream in crawl data, a database call, an API call, or any other source of information you can think of.
I believe meta-prompting can enable incredibly powerful patterns, and with the relay pattern powering it all, content is unblocked, processed, and streamed as soon as it is resolved and available.
Beyond AI
While functional streaming libraries excel at pipeline processing with declarative transformations, the lifecycle control provided by interface-based streams simplifies certain architectural patterns, particularly those involving complex control flow with shared context. In the meta-prompting example, lifecycle hooks remain accessible while parsing nested tags and managing their nested streams.
By making higher-order streams accessible to other paradigms, I hope these powerful patterns can be integrated seamlessly into a wider range of environments. I’m currently exploring their use in streaming web rendering frameworks, where I’ve found lifecycle control helps modularize the complex data flow of modern frameworks.
This all started when I set out to solve a specific problem: parallel AI prompts couldn’t be streamed without breaking output order. Higher-order streams offered an elegant approach, but weren’t available in the interface-based streaming environment I needed. The relay pattern provided a solution in only 25 lines of code. It brings this powerful pattern from functional streaming paradigms to interface-based streams.
Any async operation can be injected as a stream. From database queries to API calls to file processing, anything can be composed and processed with higher-order streams. Finding the right abstraction cuts through complexity, and often the simplest solutions are the most powerful.