def __init__(self, name, end=0, mode=0666):
"""Open a pair of pipes, name.in and name.out for communication
with another process. One process should pass 1 for end, and the
other 0. Data is marshalled with pickle."""
self.in_name, self.out_name = name +'.in', name +'.out',
try: os.mkfifo(self.in_name,mode)
except OSError: pass
try: os.mkfifo(self.out_name,mode)
except OSError: pass
# NOTE: The order the ends are opened in is important - both ends
# of pipe 1 must be opened before the second pipe can be opened.
if end:
self.inp = open(self.out_name,'r')
self.out = open(self.in_name,'w')
else:
self.out = open(self.out_name,'w')
self.inp = open(self.in_name,'r')
self._open = True
python类mkfifo()的实例源码
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def __init__(self, scene, game_server_guid, player_n):
"""
It doesn't know env_id yet, it creates pipes and waits for client to send his env_id.
"""
self.scene = scene
assert isinstance(game_server_guid, str)
assert isinstance(player_n, int)
self.player_n = player_n
self.prefix = MULTIPLAYER_FILES_DIR + "/multiplayer_%s_player%02i" % (game_server_guid, player_n)
self.sh_pipe_actready_filename = self.prefix + "_actready"
self.sh_pipe_obsready_filename = self.prefix + "_obsready"
try: os.unlink(self.sh_pipe_actready_filename)
except: pass
os.mkfifo(self.sh_pipe_actready_filename)
try: os.unlink(self.sh_pipe_obsready_filename)
except: pass
os.mkfifo(self.sh_pipe_obsready_filename)
print("Waiting %s" % self.prefix)
self.need_reset = True
self.need_response_tuple = False
def open_fifo(self):
try:
os.mkfifo(self.copytool.event_fifo)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
pids = lsof(file=self.copytool.event_fifo)
readers = set()
writers = set()
for pid, files in pids.items():
for file, info in files.items():
if 'r' in info['mode']:
readers.add(pid)
if 'w' in info['mode']:
writers.add(pid)
if readers:
raise FifoReaderConflict(readers)
self.reader_fd = os.open(self.copytool.event_fifo,
os.O_RDONLY | os.O_NONBLOCK)
copytool_log.info("Opened %s for reading" % self.copytool.event_fifo)
def setup_reply_channel(self_or_session):
r = Remote._resolve(self_or_session)
r_pre = r.pre
r.pre = lambda f: r_pre(f) + '''
__reply_fifo_dir=$(mktemp -d)
__reply_fifo="${__reply_fifo_dir}/fifo"
mkfifo ${__reply_fifo}
'''
r.post = '''
\ncat ${__reply_fifo}
rm ${__reply_fifo}
rmdir ${__reply_fifo_dir}
''' + r.post
r.arg_config['reply_fifo'] = ('__reply_fifo', Args.string)
r.required_names.add('reply_fifo')
return r
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def __init__(self, fifo_dir_path=None, is_master=True):
self.is_master = is_master
self._fifo_dir_path = fifo_dir_path
if fifo_dir_path is None:
self._fifo_dir_path = tempfile.mkdtemp()
atexit.register(shutil.rmtree, self._fifo_dir_path)
if is_master:
self._fifo_read_path = os.path.join(self._fifo_dir_path, 'cpy2py_s2c.ipc')
self._fifo_write_path = os.path.join(self._fifo_dir_path, 'cpy2py_c2s.ipc')
os.mkfifo(self._fifo_read_path)
os.mkfifo(self._fifo_write_path)
else:
self._fifo_read_path = os.path.join(self._fifo_dir_path, 'cpy2py_c2s.ipc')
self._fifo_write_path = os.path.join(self._fifo_dir_path, 'cpy2py_s2c.ipc')
self._fifo_read = None
self._fifo_write = None
def write(self, msg):
# create the pipe if necessary
try:
os.mkfifo(self.filename)
except OSError as oe:
if oe.errno != errno.EEXIST:
raise
# open it up for reading if necessary
if not self.FIFO:
self.FIFO = open(self.filename, 'w')
print(msg, file=self.FIFO)
###############################################
# Functions #
###############################################
def _create_pipes(self):
# Creates named pipe for inter-process communication
self.pipe_name = seeding.hash_seed(None) % 2 ** 32
self.launch_vars['pipe_name'] = self.pipe_name
if not self.disable_out_pipe:
self.path_pipe_out = '%s-out.%d' % (self.path_pipe_prefix, self.pipe_name)
os.mkfifo(self.path_pipe_out)
# Launching a thread that will listen to incoming pipe
# Thread exits if self.is_exiting = 1 or pipe_in is closed
if not self.disable_in_pipe:
thread_incoming = Thread(target=self._listen_to_incoming_pipe, kwargs={'pipe_name': self.pipe_name})
thread_incoming.start()
# Cannot open output pipe now, otherwise it will block until
# a reader tries to open the file in read mode - Must launch fceux first
def Start():
if(config.DEBUG or not config.LIVE_PREVIEW_ENABLE):
return
# Start recording live preview
pygameEngine.DrawCenterMessage(config.LIVE_PREVIEW_MSG, True)
print "Start recording live preview"
if os.path.exists(config.LIVE_MOVIE_FILE):
os.remove(config.LIVE_MOVIE_FILE)
os.mkfifo(config.LIVE_MOVIE_FILE)
# To avoid problem, wait
while not os.path.exists(config.LIVE_MOVIE_FILE):
time.sleep(.1)
camera.RecordPreview(config.LIVE_MOVIE_FILE, config.PREVIEW_DURATION)
# Playing live preview
pygameEngine.DrawCenterMessage("") #Clean screen before preview
print "Playing live preview"
os.popen(config.CMD_LIVE_PREVIEW_PLAY.format(filename = config.LIVE_MOVIE_FILE))
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def open(self, flags):
os.mkfifo(self._pipe)
self._thread = threading.Thread(target=self._client.get, daemon=True, args=(self.url.path, self._pipe))
self._thread.start()
self._fd = os.open(self._pipe, os.O_RDONLY)
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def setUp(self):
super(TestStream, self).setUp()
if os.path.exists(_pipepath):
os.unlink(_pipepath)
os.mkfifo(_pipepath)
def __init__(self, path):
self.path = path
self._fd = None
os.mkfifo(self.path)
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")
def makefifo(self, tarinfo, targetpath):
"""Make a fifo called targetpath.
"""
if hasattr(os, "mkfifo"):
os.mkfifo(targetpath)
else:
raise ExtractError("fifo not supported by system")