In [1]:
import logging
from pathlib import Path
from functools import partial

from execlog import ChainRouter, Event
from execlog.routers import PathRouter
from execlog.listeners import PathListener

logging.basicConfig(level=logging.DEBUG)

  from .autonotebook import tqdm as notebook_tqdm


# Router setup
Create individual "frame" routers, and attach them in a chain.

A matching event will first be processed by matching callbacks in `router1` in parallel, blocking until all are completed, and then pass on to the next router (`router2`) to repeat the same process. This trajectory can occur in parallel for several events.

In [2]:
router1 = PathRouter()
router2 = PathRouter()
router3 = PathRouter()

chain_router = ChainRouter([router1, router2, router3])

Register callbacks to each of the routers. The `Router` objects are of type `PathRouter`, so `.register` takes a path endpoint and a function that accepts `Event`s.

Events are created with the registered endpoint path, and a `name` parameter with the filename at that path target. Here one callback is attached to Router 1, two to Router 2, and three to Router 3. A given matching event should have a trajectory that looks like:

```
Event -> Router-1 -> C1-1 -- blocking --> Router-2 --> C2-1 -- blocking --> Router-3 --> C3-1 -->
                                                   \-> C2-2 -/                        \-> C3-2 -/
                                                                                      \-> C3-3 -/
```

In [3]:
router1.register('endpoint_proxy', partial(print, 'R1 ::'))
router2.register('endpoint_proxy', partial(print, 'R2 ::'))
router3.register('endpoint_proxy', partial(print, 'R3 ::'))

events = [
    Event(endpoint='endpoint_proxy', name='file1'),
    Event(endpoint='endpoint_proxy', name='file2'),
    Event(endpoint='endpoint_proxy', name='file3'),
]

Submit the event list to an individual router. Each event will be handled in its own thread, until the thread limit is reached, at which point the events remain in a queue until they can be processed.

In [4]:
# multi-event single router
router1.submit(events)

INFO:execlog.router:Event [file1] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]


[<Future at 0x74da282c7740 state=running>,
 <Future at 0x74da282c7b00 state=running>,
 <Future at 0x74da282ec260 state=running>]

INFO:execlog.router:Event [file2] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]
INFO:execlog.router:Event [file3] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]


R1 ::R1 ::  Event(endpoint='endpoint_proxy', name='file1', action=None)Event(endpoint='endpoint_proxy', name='file2', action=None)

R1 :: Event(endpoint='endpoint_proxy', name='file3', action=None)
R1 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])


Submit the event list to the chain router. Each event will be processed independently and in parallel, so long as there are threads available. Each event will make its way through the router chain, blocking until all matching callbacks for a given router are completed.

In [5]:
chain_router.submit(events)

INFO:execlog.router:Event [file1] 

[<Future at 0x74da282ed880 state=running>,
 <Future at 0x74da282ed040 state=running>,
 <Future at 0x74da282ee060 state=running>]

matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]
INFO:execlog.router:Event [file1] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]
INFO:execlog.router:Event [file2] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]
INFO:execlog.router:Event [file2] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]
INFO:execlog.router:Event [file3] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R2 ::')]
INFO:execlog.router:Event [file3] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R3 ::')]


R2 :: Event(endpoint='endpoint_proxy', name='file1', action=None)
R2 :: Event(endpoint='endpoint_proxy', name='file2', action=None)
R2 :: Event(endpoint='endpoint_proxy', name='file3', action=None)
R3 :: Event(endpoint='endpoint_proxy', name='file1', action=None)
R3 :: Event(endpoint='endpoint_proxy', name='file2', action=None)
R3 :: Event(endpoint='endpoint_proxy', name='file3', action=None)
R2 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])
R3 :: Event(endpoint='endpoint_proxy', name='fileA', action=[<flags.CREATE: 256>])


In [6]:
listener = chain_router.get_listener()
listener.start()

print(listener.watchmap)

defaultdict(<function PathListener.__init__.<locals>.<lambda> at 0x74da282f4540>, {1: defaultdict(<class 'int'>, {(PosixPath('endpoint_proxy'), PosixPath('.')): 1986})})


INFO:execlog.listeners.path:Starting listener for 1 paths
INFO:execlog.listeners.path:> Listening on path endpoint_proxy for flags [<flags.MODIFY: 2>, <flags.MOVED_FROM: 64>, <flags.MOVED_TO: 128>, <flags.CREATE: 256>, <flags.DELETE: 512>, <flags.DELETE_SELF: 1024>]


In [7]:
file_a = Path('endpoint_proxy/fileA')
file_a.write_text('test text')
file_a.unlink()

DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.CREATE: 256>]
INFO:execlog.router:Event [fileA] matched [**/!(.*|*.tmp|*~)] under [endpoint_proxy] for [functools.partial(<built-in function print>, 'R1 ::')]
DEBUG:execlog.listeners.path:Watcher fired for [fileA]: [<flags.MODIFY: 2>]


In [10]:
listener.watchmap

defaultdict(<function execlog.listeners.path.PathListener.__init__.<locals>.<lambda>()>,
            {1: defaultdict(int,
                         {(PosixPath('endpoint_proxy'),
                           PosixPath('.')): 1986})})