Source code for irc3.asynchronous

# -*- coding: utf-8 -*-
from .compat import asyncio
import functools
import re


class event:

    iotype = 'in'
    iscoroutine = True

    def __init__(self, **kwargs):
        self.meta = kwargs.get('meta')
        regexp = self.meta['match'].format(**kwargs)
        self.regexp = regexp
        regexp = getattr(self.regexp, 're', self.regexp)
        self.cregexp = re.compile(regexp).match

    def compile(self, *args, **kwargs):
        return self.cregexp

    def __repr__(self):
        s = getattr(self.regexp, 'name', self.regexp)
        name = self.__class__.__name__
        return '<temp_event {0} {1}>'.format(name, s)

    def __call__(self, callback):
        self.callback = asyncio.coroutine(functools.partial(callback, self))
        return self


def default_result_processor(self, results=None, **value):  # pragma: no cover
    value['results'] = results
    if len(results) == 1:
        value.update(results[0])
    return value


def async_events(context, events, send_line=None,
                 process_results=default_result_processor,
                 timeout=30, **params):

    loop = context.loop
    task = asyncio.Future(loop=loop)  # async result
    results = []  # store events results
    events_ = []  # reference registered events

    # async timeout
    timeout = asyncio.ensure_future(
        asyncio.sleep(timeout, loop=loop), loop=loop)

    def end(t=None):
        """t can be a future (timeout done) or False (result success)"""
        if not task.done():
            # cancel timeout if needed
            if t is False:
                timeout.cancel()
            # detach events
            context.detach_events(*events_)
            # clean refs
            events_[:] = []
            # set results
            task.set_result(process_results(results=results, timeout=bool(t)))

    # end on timeout
    timeout.add_done_callback(end)

    def callback(e, **kw):
        """common callback for all events"""
        results.append(kw)
        if e.meta.get('multi') is not True:
            context.detach_events(e)
            events_.remove(e)
        if e.meta.get('final') is True:
            # end on success
            end(False)

    events_.extend([event(meta=kw, **params)(callback) for kw in events])

    context.attach_events(*events_, insert=True)

    if send_line:
        context.send_line(send_line.format(**params))

    return task


[docs]class AsyncEvents: """Asynchronious events""" timeout = 30 send_line = None events = [] def __init__(self, context): self.context = context
[docs] def process_results(self, results=None, **value): # pragma: no cover """Process results. results is a list of dict catched during event. value is a dict containing some metadata (like timeout=(True/False). """ return default_result_processor(results=results, **value)
[docs] def __call__(self, **kwargs): """Register events; and callbacks then return a `asyncio.Future`. Events regexp are compiled with `params`""" kwargs.setdefault('timeout', self.timeout) kwargs.setdefault('send_line', self.send_line) kwargs['process_results'] = self.process_results return async_events(self.context, self.events, **kwargs)