I'm trying to build a framework similar to Scrapy ItemPipelines or Pipeless. The motivation is to be able to build generic data pipelines via defining a modular collection of "pipe" classes that handle distinct steps within the pipeline. I've taken some tips from here. I'm looking for general feedback.
interfaces.py
from zope.interface import Interface, Attribute
class IPipe(Interface):
"""A pipe is responsible for processing a single item
and yields one or more iterables
"""
def process_item(item):
"""process single item
"""
pass
class IPipeline(Interface):
""" Master pipeline interface
"""
pipe_runners = Attribute('a collection of pipe_runners')
def run(it):
"""begin pipeline execution given iterable
"""
pass
class IPipelineRunner(Interface):
"""Handles execution of pipe and skip item exceptions
"""
pipe = Attribute('single pipe being wrapped')
def execute(it):
"""execute pipe with given iterable
"""
pass
components.py
from pipeable.interfaces import IPipeline, IPipe, IPipelineRunner
from pipeable.exceptions import InvalidPipeInput, SkipPipeItem
from zope.interface import implements, implementer
import collections
class Pipeline(object):
implements(IPipeline)
def __init__(self, pipes):
"""
"""
#xxx TODO handle list of pipes of now, later handle dict with order numbers, and yaml file constructor params
self.pipe_runners = []
for pipe in pipes:
pipe_runner = PipeRunner(implementer(IPipe)(pipe()))
self.pipe_runners.append(pipe_runner)
def run(self, it):
if not hasattr(it, '__iter__'):
raise InvalidPipeInput("Pipeline requires iterable input, got: " + it)
res = it
for pipe_runner in self.pipe_runners:
res = pipe_runner.execute(res)
return res
class PipeRunner(object):
implements(IPipelineRunner)
def __init__(self, pipe):
self.pipe = pipe
def execute(self, item_gen):
for item in item_gen:
try:
for res in self.pipe.process_item(item):
#process_items can produce multiple yields
yield res
except SkipPipeItem:
continue
tests/example
import unittest
class TestPipeline(unittest.TestCase):
def _getTargetClass(self):
from pipeable.components import Pipeline
return Pipeline
def _makeOne(self, pipes):
return self._getTargetClass()(pipes)
def test_class_conforms_to_IPipeline(self):
from zope.interface.verify import verifyClass
from pipeable.interfaces import IPipeline
verifyClass(IPipeline, self._getTargetClass())
def test_should_run_simple_pipes_add_one_add_two(self):
test_pipes = self._getSimpleTestPipes()
pipe_line = self._makeOne(test_pipes)
res_gen = pipe_line.run([1])
res = res_gen.next()
self.assertEquals(4, res, res)
def test_should_yield_forward_and_backward(self):
test_pipes = self._getForwardReverseTitles()
pipe_line = self._makeOne(test_pipes)
res = pipe_line.run(['foo', 'bar'])
self.assertEquals(['Foo', 'Oof', 'Bar', 'Rab'],
list(res), list(res))
def test_should_skip_foo(self):
from pipeable.exceptions import SkipPipeItem
class ForwardReverseNoFoo(object):
def process_item(self, str_item):
if str_item == 'foo':
raise SkipPipeItem
yield str_item
yield str_item[::-1]
test_pipes = [ForwardReverseNoFoo]
pipe_line = self._makeOne(test_pipes)
res = pipe_line.run(['foo', 'bar'])
self.assertEquals(['bar', 'rab'],
list(res), list(res))
def test_should_raise_invalid_pipe_input_on_non_iterable(self):
from pipeable.exceptions import InvalidPipeInput
test_pipes = self._getForwardReverseTitles()
pipe_line = self._makeOne(test_pipes)
self.assertRaises(InvalidPipeInput, pipe_line.run,
'bad_input_non_iterable')
def _getSimpleTestPipes(self):
class TestPipe1(object):
def process_item(self, item):
yield item + 1
class TestPipe2(object):
def process_item(self, item):
yield item + 2
return [TestPipe1, TestPipe2]
def _getForwardReverseTitles(self):
class ForwardReverse(object):
def process_item(self, str_item):
yield str_item
yield str_item[::-1]
class Title(object):
def process_item(self, str_item):
yield str_item.title()
return [ForwardReverse, Title]
Pipeline.run()
requires an iterable, but you then proceed to hand that iterable to multiple pipeline runners; this won't work because an iterable can only be iterated over once. – Martijn Pieters Nov 13 '13 at 16:02IPipeline
interface here either, and why the runner needs to be separated. Won't a simplerIPipeline
interface that can be run directly, containing just one actual sequence of pipes, be easier to understand and use? – Martijn Pieters Nov 13 '13 at 16:03IPipe
interface has no access to anything else; should implementations take care of referencing a per-pipeline global state themselves? Without a pipeline context, they have no means to distinguish one run from another. – Martijn Pieters Nov 13 '13 at 16:07