filesystem_ipc.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:py-aspio 作者: hexhex 项目源码 文件源码
def __init__(self) -> None:
        '''Create a temporary named pipe.

        The path to the named pipe can be retrieved from the `name` attribute on the returned object.

        To ensure proper removal of the pipe after use,
        make sure to call `cleanup()` on the returned object, or use it as a context manager.

        Raises `OSError` if the named pipe could not be created.
        Raises `NotImplementedError` if `mkfifo` from the builtin `os` module is not available.
        '''
        if hasattr(os, 'mkfifo'):
            self.tmpdir = tempfile.TemporaryDirectory(prefix='pyaspio_')  # creates a temporary directory, avoiding any race conditions
            self.name = os.path.join(self.tmpdir.name, 'pipe')
            os.mkfifo(self.name)  # may raise OSError

            self._cleanup_warning = weakref.finalize(self, _warn_cleanup, type(self).__name__, self.name)  # type: ignore
            self._cleanup_warning.atexit = True
        else:
            # Notes about a possible Windows implementation:
            #   Windows provides named pipes, see the CreateNamedPipe function in the Windows API: https://msdn.microsoft.com/en-us/library/aa365150(v=vs.85).aspx
            #   We can use the `ctypes` module to call this function, e.g.:
            #
            #       import ctypes
            #       kernel32 = ctypes.windll.kernel32
            #       pipe_handle = kernel32.CreateNamedPipeW(name, ...)
            #
            #   Related functions: ConnectNamedPipe, WriteFile, DisconnectNamedPipe, CloseHandle.
            #
            #   Overall the Windows pipe system seems more complicated,
            #   and I was not yet able to make it work without also changing the client code (which expects to read from a file).
            #
            #   For now, fall back to a temporary file on Windows.
            raise NotImplementedError()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号