def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None,
queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True):
self.hosts_conf = hosts_conf
self.hosts = self.create_hosts()
self.connection = Connection(self.hosts)
self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments)
self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key,
queue_arguments=queue_arguments)]
self.callback = callback
self.no_ack = no_ack
python类Connection()的实例源码
def queue_send(self, recipient, message=None, sender_action=None):
exchange = Exchange(self.send_exchange, 'direct', durable=True)
queue = Queue(self.send_queue, exchange=exchange,
routing_key=self.send_queue)
with Connection(self.send_transport) as conn:
producer = conn.Producer(serializer='json')
event = {
'recipient': recipient,
'message': message,
'sender_action': sender_action,
'page_access_token': self.page_access_token
}
producer.publish(event, exchange=exchange,
routing_key=queue.routing_key,
declare=[queue])
def queue_events(self, events):
exchange = Exchange(self.exchange, 'direct', durable=True)
queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
with Connection(self.transport) as conn:
producer = conn.Producer(serializer='json')
for event in events:
producer.publish(event, exchange=exchange,
routing_key=queue.routing_key,
declare=[queue])
def _listen(self):
reader_conn = kombu.Connection(self.url)
reader_queue = self._queue(reader_conn)
with reader_conn.SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
yield message.payload
def taste_soup(queue, broker_url):
try:
with Connection(broker_url) as conn:
q = conn.SimpleQueue(queue)
return q.qsize()
except Exception as e:
return 0
def connect(self):
"""
'Connects' to the bus.
:returns: The same instance.
:rtype: commissaire_http.bus.Bus
"""
if self.connection is not None:
self.logger.warn('Bus already connected.')
return self
self.connection = Connection(self.connection_url)
self._channel = self.connection.channel()
self._exchange = Exchange(
self.exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Create queues
self._queues = []
for kwargs in self.qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug('Created queue %s', queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Bus connection finished')
return self
def __init__(self):
self._controller = Controller(DatabaseAdapter())
_connection = Connection(config.rabbitmq_url(), heartbeat=540)
retry_adapter = RetryAdapter(_connection)
self._busAdapter = BusAdapter(self._controller, _connection, retry_adapter)
def setup_connection_mock(self):
mocks.Transport.recoverable_connection_errors = pyamqp.Transport.recoverable_connection_errors
self.connection = flexmock(Connection(transport=mocks.Transport))
self.channel_mock = flexmock(self.connection.default_channel)
self.connection.should_receive('channel').and_return(self.channel_mock)
def test_declare_retry_exchanges_retries_if_it_fails(self):
connection = flexmock(Connection(transport=mocks.Transport))
connection.should_receive('_establish_connection').times(3)\
.and_raise(IOError)\
.and_raise(IOError)\
.and_return(connection.transport.establish_connection())
self.retry_adapter = RetryAdapter(connection)
def __init__(self, exchange_name, broker_url, mode=ASYNC):
"""??????
Args:
exchange_name (string): ????
broker_url (string): ????
mode (int): ??
"""
self.exchange_name = exchange_name
self.broker_url = broker_url
self.mode = mode
self.exchange = Exchange(exchange_name, type='direct')
self.connection = Connection(broker_url)
def __init__(self, broker_url, queue_name, fetch_count=10):
"""??????
Args:
broker_url (string): broker??
queue_name (string): ???????
fetch_count (int): ???????
"""
self.queue_name = queue_name
self.broker_url = broker_url
self.fetch_count = fetch_count
self.connection = Connection(broker_url)
self.queue = Queue(queue_name)
def _listen(self):
reader_conn = kombu.Connection(self.url)
reader_queue = self._queue(reader_conn)
with reader_conn.SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
yield message.payload
def start_connection(self):
"""
reset the connection to rabbit mq
:return:
"""
logger = logging.getLogger(self.__class__.__name__)
logger.info("starting new rabbit mq connection")
# todo: do we need to make confirm_publish configurable?
self._conn = Connection(self.get_config().rabbitmq_url,
transport_options={'confirm_publish': True})
# setup producer to push to error and dlqs
self._producer = Producer(channel=self._conn.channel(),
exchange=self._orchestrator.get_exchange())
def main():
parser = cli_parser()
opts, _ = parser.parse_args(sys.argv)
if not opts.password:
logger.error('Password required, see help (-h)')
sys.exit(-1)
if not opts.domain:
logger.error('yourdomain.efflux.io required, see help (-h)')
sys.exit(-1)
if not opts.token:
logger.error('API token required, see help (-h)')
sys.exit(-1)
auth = 'amqp://{}:{}@{}:{}'.format(
opts.username,
opts.password,
opts.host,
opts.port
)
if opts.mode == 'json':
efflux = CBEventHandler(opts.domain, opts.token)
events = [
'watchlist.hit.process'
]
elif opts.mode == 'protobuf':
events = [
'ingress.event.netconn',
'ingress.event.procstart'
]
if opts.output == 'api':
efflux = CBProtobufHandler(opts.domain, opts.token, mode='post')
elif opts.output == 'file':
efflux = CBProtobufHandler(opts.domain, opts.token, mode='file')
efflux.set_logfile(path='/Users/jtm/telemetry/cb.log')
with(Connection(auth)) as connection:
CarbonBlackConsumer(
connection,
efflux.handle_event,
events=events).run()
def __init__(
self, exchange_name, connection_url, qkwargs, config_file=None):
"""
Initializes a new Service instance.
:param exchange_name: Name of the topic exchange.
:type exchange_name: str
:param connection_url: Kombu connection url.
:type connection_url: str
:param qkwargs: One or more dicts keyword arguments for queue creation
:type qkwargs: list
:param config_file: Path to the configuration file location.
:type config_file: str or None
"""
name = self.__class__.__name__
self.logger = logging.getLogger(name)
self.logger.debug('Initializing {}'.format(name))
# If we are given no default, use the global one
# Read the configuration file
self._config_data = read_config_file(
config_file, self._default_config_file)
if connection_url is None and 'bus_uri' in self._config_data:
connection_url = self._config_data.get('bus_uri')
self.logger.debug(
'Using connection_url=%s from config file', connection_url)
if exchange_name is None and 'exchange_name' in self._config_data:
self.logger.debug(
'Using exchange_name=%s from config file', exchange_name)
exchange_name = self._config_data.get('bus_exchange')
self.connection = Connection(connection_url)
self._channel = self.connection.default_channel
self._exchange = Exchange(
exchange_name, type='topic').bind(self._channel)
self._exchange.declare()
# Set up queues
self._queues = []
for kwargs in qkwargs:
queue = Queue(**kwargs)
queue.exchange = self._exchange
queue = queue.bind(self._channel)
self._queues.append(queue)
self.logger.debug(queue.as_dict())
# Create producer for publishing on topics
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Initializing of {} finished'.format(name))
def action_task_inject_process(config):
if config.function_files is None:
log.error(" - input .json file with process files is needed")
return
# --------------------------------------------------------------------------
# Load process information
# --------------------------------------------------------------------------
with open(config.function_files, "r") as f:
f_info = json.load(f)
log.error(" - Building process...")
# Search and inject process
injections = []
for p in f_info:
parameters = OrderedDict({x["param_position"]: x["param_value"] for x in p['parameters']})
# --------------------------------------------------------------------------
# Fill process information
# --------------------------------------------------------------------------
inject_process = {
"args": [y for x, y in six.iteritems(parameters)],
"callbacks": None,
"chord": None,
"errbacks": None,
"eta": None,
"expires": None,
"id": uuid.uuid1(),
"kwargs": {},
"retries": 0,
"task": p["function"],
"taskset": None,
"timelimit": [
None,
None
],
"utc": True
}
injections.append(inject_process)
# --------------------------------------------------------------------------
# Re-inject messages
# --------------------------------------------------------------------------
log.warning(" - Trying to connect with server...")
url = '%s://%s' % (config.broker_type, config.target)
with Connection(url) as conn:
in_queue = conn.SimpleQueue('celery')
log.error(" - Sending processes to '%s'" % config.target)
for i, e in enumerate(injections, 1):
log.warning(" %s) %s" % (i, e['task']))
# pass
in_queue.put(e, serializer="pickle")
def action_proc_list_tasks(config):
log.warning(" - Trying to connect with server...")
url = '%s://%s' % (config.broker_type, config.target)
with Connection(url) as conn:
in_queue = conn.SimpleQueue('celery')
process_info = {}
# Get remote process
first_msg = True
while 1:
for remote_process, remote_args, _ in list_remote_process(config, in_queue):
if remote_process not in process_info:
process_info[remote_process] = remote_args
if config.no_stream is False and not process_info:
if first_msg is True:
log.error(" -> Not messages found. Waiting ...")
first_msg = False
sleep(0.1)
else:
break
# --------------------------------------------------------------------------
# Try to identify parameters types
# --------------------------------------------------------------------------
# Display info
log.error(" - Remote process found:")
for p, v in six.iteritems(process_info):
log.error(" -> %s (%s)" % (
p,
", ".join("param_%s:%s" % (i, get_param_type(x)) for i, x in enumerate(v))
))
# Export to template enabled?
if config.template is not None:
log.warning(" - Building template...")
export_data = export_process(process_info, config)
# --------------------------------------------------------------------------
# Save template
# --------------------------------------------------------------------------
# Build path in current dir
export_path = os.path.abspath(config.template)
if ".json" not in export_path:
export_path += ".json"
# dumps
json.dump(export_data, open(export_path, "w"))
log.error(" - Template saved at: '%s'" % export_path)