Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I'm using iojs. I have to read some files as a stream, process them with a same function and join them back into a stream. So, I decided to create my own solution with Transform streams and PassThrough streams.

  "use strict";

  let fs=require('fs');
  let filePaths=['./tmp/h.txt','./tmp/s.txt'];

  let Stream = require('stream');

   class StreamProcessor  {

        constructor() {
       this.process_streams = [];
       }

       push (source_stream) { 
         // Create a new Transform Stream
        let transform = new StreamTransformer();
        // Register the finish event and then pipe
        transform.processed = transform.wait.call(transform);
          source_stream.pipe(transform);
       // push the stream to the internal array
         this.process_streams.push(transform);
     }

       done (callback) {  
         let streams = this.process_streams;
       // Wait for all Transform streams to finish processing
        Promise.all( 
          streams.map(function(s) {return s.processed;}  )
         )
       .then ( function() {
          let combined_stream=new Stream.PassThrough();
         streams.forEach(function (stream) {
         stream.pipe(combined_stream); 
       });

       // Call the callback with stream or error
       callback(null,combined_stream);
       })
        .catch(function (err) {
        callback(err);
        });
       }

     }     

      class StreamTransformer extends Stream.Transform {

          constructor () {
           // call super
          super();
         } 

         _transform(chunk,enc, transformed) {
           // process files here 
              let data=chunk.toString();
               data=data.substring(0,data.length-2);
                this.push(data);
                 transformed();
        }

          _flush(flushed) {
              // for additonal at end
              this.push('\n');
               flushed();
             }

       wait() {
          // returns a promise that resolves, when all the data is processed;
           let stream = this;

           return new Promise(function(resolve,reject)  {
       stream.on('finish', function() {
       resolve(true); });
      stream.on('error', function(err) {
         reject(err);  });
      });

       }

    }

         ///  Now simply  

          let process_stream = new StreamProcessor(); 

           filePaths.forEach(function (fpath) {
               let fstream = fs.createReadStream(fpath);
             process_stream.push(fstream);
         });

           process_stream.done(  function
          (err,combined_stream) {
          // Consume the combines stream
          combined_stream.pipe(process.stdout);
           });

Test files contains 'hello' and 'stream, and outputs:

     hell
     stream

But I think this can be improved further.

share|improve this question

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.