python类open()的实例源码

test.py 文件源码 项目:pycreate2 作者: MomsFriendlyRobotCompany 项目源码 文件源码 阅读 60 收藏 0 点赞 0 评论 0
def write():
    os.remove(filename)
    cap = cv2.VideoCapture(0)
    db = shelve.open(filename)
    imgs = []
    data = range(100)

    for i in range(100):
        ret, frame = cap.read()

        if ret:
            # jpg = frame  # 29 MB
            # jpg = cv2.imencode('.jpg', frame)  # make much smaller (1.9MB), otherwise 29MB
            jpg = cv2.imencode('.jpg', frame)[1].tostring()  # no bennefit with doing string (1.9MB)
            imgs.append(jpg)
            print('frame[{}] {}'.format(i, frame.shape))

        time.sleep(0.03)

    db['imgs'] = imgs
    db['data'] = data
    cap.release()
    db.close()
cache.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def open(self):
        if self.opened:
            return

        self.lock = open(SETTINGS.CACHE_PATH + '.lock', 'ab')
        try:
            fcntl.flock(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
            mode = 'c'
        except IOError:
            logger.warn("Cache locked, using read-only")
            mode = 'r'
            self.lock.close()
            self.lock = None

        try:
            self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
        except Exception as e:
            if mode != 'c':
                raise
            logger.warn("Dropping corrupted cache on %s", e)
            self.lock.truncate(0)
            self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
        self.opened = True
sys_shelve_importer.py 文件源码 项目:pymotw3 作者: reingart 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, path_entry):
        # Loading shelve causes an import recursive loop when it
        # imports dbm, and we know we are not going to load the
        # module # being imported, so when we seem to be
        # recursing just ignore the request so another finder
        # will be used.
        if ShelveFinder._maybe_recursing:
            raise ImportError
        try:
            # Test the path_entry to see if it is a valid shelf
            try:
                ShelveFinder._maybe_recursing = True
                with shelve.open(path_entry, 'r'):
                    pass
            finally:
                ShelveFinder._maybe_recursing = False
        except Exception as e:
            print('shelf could not import from {}: {}'.format(
                path_entry, e))
            raise
        else:
            print('shelf added to import path:', path_entry)
            self.path_entry = path_entry
        return
cli.py 文件源码 项目:proxyguy 作者: yudori 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def new_profile(profile_name, address, port, username, password, activate):
    """
    create a network proxy configuration profile
    """
    store = shelve.open(db_file)
    try:
        store[str(profile_name)] = {
            'address': address,
            'port': port,
            'username': username,
            'password': password
        }
        click.echo("Profile '{}' successfully created".format(profile_name))
    finally:
        store.close()

    if activate:
        activate_profile(profile_name)
cli.py 文件源码 项目:proxyguy 作者: yudori 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def activate_profile(profile_name):
    store = shelve.open(db_file)
    try:
        if profile_name == "":
            util.write(None)
            try:
                del store['active']
            finally:
                click.echo("No proxy mode activated")
        else:
            if profile_name is not "active":
                util.write(store[str(profile_name)])
                store['active'] = str(profile_name)
                click.echo("Profile '{}' successfully activated".
                           format(profile_name))
    except KeyError:
        click.echo("No such profile '{}'".format(str(profile_name)))
    finally:
        store.close()
cli.py 文件源码 项目:proxyguy 作者: yudori 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def delete_profile(profile_name):
    """
    delete specified profile
    """
    store = shelve.open(db_file)
    try:
        if profile_name is not "active":
            del store[str(profile_name)]
            try:
                if str(store["active"]) == profile_name:
                    del store["active"]
            except KeyError:
                pass
        click.echo("Profile '{}' successfully deleted".format(str(profile_name)))
    except KeyError:
        click.echo("No such profile '{}'".format(str(profile_name)))
    finally:
        store.close()
docs_resolv.py 文件源码 项目:sequana 作者: sequana 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()

    return data
docs_resolv.py 文件源码 项目:sequana 作者: sequana 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_data(url, gallery_dir):
    """Persistent dictionary usage to retrieve the search indexes"""

    # shelve keys need to be str in python 2
    if sys.version_info[0] == 2 and isinstance(url, unicode):
        url = url.encode('utf-8')

    cached_file = os.path.join(gallery_dir, 'searchindex')
    search_index = shelve.open(cached_file)
    if url in search_index:
        data = search_index[url]
    else:
        data = _get_data(url)
        search_index[url] = data
    search_index.close()

    return data
main.py 文件源码 项目:roguelike-tutorial 作者: Wolfenswan 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def save_game():
    """ open a new empty shelve (possibly overwriting an old one) to write the game data """
    with shelve.open('savegames/savegame', 'n') as savefile:
        gv.cursor.deactivate()
        savefile['map'] = gv.game_map
        savefile['objects'] = gv.game_objects
        savefile['log'] = gv.game_log
        savefile['gamestate'] = gv.gamestate
        savefile['dlevel'] = gv.dungeon_level

        # Store the index of special objects, so they can be later restored from the gv.game_objects array
        savefile['p_index'] = gv.game_objects.index(gv.player)
        savefile['c_index'] = gv.game_objects.index(gv.cursor)
        savefile['sd_index'] = gv.game_objects.index(gv.stairs_down)
        savefile['su_index'] = gv.game_objects.index(gv.stairs_up)

        savefile.close()
main.py 文件源码 项目:roguelike-tutorial 作者: Wolfenswan 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def load_game():
    """ load an existing savegame """
    with shelve.open('savegames/savegame', 'r') as savefile:
        gv.game_map = savefile['map']
        gv.game_objects = savefile['objects']
        gv.game_log = savefile['log']
        gv.gamestate = savefile['gamestate']
        gv.dungeon_level = savefile['dlevel']

        # Restore special objects
        gv.player = gv.game_objects[savefile['p_index']]
        gv.cursor = gv.game_objects[savefile['c_index']]
        gv.stairs_down = gv.game_objects[savefile['sd_index']]
        gv.stairs_up = gv.game_objects[savefile['su_index']]

        msgbox('Welcome back stranger to level {0} of {1}!'.format(gv.dungeon_level, settings.DUNGEONNAME),width=35, text_color=colors.red)
feature.py 文件源码 项目:Treehacks 作者: andrewsy97 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def extract_all(data):
    resp = dict(data)
    # text = get_sanitized(data)
    text = data['text']
    # request feature extraction.
    text_hash = hashlib.sha256(text.encode('ascii', 'ignore')).hexdigest()
    print 'text_hash', text_hash
    cache_db = shelve.open(path.join(CACHE_DIR, 'feature'))
    if not cache_db.has_key(text_hash):
        print 'new call'
        call = alchemyapi.combined('text', text)
        cache_db[text_hash] = call
    else:
        print 'cached call'
        call = cache_db[text_hash]
    cache_db.close()
    # filter results.
    whitelist = ['concepts', 'entities', 'keywords', 'taxonomy']
    for key in whitelist:
        if key not in call:
            resp[key] = []
            continue
        resp[key] = call[key]
    return resp
__main__.py 文件源码 项目:em 作者: nhynes 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def proj_create(args, config, _extra_args):
    """Creates a new em-managed project."""
    tmpl_repo = config['project']['template_repo']
    try:
        pygit2.clone_repository(tmpl_repo, args.dest)

        # delete history of template
        shutil.rmtree(osp.join(args.dest, '.git'), ignore_errors=True)
        pygit2.init_repository(args.dest)
    except ValueError:
        pass  # already in a repo

    for em_dir in ['experiments', 'data']:
        dpath = osp.join(args.dest, em_dir)
        if not osp.isdir(dpath):
            os.mkdir(dpath)

    with shelve.open(osp.join(args.dest, '.em')) as emdb:
        emdb['__em__'] = {}
__main__.py 文件源码 项目:em 作者: nhynes 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def resume(args, config, prog_args):
    """Resume a stopped experiment."""
    name = args.name

    repo = pygit2.Repository('.')

    with shelve.open('.em') as emdb:
        if name not in emdb:
            return _die(E_NO_EXP.format(name))
        info = emdb[name]
        if 'pid' in info or info.get('status') == 'running':
            return _die(E_IS_RUNNING.format(name))
        try:
            repo.lookup_branch(name)
        except pygit2.GitError:
            return _die(E_NO_EXP.format(name))

    prog_args.append('--resume')
    if args.epoch:
        prog_args.append(args.epoch)

    return _run_job(name, config, args.gpu, prog_args, args.background)
__main__.py 文件源码 项目:em 作者: nhynes 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def list_experiments(args, _config, _extra_args):
    """List experiments."""
    import subprocess

    if args.filter:
        filter_key, filter_value = args.filter.split('=')

        def _filt(stats):
            return filter_key in stats and stats[filter_key] == filter_value

    with shelve.open('.em') as emdb:
        if args.filter:
            names = {name
                     for name, info in sorted(emdb.items()) if _filt(info)}
        else:
            names = emdb.keys()
        names -= {EM_KEY}
        if not names:
            return

    subprocess.run(
        ['column'], input='\n'.join(sorted(names)) + '\n', encoding='utf8')
__main__.py 文件源码 项目:em 作者: nhynes 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def show(args, _config, _extra_args):
    """Show details about an experiment."""
    import pickle
    import pprint

    name = args.name

    with shelve.open('.em') as emdb:
        if name not in emdb or name == EM_KEY:
            return _die(E_NO_EXP.format(name))
        for info_name, info_val in sorted(emdb[name].items()):
            if isinstance(info_val, datetime.date):
                info_val = info_val.ctime()
            print(f'{info_name}: {info_val}')

    if not args.opts:
        return

    opts_path = _expath(name, 'run', 'opts.pkl')
    with open(opts_path, 'rb') as f_opts:
        print('\noptions:')
        opts = pickle.load(f_opts)
        cols = shutil.get_terminal_size((80, 20)).columns
        pprint.pprint(vars(opts), indent=2, compact=True, width=cols)
retriever.py 文件源码 项目:PyperGrabber 作者: pykong 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_pdf(pdf_link):

    # check whether value already existing in permanent storage:
    pdf_name = pdf_link.rsplit('/', 1)[-1]  # set filename according to last element of link
    if not check_db(pdf_name) and not check_db(pdf_link):
        # print 'Downloading: {}'.format(pdf_link)
        try:
            opener = urllib2.build_opener()
            opener.addheaders = [('User-agent', USER_AGENT)]

            r = opener.open(pdf_link)

            path = tmp_dir + pdf_name

            with open(path, "wb") as code:  # 'w'
                code.write(r.read())

            # log successful download:
            log_download('DOWNLOADED: {}'.format(pdf_link))

        except Exception as e:
            log_download('FAILURE: {} | {}'.format(pdf_link, e))
    else:
        log_download('File already downloaded: {}'.format(pdf_name))
docs_resolv.py 文件源码 项目:bolero 作者: rock-learning 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_data(url):
    """Helper function to get data over http(s) or from a local file"""
    if urllib_parse.urlparse(url).scheme in ('http', 'https'):
        resp = urllib_request.urlopen(url)
        encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            data = data.decode('utf-8')
        elif encoding == 'gzip':
            data = BytesIO(data)
            data = gzip.GzipFile(fileobj=data).read().decode('utf-8')
        else:
            raise RuntimeError('unknown encoding')
    else:
        with codecs.open(url, mode='r', encoding='utf-8') as fid:
            data = fid.read()

    return data
docs_resolv.py 文件源码 项目:bolero 作者: rock-learning 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_data(url, gallery_dir):
    """Persistent dictionary usage to retrieve the search indexes"""

    # shelve keys need to be str in python 2
    if sys.version_info[0] == 2 and isinstance(url, unicode):
        url = url.encode('utf-8')

    cached_file = os.path.join(gallery_dir, 'searchindex')
    search_index = shelve.open(cached_file)
    if url in search_index:
        data = search_index[url]
    else:
        data = _get_data(url)
        search_index[url] = data
    search_index.close()

    return data
Assistant.py 文件源码 项目:Personal_AI_Assistant 作者: PratylenClub 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fill_tf_idf_shelve(self):
        tf_idf_shelve = shelve.open(self.tf_idf_shelve_file_name, writeback=True)
        if TF not in tf_idf_shelve:
            tf_idf_shelve[TF] = {}
        if DF not in tf_idf_shelve:
            tf_idf_shelve[DF] = {}
        if D not in tf_idf_shelve:
            tf_idf_shelve[D] = 0
        if TF_IDF not in tf_idf_shelve:
            tf_idf_shelve[TF_IDF] = {}
        if CENTROID not in tf_idf_shelve:
            tf_idf_shelve[CENTROID] = {}

        for action,trigger_txt in self.trigger_dict.iteritems():
            if action not in tf_idf_shelve[TF].keys():
                trigger = self.tokenize_text(trigger_txt)
                tf_idf_shelve[TF][action] = Counter(trigger)
                for word in unique(trigger):
                    if word not in tf_idf_shelve[DF].keys():
                        tf_idf_shelve[DF][word] = 0
                    tf_idf_shelve[DF][word] += 1
        tf_idf_shelve[D] = len(tf_idf_shelve[TF])
        tf_idf_shelve.close()
        self.compute_tf_idf()
        self.compute_centroids()
Assistant.py 文件源码 项目:Personal_AI_Assistant 作者: PratylenClub 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def add_list_of_words_in_w2v_model(self, unknown_words):
        huge_w2v_model_file = open(self.w2v_huge_model_path, "r")
        current_w2v_model_file = open(self.w2v_model_path, "a")
        line = huge_w2v_model_file.readline()
        unknown_words_left = len(unknown_words)
        while line and unknown_words_left:
            word = line.split()[0]
            if word in unknown_words:
                current_w2v_model_file.write(line)
                unknown_words = unknown_words - set([word])
                unknown_words_left -= 1
            line = huge_w2v_model_file.readline()
        for word in list(unknown_words):
            random_position = random(self.w2v_model.vector_size)*2-1
            current_w2v_model_file.write(" ".join(([word]+[str(x) for x in random_position])))
            print "warning random positions introduced for new words ... in the future this should be solved"
        current_w2v_model_file.close()
        huge_w2v_model_file.close()
test_eol.py 文件源码 项目:Personal_AI_Assistant 作者: PratylenClub 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_list_of_words_in_w2v_model(self, unknown_words):
        huge_w2v_model_file = open(self.w2v_huge_model_path, "r")
        current_w2v_model_file = open(self.w2v_model_path, "a")
        line = huge_w2v_model_file.readline()
        unknown_words_left = len(unknown_words)
        while line and unknown_words_left:
            word = line.split()[0]
            if word in unknown_words:
                current_w2v_model_file.write(line)
                unknown_words = unknown_words - set([word])
                unknown_words_left -= 1
            line = huge_w2v_model_file.readline()
        for word in list(unknown_words):
            random_position = random(self.w2v_model.vector_size)*2-1
            current_w2v_model_file.write(" ".join(([word]+[str(x) for x in random_position])))
            print "warning random positions introduced for new words ... in the future this should be solved"
        current_w2v_model_file.close()
        huge_w2v_model_file.close()
import_video.py 文件源码 项目:video-importer 作者: tnc-ca-geo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def list_files(self, path):
        storage = self.args.storage
        shelve_name = os.path.join(os.path.dirname(__file__), storage)
        db = shelve.open(shelve_name)
        status = []
        for key, value in db.items():
            value['hash'] = key
            status.append(value)
            status.sort(lambda a,b: cmp(a['filename'],b['filename']))
        stream = StringIO.StringIO()
        writer = csv.writer(stream)
        writer.writerow(('FILENAME','GIVEN_NAME','CREATED_ON','DISCOVERED_ON','UPLOADED_ON'))
        for params in status:
            writer.writerow((params['filename'], params['given_name'],params['camera'],
                             params['timestamp'], params['discovered_on'], params['uploaded_on']))
        return stream.getvalue()
docs_resolv.py 文件源码 项目:multi-diffusion 作者: chemical-diffusion 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()

    return data
docs_resolv.py 文件源码 项目:multi-diffusion 作者: chemical-diffusion 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_data(url, gallery_dir):
    """Persistent dictionary usage to retrieve the search indexes"""

    # shelve keys need to be str in python 2
    if sys.version_info[0] == 2 and isinstance(url, unicode):
        url = url.encode('utf-8')

    cached_file = os.path.join(gallery_dir, 'searchindex')
    search_index = shelve.open(cached_file)
    if url in search_index:
        data = search_index[url]
    else:
        data = _get_data(url)
        search_index[url] = data
    search_index.close()

    return data
test.py 文件源码 项目:pycreate2 作者: MomsFriendlyRobotCompany 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read():
    db = shelve.open(filename)
    imgs = db['imgs']
    data = db['data']

    for i in range(len(imgs)):
        d = data[i]
        print(i, d)
        img = imgs[i]
        img = np.fromstring(img, np.uint8)
        frame = cv2.imdecode(img, 1)
        print('frame[{}] {}'.format(i, frame.shape))
        cv2.imshow('camera', frame)
        cv2.waitKey(300)

    print('bye ...')
    cv2.destroyAllWindows()
    db.close()
__init__.py 文件源码 项目:mycroft-skill-jupiter-broadcasting 作者: the7erm 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_latest_episode(self, url, media=False):
        storage_path = join(self.file_system.path, 'feedcache')
        LOGGER.debug("storage_path:%s" % storage_path)
        storage = shelve.open(storage_path)
        ttl = 60 * 60
        link = ""
        try:
            fc = cache.Cache(storage, timeToLiveSeconds=ttl)
            parsed_data = fc.fetch(url)
            print "parsed_data.feed.title:", parsed_data.feed.title
            for entry in parsed_data.entries:
                pprint(entry)
                if media:
                    media_content = entry.media_content
                    if media_content:
                        link = entry.media_content[0]['url']
                else:
                    link = entry.link
                if link:
                    break
        finally:
            storage.close()
        return link
framework.py 文件源码 项目:yt 作者: yt-project 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def dump(self, result_storage):
        # The 'tainted' attribute is automatically set to 'True'
        # if the dataset required for an answer test is missing
        # (see can_run_ds() and can_run_sim()).
        # This logic check prevents creating a shelve with empty answers.
        storage_is_tainted = result_storage.get('tainted', False)
        if self.answer_name is None or storage_is_tainted:
            return
        # Store data using shelve
        ds = shelve.open(self.answer_name, protocol=-1)
        for ds_name in result_storage:
            answer_name = "%s" % ds_name
            if answer_name in ds:
                mylog.info("Overwriting %s", answer_name)
            ds[answer_name] = result_storage[ds_name]
        ds.close()
chat80.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 81 收藏 0 点赞 0 评论 0
def val_dump(rels, db):
    """
    Make a ``Valuation`` from a list of relation metadata bundles and dump to
    persistent database.

    :param rels: bundle of metadata needed for constructing a concept
    :type rels: list of dict
    :param db: name of file to which data is written.
               The suffix '.db' will be automatically appended.
    :type db: str
    """
    concepts = process_bundle(rels).values()
    valuation = make_valuation(concepts, read=True)
    db_out = shelve.open(db, 'n')

    db_out.update(valuation)

    db_out.close()
chat80.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def label_indivs(valuation, lexicon=False):
    """
    Assign individual constants to the individuals in the domain of a ``Valuation``.

    Given a valuation with an entry of the form ``{'rel': {'a': True}}``,
    add a new entry ``{'a': 'a'}``.

    :type valuation: Valuation
    :rtype: Valuation
    """
    # collect all the individuals into a domain
    domain = valuation.domain
    # convert the domain into a sorted list of alphabetic terms
    # use the same string as a label
    pairs = [(e, e) for e in domain]
    if lexicon:
        lex = make_lex(domain)
        with open("chat_pnames.cfg", 'w') as outfile:
            outfile.writelines(lex)
    # read the pairs into the valuation
    valuation.update(pairs)
    return valuation
ext_01.py 文件源码 项目:_ 作者: zengchunyun 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def auth(self):
        connect_database = shelve.open("../database/database")
        database = connect_database.get("data")
        verify = UserVerify(database)
        conn = self.request
        str()
        print(bytes(u"??????"))
        conn.send(bytes("??????:"))
        username = conn.recv(1024)
        conn.send(bytes("?????:"))
        password = conn.recv(1024)
        login = None
        if username and password:
            try:
                login = verify.login(user=username, password=password)
            except SystemExit as e:
                print(e)
                conn.close()
        if not login:
            return False
        else:
            return True


问题


面经


文章

微信
公众号

扫码关注公众号