python类open()的实例源码

base_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def dump_to_csv(self, output_csv, input_fields, write_header=True, top_level=False, mode='a', encoding='utf-8', compression=None):
        if compression == 'bz2':
            mode = binary_mode(mode)
            filehandle = bz2.open(output_csv, mode)
        elif compression == 'gzip':
            mode = binary_mode(mode)
            filehandle = gzip.open(output_csv, mode)
        else:
            filehandle = open(output_csv, mode)

        writer = csv.writer(filehandle)
        if write_header:
            writer.writerow(input_fields)
        tweet_parser = TweetParser()

        for tweet in self.get_iterator():
            if top_level:
                ret = list(zip(input_fields, [tweet.get(field) for field in input_fields]))
            else:
                ret = tweet_parser.parse_columns_from_tweet(tweet,input_fields)
            ret_values = [col_val[1] for col_val in ret]
            writer.writerow(ret_values)
        filehandle.close()
csv_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_iterator(self):
        tweet_parser = TweetParser()
        if self.compression == 'bz2':
            self.mode = binary_mode(self.mode)
            csv_handle = bz2.open(self.filepath, self.mode, encoding=self.encoding)
        elif self.compression == 'gzip':
            self.mode = binary_mode(self.mode)
            csv_handle = gzip.open(self.filepath, self.mode, encoding=self.encoding)
        else:       
            csv_handle = open(self.filepath, self.mode, encoding=self.encoding)
        for count, tweet in enumerate(csv.DictReader(csv_handle)):
            if self.limit < count+1 and self.limit != 0:
                csv_handle.close()
                return
            elif tweet_parser.tweet_passes_filter(self.filter, tweet) \
            and tweet_parser.tweet_passes_custom_filter_list(self.custom_filters, tweet):
                if self.should_strip:
                    yield tweet_parser.strip_tweet(self.keep_fields, tweet) 
                else: 
                    yield dict(tweet)
        csv_handle.close()
files.py 文件源码 项目:udapi-python 作者: udapi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _token_to_filenames(token):
        if token[0] == '!':
            pattern = token[1:]
            filenames = glob.glob(pattern)
            if not filenames:
                raise RuntimeError('No filenames matched "%s" pattern' % pattern)
        elif token[0] == '@':
            filelist_name = sys.stdin if token == '@-' else token[1:]
            with open(filelist_name) as filelist:
                filenames = [line.rstrip('\n') for line in filelist]
            directory = os.path.dirname(token[1:])
            if directory != '.':
                filenames = [f if f[0] != '/' else directory + '/' + f for f in filenames]
        else:
            filenames = token
        return filenames
files.py 文件源码 项目:udapi-python 作者: udapi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def next_filehandle(self):
        """Go to the next file and retrun its filehandle or None (meaning no more files)."""
        filename = self.next_filename()
        if filename is None:
            fhandle = None
        elif filename == '-':
            fhandle = sys.stdin
        else:
            filename_extension = filename.split('.')[-1]
            if filename_extension == 'gz':
                myopen = gzip.open
            elif filename_extension == 'xz':
                myopen = lzma.open
            elif filename_extension == 'bz2':
                myopen = bz2.open
            else:
                myopen = open
            fhandle = myopen(filename, 'rt', encoding=self.encoding)
        self.filehandle = fhandle
        return fhandle
di.py 文件源码 项目:HOPSTACK 作者: willpatterson 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_data(self, save_directory):
        """
        Retrieves data from remote location
        saves data in: save_directory
        TODO:
            figure out how to handle local file paths
            consider directory downloads from html pages with hyperlinks
            ** Impliment custom URL schemes -- Now needs to be done in lasubway.py
            How does raw data fit into this function?
        """

        url = urlunparse(self)
        file_name = os.path.basename(os.path.normpath(self.path))
        save_path = os.path.join(save_directory, file_name)
        with closing(urlopen(url)) as request:
            with open(save_path, 'wb') as sfile:
                shutil.copyfileobj(request, sfile)
curse.py 文件源码 项目:mccurse 作者: khardix 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def find(cls: Type['Game'], name: str, *, gamedb: Path = SUPPORTED_GAMES) -> 'Game':
        """Find and create instance of a supported game.

        Keyword arguments:
            name: Name of the game to instantiate.
            gamedb: Path to the YAML dictionary of supported games.

        Returns:
            Instance of the supported game.

        Raises:
            UnsupportedGameError: When the name is not found among supported games.
        """

        with gamedb.open(encoding='utf-8') as gamestream:
            games = yaml.load(gamestream)

        defaults = games.get(name.lower(), None)
        if defaults is None:
            msg = _("Game not supported: '{name}'").format_map(locals())
            raise UnsupportedGameError(msg)

        return cls(name=name.capitalize(), **defaults)
test_utils.py 文件源码 项目:xphyle 作者: jdidion 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_decompress_file(self):
        path = self.root.make_file()
        gzfile = path + '.gz'
        with gzip.open(gzfile, 'wt') as o:
            o.write('foo')

        path2 = decompress_file(gzfile, keep=True)
        self.assertEqual(path, path2)
        self.assertTrue(os.path.exists(gzfile))
        self.assertTrue(os.path.exists(path))
        with open(path, 'rt') as i:
            self.assertEqual(i.read(), 'foo')

        with open(gzfile, 'rb') as i:
            path2 = decompress_file(i, keep=True)
            self.assertEqual(path, path2)
            self.assertTrue(os.path.exists(gzfile))
            self.assertTrue(os.path.exists(path))
            with open(path, 'rt') as i:
                self.assertEqual(i.read(), 'foo')
test_utils.py 文件源码 项目:xphyle 作者: jdidion 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_pending(self):
        file1 = self.root.make_file(suffix='.gz')
        with gzip.open(file1, 'wt') as o:
            o.write('foo\nbar\n')
        f = FileInput(char_mode=TextMode)
        self.assertTrue(f._pending)
        f.add(file1)
        list(f)
        self.assertTrue(f.finished)
        self.assertFalse(f._pending)
        file2 = self.root.make_file(suffix='.gz')
        with gzip.open(file2, 'wt') as o:
            o.write('baz\n')
        f.add(file2)
        self.assertTrue(f._pending)
        self.assertFalse(f.finished)
        self.assertEqual('baz\n', f.readline())
        self.assertEqual('', f.readline())
        with self.assertRaises(StopIteration):
            next(f)
        self.assertTrue(f.finished)
        self.assertFalse(f._pending)
test_utils.py 文件源码 项目:xphyle 作者: jdidion 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_rolling_fileoutput_write(self):
        path = self.root.make_file()
        with textoutput(
                path + '{index}.txt', file_output_type=RollingFileOutput,
                lines_per_file=3) as out:
            for i in range(6):
                out.write(i, False)
            for ch in ('a', 'b', 'c'):
                out.write(ch, False)
            out.write("d\ne\nf")
        with open(path + '0.txt', 'rt') as infile:
            self.assertEqual('0\n1\n2\n', infile.read())
        with open(path + '1.txt', 'rt') as infile:
            self.assertEqual('3\n4\n5\n', infile.read())
        with open(path + '2.txt', 'rt') as infile:
            self.assertEqual('a\nb\nc\n', infile.read())
        with open(path + '3.txt', 'rt') as infile:
            self.assertEqual('d\ne\nf\n', infile.read())
RemoteCpp.py 文件源码 项目:RemoteCpp 作者: ruibm 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def clear_local_caches():
  files = []
  roots = set()
  for window in sublime.windows():
    # All views in a window share the same settings.
    view = window.views()[0]
    cwd = s_cwd(view)
    local_root = File.local_root_for_cwd(cwd)
    roots.add(local_root)
  for root in roots:
      log('Deleting local cache directory [{0}]...'.format(root))
      if os.path.exists(root):
        shutil.rmtree(root)
  for file in files:
    log("Refreshing open file [{0}]...".format(file.remote_path()))
    download_file(file)
RemoteCpp.py 文件源码 项目:RemoteCpp 作者: ruibm 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def on_text_command(self, view, command_name, args):
    # log('cmd={cmd} args={args}'.format(cmd=command_name, args=args))
    if RemoteCppListFilesCommand.owns_view(view) and \
        command_name == 'insert' and args['characters'] == '\n':
      all_lines = get_multiple_sel_lines(view)
      paths = []
      for line in all_lines:
        if self._is_valid_path(line):
          paths.append(line)
      def run_in_background():
        for path in paths:
          file = File(cwd=s_cwd(), path=path)
          Commands.open_file(view, file.to_args())
      if len(paths) > 10:
        msg = ('This will open {0} files which could be slow. \n'
               'Are you sure you want to do that?').format(len(paths),)
        button_text = 'Open {0} Files'.format(len(paths))
        if not sublime.ok_cancel_dialog(msg, button_text):
          return None
      THREAD_POOL.run(run_in_background)
    return None
json_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_iterator(self):
        tweet_parser = TweetParser()
        if self.compression == 'bz2':
            self.mode = binary_mode(self.mode)
            json_handle = bz2.open(self.filepath, self.mode, encoding=self.encoding)
        elif self.compression == 'gzip':
            self.mode = binary_mode(self.mode)
            json_handle = gzip.open(self.filepath, self.mode, encoding=self.encoding)
        else:    
            json_handle = open(self.filepath, self.mode, encoding=self.encoding)
        bad_lines = 0
        for count, tweet in enumerate(json_handle):
            if not self.throw_error:
                try:
                    tweet = json_util.loads(tweet)
                except:
                    bad_lines += 1
            else:
                tweet = json_util.loads(tweet)
            if self.limit != 0 and self.limit <= count:
                return
            elif tweet_parser.tweet_passes_filter(self.filter, tweet) \
            and tweet_parser.tweet_passes_custom_filter_list(self.custom_filters, tweet):
                if self.should_strip:
                    yield tweet_parser.strip_tweet(self.keep_fields, tweet)
                else:
                    yield tweet
        if self.verbose:
            print("{} rows are ok.".format(count - bad_lines))
            print("{} rows are corrupt.".format(bad_lines))
        json_handle.close()
base_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dump_to_bson(self, output_bson):
        filehandle = open(output_bson, 'ab+')

        for tweet in self.get_iterator():
            filehandle.write(BSON.encode(tweet))
        filehandle.close()
base_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dump_to_json(self, output_json, compression=None, mode='a'):
        if compression == 'bz2':
            mode = binary_mode(mode)
            filehandle = bz2.open(output_json, mode)
        elif compression == 'gzip':
            mode = binary_mode(mode)
            filehandle = gzip.open(output_json, mode)
        else:
            filehandle = open(output_json, mode)

        for tweet in self.get_iterator():
            filehandle.write(json_util.dumps(tweet)+'\n')
        filehandle.close()
__init__.py 文件源码 项目:chess_book_learning 作者: Aloril 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_evaluations(filename):
    d = {}
    if filename.endswith(".bz2"):
        fp = bz2.open(filename, "rt")
    else:
        fp = open(filename)
    for line in fp:
        key = fen2key(line)
        l = line.strip().split()
        fen = " ".join(l[:6])
        score_type = l[6]
        score = l[7]
        pv = " ".join(l[8:])
        d[key] = (fen, score_type, score, pv)
    return d
__init__.py 文件源码 项目:chess_book_learning 作者: Aloril 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def init_log(logname):
    global log_fp
    if log_fp==None and logname:
        log_fp = open(logname, "a")
doodle.py 文件源码 项目:neural-doodle 作者: alexjc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load_data(self):
        """Open the serialized parameters from a pre-trained network, and load them into the model created.
        """
        vgg19_file = os.path.join(os.path.dirname(__file__), 'vgg19_conv.pkl.bz2')
        if not os.path.exists(vgg19_file):
            error("Model file with pre-trained convolution layers not found. Download here...",
                  "https://github.com/alexjc/neural-doodle/releases/download/v0.0/vgg19_conv.pkl.bz2")

        data = pickle.load(bz2.open(vgg19_file, 'rb'))
        params = lasagne.layers.get_all_param_values(self.network['main'])
        lasagne.layers.set_all_param_values(self.network['main'], data[:len(params)])
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_raw(filename):
    with open(filename) as infile:
        raw = infile.read()
        # the next line needs rewriting as soon as the zenodo-dump conforms to 'records'-format
        # [{k:v}, {k:v},...]
        rawfacts = pd.read_json('[%s]' % ','.join(raw.splitlines()), orient='records')
    return rawfacts


### functions for ingesting from CProject



### functions for preprocessing
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_preprocessed_df(cacheddatapath=None, rawdatapath=None):
    try:
        with gzip.open(os.path.join(cacheddatapath, "preprocessed_df.pklz"), "rb") as infile:
            df = pickle.load(infile)
    except:
        df = preprocess(rawdatapath)
        if rawdatapath is None:
            pass
            # needs an io error for missing rawdatapath
        with gzip.open(os.path.join(cacheddatapath, "preprocessed_df.pklz"), "wb") as outfile:
            pickle.dump(df, outfile, protocol=4)
    return df
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_wikidata_dict(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "wikidata_dict.pklz"), "rb") as infile:
            wikidataIDs = pickle.load(infile)
    except:
        wikidataIDs = make_wikidata_dict(cacheddatapath, rawdatapath)
        with gzip.open(os.path.join(cacheddatapath, "wikidata_dict.pklz"), "wb") as outfile:
            pickle.dump(wikidataIDs, outfile, protocol=4)
    return wikidataIDs

## functions to extract features
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_series(cacheddatapath, rawdatapath, column):
    try:
        with gzip.open(os.path.join(cacheddatapath, column+"_series.pklz"), "rb") as infile:
            series = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        series = make_series(df, column)
        with gzip.open(os.path.join(cacheddatapath, column+"_series.pklz"), "wb") as outfile:
            pickle.dump(series, outfile, protocol=4)
    return series
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_coocc_features(cacheddatapath, rawdatapath):
    try:
        with bz2.open(os.path.join(cacheddatapath, "coocc_features.pklz2"), "r") as infile:
            coocc_features = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        coocc_features = count_cooccurrences(df)
        with bz2.BZ2File(os.path.join(cacheddatapath, "coocc_features.pklz2"), "w") as outfile:
            pickle.dump(coocc_features, outfile, protocol=4)
    return coocc_features
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_timeseries_features(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "timeseries_features.pklz"), "rb") as infile:
            ts_features = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        ts_features = make_timeseries(df)
        with gzip.open(os.path.join(cacheddatapath, "timeseries_features.pklz"), "wb") as outfile:
            pickle.dump(ts_features, outfile, protocol=4)
    return ts_features
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_journal_features(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "journal_features.pklz"), "rb") as infile:
            journ_raw = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        journ_raw = make_journal_features(df)
        with gzip.open(os.path.join(cacheddatapath, "journal_features.pklz"), "wb") as outfile:
            pickle.dump(journ_raw, outfile, protocol=4)
    return journ_raw
preprocessing.py 文件源码 项目:visualizations 作者: ContentMine 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_distribution_features(cacheddatapath, rawdatapath):
    try:
        with gzip.open(os.path.join(cacheddatapath, "dist_features.pklz"), "rb") as infile:
            dist_features = pickle.load(infile)
    except:
        df = get_preprocessed_df(cacheddatapath, rawdatapath)
        dist_features = make_distribution_features(df)
        with gzip.open(os.path.join(cacheddatapath, "dist_features.pklz"), "wb") as outfile:
            pickle.dump(dist_features, outfile, protocol=4)
    return dist_features
enhance.py 文件源码 项目:supic 作者: Hirico 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load_perceptual(self):
        """Open the serialized parameters from a pre-trained network, and load them into the model created.
        """
        vgg19_file = os.path.join(os.path.dirname(__file__), 'vgg19_conv.pkl.bz2')
        if not os.path.exists(vgg19_file):
            error("Model file with pre-trained convolution layers not found. Download here...",
                  "https://github.com/alexjc/neural-doodle/releases/download/v0.0/vgg19_conv.pkl.bz2")

        data = pickle.load(bz2.open(vgg19_file, 'rb'))
        layers = lasagne.layers.get_all_layers(self.last_layer(), treat_as_input=[self.network['percept']])
        for p, d in zip(itertools.chain(*[l.get_params() for l in layers]), data): p.set_value(d)
enhance.py 文件源码 项目:supic 作者: Hirico 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save_generator(self):
        def cast(p): return p.get_value().astype(np.float16)
        params = {k: [cast(p) for p in l.get_params()] for (k, l) in self.list_generator_layers()}
        config = {k: getattr(args, k) for k in ['generator_blocks', 'generator_residual', 'generator_filters'] + \
                                               ['generator_upscale', 'generator_downscale']}

        pickle.dump((config, params), bz2.open(self.get_filename(absolute=True), 'wb'))
        print('  - Saved model as `{}` after training.'.format(self.get_filename()))
enhance.py 文件源码 项目:supic 作者: Hirico 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load_model(self):
        if not os.path.exists(self.get_filename(absolute=True)):
            if args.train: return {}, {}
            error("Model file with pre-trained convolution layers not found. Download it here...",
                  "https://github.com/alexjc/neural-enhance/releases/download/v%s/%s"%(__version__, self.get_filename()))
        print('  - Loaded file `{}` with trained model.'.format(self.get_filename()))
        return pickle.load(bz2.open(self.get_filename(absolute=True), 'rb'))
example.py 文件源码 项目:python-cookbook-3rd 作者: tuanavu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def gen_opener(filenames):
    '''
    Open a sequence of filenames one at a time producing a file object.
    The file is closed immediately when proceeding to the next iteration. 
    '''
    for filename in filenames:
        if filename.endswith('.gz'):
            f = gzip.open(filename, 'rt')
        elif filename.endswith('.bz2'):
            f = bz2.open(filename, 'rt')
        else:
            f = open(filename, 'rt')
        yield f
        f.close()
di.py 文件源码 项目:HOPSTACK 作者: willpatterson 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, path):
        self.path = path
        self.accessor = self.open()


问题


面经


文章

微信
公众号

扫码关注公众号