2014-10-23

Transducers Explained: Pipelines

This is the second article of the Transducers Explained tutorial. In the first article, we progressed from reducing functions to transformers to using the map transducer with transduce. In this article, we will introduce four new transducers: filter, remove, drop and take. We will show how transducers can be composed into pipelines and talk about the order of transformation. We will also modify our reduce implementation to handle early termination with reduced values.

So, where were we?

Transformer

In the first article, we formalized the steps to reduce by defining a transformer protocol.

All transformers contain 3 methods:

  1. Initialize transformation with initial value, init
  2. Combine value with each item using reducing function, step
  3. Convert the last value to final output using result

In the first article, we transformed input sources using the reducing functions mult, add and append.

function append(value, item){
  value.push(item);
  return value;
}

In this article, we will only use append, which pushes items onto the end of an array.

Reduce

We defined our own implementation of reduce which accepts:

  1. A transformer or reducing function to be wrapped as a transformer
  2. An initial value (like [])
  3. An input source (like an array to reduce)

The current implementation uses the step function of the transformer as the reducing function for the native array reduce. We will modify the implementation later in this article.

During iteration, the step function is called with two parameters: value and item. The initial value is supplied by the caller, and every subsequent value uses the return value of the step function.

The item is provided by some process of iteration. We showed two processes in the first article: reducing over elements of an array and manually calling the step function with each new item (we will see more examples in a future article).

Transducer

We created the mapping transducer.

function map(f){
  return function(xf){
    return {
      init: function(){
        return xf.init();
      },
      step: function(value, item){
        var mapped = f(item);
        return xf.step(value, mapped);
      },
      result: function(value){
        return xf.result(value);
      }
    }
  }
}

The map transducer accepts a mapping function, f, and returns a transformer which:

  1. Accepts an existing transformer
  2. Returns a new transformer which maps items by f
  3. Delegates additional handling to the wrapped transformer

The function f can be any function that accepts an item and maps it to a new value.

function plus1(input){
  return input + 1;
}

function plus2(input){
  return input + 2;
}

We'll use these simple functions again for our examples.

Transduce

We defined a new function, transduce that accepts:

  1. A transducer that defines the transformation
  2. A stepper function or transformer (like append)
  3. An initial value for the the stepper function (like [])
  4. The input source (e.g. an array to transform)

The transducer is called with the stepper function to create a transformer, which is then passed to reduce with the initial value and input source.

We showed that we can use the same transducer to create different results by changing the initial value and stepper function provided to transduce.

Composition

Finally, we showed that you can create new transducers by composing existing ones.

function compose2(fn1, fn2){
  return function(item){
    var value = fn2(item);
    value = fn1(value);
    return value;
  }
}

var transducer = compose2(
      compose2(map(plus1), map(plus2)),
      map(plus1));
var stepper = append;
var init = [];
var input = [2,3,4];
var output = transduce(transducer, stepper, init, input);
// [6,7,8]

The transformation of each composed transducer is run left-to-right.

This is where we will pick up today. If this example does not feel familiar, please refer to the first article.

Pipelines

To begin, let's define a new function that composes an arbitrary number of functions passed as arguments.

function compose(/*fns*/){
  var fns = arguments;
  return function(xf){
    var i = fns.length - 1;
    for(; i >= 0; i--){
      xf = fns[i](xf);
    }
    return xf;
  }
}

This is just an extension of compose2 defined above. Function composition is right-to-left: the returned function accepts an initial argument used to call the last function on the right. The return value of that call is used as the argument to the next function to the left. This process is repeated for all functions passed to compose.

We can create plus4 by composing plus1 and plus2.

// calling manually
var value = plus1(plus1(plus2(5)));
// 9

// using composed function (allows reuse)
var plus4 = compose(plus1, plus1, plus2);
var value = plus4(5);
// 9

Let's create a plus5 tranducer by composing map transducers.

var transducer = compose(
      map(plus1),  // [3,4,5]
      map(plus2),  // [5,6,7]
      map(plus1),  // [6,7,8]
      map(plus1)); // [7,8,9]
var stepper = append;
var init = [];
var input = [2,3,4];
var output = transduce(transducer, stepper, init, input);
// [7,8,9]

The comments next to each transducer describe each transformation as an array, as it is easier to follow the transformation a step at a time.

Remember though, that transducers step through the transformation one item at a time without creating intermediate aggregates. The comments are only indicators to show what happens as the result of the transformation on each step of the pipeline. We'll discuss this more as we progress.

It turns out that, although composition is right-to-left, the order of transformation is actually left to right (top-down as displayed in example). The order does not matter in this example, but we'll see that it does with the next example.

Filter

Let's define the filter transducer.

function filter(predicate){
  return function(xf){
    return {
      init: function(){
        return xf.init();
      },
      step: function(value, item){
        var allow = predicate(item);
        if(allow){
          value = xf.step(value, item);
        }
        return value;
      },
      result: function(value){
        return xf.result(value);
      }
    }
  }
}

Notice that the transformer returned by filter only delegates to the next transformer if the predicate returns a truthy value. If predicate returns a falsey value, the item is ignored, and the previous result is returned unchanged.

Let's create a transducer that filters all odd values.

function isOdd(num){
  return num % 2 === 1;
}
var transducer = filter(isOdd);
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [1,3,5]

We created a transducer that filters items using the isOdd predicate. We then use transduce over an input array. The output array only contains odd values.

Let's also create a function that returns a predicate to check equality.

// another predicate 
function isEqual(y){
  return function(x){
    return x === y;
  }
}
var transducer = filter(isEqual(2));
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [2]

We can see that creating the predicate isEqual(2) will only allow the number 2 in the output array.

One more helper that accepts another predicate and negates it:

function not(predicate){
  return function(x){
    return !predicate(x);
  }
}
var transducer = filter(not(isEqual(2)));
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [1,3,4,5]

This modifies the previous example to negate the isEqual(2) predicate, which creates a transducer that removes all twos from the input source.

We now have an additional weapon in our pipeline arsenal. Let's wield it.

Pipeline order

Let's increment each item and then filter odd values.

var transducer = compose(
      map(plus1),         // [2,3,4,5,6]
      filter(isOdd));     // [3,5]
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [3,5]

The map(plus1) transducer runs first and increments each item then calls step on the next transformation, filtering all odd values.

Let's swap these transducers and see what happens.

var transducer = compose(
      filter(isOdd),      // [1,3,5]
      map(plus1));        // [2,4,6]
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [2,4,6]

Here we first filter all odd values. The filter(isOdd) transformer will only step items that are odd to the next transformer. All stepped items are mapped with plus1.

This demonstrates two important properties of composed transducers:

  1. Although composition is right-to-left, transformation happens left-to-right
  2. It may be more efficient to use transducers that reduce the number of items earlier in the pipeline, if possible.

Notice in the last example that map(plus1) is only called with a subset of values. Again, intermediate arrays are not created, only commented for illustration.

Remove

Ready for another tranducer?

function remove(predicate){
  return filter(not(predicate));
}

That was easy. It turns out that we can create the remove transducer by negating the predicate and reusing filter.

Here it is in action.

var transducer = compose(
      filter(isOdd),        // [1,3,5]
      map(plus1),           // [2,4,6] 
      remove(isEqual(4)));  // [2,6]
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [2,6]

We first filter odd items, then increment each, then remove 4.

Drop

What if we want to skip n items at the beginning of the iteration? This is a job for the drop transducer.

function drop(n){
  return function(xf){
    var left = n;
    return {
      init: function(){
        return xf.init();
      },
      step: function(value, item){
        if(left > 0){
          left--;
        } else {
          value = xf.step(value, item);
        }
        return value;
      },
      result: function(value){
        return xf.result(value);
      }
    }
  }
}

You can use it like so:

var transducer = drop(2);
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [3,4,5]

Drop accepts the number of items to drop from the front of the list. It is our first example of a transducer that creates a stateful transformation. Each time the drop transducer is called to create a transformation, the variable left is created to keep track of how many items are left to drop for that transformation, initialized to an initial value of n.

Notice that we created a stateful transformer using a stateless transducer. This is an important distinction. It means we can reuse the drop(2) transducer as many times as we want without worrying about any state. State is created with the transformer, not the transducer.

Take

What if, instead of dropping items, we wanted to take the first n items and drop the rest. We will assume n > 0 to ease implementation.

Let's give it a shot.

function take(n){
  return function(xf){
    var left = n;
    return {
      init: function(){
        return xf.init();
      },
      step: function(value, item){
        value = xf.step(value, item);
        if(--left <= 0){
          // how do we stop???
        }
        return value;
      },
      result: function(value){
        return xf.result(value);
      }
    }
  }
}

Uh-oh. We've hit a snag. We know how to step each item and maintain a count of how many items are left by keeping transformer state. But how do we stop the iteration of the rest of the items?

Why do we want to signal that we are done accepting any additional items? Not only is it wasteful, there is also no guarantee that our iteration will complete. It may be infinite. We will certainly want to stop infinite iteration if we can.

So how do we signal early termination? We're going to have to take another look at our source of iteration: transduce.

Reduce redux

Here are our current definitions of transduce and reduce from the first article:

function transduce(transducer, stepper, init, input){
  if(typeof stepper === 'function'){
    stepper = wrap(stepper);
  }

  var xf = transducer(stepper);
  return reduce(xf, init, input);
}

function reduce(xf, init, input){
  if(typeof xf === 'function'){
    xf = wrap(xf);
  }
  // how do we stop?? 
  var value = input.reduce(xf.step, init); 
  return xf.result(value);
}

function wrap(stepper){
  return {
    init: function(){
      throw new Error('init not supported');
    },
    step: stepper,
    result: function(value){
      return value;
    }
  }
}

If we take a look at the implementation, we will remember that we are currently iterating with the native array reduce using the reducing function from our transformer. In a future article, we will remove the assumption that the input source is an array, but we will stick with it for now. Let's define our own implementation of arrayReduce.

function reduce(xf, init, input){
  if(typeof xf === 'function'){
    xf = wrap(xf);
  }

  return arrayReduce(xf, init, input);
}

function arrayReduce(xf, init, array){
  var value = init;
  var idx = 0;
  var length = array.length;
  for(; idx < length; idx++){
    value = xf.step(value, array[idx]);
    // We need to break here, but how do we know?
  }
  return xf.result(value);
}

The arrayReduce implementation accepts a transformer, initial value and input array. It then iterates over each item using a for loop and calls the transformer step function with the value and array item.

We need a way to break out of this loop, depending on some marker value. As luck would have it, there is an existing transducer protocol we can adopt.

To signal early termination after calling step on a transformer, we can wrap our reduced value in an object with two attributes:

  1. value with the actual wrapped value
  2. __transducers_reduced__ of true signals the object is reduced.

Which leads to our implementation.

function reduced(value){
  return {
    value: value,
    __transducers_reduced__: true
  }
}

We will also add a predicate to determine if a value is reduced.

function isReduced(value){
  return value && value.__transducers_reduced__;
}

In addition, we need a way to extract, or deref the reduced value.

function deref(reducedValue){
  return reducedValue.value;
}

We can now adjust arrayReduce to handle early termination of reduced values.

function arrayReduce(xf, init, array){
  var value = init;
  var idx = 0;
  var length = array.length;
  for(; idx < length; idx++){
    value = xf.step(value, array[idx]);
    if(isReduced(value)){
      value = deref(value);
      break;
    }
  }
  return xf.result(value);
}

We iterate each item as we did before, but after every call to step we check to see if we have a reduced value. If so, we extract the value and terminate the iteration (break the loop). We still call result on the final value regardless of whether it came from a reduced value or from a complete iteration.

Take 2

Now we are ready to complete our implementation of take:

function take(n){
  return function(xf){
    var left = n;
    return {
      init: function(){
        return xf.init();
      },
      step: function(value, item){
        value = xf.step(value, item);
        if(--left <= 0){
          // we are done, so signal reduced
          value = reduced(value);
        }
        return value;
      },
      result: function(value){
        return xf.result(value);
      }
    }
  }
}

The only thing we were missing is wrapping the value with reduced after we detected the transformation was complete.

Let's see if it works:

var transducer = take(3);
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [1,2,3]

Sure does.

Just like any other transducer, you can compose drop and take in a pipeline.

var transducer = compose(
    drop(1),    // [2,3,4,5]
    take(3),    // [2,3,4]
    drop(1));   // [3,4]
var stepper = append;
var init = [];
var input = [1,2,3,4,5];
var output = transduce(transducer, stepper, init, input);
// [3,4]

The first drop skips the first item then steps every remaining item to the next transformer. The take transformer steps the first 3 items received from the first drop and then terminates the iteration. The last drop drops the first item sent from take and steps the remaining two items sent before termination.

Conclusion of part 2...

We began by summarizing what we learned in the first article. We added four new transducers: filter, remove, take, drop. We created transformer pipelines by composing transducers and saw that the order of transformation is left-to-right.

We saw that in addition to changing items during transformation, a transformer could decide to skip any item by not calling step on the next transformer. It is the implementation of each transformer that decides what to step to the next one. There are cases where a transformer may send multiple values, for example, cat or transduce-string.

We also saw examples of transducers that create stateful transformations. The state is managed by the transformer, not the transducer. This allows reuse of stateless transducers, even if the transformers they create manage state.

When implementing take, we realized we needed to add a method for signaling early termination. We changed our reduce implementation to handle and unwrap reduced values and implemented take to terminate the iteration on completion.

Anything else?

There are still a few loose ends to tie up in our final article in the introductory tutorial. We have still glossed over the purpose of transformer init and reduce, we will address this. We will add into and generalize our reduce implementation to support iterators.

We will also see the input source could be anything that produces a sequences of values: lazy lists, indefinite sequence generation, CSP, Node.js streams, iterators, generators, immutable-js data structures, etc.

Want to be notified of new articles? Follow simplectic on Twitter.

I'm ready now!

Ready to use tranducers? You should be in good shape if you made it through this article: see transducers-js and transducers.js. We followed the transducers-js implementation closely, but the concepts apply to transducers.js as well.

Also check out underarm if you like Underscore.js. It is based on the transduce libraries that allow defining APIs against a common protocol supported by both transducers.js and transducers-js.