def dehydrate_stats(self, bundle):
from chroma_core.models import SimpleHistoStoreTime
from chroma_core.models import SimpleHistoStoreBin
stats = {}
for s in StorageResourceStatistic.objects.filter(storage_resource = bundle.obj):
from django.db import transaction
stat_props = s.storage_resource.get_statistic_properties(s.name)
if isinstance(stat_props, statistics.BytesHistogram):
with transaction.commit_manually():
transaction.commit()
try:
time = SimpleHistoStoreTime.objects.filter(storage_resource_statistic = s).latest('time')
bins = SimpleHistoStoreBin.objects.filter(histo_store_time = time).order_by('bin_idx')
finally:
transaction.commit()
type_name = 'histogram'
# Composite type
data = {
'bin_labels': [u'\u2264%s' % (bin[1:] or '') for bin in stat_props.bins],
'values': [bin.value for bin in bins],
}
else:
type_name = 'timeseries'
# Go get the data from <resource>/metrics/
data = None
label = stat_props.label
if not label:
label = s.name
stat_data = {'name': s.name,
'label': label,
'type': type_name,
'unit_name': stat_props.get_unit_name(),
'data': data}
stats[s.name] = stat_data
return stats
python类commit_manually()的实例源码
def tearDown(self):
super(TestAgentRpc, self).tearDown()
try:
with transaction.commit_manually():
transaction.commit()
host = ManagedHost.objects.get(fqdn = self.CLIENT_NAME)
for host_contact_alert in HostContactAlert.filter_by_item(host):
AlertEmail.objects.filter(alerts__in=[host_contact_alert]).delete()
host_contact_alert.delete()
host.mark_deleted()
except ManagedHost.DoesNotExist:
pass
def _get_command(self, command_id):
with transaction.commit_manually():
transaction.commit()
return Command.objects.get(pk = command_id)
def managed_transaction(func):
""" This decorator wraps a function so that all sql executions in the function are atomic
It's used instead of django.db.transaction.commit_on_success in cases where reporting exceptions is necessary
as commit_on_success swallows exceptions
"""
@wraps(func)
@transaction.commit_manually
def _inner(*args, **kwargs):
try:
ret = func(*args, **kwargs)
except Exception:
transaction.rollback()
raise
else:
transaction.commit()
return ret
return _inner
def tearDown(self):
super(TestHttpAgent, self).tearDown()
try:
with transaction.commit_manually():
transaction.commit()
host = ManagedHost.objects.get(fqdn = self.CLIENT_NAME)
HostContactAlert.filter_by_item(host).delete()
host.mark_deleted()
except ManagedHost.DoesNotExist:
pass
def global_remove_resource(self, resource_id):
with self._instance_lock:
with transaction.commit_manually():
# Be extra-sure to see a fresh view (HYD-1301)
transaction.commit()
with transaction.commit_on_success():
log.debug("global_remove_resource: %s" % resource_id)
try:
record = StorageResourceRecord.objects.get(pk = resource_id)
except StorageResourceRecord.DoesNotExist:
log.error("ResourceManager received invalid request to remove non-existent resource %s" % resource_id)
return
self._delete_resource(record)
def _refresh_power_devices(self):
# Ensure that we have a fresh view of the DB
with transaction.commit_manually():
transaction.commit()
with self._lock:
for device in PowerControlDevice.objects.all():
if device.sockaddr not in self._power_devices:
self._power_devices[device.sockaddr] = device
def on_data(self, fqdn, data):
with transaction.commit_manually():
transaction.commit()
try:
host = ManagedHost.objects.get(fqdn = fqdn)
UpdateScan().run(host.id, data)
except Exception:
log.error("Error handling lustre message: %s", '\n'.join(traceback.format_exception(*(sys.exc_info()))))
def complete_job(self, job_id, errored):
if django.db.connection.connection and django.db.connection.connection != DISABLED_CONNECTION:
log.info("Job %d: open DB connection during completion" % job_id)
# Ensure that any changes made by this thread are visible to other threads before
# we ask job_scheduler to advance
with transaction.commit_manually():
transaction.commit()
self.put(('complete_job', (job_id, errored), {}))
def run(self):
from chroma_core.services.job_scheduler.job_scheduler import JobScheduler
from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerRpc
from chroma_core.services.job_scheduler.agent_rpc import AgentRpc
super(Service, self).run()
# Cancel anything that's left behind from a previous run
for command in Command.objects.filter(complete=False):
command.completed(True, True)
Job.objects.filter(~Q(state='complete')).update(state='complete', cancelled=True)
self._job_scheduler = JobScheduler()
self._queue_thread = ServiceThread(QueueHandler(self._job_scheduler))
self._rpc_thread = ServiceThread(JobSchedulerRpc(self._job_scheduler))
self._progress_thread = ServiceThread(self._job_scheduler.progress)
AgentRpc.start()
self._queue_thread.start()
self._rpc_thread.start()
self._progress_thread.start()
self._children_started.set()
self._mail_alerts_thread = MailAlerts(settings.EMAIL_SENDER,
settings.EMAIL_SUBJECT_PREFIX,
settings.EMAIL_HOST)
self._mail_alerts_thread.start()
self._complete.wait()
self.log.info("Cancelling outstanding jobs...")
# Get a fresh view of the job table
with transaction.commit_manually():
transaction.commit()
for job in Job.objects.filter(~Q(state = 'complete')).order_by('-id'):
self._job_scheduler.cancel_job(job.id)
def parse(self, fqdn, message):
hit = find_one_in_many(message['message'], self.selectors.keys())
if hit:
h = self.get_host(fqdn)
if h is None:
return
fn = self.selectors[hit]
with transaction.commit_manually():
try:
fn(message['message'], h)
except Exception, e:
syslog_events_log.error("Failed to parse log line '%s' using handler %s: %s" % (message['message'], fn, e))
transaction.rollback()
else:
transaction.commit()
def test_timeout(self):
"""Test that when a session is established, then left idle
for the timeout period, the http_agent service emits
a termination message on the RX channel."""
session_id = self._open_session()
# No alert to begin with
alerts = HostContactAlert.filter_by_item(self.host)
self.assertEqual(alerts.count(), 0)
time.sleep(HostState.CONTACT_TIMEOUT + HostStatePoller.POLL_INTERVAL + RABBITMQ_GRACE_PERIOD)
# Should be one SESSION_TERMINATE message to AMQP with a matching session ID
message = self._receive_one_amqp()
self.assertDictEqual(message, {
'fqdn': self.CLIENT_NAME,
'type': 'SESSION_TERMINATE',
'plugin': self.PLUGIN,
'session_seq': None,
'session_id': session_id,
'body': None
})
with transaction.commit_manually():
transaction.commit()
alerts = HostContactAlert.filter_by_item(self.host)
self.assertEqual(alerts.count(), 1)
# Should be a message waiting for the agent telling it that its session was terminated
# (timing out doesn't mean the agent is gone, it could just be experiencing network difficulties)
# What's more, the agent doesn't necessarily *know* that it had network difficulties, e.g. if it
# just got real slow and waited too long between GETs.
# This has to cut both ways to be reliable:
# * We have to tell the agent that we thought it went away, by sending a TERMINATE for sessions
# * If the agent finds that a GET fails then it has to assume that we might have put session
# messages in that GET, and terminate all its sessions in case one of those GET messages
# was really a TERMINATE
response = self._get()
self.assertResponseOk(response)
forwarded_messages = response.json()['messages']
self.assertEqual(len(forwarded_messages), 1)
self.assertDictEqual(forwarded_messages[0], {
'fqdn': self.CLIENT_NAME,
'type': 'SESSION_TERMINATE',
'plugin': self.PLUGIN,
'session_seq': None,
'session_id': None,
'body': None
})