def __init__(
self, exchange_name, connection_url, qkwargs, config_file=None):
"""
Initializes a new Service instance.
:param exchange_name: Name of the topic exchange.
:type exchange_name: str
:param connection_url: Kombu connection url.
:type connection_url: str
:param qkwargs: One or more dicts keyword arguments for queue creation
:type qkwargs: list
:param config_file: Path to the configuration file location.
:type config_file: str or None
"""
name = self.__class__.__name__
self.logger = logging.getLogger(name)
self.logger.debug('Initializing {}'.format(name))
# If we are given no default, use the global one
# Read the configuration file
self._config_data = read_config_file(
config_file, self._default_config_file)
if connection_url is None and 'bus_uri' in self._config_data:
connection_url = self._config_data.get('bus_uri')
self.logger.debug(
'Using connection_url=%s from config file', connection_url)
if exchange_name is None and 'exchange_name' in self._config_data:
self.logger.debug(
'Using exchange_name=%s from config file', exchange_name)
exchange_name = self._config_data.get('bus_exchange')
self.connection = Connection(connection_url)
self._channel = self.connection.default_channel
self._exchange = Exchange(
exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Set up queues
self._queues = []
for kwargs in qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug(queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Initializing of {} finished'.format(name))
评论列表
文章目录