python类QUOTE_ALL的实例源码

handlers.py 文件源码 项目:naziscore 作者: rbanffy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get(self):
        cached = memcache.get('worst_handler')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            # Using GQL as a test - will create new index
            for line in ndb.gql(
                    'select distinct screen_name, twitter_id, score '
                    'from Score order by score desc limit 20000'):
                response_writer.writerow(
                    [line.screen_name, line.twitter_id, line.score])
            memcache.set('worst_handler', self.response.text, 86400)
handlers.py 文件源码 项目:naziscore 作者: rbanffy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get(self):
        cached = memcache.get('worst_hashtags')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            c = Counter()
            for s in Score.query().order(-Score.score).iter(
                        limit=5000, projection=(Score.hashtags)):
                if s.hashtags is not None:
                    c.update((h.lower() for h in s.hashtags))
            for tag, tag_count in c.most_common(100):
                response_writer.writerow(
                    [tag, tag_count])
            memcache.set('worst_hashtags', self.response.text, 86400)
handlers.py 文件源码 项目:naziscore 作者: rbanffy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self):
        cached = memcache.get('worst_websitess')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            c = Counter()
            for s in Score.query().order(-Score.score).iter(
                        limit=5000, projection=(Score.websites)):
                if s.websites is not None:
                    c.update((h.lower() for h in s.websites))
            for site, site_count in c.most_common(200):
                response_writer.writerow(
                    [site, site_count])
            memcache.set('worst_websitess', self.response.text, 86400)
handlers.py 文件源码 项目:naziscore 作者: rbanffy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self):
        cached = memcache.get('worst_unknown_websites')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            c = Counter()
            for s in Score.query().order(-Score.score).iter(
                        limit=5000, projection=(Score.websites)):
                if s.websites is not None:
                    c.update((h.lower() for h in s.websites
                              if h.lower() not in c('KNOWN_SITES')))
            for site, site_count in c.most_common(200):
                response_writer.writerow(
                    [site, site_count])
            memcache.set('worst_unknown_websites', self.response.text, 86400)
test_csv.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        self.assertRaises(TypeError, ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
test_csv.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
feed.py 文件源码 项目:pubtransit 作者: pubtransit 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_table(zip_file, table_name, columns, dtypes=None):
    if not dtypes:
        dtypes = {}

    with zip_file.open(table_name + '.txt', 'r') as csv_stream:
        hearer = csv_stream.readline().strip()

        names = [
            GET_COLUMN_NAME_REGEX.sub(b'', name).decode('ascii')
            for name in hearer.split(b',')]

        table = pandas.read_csv(
            csv_stream, names=names, quotechar='"', quoting=csv.QUOTE_ALL,
            usecols=[col for col in columns])
        table = [
            numpy.asarray(remove_nans(table[column]), dtype=dtypes.get(column))
            for column in columns]
        return table
crawl2csv.py 文件源码 项目:vangogh 作者: gfolego 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def gen_csv(csvfile, page_list):

    # Define writer
    writer = csv.writer(csvfile, quoting=csv.QUOTE_ALL)

    # Field names
    field_names = ['PageID', 'DescriptionURL', 'ImageURL', 'ImageSHA1',
            'PixelHeight', 'PixelWidth',
            'PaintingID', 'Artist', 'RealDimensions']

    writer.writerow(field_names)

    # For each image
    for page in page_list:
        writer.writerow([page.page_id, page.description_url, page.img_url,
            page.img_sha1, page.img_height, page.img_width,
            page.paint_id, page.artist, page.dim])


# Main
views.py 文件源码 项目:django-csv-export-view 作者: benkonrath 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_csv_writer_fmtparams(self):
        return {
            'dialect': 'excel',
            'quoting': csv.QUOTE_ALL,
        }
get_london_location_data.py 文件源码 项目:Smelly-London 作者: Smelly-London 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def write_dict_to_csv(list_of_dictionaries, output_file):
    """write a list of dictionaries to a csv file."""

    fieldnames = ['centroid_lon', 'centroid_lat', 'feature_type', 'name', 'source']

    with open(output_file, 'w', newline = '') as f:
        w = csv.DictWriter(f, fieldnames, quoting = csv.QUOTE_ALL)
        w.writeheader()
        w.writerows(list_of_dictionaries)
nepse_csv.py 文件源码 项目:nepse-stock 作者: sachin38 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def write_to_csv(received_data = [], *args):
    first_data = ["Company", "Last Traded Price", "Change", "Total Listed Shares", "Paid Up Value", "Total Paid Up Value", "Closing Market Price", "Market Capitalization"]

    if os.path.isfile("datafile.csv"):
        with open("datafile.csv", "a") as output:
            writer = csv.writer(output, quoting=csv.QUOTE_ALL)
            writer.writerow(received_data)
            received_data[:] = []
    else:
        with open("datafile.csv", "w") as output:
            writer = csv.writer(output, quoting=csv.QUOTE_ALL)
            writer.writerow(first_data)
            writer.writerow(received_data)
            received_data[:] = []
csv.py 文件源码 项目:pentestly 作者: praetorian-inc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def module_run(self):
        filename = self.options['filename']
        # codecs module not used because the csv module converts to ascii
        with open(filename, 'w') as outfile:
            # build a list of table names
            table = self.options['table']
            rows = self.query('SELECT * FROM "%s" ORDER BY 1' % (table))
            cnt = 0
            for row in rows:
                row = [x if x else '' for x in row]
                if any(row):
                    cnt += 1
                    csvwriter = csv.writer(outfile, quoting=csv.QUOTE_ALL)
                    csvwriter.writerow([s.encode("utf-8") for s in row])
        self.output('%d records added to \'%s\'.' % (cnt, filename))
run.py 文件源码 项目:twitter_mongodb_helper 作者: IDEA-NTHU-Taiwan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def write_csv_file(filename, result, path):
    """Writes the result to csv with the given filename.
    Args:
        filename   (str): Filename to write to.
        path       (str): Directory path to use.
    """

    output = open(path + filename + '.csv', 'wb')
    writer = csv.writer(output, quoting=csv.QUOTE_ALL, lineterminator='\n')
    for val in result:
        writer.writerow([val])
    # Print one a single row
    # writer.writerow(result)
athena_cli.py 文件源码 项目:athena-cli 作者: guardian 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def execute(self, statement):
        execution_id = self.athena.start_query_execution(self.dbname, statement)
        if not execution_id:
            return

        while True:
            stats = self.athena.get_query_execution(execution_id)
            status = stats['QueryExecution']['Status']['State']
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            time.sleep(0.2)  # 200ms

        if status == 'SUCCEEDED':
            results = self.athena.get_query_results(execution_id)
            headers = [h['Name'].encode("utf-8") for h in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]

            if self.format in ['CSV', 'CSV_HEADER']:
                csv_writer = csv.writer(sys.stdout, quoting=csv.QUOTE_ALL)
                if self.format == 'CSV_HEADER':
                    csv_writer.writerow(headers)
                csv_writer.writerows([[text.encode("utf-8") for text in row] for row in self.athena.yield_rows(results, headers)])
            elif self.format == 'TSV':
                print(tabulate([row for row in self.athena.yield_rows(results, headers)], tablefmt='tsv'))
            elif self.format == 'TSV_HEADER':
                print(tabulate([row for row in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='tsv'))
            elif self.format == 'VERTICAL':
                for num, row in enumerate(self.athena.yield_rows(results, headers)):
                    print('--[RECORD {}]--'.format(num+1))
                    print(tabulate(zip(*[headers, row]), tablefmt='presto'))
            else:  # ALIGNED
                print(tabulate([x for x in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='presto'))

        if status == 'FAILED':
            print(stats['QueryExecution']['Status']['StateChangeReason'])
Output_Utils.py 文件源码 项目:EEGSignalAnalysis 作者: pprakhar30 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def Y_Output():

    mylist = [1,1,0,0,1,1,1,1,0,1,1,0,1,1,1,0,0,1,0,0,1,1,1,1,0]
    myfile = open("sandeep_Y.csv", 'wb')
    wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
    wr.writerow(mylist)

    with open('prakhar_Y.csv', 'rb') as csvfile:
             spamreader = csv.reader(csvfile)
             for row in spamreader:
                print row
test_csv.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_write_quoting(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,q'], 'a,1,p,q',
                          quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         quoting = csv.QUOTE_MINIMAL)
        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
                         quoting = csv.QUOTE_NONNUMERIC)
        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
                         quoting = csv.QUOTE_ALL)
        self._write_test(['a\nb',1], '"a\nb","1"',
                         quoting = csv.QUOTE_ALL)
test_csv.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_write_quoting(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
        self._write_error_test(csv.Error, ['a',1,'p,q'],
                               quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         quoting = csv.QUOTE_MINIMAL)
        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
                         quoting = csv.QUOTE_NONNUMERIC)
        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
                         quoting = csv.QUOTE_ALL)
        self._write_test(['a\nb',1], '"a\nb","1"',
                         quoting = csv.QUOTE_ALL)
test_csv.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_write_quoting(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
        self._write_error_test(csv.Error, ['a',1,'p,q'],
                               quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         quoting = csv.QUOTE_MINIMAL)
        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
                         quoting = csv.QUOTE_NONNUMERIC)
        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
                         quoting = csv.QUOTE_ALL)
        self._write_test(['a\nb',1], '"a\nb","1"',
                         quoting = csv.QUOTE_ALL)
test_csv.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_quoting(self):
        class mydialect(csv.Dialect):
            delimiter = ";"
            escapechar = '\\'
            doublequote = False
            skipinitialspace = True
            lineterminator = '\r\n'
            quoting = csv.QUOTE_NONE
        d = mydialect()
        self.assertEqual(d.quoting, csv.QUOTE_NONE)

        mydialect.quoting = None
        self.assertRaises(csv.Error, mydialect)

        mydialect.doublequote = True
        mydialect.quoting = csv.QUOTE_ALL
        mydialect.quotechar = '"'
        d = mydialect()
        self.assertEqual(d.quoting, csv.QUOTE_ALL)
        self.assertEqual(d.quotechar, '"')
        self.assertTrue(d.doublequote)

        mydialect.quotechar = "''"
        with self.assertRaises(csv.Error) as cm:
            mydialect()
        self.assertEqual(str(cm.exception),
                         '"quotechar" must be an 1-character string')

        mydialect.quotechar = 4
        with self.assertRaises(csv.Error) as cm:
            mydialect()
        self.assertEqual(str(cm.exception),
                         '"quotechar" must be string, not int')
create_samplecsv.py 文件源码 项目:ppytrading 作者: yusukemurayama 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __create_stocklist(self):
        """???????CSV????????????

        Returns:
            ????????????????????????
        """
        def get_row(idx):
            """CSV????1???????????"""
            return ('TEST{}'.format(idx),
                    'Test {}'.format(idx),
                    'Sector {}'.format(idx % 10),)

        header = ('Symbol', 'Name', 'Sector',)
        num_markets = len(const.MARKET_DATA.keys())
        lis = [[
            get_row(idx2) for idx2, row in enumerate(range(self.num_stocks))
            if idx2 % num_markets == idx1
        ] for idx1 in range(len(const.MARKET_DATA.keys()))]

        for idx, (market_id, market_name) in enumerate(const.MARKET_DATA.items()):
            with open(os.path.join(self.dest_dir_stocklist, market_name + '.csv'), 'w',
                      encoding=const.DEFAULT_FILE_ENCODING, newline='') as fp:
                writer = csv.writer(fp, quoting=csv.QUOTE_ALL)
                writer.writerow(header)
                writer.writerows(lis[idx])

        return [row[0] for row in chain(*lis)]  # Symbol??????????


问题


面经


文章

微信
公众号

扫码关注公众号