Transducers Explained: Pipelines
This is the second article of the Transducers Explained tutorial. In the first article, we progressed from reducing functions to transformers to using the map
transducer with transduce
. In this article, we will introduce four new transducers: filter
, remove
, drop
and take
. We will show how transducers can be composed into pipelines and talk about the order of transformation. We will also modify our reduce
implementation to handle early termination with reduced
values.
So, where were we?
Transformer
In the first article, we formalized the steps to reduce
by defining a transformer protocol.
All transformers contain 3 methods:
- Initialize transformation with initial value,
init
- Combine value with each item using reducing function,
step
- Convert the last value to final output using
result
In the first article, we transformed input sources using the reducing functions mult
, add
and append
.
function append(value, item){ value.push(item); return value; }
In this article, we will only use append
, which pushes items onto the end of an array.
Reduce
We defined our own implementation of reduce
which accepts:
- A transformer or reducing function to be wrapped as a transformer
- An initial value (like
[]
) - An input source (like an array to reduce)
The current implementation uses the step
function of the transformer as the reducing function for the native array reduce. We will modify the implementation later in this article.
During iteration, the step
function is called with two parameters: value
and item
. The initial value is supplied by the caller, and every subsequent value uses the return value of the step
function.
The item is provided by some process of iteration. We showed two processes in the first article: reducing over elements of an array and manually calling the step
function with each new item (we will see more examples in a future article).
Transducer
We created the mapping transducer.
function map(f){ return function(xf){ return { init: function(){ return xf.init(); }, step: function(value, item){ var mapped = f(item); return xf.step(value, mapped); }, result: function(value){ return xf.result(value); } } } }
The map
transducer accepts a mapping function, f
, and returns a transformer which:
- Accepts an existing transformer
- Returns a new transformer which maps items by
f
- Delegates additional handling to the wrapped transformer
The function f
can be any function that accepts an item and maps it to a new value.
function plus1(input){ return input + 1; } function plus2(input){ return input + 2; }
We'll use these simple functions again for our examples.
Transduce
We defined a new function, transduce
that accepts:
- A transducer that defines the transformation
- A stepper function or transformer (like
append
) - An initial value for the the stepper function (like
[]
) - The input source (e.g. an array to transform)
The transducer is called with the stepper
function to create a transformer, which is then passed to reduce
with the initial value and input source.
We showed that we can use the same transducer to create different results by changing the initial value and stepper function provided to transduce
.
Composition
Finally, we showed that you can create new transducers by composing existing ones.
function compose2(fn1, fn2){ return function(item){ var value = fn2(item); value = fn1(value); return value; } } var transducer = compose2( compose2(map(plus1), map(plus2)), map(plus1)); var stepper = append; var init = []; var input = [2,3,4]; var output = transduce(transducer, stepper, init, input); // [6,7,8]
The transformation of each composed transducer is run left-to-right.
This is where we will pick up today. If this example does not feel familiar, please refer to the first article.
Pipelines
To begin, let's define a new function that composes an arbitrary number of functions passed as arguments.
function compose(/*fns*/){ var fns = arguments; return function(xf){ var i = fns.length - 1; for(; i >= 0; i--){ xf = fns[i](xf); } return xf; } }
This is just an extension of compose2
defined above. Function composition is right-to-left: the returned function accepts an initial argument used to call the last function on the right. The return value of that call is used as the argument to the next function to the left. This process is repeated for all functions passed to compose
.
We can create plus4
by composing plus1
and plus2
.
// calling manually var value = plus1(plus1(plus2(5))); // 9 // using composed function (allows reuse) var plus4 = compose(plus1, plus1, plus2); var value = plus4(5); // 9
Let's create a plus5
tranducer by composing map
transducers.
var transducer = compose( map(plus1), // [3,4,5] map(plus2), // [5,6,7] map(plus1), // [6,7,8] map(plus1)); // [7,8,9] var stepper = append; var init = []; var input = [2,3,4]; var output = transduce(transducer, stepper, init, input); // [7,8,9]
The comments next to each transducer describe each transformation as an array, as it is easier to follow the transformation a step at a time.
Remember though, that transducers step through the transformation one item at a time without creating intermediate aggregates. The comments are only indicators to show what happens as the result of the transformation on each step of the pipeline. We'll discuss this more as we progress.
It turns out that, although composition is right-to-left, the order of transformation is actually left to right (top-down as displayed in example). The order does not matter in this example, but we'll see that it does with the next example.
Filter
Let's define the filter
transducer.
function filter(predicate){ return function(xf){ return { init: function(){ return xf.init(); }, step: function(value, item){ var allow = predicate(item); if(allow){ value = xf.step(value, item); } return value; }, result: function(value){ return xf.result(value); } } } }
Notice that the transformer returned by filter
only delegates to the next transformer if the predicate returns a truthy value. If predicate returns a falsey value, the item is ignored, and the previous result is returned unchanged.
Let's create a transducer that filters all odd values.
function isOdd(num){ return num % 2 === 1; } var transducer = filter(isOdd); var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [1,3,5]
We created a transducer that filters items using the isOdd
predicate. We then use transduce
over an input array. The output array only contains odd values.
Let's also create a function that returns a predicate to check equality.
// another predicate function isEqual(y){ return function(x){ return x === y; } } var transducer = filter(isEqual(2)); var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [2]
We can see that creating the predicate isEqual(2)
will only allow the number 2 in the output array.
One more helper that accepts another predicate and negates it:
function not(predicate){ return function(x){ return !predicate(x); } } var transducer = filter(not(isEqual(2))); var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [1,3,4,5]
This modifies the previous example to negate the isEqual(2)
predicate, which creates a transducer that removes all twos from the input source.
We now have an additional weapon in our pipeline arsenal. Let's wield it.
Pipeline order
Let's increment each item and then filter odd values.
var transducer = compose( map(plus1), // [2,3,4,5,6] filter(isOdd)); // [3,5] var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [3,5]
The map(plus1)
transducer runs first and increments each item then calls step
on the next transformation, filtering all odd values.
Let's swap these transducers and see what happens.
var transducer = compose( filter(isOdd), // [1,3,5] map(plus1)); // [2,4,6] var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [2,4,6]
Here we first filter all odd values. The filter(isOdd)
transformer will only step items that are odd to the next transformer. All stepped items are mapped with plus1
.
This demonstrates two important properties of composed transducers:
- Although composition is right-to-left, transformation happens left-to-right
- It may be more efficient to use transducers that reduce the number of items earlier in the pipeline, if possible.
Notice in the last example that map(plus1)
is only called with a subset of values. Again, intermediate arrays are not created, only commented for illustration.
Remove
Ready for another tranducer?
function remove(predicate){ return filter(not(predicate)); }
That was easy. It turns out that we can create the remove
transducer by negating the predicate and reusing filter
.
Here it is in action.
var transducer = compose( filter(isOdd), // [1,3,5] map(plus1), // [2,4,6] remove(isEqual(4))); // [2,6] var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [2,6]
We first filter odd items, then increment each, then remove 4
.
Drop
What if we want to skip n
items at the beginning of the iteration? This is a job for the drop
transducer.
function drop(n){ return function(xf){ var left = n; return { init: function(){ return xf.init(); }, step: function(value, item){ if(left > 0){ left--; } else { value = xf.step(value, item); } return value; }, result: function(value){ return xf.result(value); } } } }
You can use it like so:
var transducer = drop(2); var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [3,4,5]
Drop accepts the number of items to drop from the front of the list. It is our first example of a transducer that creates a stateful transformation. Each time the drop
transducer is called to create a transformation, the variable left
is created to keep track of how many items are left to drop for that transformation, initialized to an initial value of n
.
Notice that we created a stateful transformer using a stateless transducer. This is an important distinction. It means we can reuse the drop(2)
transducer as many times as we want without worrying about any state. State is created with the transformer, not the transducer.
Take
What if, instead of dropping items, we wanted to take the first n
items and drop the rest. We will assume n > 0
to ease implementation.
Let's give it a shot.
function take(n){ return function(xf){ var left = n; return { init: function(){ return xf.init(); }, step: function(value, item){ value = xf.step(value, item); if(--left <= 0){ // how do we stop??? } return value; }, result: function(value){ return xf.result(value); } } } }
Uh-oh. We've hit a snag. We know how to step each item and maintain a count of how many items are left by keeping transformer state. But how do we stop the iteration of the rest of the items?
Why do we want to signal that we are done accepting any additional items? Not only is it wasteful, there is also no guarantee that our iteration will complete. It may be infinite. We will certainly want to stop infinite iteration if we can.
So how do we signal early termination? We're going to have to take another look at our source of iteration: transduce
.
Reduce redux
Here are our current definitions of transduce
and reduce
from the first article:
function transduce(transducer, stepper, init, input){ if(typeof stepper === 'function'){ stepper = wrap(stepper); } var xf = transducer(stepper); return reduce(xf, init, input); } function reduce(xf, init, input){ if(typeof xf === 'function'){ xf = wrap(xf); } // how do we stop?? var value = input.reduce(xf.step, init); return xf.result(value); } function wrap(stepper){ return { init: function(){ throw new Error('init not supported'); }, step: stepper, result: function(value){ return value; } } }
If we take a look at the implementation, we will remember that we are currently iterating with the native array reduce using the reducing function from our transformer. In a future article, we will remove the assumption that the input source is an array, but we will stick with it for now. Let's define our own implementation of arrayReduce
.
function reduce(xf, init, input){ if(typeof xf === 'function'){ xf = wrap(xf); } return arrayReduce(xf, init, input); } function arrayReduce(xf, init, array){ var value = init; var idx = 0; var length = array.length; for(; idx < length; idx++){ value = xf.step(value, array[idx]); // We need to break here, but how do we know? } return xf.result(value); }
The arrayReduce
implementation accepts a transformer, initial value and input array. It then iterates over each item using a for
loop and calls the transformer step
function with the value
and array item
.
We need a way to break out of this loop, depending on some marker value. As luck would have it, there is an existing transducer protocol we can adopt.
To signal early termination after calling step
on a transformer, we can wrap our reduced value in an object with two attributes:
value
with the actual wrapped value__transducers_reduced__
oftrue
signals the object is reduced.
Which leads to our implementation.
function reduced(value){ return { value: value, __transducers_reduced__: true } }
We will also add a predicate to determine if a value is reduced.
function isReduced(value){ return value && value.__transducers_reduced__; }
In addition, we need a way to extract, or deref
the reduced value.
function deref(reducedValue){ return reducedValue.value; }
We can now adjust arrayReduce
to handle early termination of reduced values.
function arrayReduce(xf, init, array){ var value = init; var idx = 0; var length = array.length; for(; idx < length; idx++){ value = xf.step(value, array[idx]); if(isReduced(value)){ value = deref(value); break; } } return xf.result(value); }
We iterate each item as we did before, but after every call to step
we check to see if we have a reduced value. If so, we extract the value and terminate the iteration (break the loop). We still call result
on the final value regardless of whether it came from a reduced value or from a complete iteration.
Take 2
Now we are ready to complete our implementation of take:
function take(n){ return function(xf){ var left = n; return { init: function(){ return xf.init(); }, step: function(value, item){ value = xf.step(value, item); if(--left <= 0){ // we are done, so signal reduced value = reduced(value); } return value; }, result: function(value){ return xf.result(value); } } } }
The only thing we were missing is wrapping the value with reduced
after we detected the transformation was complete.
Let's see if it works:
var transducer = take(3); var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [1,2,3]
Sure does.
Just like any other transducer, you can compose drop
and take
in a pipeline.
var transducer = compose( drop(1), // [2,3,4,5] take(3), // [2,3,4] drop(1)); // [3,4] var stepper = append; var init = []; var input = [1,2,3,4,5]; var output = transduce(transducer, stepper, init, input); // [3,4]
The first drop
skips the first item then steps every remaining item to the next transformer. The take
transformer steps the first 3 items received from the first drop
and then terminates the iteration. The last drop
drops the first item sent from take
and steps the remaining two items sent before termination.
Conclusion of part 2...
We began by summarizing what we learned in the first article. We added four new transducers: filter
, remove
, take
, drop
. We created transformer pipelines by composing transducers and saw that the order of transformation is left-to-right.
We saw that in addition to changing items during transformation, a transformer could decide to skip any item by not calling step
on the next transformer. It is the implementation of each transformer that decides what to step to the next one. There are cases where a transformer may send multiple values, for example, cat
or transduce-string.
We also saw examples of transducers that create stateful transformations. The state is managed by the transformer, not the transducer. This allows reuse of stateless transducers, even if the transformers they create manage state.
When implementing take
, we realized we needed to add a method for signaling early termination. We changed our reduce
implementation to handle and unwrap reduced values and implemented take
to terminate the iteration on completion.
Anything else?
There are still a few loose ends to tie up in our final article in the introductory tutorial. We have still glossed over the purpose of transformer init
and reduce
, we will address this. We will add into
and generalize our reduce
implementation to support iterators.
We will also see the input source could be anything that produces a sequences of values: lazy lists, indefinite sequence generation, CSP, Node.js streams, iterators, generators, immutable-js data structures, etc.
Want to be notified of new articles? Follow simplectic on Twitter.
I'm ready now!
Ready to use tranducers? You should be in good shape if you made it through this article: see transducers-js and transducers.js. We followed the transducers-js implementation closely, but the concepts apply to transducers.js as well.
Also check out underarm if you like Underscore.js. It is based on the transduce libraries that allow defining APIs against a common protocol supported by both transducers.js and transducers-js.