diff --git a/README.rst b/README.rst index dd3ed4b..06d77fd 100644 --- a/README.rst +++ b/README.rst @@ -19,6 +19,25 @@ producers/consumers. Pipes simply have an input and an output channel; creating and using them is pretty straightforward: +.. code:: python + + import asyncio + + from pipekit.pipe import QueuePipe + + async def main(): + mypipe = QueuePipe('my-pipe') + await mypipe.start() + await mypipe.send('Hello world') + print(await mypipe.receive()) + + asyncio.run(main()) + # Hello world + +The current examples are still being worked on: + + + .. code:: python from pipekit import ThreadPipe diff --git a/pipekit/component.py b/pipekit/component.py index 8a6c54f..7904243 100644 --- a/pipekit/component.py +++ b/pipekit/component.py @@ -40,7 +40,7 @@ def __init__(self, *args, id=None, workflow=None, parent=None, logger=_l, **kwar self._event_lock = set() self._debug = {'events'} - self._settings = Box(self.configure(*args, **kwargs) or dict()) + self._settings = Box(self.configure(**kwargs) or dict()) if not workflow: workflow = self settings = [f'{k}={v}' for k, v in workflow.safe_settings(self._settings).items()] @@ -52,6 +52,9 @@ def configure(self, **settings): def settings(self, **override): return Box(self._settings, **override) + def safe_settings(self, settings): + return settings + @property def type(self): return type(self).__name__ @@ -175,7 +178,10 @@ def __getattr__(self, name): def _proxied_logging_method(self, method, *args, **kwargs): if method == 'debug': - debug = (self.workflow or self).settings().logging.debug + if logging in (self.workflow or self).settings(): + debug = (self.workflow or self).settings().logging.debug + else: + debug = [] if not ('all' in debug or self.type in debug or (self.id in debug)): return lambda *a, **kw: None diff --git a/pipekit/pipe.py b/pipekit/pipe.py index b8c4777..285e377 100644 --- a/pipekit/pipe.py +++ b/pipekit/pipe.py @@ -158,8 +158,9 @@ async def receiver(self): class QueuePipe(Pipe): - def configure(self, queue=None, maxsize=1): + def configure(self, queue=None, maxsize=1, **kwargs): self._queue = queue or asyncio.Queue(maxsize=maxsize) + return kwargs async def send(self, *args, **kwargs): return await self._queue.put(*args, **kwargs)