python类open()的实例源码

create_res_sim_mat.py 文件源码 项目:fem 作者: mlp6 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def read_header(dispout):
    """ Read header (first 3 words) from disp.dat

    :param dispout: disp.dat filename
    :returns: header (num_nodes, num_dims, num_timesteps)

    """
    import struct

    word_size = 4  # bytes
    if dispout.endswith('.xz'):
        import lzma
        d = lzma.open(dispout, 'rb')
    else:
        d = open(dispout, 'rb')
    num_nodes = struct.unpack('f', d.read(word_size))
    num_dims = struct.unpack('f', d.read(word_size))
    num_timesteps = struct.unpack('f', d.read(word_size))
    header = {'num_nodes': int(num_nodes[0]),
              'num_dims': int(num_dims[0]),
              'num_timesteps': int(num_timesteps[0])}
    return header
create_res_sim_mat.py 文件源码 项目:fem 作者: mlp6 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def extract_dt(dyn_file):
    """ extract time step (dt) from dyna input deck

    assumes that input deck is comma-delimited

    :param dyn_file: input.dyn filename
    :returns: dt from input.dyn binary data save parameter

    """
    found_database = False
    with open(dyn_file, 'r') as d:
        for dyn_line in d:
            if found_database:
                line_items = dyn_line.split(',')
                # make sure we're not dealing with a comment
                if '$' in line_items[0]:
                    continue
                else:
                    dt = float(line_items[0])
                    break
            elif '*DATABASE_NODOUT' in dyn_line:
                found_database = True

    return dt
companies.py 文件源码 项目:jarbas 作者: datasciencebr 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def save_companies(self):
        """
        Receives path to the dataset file and create a Company object for
        each row of each file. It creates the related activity when needed.
        """
        skip = ('main_activity', 'secondary_activity')
        keys = tuple(f.name for f in Company._meta.fields if f not in skip)
        with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
            for row in csv.DictReader(file_handler):
                main, secondary = self.save_activities(row)

                filtered = {k: v for k, v in row.items() if k in keys}
                obj = Company.objects.create(**self.serialize(filtered))
                for activity in main:
                    obj.main_activity.add(activity)
                for activity in secondary:
                    obj.secondary_activity.add(activity)
                obj.save()

                self.count += 1
                self.print_count(Company, count=self.count)
cdparser.py 文件源码 项目:C2L2 作者: CoNLL-UD-2017 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_embeddings(self, filename, xz=False):
        if not os.path.isfile(filename):
            print(filename, "does not exist")
            return self

        if xz:
            f = lzma.open(filename, "rt", encoding="utf-8", errors="ignore")
        else:
            f = open(filename, "r")
        found_set = set()
        for line in f:
            l = line.split()
            word = strong_normalize(l[0])
            vec = [float(x) for x in l[1:]]
            if word in self._vocab:
                found_set.add(word)
                self._word_lookup.init_row(self._vocab[word], vec)
        f.close()
        print("Loaded embeddings from", filename)
        print(len(found_set), "hits with vocab size of", len(self._vocab))

        return self
files.py 文件源码 项目:udapi-python 作者: udapi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _token_to_filenames(token):
        if token[0] == '!':
            pattern = token[1:]
            filenames = glob.glob(pattern)
            if not filenames:
                raise RuntimeError('No filenames matched "%s" pattern' % pattern)
        elif token[0] == '@':
            filelist_name = sys.stdin if token == '@-' else token[1:]
            with open(filelist_name) as filelist:
                filenames = [line.rstrip('\n') for line in filelist]
            directory = os.path.dirname(token[1:])
            if directory != '.':
                filenames = [f if f[0] != '/' else directory + '/' + f for f in filenames]
        else:
            filenames = token
        return filenames
files.py 文件源码 项目:udapi-python 作者: udapi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def next_filehandle(self):
        """Go to the next file and retrun its filehandle or None (meaning no more files)."""
        filename = self.next_filename()
        if filename is None:
            fhandle = None
        elif filename == '-':
            fhandle = sys.stdin
        else:
            filename_extension = filename.split('.')[-1]
            if filename_extension == 'gz':
                myopen = gzip.open
            elif filename_extension == 'xz':
                myopen = lzma.open
            elif filename_extension == 'bz2':
                myopen = bz2.open
            else:
                myopen = open
            fhandle = myopen(filename, 'rt', encoding=self.encoding)
        self.filehandle = fhandle
        return fhandle
vwoptimize.py 文件源码 项目:vwoptimize 作者: denik 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def open_regular_or_compressed(filename):
    if filename is None:
        return sys.stdin

    if hasattr(filename, 'read'):
        fobj = filename
    else:
        f = filename.lower()
        ext = f.rsplit('.', 1)[-1]
        if ext == 'gz':
            import gzip
            fobj = gzip.GzipFile(filename)
        elif ext == 'bz2':
            import bz2
            fobj = bz2.BZ2File(filename)
        elif ext == 'xz':
            import lzma
            fobj = lzma.open(filename)
        else:
            fobj = open(filename)
    return fobj
vwoptimize.py 文件源码 项目:vwoptimize 作者: denik 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _convert_any_to_vw(source, format, output, weights, preprocessor, columnspec, named_labels, remap_label, ignoreheader):
    if named_labels is not None:
        assert not isinstance(named_labels, basestring)
        named_labels = set(named_labels)

    rows_source = open_anything(source, format, ignoreheader=ignoreheader)
    output = open(output, 'wb')

    for row in rows_source:
        try:
            vw_line = convert_row_to_vw(row, columnspec, preprocessor=preprocessor, weights=weights, named_labels=named_labels, remap_label=remap_label)
        except Exception:
            log_always('Failed to parse: %r', row)
            raise
        output.write(vw_line)

    flush_and_close(output)
compression.py 文件源码 项目:atropos 作者: jdidion 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def open_compressed_file(filename, mode):
    """Open a compressed file, determining the compression type based on the
    file name.

    Args:
        filename: The file to open.
        mode: The file open mode.

    Returns:
        The opened file.
    """
    ext = os.path.splitext(filename)
    opener = get_file_opener(ext)
    if not opener:
        raise ValueError("{} is not a recognized compression format")
    return opener(filename, mode)
xopen.py 文件源码 项目:grocsvs 作者: grocsvs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, path, mode='w'):
        self.outfile = open(path, mode)
        self.devnull = open(os.devnull, 'w')
        self.closed = False

        # Setting close_fds to True in the Popen arguments is necessary due to
        # <http://bugs.python.org/issue12786>.
        kwargs = dict(stdin=PIPE, stdout=self.outfile, stderr=self.devnull, close_fds=True)
        try:
            self.process = Popen(['pigz'], **kwargs)
            self.program = 'pigz'
        except OSError as e:
            # binary not found, try regular gzip
            try:
                self.process = Popen(['gzip'], **kwargs)
                self.program = 'gzip'
            except (IOError, OSError) as e:
                self.outfile.close()
                self.devnull.close()
                raise
        except IOError as e:
            self.outfile.close()
            self.devnull.close()
            raise
ios.py 文件源码 项目:objection 作者: sensepost 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def download(self):
        """
            Downloads the latest iOS gadget.

            :return:
        """

        download_url = self._get_download_url()

        # stream the download using requests
        dylib = requests.get(download_url, stream=True)

        # save the requests stream to file
        with open(self.ios_dylib_gadget_archive_path, 'wb') as f:
            click.secho('Downloading iOS dylib to {0}...'.format(self.ios_dylib_gadget_archive_path),
                        fg='green', dim=True)

            shutil.copyfileobj(dylib.raw, f)

        return self
ios.py 文件源码 项目:objection 作者: sensepost 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set_application_binary(self, binary: str = None) -> None:
        """
            Sets the binary that will be patched.

            If a binary is not defined, the applications Info.plist is parsed
            and the CFBundleIdentifier key read.

            :param binary:
            :return:
        """

        if binary is not None:
            click.secho('Using user provided binary name of: {0}'.format(binary))
            self.app_binary = os.path.join(self.app_folder, binary)

            return

        with open(os.path.join(self.app_folder, 'Info.plist'), 'rb') as f:
            info_plist = plistlib.load(f)

        # print the bundle identifier
        click.secho('Bundle identifier is: {0}'.format(info_plist['CFBundleIdentifier']),
                    fg='green', bold=True)

        self.app_binary = os.path.join(self.app_folder, info_plist['CFBundleExecutable'])
android.py 文件源码 项目:objection 作者: sensepost 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def download(self):
        """
            Downloads the latest Android gadget for this
            architecture.

            :return:
        """

        download_url = self._get_download_url()

        # stream the download using requests
        library = requests.get(download_url, stream=True)
        library_destination = self.get_frida_library_path(packed=True)

        # save the requests stream to file
        with open(library_destination, 'wb') as f:
            click.secho('Downloading {0} library to {1}...'.format(self.architecture,
                                                                   library_destination), fg='green', dim=True)

            shutil.copyfileobj(library.raw, f)

        return self
create_res_sim_mat.py 文件源码 项目:fem 作者: mlp6 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def open_dispout(dispout):
    """open dispout file for reading

    :param dispout: (str) dispout filename (disp.dat)
    :return: dispout file object
    """
    if dispout.endswith('.xz'):
        import lzma
        dispout = lzma.open(dispout, 'rb')
    else:
        dispout = open(dispout, 'rb')

    return dispout
create_disp_dat.py 文件源码 项目:fem 作者: mlp6 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_dat(nodout="nodout", dispout="disp.dat", legacynodes=False):
    """create binary data file

    :param str nodout: nodout file created by ls-dyna (default="nodout")
    :param str dispout: default = "disp.dat"
    :param boolean legacynodes: node IDs written every timestep (default=False)
    """
    header_written = False
    timestep_read = False
    timestep_count = 0
    writenode = True

    with open(nodout, 'r') as nodout:
        with open_dispout(dispout) as dispout:
            for line in nodout:
                if 'nodal' in line:
                    timestep_read = True
                    timestep_count += 1
                    data = []
                    continue
                if timestep_read is True:
                    if line[0:2] == '\n':  # done reading the time step
                        timestep_read = False
                        # if this was the first time, everything needed to
                        # be read to # get node count for header
                        if not header_written:
                            header = generate_header(data, nodout)
                            write_headers(dispout, header)
                            header_written = True
                            print('Time Step: ', end="", flush=True)
                        if timestep_count > 1 and not legacynodes:
                            writenode = False
                        print("%i, " % timestep_count, end="", flush=True)
                        process_timestep_data(data, dispout, writenode)
                    else:
                        raw_data = parse_line(line)
                        data.append(list(raw_data))

    print("done.", flush=True)

    return 0
create_disp_dat.py 文件源码 项目:fem 作者: mlp6 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def count_timesteps(outfile):
    """count timesteps written to nodout

    searches for 'time' in lines, and then removes 1 extra entry that occurs
    for t = 0

    grep will be used on linux systems (way faster)

    :param outfile: usually 'nodout'
    :returns: int ts_count

    """
    from sys import platform

    print("Reading number of time steps... ", end="", flush=True)
    if platform == "linux":
        from subprocess import PIPE, Popen
        p = Popen('grep time %s | wc -l' % outfile, shell=True, stdout=PIPE)
        ts_count = int(p.communicate()[0].strip().decode())
    else:
        print("Non-linux OS detected -> using slower python implementation",
              flush=True)
        ts_count = 0
        with open(outfile, 'r') as f:
            for line in f:
                if 'time' in line:
                    ts_count += 1

    ts_count -= 1  # rm extra time count

    print('there are {}.'.format(ts_count), flush=True)

    return ts_count
receipts_text.py 文件源码 项目:jarbas 作者: datasciencebr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def receipts(self):
        """Returns a Generator with batches of receipts text."""
        print('Loading receipts text dataset…', end='\r')
        with lzma.open(self.path, mode='rt') as file_handler:
            batch = []
            for row in csv.DictReader(file_handler):
                batch.append(self.serialize(row))
                if len(batch) >= self.batch_size:
                    yield batch
                    batch = []
            yield batch
suspicions.py 文件源码 项目:jarbas 作者: datasciencebr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def suspicions(self):
        """Returns a Generator with batches of suspicions."""
        print('Loading suspicions dataset…', end='\r')
        with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler:
            batch = []
            for row in csv.DictReader(file_handler):
                batch.append(self.serialize(row))
                if len(batch) >= self.batch_size:
                    yield batch
                    batch = []
            yield batch
reimbursements.py 文件源码 项目:jarbas 作者: datasciencebr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def reimbursements(self):
        """Returns a Generator with a dict object for each row."""
        with lzma.open(self.path, 'rt') as file_handler:
            yield from DictReader(file_handler)
sysstat.py 文件源码 项目:sarjitsu 作者: distributed-system-analysis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def verify_contents(thefile, tgt_hostname=None, callback=None):
    """
    Given a sysstat binary data file verify that it contains a set of well
    formed data values.

    The optional 'tgt_hostname' argument is checked against the file header's
    stored hostname value.

    The optional 'callback' argument, if provided, should be an instance of
    the ContentAction class, where for each magic structure, file header, file
    activity set, record header and record payload read the appropriate method
    will be invoked, with the 'eof' method invoked at the end.

    One of the following exceptions will be raised if a problem is found with
    the file:

        Invalid: The file header or record header metadata values do not make
                 sense in relation to each other

        Corruption: The file appears to be corrupted in some way

        Truncated: The file does not appear to contain all the data as
                   described by the file header or a given record header
    """
    try:
        with lzma.open(thefile, "rb") as fp:
            verify_contents_fp(fp, tgt_hostname, callback)
    except lzma.LZMAError:
        with open(thefile, "rb") as fp:
            verify_contents_fp(fp, tgt_hostname, callback)
sysstat.py 文件源码 项目:sarjitsu 作者: distributed-system-analysis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def fetch_fileheader(thefile):
    """
    Fetch the sysstat FileHeader object for the given file path.
    """
    try:
        with lzma.open(thefile, "rb") as fp:
            res = fetch_fileheader_with_fp(fp)
    except lzma.LZMAError:
        with open(thefile, "rb") as fp:
            res = fetch_fileheader_with_fp(fp)
    return res
cdparser.py 文件源码 项目:C2L2 作者: CoNLL-UD-2017 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load_vocab(self, filename):
        with open(filename, "rb") as f:
            vocab = pickle.load(f)
        self._load_vocab(vocab)
        return self
cdparser.py 文件源码 项目:C2L2 作者: CoNLL-UD-2017 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def save_vocab(self, filename):
        with open(filename, "wb") as f:
            pickle.dump(self._fullvocab, f)
        return self
cdparser.py 文件源码 项目:C2L2 作者: CoNLL-UD-2017 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def save_model(self, filename):
        self.save_vocab(filename + ".vocab")
        with open(filename + ".params", "wb") as f:
            pickle.dump(self._args, f)
        self._model.save(filename + ".model")
        return self
cdparser.py 文件源码 项目:C2L2 作者: CoNLL-UD-2017 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_model(self, filename, **kwargs):
        self.load_vocab(filename + ".vocab")
        with open(filename + ".params", "rb") as f:
            args = pickle.load(f)
            args.update(kwargs)
            self.create_parser(**args)
        self.init_model()
        self._model.load(filename + ".model")
        return self
vwoptimize.py 文件源码 项目:vwoptimize 作者: denik 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write_file(filename, data):
    if isinstance(data, list):
        data = ''.join(data)
    else:
        assert isinstance(data, str), type(data)
    if filename in STDOUT_NAMES:
        sys.stdout.write(data)
    else:
        fobj = open(filename, 'w')
        fobj.write(data)
        flush_and_close(fobj)
vwoptimize.py 文件源码 项目:vwoptimize 作者: denik 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_num_features(filename):
    counting = False
    count = 0
    for line in open(filename):
        if counting:
            count += 1
        else:
            if line.strip() == ':0':
                counting = True
    return count
datasets.py 文件源码 项目:FreeDiscovery 作者: FreeDiscovery 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _load_erdm_ground_truth(outdir):
    """A helper function to load Legal TREC 2009 data"""
    with open(os.path.join(outdir, 'seed_relevant.txt'), 'rt') as fh:
        relevant_files = [el.strip() for el in fh.readlines()]

    with open(os.path.join(outdir, 'seed_non_relevant.txt'), 'rt') as fh:
        non_relevant_files = [el.strip() for el in fh.readlines()]

    if platform.system() == 'Windows':
        relevant_files = [el.replace('/', '\\') for el in relevant_files]
        non_relevant_files = [el.replace('/', '\\') for el in non_relevant_files]
    return non_relevant_files, relevant_files
compression.py 文件源码 项目:atropos 作者: jdidion 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, path, mode='w'):
        self.name = path
        self.outfile = open(path, mode)
        self.devnull = open(os.devnull, 'w')
        self.closed = False
        try:
            # Setting close_fds to True is necessary due to
            # http://bugs.python.org/issue12786
            self.process = Popen(
                [get_program_path('gzip')], stdin=PIPE, stdout=self.outfile,
                stderr=self.devnull, close_fds=True)
        except IOError:
            self.outfile.close()
            self.devnull.close()
            raise
compression.py 文件源码 项目:atropos 作者: jdidion 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def open_gzip_file(filename, mode, use_system=True):
    """Open a gzip file, preferring the system gzip program if `use_system`
    is True, falling back to the gzip python library.

    Args:
        mode: The file open mode.
        use_system: Whether to try to use the system gzip program.
    """
    if use_system:
        try:
            if 'r' in mode:
                gzfile = GzipReader(filename)
            else:
                gzfile = GzipWriter(filename)
            if 't' in mode:
                gzfile = io.TextIOWrapper(gzfile)
            return gzfile
        except:
            pass

    gzfile = gzip.open(filename, mode)
    if 'b' in mode:
        if 'r' in mode:
            gzfile = io.BufferedReader(gzfile)
        else:
            gzfile = io.BufferedWriter(gzfile)
    return gzfile


问题


面经


文章

微信
公众号

扫码关注公众号