python类O_RDWR的实例源码

test_tempfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_usable_template(self):
        # gettempprefix returns a usable prefix string

        # Create a temp directory, avoiding use of the prefix.
        # Then attempt to create a file whose name is
        # prefix + 'xxxxxx.xxx' in that directory.
        p = tempfile.gettempprefix() + "xxxxxx.xxx"
        d = tempfile.mkdtemp(prefix="")
        try:
            p = os.path.join(d, p)
            try:
                fd = os.open(p, os.O_RDWR | os.O_CREAT)
            except:
                self.failOnException("os.open")
            os.close(fd)
            os.unlink(p)
        finally:
            os.rmdir(d)
test_tempfile.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_usable_template(self):
        # gettempprefix returns a usable prefix string

        # Create a temp directory, avoiding use of the prefix.
        # Then attempt to create a file whose name is
        # prefix + 'xxxxxx.xxx' in that directory.
        p = tempfile.gettempprefix() + "xxxxxx.xxx"
        d = tempfile.mkdtemp(prefix="")
        try:
            p = os.path.join(d, p)
            try:
                fd = os.open(p, os.O_RDWR | os.O_CREAT)
            except:
                self.failOnException("os.open")
            os.close(fd)
            os.unlink(p)
        finally:
            os.rmdir(d)
recipe-578476.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def do_magic(self):
        if OS_WIN:
            try:
                if os.path.exists(LOCK_PATH):
                    os.unlink(LOCK_PATH)
                self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR)
            except EnvironmentError as err:
                if err.errno == 13:
                    self.is_running = True
                else:
                    raise
        else:
            try:
                self.fh = open(LOCK_PATH, 'w')
                fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except EnvironmentError as err:
                if self.fh is not None:
                    self.is_running = True
                else:
                    raise
master.py 文件源码 项目:kAFL 作者: RUB-SysSec 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def wipe(self):
        filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'])
        filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        for i in range(self.config.config_values['BITMAP_SHM_SIZE']):
            filter_bitmap[i] = '\x00'
        filter_bitmap.close()
        os.close(filter_bitmap_fd)

        filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(filter_bitmap_fd, 0x1000000)
        filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        for i in range(0x1000000):
            filter_bitmap[i] = '\x00'
        filter_bitmap.close()
        os.close(filter_bitmap_fd)
qemu.py 文件源码 项目:kAFL 作者: RUB-SysSec 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __set_binary(self, filename, binaryfile, max_size):
        shm_fd = os.open(filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(shm_fd, max_size)
        shm = mmap.mmap(shm_fd, max_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        shm.seek(0x0)
        shm.write('\x00' * max_size)
        shm.seek(0x0)

        f = open(binaryfile, "rb")
        bytes = f.read(1024)
        if bytes:
            shm.write(bytes)
        while bytes != "":
            bytes = f.read(1024)
            if bytes:
                shm.write(bytes)

        f.close()
        shm.close()
        os.close(shm_fd)
qemu.py 文件源码 项目:kAFL 作者: RUB-SysSec 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def init(self):
        self.control = socket.socket(socket.AF_UNIX)
        while True:
            try:
                self.control.connect(self.control_filename)
                #self.control.connect(self.control_filename)
                break
            except socket_error:
                pass
                #time.sleep(0.01)

        self.kafl_shm_f     = os.open(self.bitmap_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        self.fs_shm_f       = os.open(self.payload_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        #argv_fd             = os.open(self.argv_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(self.kafl_shm_f, self.bitmap_size)
        os.ftruncate(self.fs_shm_f, (128 << 10))
        #os.ftruncate(argv_fd, (4 << 10))

        self.kafl_shm       = mmap.mmap(self.kafl_shm_f, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        self.fs_shm         = mmap.mmap(self.fs_shm_f, (128 << 10),  mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)

        return True
Utils.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
    if sys.hexversion > 0x3000000 and not 'b' in m:
        data = data.encode(encoding)
        m += 'b'
    flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
    if 'b' in m:
        flags |= os.O_BINARY
    if '+' in m:
        flags |= os.O_RDWR
    try:
        fd = os.open(f, flags)
    except OSError:
        raise IOError('Cannot write to %r' % f)
    f = os.fdopen(fd, m)
    try:
        f.write(data)
    finally:
        f.close()
Utils.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
    if sys.hexversion > 0x3000000 and not 'b' in m:
        data = data.encode(encoding)
        m += 'b'
    flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
    if 'b' in m:
        flags |= os.O_BINARY
    if '+' in m:
        flags |= os.O_RDWR
    try:
        fd = os.open(f, flags)
    except OSError:
        raise IOError('Cannot write to %r' % f)
    f = os.fdopen(fd, m)
    try:
        f.write(data)
    finally:
        f.close()
_twistd_unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def daemonize():
    # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent
    os.setsid()
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent again.
    os.umask(077)
    null=os.open('/dev/null', os.O_RDWR)
    for i in range(3):
        try:
            os.dup2(null, i)
        except OSError, e:
            if e.errno != errno.EBADF:
                raise
    os.close(null)
maildir.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def createTempFile(self):
        attr = (os.O_RDWR | os.O_CREAT | os.O_EXCL
                | getattr(os, "O_NOINHERIT", 0)
                | getattr(os, "O_NOFOLLOW", 0))
        tries = 0
        self.fh = -1
        while True:
            self.tmpname = os.path.join(self.mbox.path, "tmp", _generateMaildirName())
            try:
                self.fh = self.osopen(self.tmpname, attr, 0600)
                return None
            except OSError:
                tries += 1
                if tries > 500:
                    self.defer.errback(RuntimeError("Could not create tmp file for %s" % self.mbox.path))
                    self.defer = None
                    return None
pty.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def slave_open(tty_name):
    """slave_open(tty_name) -> slave_fd
    Open the pty slave and acquire the controlling terminal, returning
    opened filedescriptor.
    Deprecated, use openpty() instead."""

    result = os.open(tty_name, os.O_RDWR)
    try:
        from fcntl import ioctl, I_PUSH
    except ImportError:
        return result
    try:
        ioctl(result, I_PUSH, "ptem")
        ioctl(result, I_PUSH, "ldterm")
    except IOError:
        pass
    return result
InputDevice.py 文件源码 项目:enigma2 作者: OpenLD 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getInputDevices(self):
        devices = os.listdir("/dev/input/")

        for evdev in devices:
            try:
                buffer = "\0"*512
                self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
                self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
                self.name = self.name[:self.name.find("\0")]
                if str(self.name).find("Keyboard") != -1:
                    self.name = 'keyboard'
                os.close(self.fd)
            except (IOError,OSError), err:
                print '[iInputDevices] getInputDevices  <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
                self.name = None

            if self.name:
                self.Devices[evdev] = {'name': self.name, 'type': self.getInputDeviceType(self.name),'enabled': False, 'configuredName': None }
                if boxtype.startswith('et'):
                    self.setDefaults(evdev) # load default remote control "delay" and "repeat" values for ETxxxx ("QuickFix Scrollspeed Menues" proposed by Xtrend Support)
filestore.py 文件源码 项目:nstock 作者: ybenitezf 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_file(self, name, excl=False, mode="wb", **kwargs):
        """Creates a file with the given name in this storage.

        :param name: the name for the new file.
        :param excl: if True, try to open the file in "exclusive" mode.
        :param mode: the mode flags with which to open the file. The default is
            ``"wb"``.
        :return: a :class:`whoosh.filedb.structfile.StructFile` instance.
        """

        if self.readonly:
            raise ReadOnlyError

        path = self._fpath(name)
        if excl:
            flags = os.O_CREAT | os.O_EXCL | os.O_RDWR
            if hasattr(os, "O_BINARY"):
                flags |= os.O_BINARY
            fd = os.open(path, flags)
            fileobj = os.fdopen(fd, mode)
        else:
            fileobj = open(path, mode)

        f = StructFile(fileobj, name=name, **kwargs)
        return f
generator.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def generate(self):
        return textwrap.dedent("""
        import pupy, os

        if os.name == 'posix':
            pupy.infos['daemonize']=True
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent
            os.setsid()
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent again.
            os.umask(022)   # Don't allow others to write
            null=os.open('/dev/null', os.O_RDWR)
            for i in range(3):
                try:
                    os.dup2(null, i)
                except OSError, e:
                    if e.errno != errno.EBADF:
                        raise
            os.close(null)
        """)
sftp_server.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _convert_pflags(self, pflags):
        "convert SFTP-style open() flags to python's os.open() flags"
        if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
            flags = os.O_RDWR
        elif pflags & SFTP_FLAG_WRITE:
            flags = os.O_WRONLY
        else:
            flags = os.O_RDONLY
        if pflags & SFTP_FLAG_APPEND:
            flags |= os.O_APPEND
        if pflags & SFTP_FLAG_CREATE:
            flags |= os.O_CREAT
        if pflags & SFTP_FLAG_TRUNC:
            flags |= os.O_TRUNC
        if pflags & SFTP_FLAG_EXCL:
            flags |= os.O_EXCL
        return flags
filelock.py 文件源码 项目:ZeroExploit 作者: 5alt 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def acquire(self):
        """ Acquire the lock, if possible. If the lock is in use, it check again
            every `wait` seconds. It does this until it either gets the lock or
            exceeds `timeout` number of seconds, in which case it throws
            an exception.
        """
        start_time = time.time()
        while True:
            try:
                self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)
                break
            except OSError as e:
                if e.errno != errno.EEXIST:
                    raise
                if (time.time() - start_time) >= self.timeout:
                    raise FileLockTimeoutException("%d seconds passed." % self.timeout)
                time.sleep(self.delay)
        self.is_locked = True
util.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def safe_open(path, mode="w", chmod=None, buffering=None):
    """Safely open a file.

    :param str path: Path to a file.
    :param str mode: Same os `mode` for `open`.
    :param int chmod: Same as `mode` for `os.open`, uses Python defaults
        if ``None``.
    :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
        defaults if ``None``.

    """
    # pylint: disable=star-args
    open_args = () if chmod is None else (chmod,)
    fdopen_args = () if buffering is None else (buffering,)
    return os.fdopen(
        os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
        mode, *fdopen_args)
master.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def tty_create_child(*args):
    master_fd, slave_fd = os.openpty()
    disable_echo(master_fd)
    disable_echo(slave_fd)

    pid = os.fork()
    if not pid:
        mitogen.core.set_block(slave_fd)
        os.dup2(slave_fd, 0)
        os.dup2(slave_fd, 1)
        os.dup2(slave_fd, 2)
        close_nonstandard_fds()
        os.setsid()
        os.close(os.open(os.ttyname(1), os.O_RDWR))
        os.execvp(args[0], args)

    os.close(slave_fd)
    LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
              pid, master_fd, os.getpid(), Argv(args))
    return pid, master_fd
daemon.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def redirect_stream(system_stream, target_stream):
    """ Redirect a system stream to a specified file.

        :param standard_stream: A file object representing a standard I/O
            stream.
        :param target_stream: The target file object for the redirected
            stream, or ``None`` to specify the null device.
        :return: ``None``.

        `system_stream` is a standard system stream such as
        ``sys.stdout``. `target_stream` is an open file object that
        should replace the corresponding system stream object.

        If `target_stream` is ``None``, defaults to opening the
        operating system's null device and using its file descriptor.

        """
    if target_stream is None:
        target_fd = os.open(os.devnull, os.O_RDWR)
    else:
        target_fd = target_stream.fileno()
    os.dup2(target_fd, system_stream.fileno())
generator.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def generate(self):
        return textwrap.dedent("""
        import pupy, os

        if os.name == 'posix':
            pupy.infos['daemonize']=True
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent
            os.setsid()
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent again.
            os.umask(022)   # Don't allow others to write
            null=os.open('/dev/null', os.O_RDWR)
            for i in range(3):
                try:
                    os.dup2(null, i)
                except OSError, e:
                    if e.errno != errno.EBADF:
                        raise
            os.close(null)
        """)
gadgetfs_phy.py 文件源码 项目:sahara_emulator 作者: bkerler 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def _open_endpoint_fd(self, ep):
        '''
        :param ep: USBEndpoint object

        :raises Exception: if no endpoint file found, or failed to open

        .. todo:: detect transfer-type specific endpoint files
        '''
        num = ep.number
        s_dir = 'out' if ep.direction == USBEndpoint.direction_out else 'in'
        filename = 'ep%d%s' % (num, s_dir)
        path = os.path.join(self.gadgetfs_dir, filename)
        fd = os.open(path, os.O_RDWR | os.O_NONBLOCK)
        self.debug('Opened endpoint %d' % (num))
        self.debug('ep: %d dir: %s file: %s fd: %d' % (num, s_dir, filename, fd))
        ep.fd = fd
filelock.py 文件源码 项目:PY-Snip 作者: MrKiven 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def with_multi_lock(tag, n, unlock_after_with=True):

    get_lock_file_path = lambda i: os.path.join(
        '/tmp/', tag + '.lock' + (str(i) if i else ''))

    for i in range(n):
        lock_file_path = get_lock_file_path(i)
        fd = os.open(lock_file_path, os.O_CREAT | os.O_RDWR, 0660)
        try:
            if trylock(fd):
                yield True
                break
        finally:
            if unlock_after_with:
                os.close(fd)
    else:
        yield False
InputDevice.py 文件源码 项目:enigma2 作者: Openeight 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getInputDevices(self):
        devices = os.listdir("/dev/input/")

        for evdev in devices:
            try:
                buffer = "\0"*512
                self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
                self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
                self.name = self.name[:self.name.find("\0")]
                os.close(self.fd)
            except (IOError,OSError), err:
                print '[iInputDevices] getInputDevices ' + evdev + ' <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
                self.name = None

            if self.name:
                self.Devices[evdev] = {'name': self.name, 'type': self.getInputDeviceType(self.name),'enabled': False, 'configuredName': None }
filestore.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def create_file(self, name, excl=False, mode="wb", **kwargs):
        """Creates a file with the given name in this storage.

        :param name: the name for the new file.
        :param excl: if True, try to open the file in "exclusive" mode.
        :param mode: the mode flags with which to open the file. The default is
            ``"wb"``.
        :return: a :class:`whoosh.filedb.structfile.StructFile` instance.
        """

        if self.readonly:
            raise ReadOnlyError

        path = self._fpath(name)
        if excl:
            flags = os.O_CREAT | os.O_EXCL | os.O_RDWR
            if hasattr(os, "O_BINARY"):
                flags |= os.O_BINARY
            fd = os.open(path, flags)
            fileobj = os.fdopen(fd, mode)
        else:
            fileobj = open(path, mode)

        f = StructFile(fileobj, name=name, **kwargs)
        return f
pty.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _open_terminal():
    """Open pty master and return (master_fd, tty_name).
    SGI and generic BSD version, for when openpty() fails."""
    try:
        import sgi
    except ImportError:
        pass
    else:
        try:
            tty_name, master_fd = sgi._getpty(os.O_RDWR, 0o666, 0)
        except IOError as msg:
            raise os.error(msg)
        return master_fd, tty_name
    for x in 'pqrstuvwxyzPQRST':
        for y in '0123456789abcdef':
            pty_name = '/dev/pty' + x + y
            try:
                fd = os.open(pty_name, os.O_RDWR)
            except os.error:
                continue
            return (fd, '/dev/tty' + x + y)
    raise os.error('out of pty devices')
watchdog.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def daemonize():
    # See http://www.steve.org.uk/Reference/Unix/faq_2.html#SEC16
    if os.fork():  # launch child and...
        os._exit(0)  # kill off parent
    os.setsid()
    if os.fork():  # launch child and...
        os._exit(0)  # kill off parent again.
    # some argue that this umask should be 0, but that's annoying.
    os.umask(0o077)
    null = os.open('/dev/null', os.O_RDWR)
    for i in range(3):
        try:
            os.dup2(null, i)
        except OSError as e:
            if e.errno != errno.EBADF:
                raise
    os.close(null)
util.py 文件源码 项目:certbot 作者: nikoloskii 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def safe_open(path, mode="w", chmod=None, buffering=None):
    """Safely open a file.

    :param str path: Path to a file.
    :param str mode: Same os `mode` for `open`.
    :param int chmod: Same as `mode` for `os.open`, uses Python defaults
        if ``None``.
    :param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
        defaults if ``None``.

    """
    # pylint: disable=star-args
    open_args = () if chmod is None else (chmod,)
    fdopen_args = () if buffering is None else (buffering,)
    return os.fdopen(
        os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
        mode, *fdopen_args)
client.py 文件源码 项目:bt 作者: kracekumar 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, torrent, savedir):
        self.torrent = torrent
        self.total_pieces = len(self.torrent.info.pieces)
        self.peers = {}
        # TODO: Come up with different data structure to store
        # states of different pieces and blocks. Probably dict or set?
        self.pending_blocks = []
        self.ongoing_pieces = []
        self.have_pieces = []
        self.missing_pieces = self.make_pieces()
        self.max_pending_time = 300 * 1000 # Seconds
        self.progress_bar = Bar('Downloading', max=self.total_pieces)
        if savedir == '.':
            name = self.torrent.name
        else:
            name = os.path.join(savedir, self.torrent.name)
        self.fd = os.open(name, os.O_RDWR | os.O_CREAT)
filestore.py 文件源码 项目:WhooshSearch 作者: rokartnaz 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def create_file(self, name, excl=False, mode="wb", **kwargs):
        """Creates a file with the given name in this storage.

        :param name: the name for the new file.
        :param excl: if True, try to open the file in "exclusive" mode.
        :param mode: the mode flags with which to open the file. The default is
            ``"wb"``.
        :return: a :class:`whoosh.filedb.structfile.StructFile` instance.
        """

        if self.readonly:
            raise ReadOnlyError

        path = self._fpath(name)
        if excl:
            flags = os.O_CREAT | os.O_EXCL | os.O_RDWR
            if hasattr(os, "O_BINARY"):
                flags |= os.O_BINARY
            fd = os.open(path, flags)
            fileobj = os.fdopen(fd, mode)
        else:
            fileobj = open(path, mode)

        f = StructFile(fileobj, name=name, **kwargs)
        return f
handler.py 文件源码 项目:topics 作者: jrconlin 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def post(self, *args, **kwargs):
        """Accept and log the push endpoint data

        """
        try:
            # make sure it's valid JSON, even if you don't do anything with it.
            body = json.loads(self.request.body)
            out = os.open(self._settings.storage,
                          os.O_RDWR|os.O_CREAT,
                          0644)
            self.log.info("Writing body: {}".format(body))
            os.write(out, self.request.body)
            os.close(out)
            self.write("Ok")
            self.log.info("Done!")
        except Exception as x:
            self.log.error(repr(x))
            self.set_status(400)
            self.write(repr(x) + "\n\n")
        self.finish()


问题


面经


文章

微信
公众号

扫码关注公众号