python类reader()的实例源码

ml.py 文件源码 项目:kaggle-spark-ml 作者: imgoodman 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def loadRecord(line):
    """
    ????csv??
    """
    input_line=StringIO.StringIO(line)
    #row=unicodecsv.reader(input_line, encoding="utf-8")
    #return row.next()
    #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"])
    reader=csv.reader(input_line)
    return reader.next()
    #data=[]
    #for row in reader:
    #    print row
    #    data.append([unicode(cell,"utf-8") for cell in row])
    #return data[0]
    #return reader.next()

#raw_data=sc.textFile(train_file_path).map(loadRecord)
#print raw_data.take(10)
load_zmeskal_data.py 文件源码 项目:dprr-django 作者: kingsdigitallab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def read_notes_file_to_dict(ifname):
    """ Reads a notes file to a dict
        returns a dictionary where the key is the reference name
        and the value is the note
    """

    notes_dict = {}

    # csvfile = codecs.open(ifname, 'r', encoding='latin1')
    csvfile = open(ifname, 'r')

    # with open(ifname, 'rU') as csvfile:

    csv_reader = csv.reader(csvfile, delimiter=";")

    for row in csv_reader:
        row_text = row[2].strip()
        notes_dict[row[1].strip()] = row_text

    csvfile.close()

    return notes_dict
japanese.py 文件源码 项目:messenger-maid-chan 作者: freedomofkeima 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_kanji(level, current_pos=1):
    """
    get_kanji returns a single record of the current_pos line position

    level: 1 - 4 (N1 to N4)
    current_pos: up to number of records
    """
    kanji = {}
    with open(KANJI_FILENAMES[level], 'rb') as fobj:
        reader = csv.reader(fobj, delimiter=',', encoding='utf-8')
        num_of_lines = 0
        for line in reader:
            num_of_lines += 1
            if num_of_lines == current_pos:
                kanji = dict(zip(KANJI_FIELDS, line))
                break
    # Convert to UTF-8
    for key, value in kanji.iteritems():
        kanji[key] = value.encode("utf-8")
    return kanji
japanese.py 文件源码 项目:messenger-maid-chan 作者: freedomofkeima 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_vocabulary(current_pos=1):
    """
    get_vocabulary returns a single record of the current_pos line position

    current_pos: up to number of records
    """
    vocabulary = {}
    with open(VOCABULARY_FILENAME, 'rb') as fobj:
        reader = csv.reader(fobj, delimiter=',', encoding='utf-8')
        num_of_lines = 0
        for line in reader:
            num_of_lines += 1
            if num_of_lines == current_pos:
                vocabulary = dict(zip(VOCABULARY_FIELDS, line))
                break
    # Convert to UTF-8
    for key, value in vocabulary.iteritems():
        vocabulary[key] = value.encode("utf-8")
    return vocabulary
preprocess_data.py 文件源码 项目:kaggle_redefining_cancer_treatment 作者: jorgemf 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def load_csv_dataset(filename):
    """
    Loads a csv filename as a dataset
    :param str filename: name of the file
    :return List[DataSample]: a list of DataSample
    """
    dataset = []
    with open(os.path.join(DIR_GENERATED_DATA, filename), 'rb') as file:
        reader = csv.reader(file, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL,
                            errors='ignore')
        for row in reader:
            id = int(row[0])
            text = row[1]
            gene = row[2]
            variation = row[3]
            try:
                real_class = int(row[4])
            except:
                real_class = None
            dataset.append(DataSample(id, text, gene, variation, real_class))
    return dataset
dictionaries_service.py 文件源码 项目:time_extract 作者: blackbirdco 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load(self):
        global dictionaries
        if dictionaries == {}:
            for file in os.listdir('./dictionaries'):
                metadata_name = re.sub(r'.dic', r'', file)
                print "Loading dictionary for %s" % metadata_name

                with open('./dictionaries/' + file, 'rb') as concepts_dictionary:
                    Tag = namedtuple('Tag', 'concept, pos, semanticType')
                    dictionary = []
                    for tag in map(Tag._make, unicodecsv.reader(concepts_dictionary, delimiter='\t', encoding='utf-8')):
                        dictionary.append(tag)

                    dictionaries[metadata_name] = dictionary

        return dictionaries
__main__.py 文件源码 项目:troveharvester 作者: wragge 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def restart_harvest(args):
    harvest = get_harvest(args)
    data_dir = os.path.join(os.getcwd(), 'data', harvest)
    meta = get_metadata(data_dir)
    if meta:
        try:
            with open(os.path.join(data_dir, 'results.csv'), 'rb') as csv_file:
                reader = csv.reader(csv_file, delimiter=',', encoding='utf-8')
                rows = list(reader)
            if len(rows) > 1:
                start = len(rows) - 2
                # Remove the last row in the CSV just in case there was a problem
                rows = rows[:-1]
                with open(os.path.join(data_dir, 'results.csv'), 'wb') as csv_file:
                    writer = csv.writer(csv_file, delimiter=',', encoding='utf-8')
                    for row in rows:
                        writer.writerow(row)
            else:
                start = 0
        except IOError:
            # Nothing's been harvested
            start = 0
        start_harvest(data_dir=data_dir, key=meta['key'], query=meta['query'], pdf=meta['pdf'], text=meta['text'], start=start, max=meta['max'])
nber_county_cbsa.py 文件源码 项目:skills-ml 作者: workforce-data-initiative 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def cbsa_lookup():
    """
    Construct a County->CBSA Lookup table from NBER data
    Returns: dict
        each key is a (State Code, County FIPS code) tuple
        each value is a (CBSA FIPS code, CBSA Name) tuple
    """
    logging.info("Beginning CBSA lookup")
    cbsa_lookup = defaultdict(dict)
    download = requests.get(URL)
    decoded_content = download.content.decode('latin-1').encode('utf-8')
    reader = csv.reader(decoded_content.splitlines(), delimiter=',')
    # skip header line
    next(reader)
    for row in reader:
        state_code = row[1]
        fipscounty = row[3][-3:]
        cbsa = row[4]
        cbsaname = row[5]
        cbsa_lookup[state_code][fipscounty] = (cbsa, cbsaname)
    return cbsa_lookup
freetext.py 文件源码 项目:skills-ml 作者: workforce-data-initiative 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _skills_lookup(self):
        """Create skills lookup

        Reads the object's filename containing skills into a lookup

        Returns: (set) skill names
        """
        logging.info('Creating skills lookup from %s', self.skill_lookup_path)
        lookup = defaultdict(set)
        with smart_open(self.skill_lookup_path) as infile:
            reader = csv.reader(infile, delimiter='\t')
            header = next(reader)
            ksa_index = header.index(self.nlp.transforms[0])
            soc_index = header.index('O*NET-SOC Code')
            for row in reader:
                lookup[row[soc_index]].add(row[ksa_index])
            return lookup
base.py 文件源码 项目:knesset-data-datapackage 作者: hasadna 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fetch_from_datapackage(self, **kwargs):
        if not self._skip_resource(**kwargs):
            # IMPORTANT!
            # after this point - kwargs are ignored as we are fetching from previously prepared csv data
            if self.csv_path and os.path.exists(self.csv_path):
                with open(self.csv_path, 'rb') as csv_file:
                    csv_reader = unicodecsv.reader(csv_file)
                    header_row = None
                    for row in csv_reader:
                        if not header_row:
                            header_row = row
                        else:
                            csv_row = OrderedDict(zip(header_row, row))
                            parsed_row = []
                            for field in self.descriptor["schema"]["fields"]:
                                try:
                                    parsed_row.append((field["name"], self._get_field_original_value(csv_row[field["name"]], field)))
                                except Exception as e:
                                    import logging
                                    message = "error parsing field %s in file %s : %s" % (field["name"],self.csv_path, str(e))
                                    logging.exception(message)
                                    raise Exception(message)
                            yield OrderedDict(parsed_row)
data_cleaner.py 文件源码 项目:data-cleaner 作者: datosgobar 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _assert_no_duplicates(self, input_path, encoding, sep, quotechar):

        if input_path.endswith('.csv'):
            with open(input_path, 'r') as csvfile:
                reader = unicodecsv.reader(csvfile,
                                           encoding=encoding,
                                           delimiter=sep,
                                           quotechar=quotechar)
                fields = reader.next()

                for col in fields:
                    if fields.count(col) > 1:
                        raise DuplicatedField(col)

        # TODO: Implementar chequeo de que no hay duplicados para XLSX
        elif input_path.endswith('.xlsx'):
            pass
reromanize.py 文件源码 项目:epitran 作者: dmort27 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _load_reromanizer(self, table, decompose):
        path = os.path.join('data', 'reromanize', table + '.csv')
        try:
            path = pkg_resources.resource_filename(__name__, path)
        except:
            print('Could not locate {}.'.format(path), file=sys.stderr)
        if os.path.isfile(path):
            mapping = {}
            with open(path, 'rb') as f:
                reader = csv.reader(f, encoding='utf-8')
                next(reader)
                for ipa, rom in reader:
                    rom = normalize('NFD', rom) if decompose else normalize('NFC', rom)
                    mapping[ipa] = rom
            return mapping
        else:
            print('File {} does not exist.'.format(path), file=sys.stderr)
            return {}
featuretable.py 文件源码 项目:panphon 作者: dmort27 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _read_bases(self, fn, weights):
        fn = pkg_resources.resource_filename(__name__, fn)
        segments = []
        with open(fn, 'rb') as f:
            reader = csv.reader(f, encoding='utf-8')
            header = next(reader)
            names = header[1:]
            for row in reader:
                ipa = row[0]
                vals = [{'-': -1, '0': 0, '+': 1}[x] for x in row[1:]]
                vec = Segment(names,
                              {n: v for (n, v) in zip(names, vals)},
                              weights=weights)
                segments.append((ipa, vec))
        seg_dict = dict(segments)
        return segments, seg_dict, names
_panphon.py 文件源码 项目:panphon 作者: dmort27 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _read_table(self, filename):
        """Read the data from data/ipa_all.csv into self.segments, a
        list of 2-tuples of unicode strings and sets of feature tuples and
        self.seg_dict, a dictionary mapping from unicode segments and sets of
        feature tuples.
        """
        filename = pkg_resources.resource_filename(
            __name__, filename)
        segments = []
        with open(filename, 'rb') as f:
            reader = csv.reader(f, encoding='utf-8')
            header = next(reader)
            names = header[1:]
            for row in reader:
                seg = row[0]
                vals = row[1:]
                specs = set(zip(vals, names))
                segments.append((seg, specs))
        seg_dict = dict(segments)
        return segments, seg_dict, names
wikipedia_dump_index_creator.py 文件源码 项目:fact_extraction 作者: janetzki 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _create_filtered_index(self, source=dir_path + '../data/character_index.csv',
                               destination=dir_path + '../data/character_index_filtered.csv'):
        with io.open(source, 'rb') as fin_index, io.open(destination, 'w', encoding='utf8') as fout:
            total_lines_relations = line_counting.cached_counter.count_lines(self.path_relations)
            self.logger.print_info('Collecting important entities...')
            important_articles = set()
            nt_reader = NTReader(self.path_relations)
            for subject, predicate, object in tqdm(nt_reader.yield_cleaned_entry_names(), total=total_lines_relations):
                important_articles.add(subject)

            total_lines_index = line_counting.cached_counter.count_lines(source)
            self.logger.print_info('Filtering important entities...')
            index_reader = csv.reader(fin_index, delimiter=self.delimiter, encoding='utf-8', quoting=csv.QUOTE_NONE)
            for line in tqdm(index_reader, total=total_lines_index):
                subject, character_offset = line
                if subject in important_articles:
                    fout.write(subject + self.delimiter + character_offset + '\n')
reader.py 文件源码 项目:pykbart 作者: chill17 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, file_handle, delimiter='\t'):
        self.reader = csv.reader(file_handle, delimiter=delimiter, encoding='utf-8')
        self.fields = list(six.next(self.reader))
reader.py 文件源码 项目:pykbart 作者: chill17 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __next__(self):
        return KbartRecord(six.next(self.reader), fields=self.fields)
opinionTokenizer.py 文件源码 项目:scrapyProject 作者: bedcode 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def positive_and_negative_to_full():
    fpos = open('positive.csv')
    positive_units = [row for row in csv.reader(fpos)]
    fneg = open('negative.csv')
    negative_units = [row for row in csv.reader(fneg)]
    for item in positive_units:
        item.append('positive')
    for item in negative_units:
        item.append('negative')
    del negative_units[0]
    positive_units[0][0] = 'review_content'
    positive_units[0][1] = 'sentiment'
    full = positive_units
    full.extend(negative_units)
    with open('positiveandnegative.csv', 'wb') as csvfile:
        writer = csv.writer(csvfile, dialect='excel')
        writer.writerows(full)



#this will open the review scraped data and write two files from that info:
#positive.csv, containing positive opinion units
#negative.csv, containing negative opinion units
preprocess_data.py 文件源码 项目:kaggle_redefining_cancer_treatment 作者: jorgemf 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def load_csv_wikipedia_gen(filename):
    """
    Loads a csv filename as a wikipedia genes dataset
    :param str filename: name of the file
    :return List[WikipediaGene]: a list of WikipediaGene
    """
    dataset = []
    with open(os.path.join(DIR_GENERATED_DATA, filename)) as file:
        reader = csv.reader(file, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        for row in reader:
            dataset.append(WikipediaGene(row[0], row[1]))
    return dataset
convert.py 文件源码 项目:csvtotable 作者: vividvilla 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def convert(input_file_name, **kwargs):
    """Convert CSV file to HTML table"""
    delimiter = kwargs["delimiter"] or ","
    quotechar = kwargs["quotechar"] or "|"

    if six.PY2:
        delimiter = delimiter.encode("utf-8")
        quotechar = quotechar.encode("utf-8")

    # Read CSV and form a header and rows list
    with open(input_file_name, "rb") as input_file:
        reader = csv.reader(input_file,
                            encoding="utf-8",
                            delimiter=delimiter,
                            quotechar=quotechar)

        csv_headers = []
        if not kwargs.get("no_header"):
            # Read header from first line
            csv_headers = next(reader)

        csv_rows = [row for row in reader if row]

        # Set default column name if header is not present
        if not csv_headers and len(csv_rows) > 0:
            end = len(csv_rows[0]) + 1
            csv_headers = ["Column {}".format(n) for n in range(1, end)]

    # Render csv to HTML
    html = render_template(csv_headers, csv_rows, **kwargs)

    # Freeze all JS files in template
    return freeze_js(html)
resultAnalysis.py 文件源码 项目:jingjuSingingPhraseMatching 作者: ronggong 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def compareMelodicSimiResults(path_largerPyin,list_lessNRank_phrase_name):
    '''
    compare with the results of melodic similarity
    find the intersection set, melodic similarity ranking > N, phonetic similarity ranking < N
    :param path_largerPyin: path of the melodic similarity csv
    :param list_lessNRank_phrase_name: ranking less than N phrase name by phonetic similarity
    :return: intersection set of the phrase name
    '''
    phrase_names_largerN = []
    with open(path_largerPyin,'r') as openfile:
        csv_reader = csv.reader(openfile,delimiter=',')
        for row in csv_reader:
            phrase_names_largerN.append(row[0])

    return set.intersection(set(phrase_names_largerN),set(list_lessNRank_phrase_name))
csv.py 文件源码 项目:budgettracker 作者: maximebf 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def parse_csv(self):
        if not getattr(self, 'csv', None):
            with codecs.open(self.filename) as f:
                self.csv = list(unicodecsv.reader(f))
        return self.csv
storage.py 文件源码 项目:budgettracker 作者: maximebf 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_accounts(self):
        filename = self.get_accounts_filename()
        if not os.path.exists(filename):
            return []
        with codecs.open(filename) as f:
            return map(self._csv_row_to_account, unicodecsv.reader(f))
storage.py 文件源码 项目:budgettracker 作者: maximebf 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def load_transactions(self, filename):
        if not os.path.exists(filename):
            return []
        with codecs.open(filename) as f:
            return map(self._csv_row_to_transaction, unicodecsv.reader(f))
__main__.py 文件源码 项目:troveharvester 作者: wragge 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_results(data_dir):
    results = {}
    try:
        with open(os.path.join(data_dir, 'results.csv'), 'rb') as csv_file:
                reader = csv.reader(csv_file, delimiter=',', encoding='utf-8')
                rows = list(reader)
                results['num_rows'] = len(rows) - 1
                results['last_row'] = rows[-1]
    except IOError:
        results['num_rows'] = 0
        results['last_row'] = None
    return results
negative_positive_dict.py 文件源码 项目:skills-ml 作者: workforce-data-initiative 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def negative_positive_dict():
    """
    Construct a dictionary of terms that are considered not to be in job title, including
    states, states abv, cities
    Returns: dictionary of set
    """
    logging.info("Beginning negative dictionary build")
    states = []
    states.extend(list(map(lambda x: x.lower(), list(us.states.mapping('name', 'abbr').keys()))))
    states.extend(list(map(lambda x: x.lower(), list(us.states.mapping('name', 'abbr').values()))))

    places = []
    download = requests.get(PLACEURL)
    reader = csv.reader(download.content.decode('latin-1').encode('utf-8').splitlines(), delimiter=',')
    next(reader)
    for row in reader:
        cleaned_placename = re.sub(r'\([^)]*\)', '', row[4]).rstrip()
        for suffix in SUFFIXES:
            if cleaned_placename.endswith(suffix):
                cleaned_placename = cleaned_placename.replace(suffix, '').rstrip()
        places.append(cleaned_placename.lower())

    places = list(set(places))
    places.remove('not in a census designated place or incorporated place')

    onetjobs = []
    download = requests.get(ONETURL)
    reader = csv.reader(download.content.splitlines(), delimiter='\t')
    next(reader)
    for row in reader:
        onetjobs.append(row[2].lower())
        onetjobs.append(row[3].lower())
    onetjobs = list(set(onetjobs))

    return {'states': states, 'places': places, 'onetjobs': onetjobs}
ua_cbsa.py 文件源码 项目:skills-ml 作者: workforce-data-initiative 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ua_cbsa():
    """
    Construct a UA->CBSA Lookup table from Census data
    Returns: dict
    { UA Fips: [(CBSA FIPS, CBSA Name)] }
    """
    logging.info("Beginning CBSA lookup")
    lookup = defaultdict(list)
    download = requests.get(URL)
    reader = csv.reader(
        download.content.decode('latin-1').encode('utf-8').splitlines(),
        delimiter=','
    )
    not_designated = 0
    total = 0
    # skip header line
    next(reader)
    for row in reader:
        total += 1
        ua_fips = row[0]
        cbsa_fips = row[2]
        cbsa_name = row[3]

        if cbsa_fips == '99999' or ua_fips == '99999':
            not_designated += 1
            continue

        lookup[ua_fips].append((cbsa_fips, cbsa_name))

        logging.info(
            'Done extracting CBSAs %s total rows, %s not designated, %s found',
            total,
            not_designated,
            total - not_designated
        )

    return lookup
freetext.py 文件源码 项目:skills-ml 作者: workforce-data-initiative 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _skills_lookup(self):
        """Create skills lookup

        Reads the object's filename containing skills into a lookup

        Returns: (set) skill names
        """
        with smart_open(self.skill_lookup_path) as infile:
            reader = csv.reader(infile, delimiter='\t')
            next(reader)
            index = 3
            generator = (self.reg_ex(row[index]) for row in reader)

            return set(generator)
tests.py 文件源码 项目:knesset-data-datapackage 作者: hasadna 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_committees(self):
        # fetching directly
        self.assertEqual(list(MockCommitteesResource().fetch()), [dict(COMMITTEE_EXPECTED_DATA, id=3)])
        self.assertEqual(list(MockCommitteesResource().fetch(committee_ids=[4])), [dict(COMMITTEE_EXPECTED_DATA, id=4)])
        self.assertEqual(list(MockCommitteesResource().fetch(all_committees=True)), [dict(COMMITTEE_EXPECTED_DATA, id=1),
                                                                                     dict(COMMITTEE_EXPECTED_DATA, id=2),
                                                                                     dict(COMMITTEE_EXPECTED_DATA, id=3),
                                                                                     dict(COMMITTEE_EXPECTED_DATA, id=4)])
        self.assertEqual(list(MockCommitteesResource().fetch(main_committees=True)),
                         [dict(COMMITTEE_EXPECTED_DATA, id=1),
                          dict(COMMITTEE_EXPECTED_DATA, id=2),])
        # making the resource
        data_root = self.given_temporary_data_root()
        MockCommitteesResource("committees", data_root).make()
        with open(os.path.join(data_root, "committees.csv")) as f:
            lines = unicodecsv.reader(f.readlines())
            self.assertEqual(list(lines), [
                ['id', 'type_id', 'parent_id', 'name', 'name_eng', 'name_arb', 'begin_date',
                 'end_date', 'description', 'description_eng', 'description_arb', 'note',
                 'note_eng', 'portal_link', 'scraper_errors'],
                ['3', '4', '', 'hebrew name', 'string', 'string', '1950-01-01T00:00:00',
                 '', 'hebrew description', 'string', 'string', 'string',
                 'string', 'can be used to link to the dedicated page in knesset website', '']
            ])
        # fetching from the made resource
        fetched_items = MockCommitteesResource("committees", data_root).fetch_from_datapackage()
        fetched_items = [dict(oredered_dict.items()) for oredered_dict in fetched_items]
        self.assertEqual(fetched_items, [dict(COMMITTEE_EXPECTED_DATA, id=3)])
tests.py 文件源码 项目:knesset-data-datapackage 作者: hasadna 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_committee_meeting_protocols(self):
        # protocols only support appending
        resource = CommitteeMeetingProtocolsResource("committee-meeting-protocols", self.given_temporary_data_root())
        committee_id, meeting_id, meeting_datetime = 6, 7, datetime.datetime(1953,5,4)
        # a contextmanager for mock protocol
        @contextlib.contextmanager
        def meeting_protocol():
            yield type("MockProtocol", (object,), {"text": "Hello World!",
                                                   "parts": [type("MockProtocolPart", (object,), {"header": "mock header", "body": "mock body"}),
                                                             type("MockProtocolPart", (object,), {"header": "mock header 2", "body": "mock body 2"})],
                                                   "file_name": ""})
        # appending using the fake protocol
        resource.append_for_meeting(committee_id, meeting_id, meeting_datetime, meeting_protocol(), skip_exceptions=True)
        # checking the created files
        with open(resource.get_file_path(".csv")) as f:
            self.assertEqual(list(unicodecsv.reader(f.readlines())),
                             [['committee_id', 'meeting_id', 'text',
                               'parts',
                               'original',
                               'scraper_errors'],
                              ['6',            '7',          'committee_6/7_1953-05-04_00-00-00/protocol.txt',
                               'committee_6/7_1953-05-04_00-00-00/protocol.csv', '',
                               "error getting original file: [Errno 2] No such file or directory: ''"]])
        with open(resource.get_path("committee_6", "7_1953-05-04_00-00-00", "protocol.txt")) as f:
            self.assertEqual(f.readlines(), ["Hello World!"])
        with open(resource.get_path("committee_6", "7_1953-05-04_00-00-00", "protocol.csv")) as f:
            self.assertEqual(f.readlines(), ['header,body\r\n', 'mock header,mock body\r\n', 'mock header 2,mock body 2\r\n'])


问题


面经


文章

微信
公众号

扫码关注公众号