-
Notifications
You must be signed in to change notification settings - Fork 0
/
groups.py
50 lines (36 loc) · 1.53 KB
/
groups.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from __future__ import absolute_import
from cell.actors import Actor
from cell.agents import dAgent
from kombu.entity import Exchange
__author__ = 'rumi'
class Group(Actor):
"""Convenience class used to spawn group of actors of the same type.
**Example usage**
Here we spawn two groups, of 10 :class:`Logger` actors each.
.. code-block:: python
>>> exception_group = agent.spawn(Group, Logger, 10)
>>> warning_group = agent.spawn(Group, Logger, 10)
>>> exception_group.scatter('log_msg', 'some exception msg...')
>>> warning_group.scatter('log_msg', 'some exception msg...')
:param act_type: the actor to spawn.
:param number: the number of actor instances to spawn.
"""
def __init__(self, act_type, number, **kwargs):
super(Group, self).__init__(**kwargs)
self.state.act_type = act_type
self.state.number = number
class state(object):
def config(self, act_type, number):
agent = dAgent(self.actor.connection)
for _ in range(0, number):
agent.spawn(
act_type,
{'group_exchange': self.actor.inbox_scatter.name})
def get_scatter_exchange(self):
"""Returns a :class:'kombu.Exchange' for type fanout"""
return Exchange('cl.scatter.%s.%s' % (self.name, self.id),
'fanout', auto_delete=True)
def on_agent_ready(self):
self.state.config(self.state.act_type, self.state.number)
def get_queues(self):
return []