def test_main():
try:
dsp = ossaudiodev.open('w')
except (ossaudiodev.error, OSError) as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
dsp.close()
support.run_unittest(__name__)
python类EBUSY的实例源码
def test_main():
try:
dsp = linuxaudiodev.open('w')
except linuxaudiodev.error, msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
dsp.close()
run_unittest(LinuxAudioDevTests)
def test_main():
try:
dsp = ossaudiodev.open('w')
except (ossaudiodev.error, IOError), msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
dsp.close()
test_support.run_unittest(__name__)
def initiate_sendfile(self):
"""A wrapper around sendfile."""
try:
sent = sendfile(self._fileno, self._filefd, self._offset,
self.ac_out_buffer_size)
except OSError as err:
if err.errno in _ERRNOS_RETRY or err.errno == errno.EBUSY:
return
elif err.errno in _ERRNOS_DISCONNECTED:
self.handle_close()
else:
if self.tot_bytes_sent == 0:
logger.warning(
"sendfile() failed; falling back on using plain send")
raise _GiveUpOnSendfile
else:
raise
else:
if sent == 0:
# this signals the channel that the transfer is completed
self.discard_buffers()
self.handle_close()
else:
self._offset += sent
self.tot_bytes_sent += sent
# --- utility methods
def test_flags(self):
try:
os.sendfile(self.sockno, self.fileno, 0, 4096,
flags=os.SF_NODISKIO)
except OSError as err:
if err.errno not in (errno.EBUSY, errno.EAGAIN):
raise
def test_main():
try:
dsp = ossaudiodev.open('w')
except (ossaudiodev.error, OSError) as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
dsp.close()
support.run_unittest(__name__)
def test_write_config_errors_if_unexpected_exception(self):
dnsconfig = DNSConfig()
exception = IOError(errno.EBUSY, factory.make_string())
self.patch(config, 'atomic_write', Mock(side_effect=exception))
self.assertRaises(IOError, dnsconfig.write_config)
def connect_miniterm(port):
try:
ser = serial.Serial(port, BAUDRATE, parity=PARITY, rtscts=False,
xonxoff=False)
return Miniterm(ser, echo=False)
except serial.SerialException as e:
if e.errno == errno.ENOENT:
sys.stderr.write(
"Device %r not found. Check your "
"MicroPython device path settings.\n" % port)
elif e.errno == errno.EBUSY:
# Device is busy. Explain what to do.
sys.stderr.write(
"Found the device, but it is busy. "
"Wait up to 20 seconds, or "
"press the reset button on the "
"back of the device next to the yellow light; "
"then try again.\n"
)
elif e.errno == errno.EACCES:
sys.stderr.write("Found the device, but could not connect.".format(port))
sys.stderr.write(e)
sys.stderr.write('On linux, try adding yourself to the "dialout" group')
sys.stderr.write('sudo usermod -a -G dialout <your-username>')
else:
# Try to be as helpful as possible.
sys.stderr.write("Found the device, but could not connect via" +
" port %r: %s\n" % (port, e))
sys.stderr.write("I'm not sure what to suggest. :-(\n")
input("Press ENTER to continue")
sys.exit(1)
def open(self, usb_bus, dev_adr):
self.usb_dev = None
self.usb_out = None
self.usb_inp = None
for dev in self.context.getDeviceList(skip_on_error=True):
if ((dev.getBusNumber() == usb_bus and
dev.getDeviceAddress() == dev_adr)):
break
else:
log.error("no device {0} on bus {1}".format(dev_adr, usb_bus))
raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
try:
first_setting = dev.iterSettings().next()
except StopIteration:
log.error("no usb configuration settings, please replug device")
raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def transfer_type(x):
return x & libusb.TRANSFER_TYPE_MASK
def endpoint_dir(x):
return x & libusb.ENDPOINT_DIR_MASK
for endpoint in first_setting.iterEndpoints():
ep_addr = endpoint.getAddress()
ep_attr = endpoint.getAttributes()
if transfer_type(ep_attr) == libusb.TRANSFER_TYPE_BULK:
if endpoint_dir(ep_addr) == libusb.ENDPOINT_IN:
if not self.usb_inp:
self.usb_inp = endpoint
if endpoint_dir(ep_addr) == libusb.ENDPOINT_OUT:
if not self.usb_out:
self.usb_out = endpoint
if not (self.usb_inp and self.usb_out):
log.error("no bulk endpoints for read and write")
raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
try:
# workaround the PN533's buggy USB implementation
self._manufacturer_name = dev.getManufacturer()
self._product_name = dev.getProduct()
except libusb.USBErrorIO:
self._manufacturer_name = None
self._product_name = None
try:
self.usb_dev = dev.open()
self.usb_dev.claimInterface(0)
except libusb.USBErrorAccess:
raise IOError(errno.EACCES, os.strerror(errno.EACCES))
except libusb.USBErrorBusy:
raise IOError(errno.EBUSY, os.strerror(errno.EBUSY))
except libusb.USBErrorNoDevice:
raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except IOError as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except (TypeError, AttributeError):
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize/8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time (%s) > 10%% off of expected time (%s)" %
(elapsed_time, expected_time))
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except IOError, msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except TypeError:
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize//8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time > 10% off of expected time")
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except IOError, msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except TypeError:
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize//8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time > 10% off of expected time")
def rmtree(path, selinux=False, exclude=()):
"""Version of shutil.rmtree that ignores no-such-file-or-directory errors,
tries harder if it finds immutable files and supports excluding paths"""
if os.path.islink(path):
raise OSError("Cannot call rmtree on a symbolic link")
try_again = True
retries = 0
failed_to_handle = False
failed_filename = None
if path in exclude:
return
while try_again:
try_again = False
try:
names = os.listdir(path)
for name in names:
fullname = os.path.join(path, name)
if fullname not in exclude:
try:
mode = os.lstat(fullname).st_mode
except OSError:
mode = 0
if stat.S_ISDIR(mode):
try:
rmtree(fullname, selinux=selinux, exclude=exclude)
except OSError as e:
if e.errno in (errno.EPERM, errno.EACCES, errno.EBUSY):
# we alrady tried handling this on lower level and failed,
# there's no point in trying again now
failed_to_handle = True
raise
else:
os.remove(fullname)
os.rmdir(path)
except OSError as e:
if failed_to_handle:
raise
if e.errno == errno.ENOENT: # no such file or directory
pass
elif exclude and e.errno == errno.ENOTEMPTY: # there's something excluded left
pass
elif selinux and (e.errno == errno.EPERM or e.errno == errno.EACCES):
try_again = True
if failed_filename == e.filename:
raise
failed_filename = e.filename
os.system("chattr -R -i %s" % path)
elif e.errno == errno.EBUSY:
retries += 1
if retries > 1:
raise
try_again = True
getLog().debug("retrying failed tree remove after sleeping a bit")
time.sleep(2)
else:
raise
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except IOError as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except (TypeError, AttributeError):
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize/8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time (%s) > 10%% off of expected time (%s)" %
(elapsed_time, expected_time))
def set_memory_hardlimit(cgrp, limit):
"""Set the cgroup hard-limits to the desired value.
The logic here is complicated since the ordering of operations depends on
weither we are lowering or raising the value.
"""
def _lower_memory_hardlimit(cgrp, limit):
"""Lower the cgroup memory hardlimit."""
cgroups.set_value('memory', cgrp,
'memory.limit_in_bytes', limit)
cgroups.set_value('memory', cgrp,
'memory.memsw.limit_in_bytes', limit)
def _raise_memory_hardlimit(cgrp, limit):
"""Raise the cgroup memory hardlimit."""
cgroups.set_value('memory', cgrp,
'memory.memsw.limit_in_bytes', limit)
cgroups.set_value('memory', cgrp,
'memory.limit_in_bytes', limit)
memory_hardlimit_funs = [_lower_memory_hardlimit, _raise_memory_hardlimit]
while memory_hardlimit_funs:
try:
memory_hardlimit_fun = memory_hardlimit_funs.pop(0)
memory_hardlimit_fun(cgrp, limit)
break
except IOError as err:
if err.errno == errno.EBUSY:
# Resource busy, this means the cgroup is already using more
# then the limit we are trying to set.
raise TreadmillCgroupError('Unable to set hard limit to %d. '
'Cgroup %r memory over limit.' %
(limit, cgrp))
elif err.errno == errno.EINVAL:
# This means we did the hard limit operation in the wrong
# order. If we have a different ordering to try, go ahead.
if memory_hardlimit_funs:
continue
# For any other case, raise the exception
raise
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except IOError, msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except TypeError:
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize//8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time > 10% off of expected time")
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except OSError as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except (TypeError, AttributeError):
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize/8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time (%s) > 10%% off of expected time (%s)" %
(elapsed_time, expected_time))
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except IOError, msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except TypeError:
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize//8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time > 10% off of expected time")
def play_sound_file(self, data, rate, ssize, nchannels):
try:
dsp = ossaudiodev.open('w')
except OSError as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise unittest.SkipTest(msg)
raise
# at least check that these methods can be invoked
dsp.bufsize()
dsp.obufcount()
dsp.obuffree()
dsp.getptr()
dsp.fileno()
# Make sure the read-only attributes work.
self.assertFalse(dsp.closed)
self.assertEqual(dsp.name, "/dev/dsp")
self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)
# And make sure they're really read-only.
for attr in ('closed', 'name', 'mode'):
try:
setattr(dsp, attr, 42)
except (TypeError, AttributeError):
pass
else:
self.fail("dsp.%s not read-only" % attr)
# Compute expected running time of sound sample (in seconds).
expected_time = float(len(data)) / (ssize/8) / nchannels / rate
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
t1 = time.time()
dsp.write(data)
dsp.close()
t2 = time.time()
elapsed_time = t2 - t1
percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
self.assertTrue(percent_diff <= 10.0,
"elapsed time (%s) > 10%% off of expected time (%s)" %
(elapsed_time, expected_time))
def remove(self, pkgplan):
path = self.get_installed_path(pkgplan.image.get_root())
try:
os.rmdir(path)
except OSError as e:
if e.errno == errno.ENOENT:
pass
elif e.errno in (errno.EEXIST, errno.ENOTEMPTY):
# Cannot remove directory since it's
# not empty.
pkgplan.salvage(path)
elif e.errno == errno.ENOTDIR:
# Either the user or another package has changed
# this directory into a link or file. Salvage
# what's there and drive on.
pkgplan.salvage(path)
elif e.errno == errno.EBUSY and os.path.ismount(path):
# User has replaced directory with mountpoint,
# or a package has been poorly implemented.
if not self.attrs.get("implicit"):
err_txt = _("Unable to remove {0}; it is "
"in use as a mountpoint. To "
"continue, please unmount the "
"filesystem at the target "
"location and try again.").format(
path)
raise apx.ActionExecutionError(self,
details=err_txt, error=e,
fmri=pkgplan.origin_fmri)
elif e.errno == errno.EBUSY:
# os.path.ismount() is broken for lofs
# filesystems, so give a more generic
# error.
if not self.attrs.get("implicit"):
err_txt = _("Unable to remove {0}; it "
"is in use by the system, another "
"process, or as a "
"mountpoint.").format(path)
raise apx.ActionExecutionError(self,
details=err_txt, error=e,
fmri=pkgplan.origin_fmri)
elif e.errno != errno.EACCES: # this happens on Windows
raise