python类QUOTE_MINIMAL的实例源码

CourseInfo.py 文件源码 项目:GLT 作者: MikeTheGreat 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def write_data_file(self, output_file_path):
        """Write our course info file information out
        output_file_path: path to file file"""
        with open(output_file_path, "w") as f_out:
            f_out.write("Assignments\n")

            for assign in self.assignments:
                f_out.write(assign.name+","+assign.internal_name+","+str(assign.id))
            f_out.write("\n")

            f_out.write("Roster\n")
            csvwriter = csv.writer(f_out, quoting=csv.QUOTE_MINIMAL)
            for student in self.roster.students_no_errors:
                csvwriter.writerow([student.first_name,\
                    student.last_name,\
                    student.email,\
                    student.glid])
run_webcam.py 文件源码 项目:Self-DriverFlow 作者: davsuacar 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def callback(data):
    print("callback called...")
    global i
    global vc
    rospy.loginfo("I heard %s",data)
    with open('dataset.csv', 'a') as csvfile:
        spamwriter = csv.writer(csvfile, delimiter=' ',
                            quotechar='|', quoting=csv.QUOTE_MINIMAL)
        spamwriter.writerow([data.angular.z])

        if vc.isOpened(): # try to get the first frame
            rval, frame = vc.read()
            else:
        rval = False


        rval, frame = vc.read()
        cv2.imwrite("images/image_" + str(i) + ".png", frame)
        i += 1
test_csv.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
test_csv.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
utils.py 文件源码 项目:catwalk 作者: dssg 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def save_db_objects(db_engine, db_objects):
    """Saves a collection of SQLAlchemy model objects to the database using a COPY command

    Args:
        db_engine (sqlalchemy.engine)
        db_objects (list) SQLAlchemy model objects, corresponding to a valid table
    """
    with tempfile.TemporaryFile(mode='w+') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        for db_object in db_objects:
            writer.writerow([
                getattr(db_object, col.name)
                for col in db_object.__table__.columns
            ])
        f.seek(0)
        postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
s3grouper.py 文件源码 项目:s3bundler 作者: awslabs 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def dl_and_parse(s3,headers,keylist,bucket):
    for key in keylist:
        with TemporaryFile() as fp:
            try:
                s3.Bucket(bucket).download_fileobj(key,fp)
            except ClientError as e:
                logger.error('Unable to download s3://{}/{}'.format(bucket, key))
                logger.debug('Received error: {}'.format(e))
                sys.exit(5)
            fp.seek(0)
            with TextIOWrapper(GzipFile(fileobj=fp,mode='r')) as f:
                try:
                    reader = csv.DictReader(f,fieldnames=headers,delimiter=',',quoting=csv.QUOTE_MINIMAL)
                    for row in reader:
                        yield row
                except csv.Error as e:
                    logger.error("Unable to read CSV '{}'".format(reader.line))
                    logger.debug('Received error: {}'.format(e))
                    sys.exit(3)

# main parser
_table.py 文件源码 项目:pgreaper 作者: vincentlaucsb 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def to_string(table):
    ''' Return table as a StringIO object for writing via copy() '''

    string = StringIO()
    writer = csv.writer(string, delimiter=",", quoting=csv.QUOTE_MINIMAL)
    dict_encoder = json.JSONEncoder()

    jsonb_cols = set([i for i, j in enumerate(table.col_types) if j == 'jsonb'])
    datetime_cols = set([i for i, j in enumerate(table.col_types) if j == 'datetime'])

    for row in table:
        for i in jsonb_cols:
            row[i] = dict_encoder.encode(row[i])
        for i in datetime_cols:
            row[i] = psycopg2.extensions.adapt(i)

        writer.writerow(row)

    string.seek(0)
    return string
migrations.py 文件源码 项目:LOST 作者: kylemh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_csv(csv_filename, header_list, sql_query_string):
    """Transfer records from database query to a new .csv file.

    Keyword arguments:
    csv_filename -- String with a '.csv' suffix
    header_list -- List of strings (as CSV header names)
    sql_query_string -- Passed to match header_list format
    """

    with open(csv_filename, 'w', newline='\n') as csvfile:
        csv_writer = csv.writer(csvfile, quotechar="'", quoting=csv.QUOTE_MINIMAL)
        csv_writer.writerow(header_list)

        CUR.execute(sql_query_string)
        records = CUR.fetchall()
        CONN.commit()
        for entry in records:
            csv_writer.writerow(entry)
samplesheet.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def write_csv_rows(rows, path):
    """
    Write CSV rows in a standard format.
    :type rows: list[list[string]]
    :type path: string
    """
    with open(path, 'w') as outfile:
        writer = csv.writer(outfile, delimiter=',', quoting=csv.QUOTE_MINIMAL)
        writer.writerows(rows)
contact.py 文件源码 项目:fccforensics 作者: RagtagOpen 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_query(src, fn, rows=200):
    print('writing data for %s to %s' % (source, fn))
    query['query']['function_score']['query']['bool']['must'][0]['term']['analysis.source'] = source
    emails = set()
    batches = 0
    print(json.dumps(query))
    total = None
    with open(fn, 'w', newline='') as outfile:
        writer = csv.writer(outfile, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(['email', 'name', 'date', 'comment', 'url'])
        while len(emails) < rows and batches < 10:
            offset = batches * 100
            if total and offset > total:
                break
            resp = es.search(index='fcc-comments', body=query, size=100, from_=offset)
            if batches == 0:
                total = resp['hits']['total']
                print('\t%s matches' % (total))
            else:
                print('\tbatch %s: have %s' % (batches+1, len(emails)))
            batches += 1
            for doc in resp['hits']['hits']:
                if len(emails) == rows:
                    break
                data = doc['_source']
                if data['contact_email'] in emails:
                    continue
                emails.add(data['contact_email'])
                writer.writerow([data['contact_email'], data['filers'][0]['name'],
                    data['date_received'], data['text_data'],
                    'https://www.fcc.gov/ecfs/filing/%s' % doc['_id']
                ])
main.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_to_csv(model_name: str, last_epoch: int,
                        configuration: Configuration = Configuration.german(),
                        step_count=10, first_epoch: int = 0,
                        csv_directory: Path = configuration.default_data_directories.test_results_directory) -> List[
        Tuple[int, ExpectationsVsPredictionsInGroupedBatches]]:

        step_size = (last_epoch - first_epoch) / (step_count - 1)

        epochs = distinct(list(int(first_epoch + index * step_size) for index in range(step_count)))
        log("Testing model {} on epochs {}.".format(model_name, epochs))

        model = configuration.load_model(model_name, last_epoch,
                                         allowed_characters_for_loaded_model=configuration.allowed_characters,
                                         use_kenlm=True,
                                         language_model_name_extension="-incl-trans")

        def get_result(epoch: int) -> ExpectationsVsPredictionsInGroupedBatches:
            log("Testing epoch {}.".format(epoch))

            model.load_weights(
                allowed_characters_for_loaded_model=configuration.allowed_characters,
                load_model_from_directory=configuration.directories.nets_base_directory / model_name, load_epoch=epoch)

            return configuration.test_model_grouped_by_loaded_corpus_name(model)

        results_with_epochs = []

        csv_file = csv_directory / "{}.csv".format(model_name + "-incl-trans")
        import csv
        with csv_file.open('w', encoding='utf8') as opened_csv:
            writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            for epoch in epochs:
                result = get_result(epoch)
                writer.writerow((epoch, result.average_loss, result.average_letter_error_rate,
                                 result.average_word_error_rate, result.average_letter_error_count,
                                 result.average_word_error_count))

        return results_with_epochs
corpus.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def summarize_to_csv(self, summary_csv_file: Path) -> None:
        import csv
        with summary_csv_file.open('w', encoding='utf8') as csv_summary_file:
            writer = csv.writer(csv_summary_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            for row in self.csv_rows():
                writer.writerow(row)
corpus.py 文件源码 项目:speechless 作者: JuliusKunze 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def save(self, corpus_csv_file: Path, use_relative_audio_file_paths: bool = True):
        import csv
        with corpus_csv_file.open('w', encoding='utf8') as opened_csv:
            writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            examples_and_phase = [(e, Phase.training) for e in self.training_examples] + \
                                 [(e, Phase.test) for e in self.test_examples]

            for e, phase in examples_and_phase:
                writer.writerow(
                    (e.id, str(e.audio_file.relative_to(
                        corpus_csv_file.parent) if use_relative_audio_file_paths else e.audio_file),
                     e.label, phase.value, e.positional_label.serialize() if e.positional_label else ""))
utils.py 文件源码 项目:us_zipcodes_congress 作者: OpenSourceActivismTech 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def csv_writer(filename, data, fields=None, delimiter=',', quoting=csv.QUOTE_MINIMAL):
    log.info('writing', filename)
    rows = 0
    with open(filename, 'w') as f:
        if not fields:
            fields = data[0].keys()
        writer = csv.DictWriter(f, fieldnames=fields, delimiter=delimiter, quoting=quoting)
        writer.writeheader()
        for row in data:
            writer.writerow(row)
            rows += 1
    log.info('wrote %d rows' % rows)
utils.py 文件源码 项目:us_zipcodes_congress 作者: OpenSourceActivismTech 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load_csv_columns(filename, column_names=None, skip=0, delimiter=',', quoting=csv.QUOTE_MINIMAL):
    r = []
    log.info('opening', filename)
    with open(filename, 'r') as f:
        data_file = csv_reader_converter(f, delimiter=delimiter, quoting=quoting)
        for i in range(skip):
            next(data_file)
        headers = next(data_file, None)  # parse the headers
        columns = {}
        for (i, h) in enumerate(headers):
            h = h.strip()
            if (not column_names) or h in column_names:
                columns[i] = h
        log.info("headers", headers)
        log.info("columns", column_names)

        for line in data_file:
            d = {}
            if not line:
                continue
            for (column, index) in columns.items():
                if column_names:
                    rename = column_names[index]
                else:
                    rename = headers[column]
                value = line[column].strip()
                d[rename] = value
            r.append(d)
        log.info('read %d lines' % len(r))
        return r
output.py 文件源码 项目:croissance 作者: biosustain 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, file, include_default_phase: bool = True):
        writer = csv.writer(file, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(['name', 'phase', 'slope', 'intercept', 'N0', 'SNR', 'rank'])

        self._writer = writer
        self._file = file
        self._include_default_phase = include_default_phase
add_mrr3_notes.py 文件源码 项目:dprr-django 作者: kingsdigitallab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run():

    note_files = ['promrep/scripts/data/MRR3ShortNotesV2.csv',
                  'promrep/scripts/data/MRR3LongNotesV2.csv']

    source = SecondarySource.objects.get(abbrev_name='Broughton MRR3')

    for ifname in note_files:
        print 'Will read notes from file', ifname, '\n\n'

        ifile = codecs.open(ifname, 'r', encoding='latin1')
        log_fname = os.path.splitext(os.path.basename(ifname))[0] + '.log'

        with open(log_fname, 'wb') as log_file:
            spamwriter = csv.writer(
                log_file,
                delimiter=',',
                quotechar='"',
                quoting=csv.QUOTE_MINIMAL)
            spamwriter.writerow(('id', 'note'))

            for i, line in enumerate(ifile):

                note_text = line.encode('utf-8').replace("**", ";").strip('"')

                print str(i) + ":", note_text

                note = PostAssertionNote.objects.create(
                    text=note_text, secondary_source=source)
                spamwriter.writerow((note.id, note_text[0:20]))
generate_analysis_csv.py 文件源码 项目:pfb-network-connectivity 作者: azavea 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):

        tmpdir = tempfile.mkdtemp()

        try:
            queryset = AnalysisJob.objects.all().filter(status=AnalysisJob.Status.COMPLETE)
            filter_set = AnalysisJobFilterSet()
            queryset = filter_set.filter_latest(queryset, 'latest', True)

            tmp_csv_filename = os.path.join(tmpdir, 'results.csv')
            with open(tmp_csv_filename, 'w') as csv_file:
                writer = None
                fieldnames = []

                for job in queryset:
                    row_data = {}
                    for export in EXPORTS:
                        columns, values = export(job)
                        if writer is None:
                            fieldnames = fieldnames + columns
                        for column, value in zip(columns, values):
                            row_data[column] = value
                    if writer is None:
                        writer = csv.DictWriter(csv_file,
                                                fieldnames=fieldnames,
                                                dialect=csv.excel,
                                                quoting=csv.QUOTE_MINIMAL)
                        writer.writeheader()
                    writer.writerow(row_data)

            s3_client = boto3.client('s3')
            now = datetime.utcnow()
            s3_key = 'analysis-spreadsheets/results-{}.csv'.format(now.strftime('%Y-%m-%dT%H%M'))
            s3_client.upload_file(tmp_csv_filename, settings.AWS_STORAGE_BUCKET_NAME, s3_key)
            logger.info('File uploaded to: s3://{}/{}'
                        .format(settings.AWS_STORAGE_BUCKET_NAME, s3_key))
        finally:
            shutil.rmtree(tmpdir)
helper_functions.py 文件源码 项目:LegalNetworks 作者: brschneidE3 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_row_to_csv(directory, filename, row, columns_to_skip):
    row = ['' for i in range(columns_to_skip)] + row
    with open(directory + '/' + filename + '.csv', 'a') as csvfile:
        spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
        spamwriter.writerow(row)
    csvfile.close()
io_helpers.py 文件源码 项目:retrieve-and-rank-tuning 作者: rchaks 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        super(RankerRelevanceFileQueryStream, self).__init__(*args, **kwargs)

        dialect = csv.excel

        # The following explicit assignments shadow the dialect defaults
        # but are necessary to avoid strange behavior while called by
        # certain unit tests. Please do not delete.
        dialect.doublequote = True
        dialect.quoting = csv.QUOTE_MINIMAL
        dialect.skipinitialspace = True

        self.__reader__ = csv.reader(self.query_file, dialect=dialect)


问题


面经


文章

微信
公众号

扫码关注公众号