python类reader()的实例源码

DataGatherer.py 文件源码 项目:pubchem-ranker 作者: jacobwindsor 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def harvest(self, limit=None, offset=None):
        """

        Harvest the data from the file
        :param offset: Integer offset for the row - starts from 0
        :param limit: Interger limit of the rows to iterate over - starts from 0
        :return: list of tuples containing CAS number and IUPAC name
        """
        response = []
        for i, row in enumerate(list(self.reader)[offset:]):
            if limit:
                if i == limit:
                    break

            cas = row[0].split(' ', 1)[0]
            cut_start_iupac = str(row[0].split('(', 1)[1])
            iupac = cut_start_iupac.rsplit(')', 1)[0]

            response.append({
                "CAS": cas,
                "IUPAC": iupac
            })

        return response
read_csv_files.py 文件源码 项目:AFSCbot 作者: HadManySons 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def add_afsc(dict, fname):
    """
    Add AFSCs from given filename into the dictionary.
    :param dict: empty dictionary
    :param fname: CSV file using '#' as delimiter
    """
    with open(CSV_FOLDER + fname, newline='') as f:
        reader = csv.reader(f, delimiter='#')
        for row in reader:
            base_afsc = row[0]
            job_title = row[1]
            afsc_dict = {"base_afsc": base_afsc,
                         "job_title": job_title,
                         "shreds": {},
                         "link": ""}
            dict[base_afsc] = afsc_dict
read_csv_files.py 文件源码 项目:AFSCbot 作者: HadManySons 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def add_shreds(afsc_dict, fname):
    """
    Add shreds from given filename into the dictionary.
    :param dict: either enlisted_dict or officer_dict
    :param fname: CSV file using '#' as delimiter
    """
    with open(CSV_FOLDER + fname, newline='') as f:
        reader = csv.reader(f, delimiter=',')
        for row in reader:
            base_afsc = row[0]
            shred_char = row[1]
            shred_title = row[2]
            # if shred AFSC not in base afsc, skip it
            try:
                afsc_dict[base_afsc]["shreds"][shred_char] = shred_title
            except KeyError:
                pass
sync_users.py 文件源码 项目:safetyculture-sdk-python 作者: SafetyCulture 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def process_desired_state(server_state, desired_state):
    """
    Processes the user provided input file and determines any actions that need to happen because the server state is different than the desired state
    :param server_state: The list of all users and their associated groups in the server
    :param desired_state: The input provided by the user
    :return: None
    """
    group_id =[]
    with open(desired_state) as csvDataFile:
        csvReader = csv.reader(csvDataFile)
        next(csvReader, None)
        for email, lastname, firstname, groups in csvReader:
            group_list = []
            if groups != "":
                group_list = groups.split(",")
                group_list = [group.strip(' ') for group in group_list]
            if email not in [user for user in server_state]:
                actions[email] = {'action': 'add', 'groups': group_list, 'user_id':'', 'user_data': {'firstname': firstname, 'lastname': lastname, 'email': email, 'reset_password_required': True} }
            else:
                group_names_server = server_state[email]['groups'][0::2]
                group_names_desired = groups.split(',')
                group_names_desired = [group.strip(' ') for group in group_names_desired]
                group_diff = [i for i in group_names_desired if i not in group_names_server]
                if group_diff != [] and group_diff != ['']:
                    actions[email] = {'action': 'add to group', 'groups': group_diff, 'user_id': server_state[email]['user_id'] }
database_unittests.py 文件源码 项目:Causality 作者: vcla 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def extractRows(fileName):
    fileName = 'results/cvpr_db_results/'+fileName+'.csv'
    with open(fileName, 'r') as csvfile:
        lines = csv.reader(csvfile)
        for row in lines:
            if row[0] != "name":
                retval = [int(x) if x != "" else -100 for x in row[1:-2] ]
            else:
                nameRow = row[1:-2]
            if row[0] == "causalgrammar":
                causalRow = retval
            elif row[0] == "origdata":
                origRow =  retval
            elif row[0] == "random":
                randomRow =  retval
        return {"nameRow": nameRow, "causalRow": causalRow, "origRow": origRow, "randomRow": randomRow}
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
util.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
util.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
gpUtils.py 文件源码 项目:MKLMM 作者: omerwe 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def loadData(bfile, extractSim, phenoFile, missingPhenotype='-9', loadSNPs=False, standardize=True):
    bed = Bed(bfile)

    if (extractSim is not None):
        f = open(extractSim)
        csvReader = csv.reader(f)
        extractSnpsSet = set([])
        for l in csvReader: extractSnpsSet.add(l[0])            
        f.close()       
        keepSnpsInds = [i for i in xrange(bed.sid.shape[0]) if bed.sid[i] in extractSnpsSet]        
        bed = bed[:, keepSnpsInds]

    phe = None
    if (phenoFile is not None): bed, phe = loadPheno(bed, phenoFile, missingPhenotype)

    if (loadSNPs):
        bed = bed.read()
        if (standardize): bed = bed.standardize()   

    return bed, phe
utils.py 文件源码 项目:GeoInfoMiner 作者: jingge326 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def prepare_data(imagery_path, train_file_path, split_points):

    # Read tiff image
    image_tuple = read_image(imagery_path)

    # Read samples
    original_data_list = []
    csv_reader = csv.reader(open(train_file_path, encoding='utf-8'))
    for row in csv_reader:
        original_data_list.append(row)
    original_data_array = np.array(original_data_list)

    # Split training data into variables and lables 
    x_s = original_data_array[:,split_points[0]:split_points[1]] 
    y_s = original_data_array[:,split_points[1]]

    return x_s, y_s, image_tuple


# Read image
samplesheet.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def read_csv_rows(path):
    """
    Extract the rows from the CSV at the specified path.
    Will throw an error if the file doesn't exist.

    :type path: string
    :rtype: list[list[string]]
    """
    with open(path, 'rU') as infile:
        reader = csv.reader(infile, delimiter=',')
        rows = [row for row in reader]
        # eliminate trailing cols that have no entries (CSI-215)
        for idx, row in enumerate(rows):
            clipIndex = 0
            for col in row[::-1]:
                if not col:
                    clipIndex -= 1
                else:
                    break
            if clipIndex < 0:
                rows[idx] = rows[idx][:clipIndex]
    return rows
load.py 文件源码 项目:Renewables_Scenario_Gen_GAN 作者: chennnnnyize 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def load_solar_data():
    with open('solar label.csv', 'r') as csvfile:
        reader = csv.reader(csvfile)
        rows = [row for row in reader]
    labels = np.array(rows, dtype=int)
    print(shape(labels))

    with open('solar.csv', 'r') as csvfile:
        reader = csv.reader(csvfile)
        rows = [row for row in reader]
    rows = np.array(rows, dtype=float)
    rows=rows[:104832,:]
    print(shape(rows))
    trX = np.reshape(rows.T,(-1,576))
    print(shape(trX))
    m = np.ndarray.max(rows)
    print("maximum value of solar power", m)
    trY=np.tile(labels,(32,1))
    trX=trX/m
    return trX,trY
validate_hashes.py 文件源码 项目:chash 作者: luhsra 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def read_from_csv(filename, column_names):
    data = []
    with open(filename) as csv_file:
        reader = csv.reader(csv_file)
        is_header_row = True
        for row in reader:
            if is_header_row:
                for col in row:
                    data.append([])
                is_header_row = False
            else:
                colnum = 0
                for col in row:
                    data[colnum].append(float(col))
                    colnum += 1
    return data
evaluate_data_with_stop.py 文件源码 项目:chash 作者: luhsra 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def read_from_csv(filename, column_names):
    data = []
    with open(filename) as csv_file:
        reader = csv.reader(csv_file)
        is_header_row = True
        for row in reader:
            if is_header_row:
                for col in row:
                    data.append([])
                is_header_row = False
            else:
                colnum = 0
                for col in row:
                    data[colnum].append(float(col))
                    colnum += 1
    return data
pokedex.py 文件源码 项目:Jumper-Cogs 作者: Redjumpman 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def collect_moves(self, reader, name):
        Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions'])
        if name.split('-')[-1].isdigit():
            for row in reader:
                if name == row[0]:
                    pokemon = name.split('-')[0].title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
        else:
            for row in reader:
                if name in row[0]:
                    pokemon = name.title()
                    generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
                    moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
                    return Moves(pokemon, generation, color, moves, versions)
search_command.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _records_protocol_v1(self, ifile):

        reader = csv.reader(ifile, dialect=CsvDialect)

        try:
            fieldnames = reader.next()
        except StopIteration:
            return

        mv_fieldnames = {name: name[len('__mv_'):] for name in fieldnames if name.startswith('__mv_')}

        if len(mv_fieldnames) == 0:
            for values in reader:
                yield OrderedDict(izip(fieldnames, values))
            return

        for values in reader:
            record = OrderedDict()
            for fieldname, value in izip(fieldnames, values):
                if fieldname.startswith('__mv_'):
                    if len(value) > 0:
                        record[mv_fieldnames[fieldname]] = self._decode_list(value)
                elif fieldname not in record:
                    record[fieldname] = value
            yield record
studymcmc.py 文件源码 项目:bayestsa 作者: thalesians 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main():
    indexfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-index.txt'
    chainfilepath = r'C:\Users\Paul Bilokon\Documents\dev\alexandria\bilokon-msc\dissertation\code\winbugs\svl2\dataset-1\coda-for-chain-1.txt'

    index = readindexfile(indexfilepath)
    print(index)

    data = []
    indexoffset = None
    with open(chainfilepath, 'rt') as chainfile:
        reader = csv.reader(chainfile, delimiter='\t')
        for row in reader:
            index, value = int(row[0]), float(row[1])
            if not data: indexoffset = index
            print(index, indexoffset)
            assert index == len(data) + indexoffset
            data.append(value)
    print(data)
util.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        if headers.get('Content-Type') != 'application/json':
            logger.debug('Unexpected response for JSON request')
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
util.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
util.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_external_data(url):
    result = {}
    try:
        # urlopen might fail if it runs into redirections,
        # because of Python issue #13696. Fixed in locators
        # using a custom redirect handler.
        resp = urlopen(url)
        headers = resp.info()
        ct = headers.get('Content-Type')
        if not ct.startswith('application/json'):
            logger.debug('Unexpected response for JSON request: %s', ct)
        else:
            reader = codecs.getreader('utf-8')(resp)
            #data = reader.read().decode('utf-8')
            #result = json.loads(data)
            result = json.load(reader)
    except Exception as e:
        logger.exception('Failed to get external data for %s: %s', url, e)
    return result
util.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def reader(self, stream, context):
        """
        Read lines from a subprocess' output stream and either pass to a progress
        callable (if specified) or write progress information to sys.stderr.
        """
        progress = self.progress
        verbose = self.verbose
        while True:
            s = stream.readline()
            if not s:
                break
            if progress is not None:
                progress(s, context)
            else:
                if not verbose:
                    sys.stderr.write('.')
                else:
                    sys.stderr.write(s.decode('utf-8'))
                sys.stderr.flush()
        stream.close()
search_command.py 文件源码 项目:Splunk_CBER_App 作者: MHaggis 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _records_protocol_v1(self, ifile):

        reader = csv.reader(ifile, dialect=CsvDialect)

        try:
            fieldnames = reader.next()
        except StopIteration:
            return

        mv_fieldnames = {name: name[len('__mv_'):] for name in fieldnames if name.startswith('__mv_')}

        if len(mv_fieldnames) == 0:
            for values in reader:
                yield OrderedDict(izip(fieldnames, values))
            return

        for values in reader:
            record = OrderedDict()
            for fieldname, value in izip(fieldnames, values):
                if fieldname.startswith('__mv_'):
                    if len(value) > 0:
                        record[mv_fieldnames[fieldname]] = self._decode_list(value)
                elif fieldname not in record:
                    record[fieldname] = value
            yield record
ml.py 文件源码 项目:kaggle-spark-ml 作者: imgoodman 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def loadRecord(line):
    """
    ????csv??
    """
    input_line=StringIO.StringIO(line)
    #row=unicodecsv.reader(input_line, encoding="utf-8")
    #return row.next()
    #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"])
    reader=csv.reader(input_line)
    return reader.next()
    #data=[]
    #for row in reader:
    #    print row
    #    data.append([unicode(cell,"utf-8") for cell in row])
    #return data[0]
    #return reader.next()

#raw_data=sc.textFile(train_file_path).map(loadRecord)
#print raw_data.take(10)
random.py 文件源码 项目:Harmonbot 作者: Harmon758 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, bot):
        self.bot = bot
        # Add commands as random subcommands
        for name, command in inspect.getmembers(self):
            if isinstance(command, commands.Command) and command.parent is None and name != "random":
                self.bot.add_command(command)
                self.random.add_command(command)
        # Add fact subcommands as subcommands of corresponding commands
        for command, parent in ((self.fact_cat, self.cat), (self.fact_date, self.date), (self.fact_number, self.number)):
            utilities.add_as_subcommand(self, command, parent, "fact")
        # Add random subcommands as subcommands of corresponding commands
        self.random_subcommands = ((self.color, "Resources.color"), (self.giphy, "Resources.giphy"), (self.map, "Resources.map"), (self.streetview, "Resources.streetview"), (self.uesp, "Search.uesp"), (self.wikipedia, "Search.wikipedia"), (self.xkcd, "Resources.xkcd"))
        for command, parent_name in self.random_subcommands:
            utilities.add_as_subcommand(self, command, parent_name, "random")
        # Import jokes
        self.jokes = []
        try:
            with open("data/jokes.csv", newline = "") as jokes_file:
                jokes_reader = csv.reader(jokes_file)
                for row in jokes_reader:
                    self.jokes.append(row[0])
        except FileNotFoundError:
            pass
selectseq.py 文件源码 项目:piperine 作者: DNA-and-Natural-Algorithms-Group 项目源码 文件源码 阅读 428 收藏 0 点赞 0 评论 0
def main():
    print()

    csvnames = sys.argv[1:]

    columns_named=False
    scores = []
    for csvname in sys.argv[1:] :
        print("Reading scores from "+csvname+" .")

        with open(csvname, 'r') as csvfile:
            score_reader = csv.reader(csvfile)
            for row in score_reader:
                print(', '.join(row))
                if 'Winner' in row[0]:
                    continue
                if 'Index' in row[0] and columns_named:
                    continue
                else:
                    scores.append(row)
                    columns_named=True
            csvfile.close()

    winner = metarank(scores)
util2.py 文件源码 项目:code-uai16 作者: thanhan 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def read_turk_dic_proton():
    """
    dic of abs_id to answers to (wid, q3,q4) for each worker
    """
    f = open("data/proton-beam-RawTurkResults.csv")

    first_line = f.readline()

    csv_reader = csv.reader(f)

    turk_dic = {}
    for row in csv_reader:        
        (AssignmentId, WorkerId, HITId, AcceptTime, SubmitTime, ApprovalTime, TimeToComplete, AbstractId, Question1, Question2, Question3, Question4, Relevant) = tuple(row)
        AbstractId = int(AbstractId)
        if AbstractId not in turk_dic: turk_dic[AbstractId] = []
        turk_dic[AbstractId].append( (Question3, Question4) )

    return turk_dic
util.py 文件源码 项目:code-uai16 作者: thanhan 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_pub_dic_csv(dataset):
    filename = "data/" + dataset + "-text.csv"
    f = open(filename)
    f.readline()
    csv_reader = csv.reader(f)

    # Create dic of : id -> text features
    pub_dic = {}

    for row in csv_reader:
        if dataset.startswith("RCT"):
            (abstract_id, abstract, title) = tuple(row)[0:3]
        else:
            (abstract_id, title, publisher, abstract) = tuple(row)[0:4]

        abstract_id = int(abstract_id)
        text = title + abstract

        pub_dic[abstract_id] = text

    return pub_dic
corpus.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def load(corpus_csv_file: Path,
             sampled_training_example_count: Optional[int] = None) -> 'Corpus':
        import csv
        with corpus_csv_file.open(encoding='utf8') as opened_csv:
            reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            def to_absolute(audio_file_path: Path) -> Path:
                return audio_file_path if audio_file_path.is_absolute() else Path(
                    corpus_csv_file.parent) / audio_file_path

            examples = [
                (
                    LabeledExampleFromFile(
                        audio_file=to_absolute(Path(audio_file_path)), id=id, label=label,
                        positional_label=None if positional_label == "" else PositionalLabel.deserialize(
                            positional_label)), Phase[phase])
                for id, audio_file_path, label, phase, positional_label in reader]

            return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training],
                          test_examples=[e for e, phase in examples if phase == Phase.test],
                          sampled_training_example_count=sampled_training_example_count)
test_util.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_UnicodeWriter(self):
        """Test UnicodeWriter class works."""
        tmp = tempfile.NamedTemporaryFile()
        uw = util.UnicodeWriter(tmp)
        fake_csv = ['one, two, three, {"i": 1}']
        for row in csv.reader(fake_csv):
            # change it for a dict
            row[3] = dict(i=1)
            uw.writerow(row)
        tmp.seek(0)
        err_msg = "It should be the same CSV content"
        with open(tmp.name, 'rb') as f:
            reader = csv.reader(f)
            for row in reader:
                for item in row:
                    assert item in fake_csv[0], err_msg


问题


面经


文章

微信
公众号

扫码关注公众号