def _add_checkpoint(self, id):
self.job.checkpoint_archives.append(bin_to_hex(id))
transaction.get().note('Added checkpoint archive %s for job %s' % (bin_to_hex(id), self.job.id))
transaction.commit()
python类get()的实例源码
def get(self, id):
"""API"""
repo_data = self.repository.get(id)
client_data = self._repo_to_client(id, repo_data)
return client_data
def _cache_sync_archive(self, archive_id):
log.debug('Started cache sync')
add_chunk = self._cache.chunks.add
cdata = self._cache.repository.get(archive_id)
_, data = self._cache.key.decrypt(archive_id, cdata)
add_chunk(archive_id, 1, len(data), len(cdata))
try:
archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
except (TypeError, ValueError, AttributeError) as error:
log.error('Corrupted/unknown archive metadata: %s', error)
return False
if archive.version != 1:
log.error('Unknown archive metadata version %r', archive.version)
return False
unpacker = msgpack.Unpacker()
for item_id, chunk in zip(archive.items, self._cache.repository.get_many(archive.items)):
_, data = self._cache.key.decrypt(item_id, chunk)
add_chunk(item_id, 1, len(data), len(chunk))
unpacker.feed(data)
for item in unpacker:
if not isinstance(item, dict):
log.error('Error: Did not get expected metadata dict - archive corrupted!')
return False
if b'chunks' in item:
for chunk_id, size, csize in item[b'chunks']:
add_chunk(chunk_id, 1, size, csize)
log.debug('Completed cache sync')
return True
def test_celery__TransactionAwareTask__delay__6(celery_session_worker, zcml):
"""It allows overriding the principal."""
auth = zope.component.getUtility(
zope.authentication.interfaces.IAuthentication)
principal = auth.getPrincipal('example.user')
z3c.celery.celery.login_principal(principal)
result = get_principal_title_task.delay(_principal_id_='zope.user')
transaction.commit()
assert 'User' == result.get()
def test_celery__TransactionAwareTask____call____1(celery_session_worker):
"""It aborts the transaction in case of an error during task execution."""
result = exception_task.delay()
transaction.commit()
with pytest.raises(Exception) as err:
result.get()
# Celery wraps errors dynamically as celery.backends.base.<ErrorName>, so
# we have to dig deep here.
assert 'RuntimeError' == err.value.__class__.__name__
def conflict_task(bind=True, context=None, datetime=None):
"""Dummy task which injects a DataManager that votes a ConflictError."""
transaction.get().join(VoteExceptionDataManager())
def test_celery__TransactionAwareTask____call____2(
celery_session_worker, interaction):
"""It aborts the transaction and retries in case of an ConflictError."""
result = conflict_task.delay()
transaction.commit()
with pytest.raises(Exception) as err:
result.get()
assert 'MaxRetriesExceededError' == err.value.__class__.__name__
def test_celery__TransactionAwareTask____call____3(
celery_session_worker, zcml):
"""It runs as given principal in asynchronous mode."""
auth = zope.component.getUtility(
zope.authentication.interfaces.IAuthentication)
principal = auth.getPrincipal('example.user')
z3c.celery.celery.login_principal(principal)
result = get_principal_title_task.delay()
transaction.commit()
assert 'Ben Utzer' == result.get()
def test_celery__TransactionAwareTask____call____4(
celery_session_worker, interaction):
"""It propagates the task_id to the worker."""
job = get_task_id.apply_async(task_id='my-nice-task-id')
transaction.commit()
assert 'my-nice-task-id' == job.get()
def test_celery__TransactionAwareTask__run_in_worker__1(
celery_session_worker, storage_file, interaction):
"""It handles specific exceptions in a new transaction after abort."""
job = except_with_handler.delay()
transaction.commit()
with pytest.raises(Exception):
job.get()
with open_zodb_copy(storage_file) as app:
assert [('data', ('a1', 'a2', 1, 4, u'User'))] == list(app.items())
def test_celery__TransactionAwareTask__run_in_worker__2(
celery_session_worker, storage_file, interaction):
"""It handles a specific exceptions to abort the transaction but still
count as a successful job."""
job = success_but_abort_transaction.delay()
transaction.commit()
assert 'done' == job.get()
with open_zodb_copy(storage_file) as app:
assert 'flub' not in app
def commit_error_task(bind=True):
transaction.get().join(CommitExceptionDataManager())
def _join_transaction(self):
if not self._needs_to_join:
return
dm = CeleryDataManager(self)
transaction.get().join(dm)
self._needs_to_join = False
def __repr__(self):
"""Custom repr."""
return '<{0.__module__}.{0.__name__} for {1}, {2}>'.format(
self.__class__, transaction.get(), self.session)
def update_item(storage, context):
target_version = context.type_info.schema_version
current_version = context.properties.get('schema_version', '')
update = False
errors = []
properties = context.properties
if target_version is None or current_version == target_version:
unique_keys = context.unique_keys(properties)
links = context.links(properties)
keys_add, keys_remove = storage._update_keys(context.model, unique_keys)
if keys_add or keys_remove:
update = True
rels_add, rels_remove = storage._update_rels(context.model, links)
if rels_add or rels_remove:
update = True
else:
properties = deepcopy(properties)
upgrader = context.registry[UPGRADER]
properties = upgrader.upgrade(
context.type_info.name, properties, current_version, target_version,
context=context, registry=context.registry)
if 'schema_version' in properties:
del properties['schema_version']
schema = context.type_info.schema
properties['uuid'] = str(context.uuid)
validated, errors = validate(schema, properties, properties)
# Do not send modification events to skip indexing
context.update(validated)
update = True
return update, errors
def batch_upgrade(request):
request.datastore = 'database'
transaction.get().setExtendedInfo('upgrade', True)
batch = request.json['batch']
root = request.root
storage = request.registry[STORAGE].write
session = storage.DBSession()
results = []
for uuid in batch:
item_type = None
update = False
error = False
sp = session.begin_nested()
try:
item = find_resource(root, uuid)
item_type = item.type_info.item_type
update, errors = update_item(storage, item)
except Exception as e:
logger.error('Error %s updating: /%s/%s' % (e, item_type, uuid))
sp.rollback()
error = True
else:
if errors:
# redmine 5161 sometimes error.path has an int
errortext = [
'%s: %s' % ('/'.join([str(x) or '<root>' for x in error.path]), error.message)
for error in errors]
logger.error(
'Validation failure: /%s/%s\n%s', item_type, uuid, '\n'.join(errortext))
sp.rollback()
error = True
else:
sp.commit()
results.append((item_type, uuid, update, error))
return {'results': results}
def includeme(config):
registry = config.registry
registry[STORAGE] = RDBStorage(registry[DBSESSION])
global _DBSESSION
_DBSESSION = registry[DBSESSION]
if registry.settings.get('blob_bucket'):
registry[BLOBS] = S3BlobStorage(
registry.settings['blob_bucket'],
read_profile_name=registry.settings.get('blob_read_profile_name'),
store_profile_name=registry.settings.get('blob_store_profile_name'),
)
else:
registry[BLOBS] = RDBBlobStorage(registry[DBSESSION])
def update(self, model, properties=None, sheets=None, unique_keys=None, links=None):
session = self.DBSession()
sp = session.begin_nested()
try:
session.add(model)
self._update_properties(model, properties, sheets)
if links is not None:
self._update_rels(model, links)
if unique_keys is not None:
keys_add, keys_remove = self._update_keys(model, unique_keys)
sp.commit()
except (IntegrityError, FlushError):
sp.rollback()
else:
return
# Try again more carefully
try:
session.add(model)
self._update_properties(model, properties, sheets)
if links is not None:
self._update_rels(model, links)
session.flush()
except (IntegrityError, FlushError):
msg = 'UUID conflict'
raise HTTPConflict(msg)
assert unique_keys is not None
conflicts = [pk for pk in keys_add if session.query(Key).get(pk) is not None]
assert conflicts
msg = 'Keys conflict: %r' % conflicts
raise HTTPConflict(msg)
def get_blob(self, download_meta):
blob_id = download_meta['blob_id']
if isinstance(blob_id, str):
blob_id = uuid.UUID(blob_id)
session = self.DBSession()
blob = session.query(Blob).get(blob_id)
return blob.data
def __setitem__(self, key, value):
current = self.data.get(key, None)
if current is None:
self.data[key] = current = CurrentPropertySheet(name=key, rid=self.rid)
propsheet = PropertySheet(name=key, properties=value, rid=self.rid)
current.propsheet = propsheet