python类unlink()的实例源码

state.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def finalize(self):
        assert (self.__asynchronous == 0) and not self.__dirty
        if self.__buildIdCache is not None:
            self.__buildIdCache.close()
            self.__buildIdCache = None
        if self.__lock:
            try:
                os.unlink(self.__lock)
            except FileNotFoundError:
                from .tty import colorize
                from sys import stderr
                print(colorize("Warning: lock file was deleted while Bob was still running!", "33"),
                    file=stderr)
            except OSError as e:
                from .tty import colorize
                from sys import stderr
                print(colorize("Warning: cannot unlock workspace: "+str(e), "33"),
                    file=stderr)
anita.py 文件源码 项目:anita 作者: gson1703 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download_file(file, url, optional = False):
    try:
        print "Downloading", url + "...",
        sys.stdout.flush()
        my_urlretrieve(url, file)
        print "OK"
        sys.stdout.flush()
    except IOError, e:
        if optional:
            print "missing but optional, so that's OK"
        else:
            print e
        sys.stdout.flush()
        if os.path.exists(file):
            os.unlink(file)
        raise

# Create a file of the given size, containing NULs, without holes.
anita.py 文件源码 项目:anita 作者: gson1703 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def install(self):
        # This is needed for Xen and noemu, where we get the kernel
        # from the dist rather than the installed image
        self.dist.set_workdir(self.workdir)
        if self.vmm == 'noemu':
            self.dist.download()
            self._install()
        else:
            # Already installed?
            if os.path.exists(self.wd0_path()):
                return
            try:
                self._install()
            except:
                if os.path.exists(self.wd0_path()):
                    os.unlink(self.wd0_path())
                raise

    # Boot the virtual machine (installing it first if it's not
    # installed already).  The vmm_args argument applies when
    # booting, but not when installing.  Does not wait for
    # a login prompt.
test_svg.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_plotscene():
    tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
    print("using %s as a temporary file" % tempfilename)
    pg.setConfigOption('foreground', (0,0,0))
    w = pg.GraphicsWindow()
    w.show()        
    p1 = w.addPlot()
    p2 = w.addPlot()
    p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
    p1.setXRange(0,5)
    p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
    app.processEvents()
    app.processEvents()

    ex = pg.exporters.SVGExporter(w.scene())
    ex.export(fileName=tempfilename)
    # clean up after the test is done
    os.unlink(tempfilename)
test_svg.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_plotscene():
    tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
    print("using %s as a temporary file" % tempfilename)
    pg.setConfigOption('foreground', (0,0,0))
    w = pg.GraphicsWindow()
    w.show()        
    p1 = w.addPlot()
    p2 = w.addPlot()
    p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
    p1.setXRange(0,5)
    p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
    app.processEvents()
    app.processEvents()

    ex = pg.exporters.SVGExporter(w.scene())
    ex.export(fileName=tempfilename)
    # clean up after the test is done
    os.unlink(tempfilename)
background.py 文件源码 项目:Gank-Alfred-Workflow 作者: hujiaweibujidao 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def is_running(name):
    """
    Test whether task is running under ``name``

    :param name: name of task
    :type name: ``unicode``
    :returns: ``True`` if task with name ``name`` is running, else ``False``
    :rtype: ``Boolean``

    """
    pidfile = _pid_file(name)
    if not os.path.exists(pidfile):
        return False

    with open(pidfile, 'rb') as file_obj:
        pid = int(file_obj.read().strip())

    if _process_exists(pid):
        return True

    elif os.path.exists(pidfile):
        os.unlink(pidfile)

    return False
workflow.py 文件源码 项目:Gank-Alfred-Workflow 作者: hujiaweibujidao 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _delete_directory_contents(self, dirpath, filter_func):
        """Delete all files in a directory

        :param dirpath: path to directory to clear
        :type dirpath: ``unicode`` or ``str``
        :param filter_func function to determine whether a file shall be
            deleted or not.
        :type filter_func ``callable``
        """

        if os.path.exists(dirpath):
            for filename in os.listdir(dirpath):
                if not filter_func(filename):
                    continue
                path = os.path.join(dirpath, filename)
                if os.path.isdir(path):
                    shutil.rmtree(path)
                else:
                    os.unlink(path)
                self.logger.debug('Deleted : %r', path)
background.py 文件源码 项目:Gank-Alfred-Workflow 作者: hujiaweibujidao 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_running(name):
    """
    Test whether task is running under ``name``

    :param name: name of task
    :type name: ``unicode``
    :returns: ``True`` if task with name ``name`` is running, else ``False``
    :rtype: ``Boolean``

    """
    pidfile = _pid_file(name)
    if not os.path.exists(pidfile):
        return False

    with open(pidfile, 'rb') as file_obj:
        pid = int(file_obj.read().strip())

    if _process_exists(pid):
        return True

    elif os.path.exists(pidfile):
        os.unlink(pidfile)

    return False
host.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def service_resume(service_name, init_dir="/etc/init",
                   initd_dir="/etc/init.d"):
    """Resume a system service.

    Reenable starting again at boot. Start the service"""
    upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
    sysv_file = os.path.join(initd_dir, service_name)
    if init_is_systemd():
        service('enable', service_name)
    elif os.path.exists(upstart_file):
        override_path = os.path.join(
            init_dir, '{}.override'.format(service_name))
        if os.path.exists(override_path):
            os.unlink(override_path)
    elif os.path.exists(sysv_file):
        subprocess.check_call(["update-rc.d", service_name, "enable"])
    else:
        raise ValueError(
            "Unable to detect {0} as SystemD, Upstart {1} or"
            " SysV {2}".format(
                service_name, upstart_file, sysv_file))

    started = service_running(service_name)
    if not started:
        started = service_start(service_name)
    return started
archiveurl.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
test_rstviewer.py 文件源码 项目:rstviewer 作者: arne-cl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_cli_rs3tohtml():
    """conversion to HTML on the commandline"""
    temp_html = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
    temp_html.close()

    cli([RS3_FILEPATH, temp_html.name])
    with open(temp_html.name, 'r') as html_file:
        assert EXPECTED_HTML in html_file.read()
        os.unlink(temp_html.name)
main.py 文件源码 项目:rstviewer 作者: arne-cl 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def rs3topng(rs3_filepath, png_filepath=None):
    """Convert a RS3 file into a PNG image of the RST tree.

    If no output filename is given, the PNG image is returned
    as a string (which is useful for embedding).
    """
    try:
        from selenium import webdriver
        from selenium.common.exceptions import WebDriverException
    except ImportError:
        raise ImportError(
            'Please install selenium: pip install selenium')

    html_str = rs3tohtml(rs3_filepath)

    temp = tempfile.NamedTemporaryFile(suffix='.html', delete=False)
    temp.write(html_str.encode('utf8'))
    temp.close()

    try:
        driver = webdriver.PhantomJS()
    except WebDriverException as err:
        raise WebDriverException(
           'Please install phantomjs: http://phantomjs.org/\n' + err.msg)

    driver.get(temp.name)
    os.unlink(temp.name)

    png_str = driver.get_screenshot_as_png()
    if png_filepath:
        with open(png_filepath, 'w') as png_file:
            png_file.write(png_str)
    else:
        return png_str
workflow.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def release(self):
        """Release the lock by deleting `self.lockfile`."""
        self._locked = False
        try:
            os.unlink(self.lockfile)
        except (OSError, IOError) as err:  # pragma: no cover
            if err.errno != 2:
                raise err
workflow.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def cache_data(self, name, data):
        """Save ``data`` to cache under ``name``.

        If ``data`` is ``None``, the corresponding cache file will be
        deleted.

        :param name: name of datastore
        :param data: data to store. This may be any object supported by
                the cache serializer

        """
        serializer = manager.serializer(self.cache_serializer)

        cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))

        if data is None:
            if os.path.exists(cache_path):
                os.unlink(cache_path)
                self.logger.debug('Deleted cache file : %s', cache_path)
            return

        with atomic_writer(cache_path, 'wb') as file_obj:
            serializer.dump(data, file_obj)

        self.logger.debug('Cached data saved at : %s', cache_path)
test_k8s_client.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_bearer_token(self, m_cfg, m_get):
        token_content = (
            "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3Nl"
            "cnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc"
            "3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bn"
            "Qvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLWh4M3QxIiwia3ViZXJuZXRlcy5"
            "pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQi"
            "LCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51a"
            "WQiOiIxYTkyM2ZmNi00MDkyLTExZTctOTMwYi1mYTE2M2VkY2ViMDUiLCJzdWIiOi"
            "JzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.lzcPef"
            "DQ-uzF5cD-5pLwTKpRvtvvxKB4LX8TLymrPLMTth8WGr1vT6jteJPmLiDZM2C5dZI"
            "iFJpOw4LL1XLullik-ls-CmnTWq97NvlW1cZolC0mNyRz6JcL7gkH8WfUSjLA7x80"
            "ORalanUxtl9-ghMGKCtKIACAgvr5gGT4iznGYQQRx_hKURs4O6Js5vhwNM6UuOKeW"
            "GDDAlhgHMG0u59z3bhiBLl6jbQktZsu8c3diXniQb3sYqYQcGKUm1IQFujyA_ByDb"
            "5GUtCv1BOPL_-IjYtvdJD8ZzQ_UnPFoYQklpDyJLB7_7qCGcfVEQbnSCh907NdKo4"
            "w_8Wkn2y-Tg")
        token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False)
        try:
            m_cfg.kubernetes.token_file = token_file.name
            token_file.write(token_content)
            token_file.close()
            m_cfg.kubernetes.ssl_verify_server_crt = False

            path = '/test'
            client = k8s_client.K8sClient(self.base_url)
            client.get(path)
            headers = {
                'Authorization': 'Bearer {}'.format(token_content)}
            m_get.assert_called_once_with(
                self.base_url + path, cert=(None, None), headers=headers,
                verify=False)
        finally:
            os.unlink(m_cfg.kubernetes.token_file)
executors.py 文件源码 项目:spoonybard 作者: notnownikki 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_exit_code(self):
        self.process.wait()
        os.unlink(self.tmp_script_filename)
        return self.process.returncode
symlinklockfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def release(self):
        if not self.is_locked():
            raise NotLocked("%s is not locked" % self.path)
        elif not self.i_am_locking():
            raise NotMyLock("%s is locked, but not by me" % self.path)
        os.unlink(self.lock_file)
mkdirlockfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def release(self):
        if not self.is_locked():
            raise NotLocked("%s is not locked" % self.path)
        elif not os.path.exists(self.unique_name):
            raise NotMyLock("%s is locked, but not by me" % self.path)
        os.unlink(self.unique_name)
        os.rmdir(self.lock_file)
mkdirlockfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def break_lock(self):
        if os.path.exists(self.lock_file):
            for name in os.listdir(self.lock_file):
                os.unlink(os.path.join(self.lock_file, name))
            os.rmdir(self.lock_file)
linklockfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def acquire(self, timeout=None):
        try:
            open(self.unique_name, "wb").close()
        except IOError:
            raise LockFailed("failed to create %s" % self.unique_name)

        timeout = timeout if timeout is not None else self.timeout
        end_time = time.time()
        if timeout is not None and timeout > 0:
            end_time += timeout

        while True:
            # Try and create a hard link to it.
            try:
                os.link(self.unique_name, self.lock_file)
            except OSError:
                # Link creation failed.  Maybe we've double-locked?
                nlinks = os.stat(self.unique_name).st_nlink
                if nlinks == 2:
                    # The original link plus the one I created == 2.  We're
                    # good to go.
                    return
                else:
                    # Otherwise the lock creation failed.
                    if timeout is not None and time.time() > end_time:
                        os.unlink(self.unique_name)
                        if timeout > 0:
                            raise LockTimeout("Timeout waiting to acquire"
                                              " lock for %s" %
                                              self.path)
                        else:
                            raise AlreadyLocked("%s is already locked" %
                                                self.path)
                    time.sleep(timeout is not None and timeout / 10 or 0.1)
            else:
                # Link creation succeeded.  We're good to go.
                return


问题


面经


文章

微信
公众号

扫码关注公众号