python类default()的实例源码

read_data_from_mongo.py 文件源码 项目:entity_binding 作者: JasperGuo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def fetch_all_table():
    tables = db.tables.find()
    table_str_list = list()
    for table in tables:
        table_str_list.append(json.dumps(table, default=json_util.default))
    save("all_tables.txt", table_str_list)
utils.py 文件源码 项目:dbs-back 作者: Beit-Hatfutsot 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def default(self, obj):
        if isinstance(obj, (datetime.datetime, datetime.date)):
            return obj.isoformat()
        elif isinstance(obj, ObjectId):
            return unicode(obj)
        return json.JSONEncoder.default(self, obj)
utils.py 文件源码 项目:dbs-back 作者: Beit-Hatfutsot 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def jsonify(*args, **kwargs):
    """ jsonify with support for MongoDB ObjectId
        See https://gist.github.com/akhenakh/2954605
    """
    return Response(json.dumps(dict(*args, **kwargs),
                    default=json_util.default,
                    indent=2,
                    cls=MongoJsonEncoder),
                    mimetype='application/json')
utils.py 文件源码 项目:dbs-back 作者: Beit-Hatfutsot 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_conf(must_have_keys=DEAFULT_CONF_REQUIRED_KEYS,
             config_file=DEFAULT_CONF_FILE,
             env_config_file=DEFAULT_ENV_CONF_FILE,
             with_pardir_fallback=True):
    ''' Read a configuration file, ensure all the `must_have_keys` are present
        and return a config dict.
        The file is read from `config_file` and if it's not there and it's a
        default request, `conf/app_server.yaml` is used.
    '''
    if os.path.exists(config_file):
        # this will load either the default conf file if it exists or the given config_file parameter
        fh = open(config_file)
    elif os.environ.get("BH_ENV") and os.path.exists(env_config_file.format(BH_ENV=os.environ["BH_ENV"])):
        # environment is set in BH_ENV and corresponding conf file exists
        fh = open(env_config_file.format(BH_ENV=os.environ["BH_ENV"]))
    elif with_pardir_fallback and os.path.exists(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, 'conf', 'app_server.yaml')):
        # fallback to local file for development
        fh = open(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, 'conf', 'app_server.yaml'))
    else:
        raise Exception("Could not find a conf file")

    conf = yaml.load(fh)
    if not conf:
        raise ValueError('Empty config file')
    # Check that all the must_have_keys are present
    config_keys = set(conf.keys())
    missing_keys = list(must_have_keys.difference(config_keys))
    if missing_keys != []:
        keys_message = gen_missing_keys_error(missing_keys)
        error_message = 'Invalid configuration file: ' + keys_message
        raise ValueError(error_message)

    return Struct(**conf) # Enables dot access
utils.py 文件源码 项目:dbs-back 作者: Beit-Hatfutsot 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def humanify(obj, status_code=200):
    """ Gets an obj and possibly a status code and returns a flask Resonse
        with a jsonified obj, not suitable for humans
    >>> humanify({"a": 1})
    <Response 8 bytes [200 OK]>
    >>> humanify({"a": 1}, 404)
    <Response 8 bytes [404 NOT FOUND]>
    >>> humanify({"a": 1}).get_data()
    '{"a": 1}'
    >>> humanify([1,2,3]).get_data()
    '[1, 2, 3]'
    """
    # TODO: refactor the name to `response`
    # jsonify function doesn't work with lists
    if type(obj) == list:
        data = json.dumps(obj, default=json_util.default)
    elif type(obj) == pymongo.cursor.Cursor:
        rv = []
        for doc in obj:
            doc['_id'] = str(doc['_id'])
            rv.append(dumps(doc))
        data = '[' + ',\n'.join(rv) + ']' + '\n'
    else:
        data = dumps(obj,
                          default=json_util.default,
                          cls=MongoJsonEncoder)
    resp = Response(data, mimetype='application/json')
    resp.status_code = status_code
    return resp
server.py 文件源码 项目:taobaobao 作者: 1dot75cm 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def toJson(data):
    return json.dumps(
               data,
               default=json_util.default,
               ensure_ascii=False
           )
httpresponse.py 文件源码 项目:flask-template-project 作者: andreffs18 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def MyResponse(response, status, mimetype="application/json", **kwargs):
    return Response(json.dumps(response, default=json_util.default),
                    status=status, mimetype="application/json", **kwargs)
inspector.py 文件源码 项目:Rocket.Chat.Audit 作者: peak6 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def to_json(l):
    return json.dumps(list(l), indent=2, default=json_util.default)
dashboard.py 文件源码 项目:Twitter-Hashtag-Tracking 作者: xuwenyihust 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def keywords():
    connection = MongoClient(MONGODB_HOST, MONGODB_PORT)
    collection = connection[DBS_NAME][COLLECTION_NAME0]
    projects = collection.find(projection=FIELDS0)
    json_projects = []
    for project in projects:
        json_projects.append(project)
    json_projects = json.dumps(json_projects, default=json_util.default)
    connection.close()
    return json_projects
dashboard.py 文件源码 项目:Twitter-Hashtag-Tracking 作者: xuwenyihust 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def hashtags():
        connection = MongoClient(MONGODB_HOST, MONGODB_PORT)
        collection = connection[DBS_NAME][COLLECTION_NAME1]
        projects = collection.find(projection=FIELDS1)
        json_projects = []
        for project in projects:
                json_projects.append(project)
        json_projects = json.dumps(json_projects, default=json_util.default)
        connection.close()
        return json_projects
dashboard.py 文件源码 项目:Twitter-Hashtag-Tracking 作者: xuwenyihust 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def counts():
        connection = MongoClient(MONGODB_HOST, MONGODB_PORT)
        collection = connection[DBS_NAME][COLLECTION_NAME2]
        projects = collection.find(projection=FIELDS2)
        json_projects = []
        for project in projects:
                json_projects.append(project)
        json_projects = json.dumps(json_projects, default=json_util.default)
        connection.close()
        return json_projects
dashboard.py 文件源码 项目:Twitter-Hashtag-Tracking 作者: xuwenyihust 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ratio():
        connection = MongoClient(MONGODB_HOST, MONGODB_PORT)
        collection = connection[DBS_NAME][COLLECTION_NAME3]
        projects = collection.find(projection=FIELDS3)
        json_projects = []
        for project in projects:
                json_projects.append(project)
        json_projects = json.dumps(json_projects, default=json_util.default)
        connection.close()
        return json_projects
dashboard.py 文件源码 项目:Twitter-Hashtag-Tracking 作者: xuwenyihust 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tracking_word():
        connection = MongoClient(MONGODB_HOST, MONGODB_PORT)
        collection = connection[DBS_NAME][COLLECTION_NAME4]
        projects = collection.find(projection=FIELDS4)
        json_projects = []
        for project in projects:
                json_projects.append(project)
        json_projects = json.dumps(json_projects, default=json_util.default)
        connection.close()
        return json_projects
dashboard.py 文件源码 项目:Twitter-Hashtag-Tracking 作者: xuwenyihust 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def time():
        connection = MongoClient(MONGODB_HOST, MONGODB_PORT)
        collection = connection[DBS_NAME][COLLECTION_NAME6]
        projects = collection.find(projection=FIELDS6)
        json_projects = []
        for project in projects:
                json_projects.append(project)
        json_projects = json.dumps(json_projects, default=json_util.default)
        connection.close()
        return json_projects
runs.py 文件源码 项目:pypers 作者: frankosan 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self, type):

            parser = reqparse.RequestParser()
            parser.add_argument('user'  , type=str, default=None)
            parser.add_argument('name'  , type=str, default=None)
            parser.add_argument('status', type=str, default=[], action='append')

            args = parser.parse_args()


            query = {}
            if type == 'steps':
                query = {
                    'single_step' : True,
                }
            elif type == 'pipelines':
                query = {
                    'single_step' : {'$nin': [True]},
                    'name' :  args['name']
                }

            query['user'] = args['user']
            arg_status = args.get('status', [])
            if arg_status:
                query['status'] = {"$in": arg_status}

            # filter None values from query
            query = {k: v for k, v in query.items() if not v == None}

            return db.pipelines.find(query).count()
runs.py 文件源码 项目:pypers 作者: frankosan 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self):
            """
            Get the number of runs (useful for pagination)
            """
            parser = reqparse.RequestParser()
            parser.add_argument('user'  , type=str, default=None)

            args = parser.parse_args()

            #stats =  {'total': db.pipelines.find({}).count()}
            stats = { 'pipelines': [ ], 'totals': { 'stats':{}}}

            for pipeline in pipelines:
                stats['pipelines'].append(dbmodel.get_stats({'name':pipeline['name']}))

            # Get user stat
            if args['user']:
                stats['user'] = dbmodel.get_stats({'user': args['user']}, 'user')

            # compute totals
            tottot = 0
            for pipeline in stats['pipelines']:
                for stat, value in pipeline['stats'].iteritems():
                    if stat in stats['totals']['stats']:
                        stats['totals']['stats'][stat] += value
                        tottot += value
                    else:
                        stats['totals']['stats'][stat] = value
                        tottot += value
            stats['totals']['total'] = tottot

            return stats
crawl.py 文件源码 项目:open-house-crawler 作者: data-skeptic 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)
crawl.py 文件源码 项目:open-house-crawler 作者: data-skeptic 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_expiration(url, expiration_rules):
    exp = expiration_rules['default']
    sw = expiration_rules['starts_with']
    prefixes = sw.keys()
    for prefix in prefixes:
        if url.startswith(prefix):
            exp = sw[prefix]
    return exp
crawl.py 文件源码 项目:open-house-crawler 作者: data-skeptic 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_one(url, s3, expiration_rules, headers):
    tld = tldextract.extract(url)
    if tld.subdomain != '' and tld.subdomain != 'www':
        tld = tld.subdomain + '.' + tld.domain + '.' + tld.suffix
    else:
        tld = tld.domain + '.' + tld.suffix
    i = url.find(tld)
    s3key = tld + url[i+len(tld):]
    exp = get_expiration(url, expiration_rules)
    try:
        o = s3.ObjectSummary(bucket, s3key)
        lm = o.last_modified
        now = datetime.datetime.utcnow()
        diff = exp - now
        expires_on = now - diff
        if lm.replace(tzinfo=None) < expires_on:
            exists = False
        else:
            exists = True
    except botocore.exceptions.ClientError as e:
        exists = False
    if not(exists):
        logger.info('Processing: ' + url)
        crawl = crawl_one(url, expiration_rules, headers)
        contents = json.dumps(crawl, default=json_util.default)
        fake_handle = StringIO(contents)
        b = s3.create_bucket(Bucket=bucket)
        res = b.put_object(Key=s3key, Body=fake_handle)
        # TODO: check for errors
        dt = datetime.datetime.today().strftime('%Y-%m-%d')
        trackStats(tld, dt, True)
        summaryKey = dt
        trackStats(summaryKey, dt, True)
        summaryKey = tld + "|" + dt
        trackStats(summaryKey, dt, True)
        return True
    return False
__init__.py 文件源码 项目:hel 作者: hel-repo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __call__(self, value, system):
        from bson import json_util
        request = system.get('request')
        if request is not None:
            if hasattr(request, 'response'):
                request.response.content_type = 'application/json'
        return json.dumps(value, default=json_util.default)


# https://gist.github.com/kamalgill/b1f682dbdc6d6df4d052


问题


面经


文章

微信
公众号

扫码关注公众号