python类default()的实例源码

views.py 文件源码 项目:nanami 作者: theeluwin 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def response(status, data):
    hr = HttpResponse()
    hr['Access-Control-Allow-Origin'] = '*'
    hr['Content-Type'] = 'application/json'
    hr['charset'] = 'utf-8'
    hr.status_code = status
    if type(data) == str or type(data) == unicode:
        data = {
            'message': data,
        }
    try:
        hr.write(json.dumps(data))
    except:
        try:
            hr.write(json.dumps(data, default=json_util.default))
        except:
            hr.status_code = 500
            hr.write(json.dumps({
                'error': "json serialize failed",
            }))
    return hr
ctrlapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execQuery (tree, query, aggcode=None):
    hosts = check_source (tree, query['name'])
    if send_source (hosts, tree, query['name']) == False:
        return []

    if aggcode:
        hosts = check_source (tree, aggcode['name'])
        if send_source (hosts, tree, aggcode['name']) == False:
            return []

    req = buildReq ('execQuery', tree, query, aggcode)
    resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
    if resp['status'] != '200':
        return []
    else:
        return json.loads (content, object_hook=json_util.object_hook)
ctrlapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def registerQuery (filepath):
    filename = os.path.basename (filepath)

    try:
        with open (filepath, 'r') as f:
            filedata = f.read()
    except EnvironmentError:
        return [False]

    req = {'api': 'registerQuery'}
    req.update ({'name': filename})
    req.update ({'data': filedata})

    resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
    if resp['status'] != '200':
        return []
    else:
        return json.loads (content, object_hook=json_util.object_hook)
processquery.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def execRequest (req, url):
    global results
    workers = []
    tree = req['tree']
    for child in tree['controller']['child']:
        t = Thread (target = wrapper, args = (httpcmd, (child, req, url),
                                              results))
        workers.append (t)

    for worker in workers:
        worker.start()
    for worker in workers:
        worker.join()

    data = []
    for res in results:
        resp, content = res
        if resp['status'] == '200':
            data += json.loads (content, object_hook=json_util.object_hook)

    results = []
    return json.dumps (data, default=json_util.default)
read_data_from_mongo.py 文件源码 项目:entity_binding 作者: JasperGuo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    questions = db.test_questions.find({
        "is_bound": True
    })

    table_key_set = set()
    question_str_list = list()
    for question in questions:
        table_key_set.add(get_table_key(question["table_loc"]))
        question_str_list.append(json.dumps(question, default=json_util.default))

    save("test_questions.txt", question_str_list)

    query_table(table_key_set)

    print("Questions: %d" % questions.count())
    print("Tables: %d" % len(table_key_set))
manage_dorks.py 文件源码 项目:dorky 作者: recordedfuture 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def on_get(self, request):
        """ Various getters.
        """
        what = request.form['what']
        resp = {}
        if what == 'results':
            dbid = request.form['dbid']
            resp['results'] = list(self.storage.get_results(dbid))
        elif what == 'dorks':
            dorks = list(self.storage.get_dorks())
            resp['categories'] = list(set([d['category'] for d in dorks]))
            resp['dorks'] = dorks
        elif what == 'blacklist':
            resp['blacklist'] = {'url': [], 'text': []}
            for bl in self.storage.get_blacklist():
                resp['blacklist'][bl['type']].append(bl['term'])
        else:
            resp['error'] = 'Unknown'
        return Response(json.dumps(resp, default=json_util.default), mimetype='application/json')
cli.py 文件源码 项目:tracboat 作者: nazavode 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def TRAC_OPTIONS(func):  # pylint: disable=invalid-name
    @click.option(
        '--trac-uri',
        default='http://localhost/xmlrpc',
        show_default=True,
        help='uri of the Trac instance XMLRpc endpoint',
    )
    @click.option(
        '--ssl-verify / --no-ssl-verify',
        default=True,
        show_default=True,
        help='Enable/disable SSL certificate verification'
    )
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)

    return wrapper
CVEScan.py 文件源码 项目:Plugins 作者: cve-search 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def handle_scan(self, scan, action, tags, notes, store=False):
    try:
      nmap     = self._parseNMap(scan)
      enhanced = self._enhance(nmap)
      if store: self._store_in_db(nmap, tags=tags, notes=notes)
      if   action == "json":
        returndata = json.dumps(enhanced, indent=2, default=json_util.default)
      elif action == "pdf":
        returndata = str(base64.b64encode(self._generatePDF(enhanced)), "utf-8")
      elif action == "webview":
        app = Flask(__name__, template_folder=os.path.join(callLocation, "templates"))
        with app.test_request_context("/"):
          returndata = render_template(self.html, scan=enhanced)
      return returndata
    except Exception as e:
      traceback.print_exc()
mongo_base.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def to_json(self, obj):
        return json.dumps(obj, default=json_util.default)
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def serialize(value):
        return dumps(value, default=json_util.default)
helperapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def httpcmd (node, req):
    return restapi.post (node, json.dumps (req, default=json_util.default), "pathdump")
agent.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getpathdumppost():
    if not request.json or not 'api' in request.json:
        abort (404)
    else:
        output = handleRequest (request.json)
        return json.dumps (output, default=json_util.default)
agent.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getpathdumpget():
    if not request.json or not 'api' in request.json:
        abort (404)
    else:
        output = handleRequest (request.json)
        return json.dumps (output, default=json_util.default)
ctrlapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def installQuery (tree, query, interval):
    hosts = check_source (tree, query['name'])
    if send_source (hosts, tree, query['name']) == False:
        return []

    req = buildReq ('installQuery', tree, query, None, interval)
    resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
    if resp['status'] != '200':
        return []
    else:
        return json.loads (content, object_hook=json_util.object_hook)
ctrlapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def check_source (tree, filename):
    req = {'api': 'check_source'}
    req.update ({'tree': tree})
    req.update ({'name': filename})

    resp, content = r.post (controller, json.dumps (req, default=json_util.default), "pathdump")
    return json.loads (content, object_hook=json_util.object_hook)
ctrlapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def send_source (hosts, tree, filename):
    if source_available_at (hosts):
        return True

    # need to send a copy of source to hosts which don't have it
    send_tree = remove_hosts_from_tree (hosts, tree)

    req = {'api': 'send_source'}
    req.update ({'tree': send_tree})
    req.update ({'name': filename})

    resp, content = r.post (controller, json.dumps (req, default=json_util.default), "pathdump")
    return source_available_at (json.loads (content, object_hook=json_util.object_hook))
ctrlapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def getAggTree (groupnodes):
    req = {'api': 'getAggTree'}
    req.update ({'groupnodes': groupnodes})

    resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
    if resp['status'] != '200':
        return {}
    else:
        return json.loads (content, object_hook=json_util.object_hook)[0]
ctrlapi.py 文件源码 项目:tool 作者: PathDump 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getFlowCollectionDir():
    req = {'api': 'getFlowCollDir'}

    resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
    if resp['status'] != '200':
        return ''
    else:
        return json.loads (content, object_hook=json_util.object_hook)[0]
jinja_filters.py 文件源码 项目:sacredboard 作者: chovanecm 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def dump_json(obj):
    """Dump Python object as JSON string."""
    return simplejson.dumps(obj, ignore_nan=True, default=json_util.default)
api_old.py 文件源码 项目:neon-wallet-db 作者: CityOfZion 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def db2json(db_obj):
    return json.loads(json.dumps(db_obj, indent=4, default=json_util.default))

# return a dictionary of spent (txids, vout) => transaction when spent
# TODO: add vout to this
api.py 文件源码 项目:neon-wallet-db 作者: CityOfZion 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def db2json(db_obj):
    return json.loads(json.dumps(db_obj, indent=4, default=json_util.default))

# return a dictionary of spent (txids, vout) => transaction when spent
# TODO: add vout to this
export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _object_schema_to_line_tuples(cls, object_schema, columns_to_get, field_prefix):
        """ Get the list of tuples describing lines in object_schema

        - Sort fields by count
        - Add the tuples describing each field in object
        - Recursively add tuples for nested objects

        :param object_schema: dict
        :param columns_to_get: iterable
            columns to create for each field
        :param field_prefix: str, default ''
            allows to create full name.
            '.' is the separator for object subfields
            ':' is the separator for list of objects subfields
        :return line_tuples: list of tuples describing lines
        """
        line_tuples = []
        sorted_fields = sorted(list(object_schema.items()),
                               key=lambda x: (-x[1]['count'], x[0]) if 'count' in x[1] else x[0])

        for field, field_schema in sorted_fields:
            line_columns = cls._field_schema_to_columns(
                field, field_schema, field_prefix, columns_to_get)
            line_tuples.append(line_columns)

            types = field_schema.get('types_count', [field_schema['type']])

            if 'object' in field_schema:
                if 'ARRAY' in types:
                    current_prefix = field_prefix + field + ':'
                elif 'OBJECT' in types:
                    current_prefix = field_prefix + field + '.'
                else:
                    logger.warning('Field {} has key "object" but has types {} while should have '
                                   '"OBJECT" or "ARRAY"'.format(field, types))
                    continue
                line_tuples += cls._object_schema_to_line_tuples(
                    field_schema['object'], columns_to_get, field_prefix=current_prefix)

        return line_tuples
export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _field_schema_to_columns(cls, field_name, field_schema, field_prefix, columns_to_get):
        """ Given fields information, returns a tuple representing columns_to_get.

        :param field_name:
        :param field_schema:
        :param field_prefix: str, default ''
        :param columns_to_get: iterable
            columns to create for each field
        :return field_columns: tuple
        """
        field_columns = list()
        for column in columns_to_get:
            field_columns.append(cls.make_column_value(column, field_schema, field_name, field_prefix))

        return tuple(field_columns)
export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _format_types_count(types_count, array_types_count=None):
        """ Format types_count to a readable sting.

        >>> format_types_count({'integer': 10, 'boolean': 5, 'null': 3, })
        'integer : 10, boolean : 5, null : 3'

        >>> format_types_count({'ARRAY': 10, 'null': 3, }, {'float': 4})
        'ARRAY(float : 4) : 10, null : 3'

        :param types_count: dict
        :param array_types_count: dict, default None
        :return types_count_string : str
        """
        if types_count is None:
            return str(None)

        types_count = sorted(types_count.items(), key=lambda x: x[1], reverse=True)

        type_count_list = list()
        for type_name, count in types_count:
            if type_name == 'ARRAY':
                array_type_name = _SchemaPreProcessing._format_types_count(array_types_count)
                type_count_list.append('ARRAY(' + array_type_name + ') : ' + str(count))
            else:
                type_count_list.append(str(type_name) + ' : ' + str(count))

        types_count_string = ', '.join(type_count_list)
        return types_count_string
export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, data, category='schema', without_counts=False, **kwargs):
        """
        :param data: json like structure - schema, mapping, ...
        :param without_counts: bool - default False, remove all count fields in output if True
        :param kwargs: unused - exists for a unified interface with other subclasses of BaseOutput
        """
        data_processor = OutputPreProcessing(category)
        if without_counts:
            self.data = data_processor.filter_data(data)
        else:
            self.data = data
export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_default_columns(cls):
        """List default columns by category"""
        return {
            'schema': cls._default_columns.get('schema', _SchemaPreProcessing.default_columns),
            'mapping': cls._default_columns.get('mapping', _MappingPreProcessing.default_columns),
            'diff': cls._default_columns.get('diff', _DiffPreProcessing.default_columns)}
export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write_data(self, file_descr):
        """Use json module dump function to write into file_descr (opened with opener)."""
        json.dump(self.data, file_descr, indent=4, ensure_ascii=False,
                  default=json_util.default, sort_keys=True)
export.py 文件源码 项目:pymongo-schema 作者: pajachiet 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def transform_data_to_file(data, formats, output=None, category='schema', **kwargs):
    """
    Transform data into each of output_formats and write result to output_filename or stdout.

    :param data: dict (schema, mapping or diff)
    :param formats: list of str - extensions of output desired among:
                            'json', 'yaml' (hierarchical formats)
                            'tsv', 'html', 'md' or 'xlsx' (list like formats)
    :param output: str full path to file where formatted output will be saved saved
                            (default is std out)
    :param category: string in 'schema', 'mapping', 'diff' - describe input data
    :param kwargs: may contain additional specific arguments
           columns: list of columns to display in the output for list like formats
           without_counts: bool to display count fields in output for hierarchical formats
    """
    wrong_formats = set(formats) - {'tsv', 'xlsx', 'json', 'yaml', 'html', 'md'}

    if wrong_formats:
        raise ValueError("Output format should be tsv, xlsx, html, md, json or yaml. "
                         "{} is/are not supported".format(wrong_formats))

    for output_format in formats:
        output_maker = rec_find_right_subclass(output_format)(
            data, category=category,
            columns_to_get=kwargs.get('columns'), without_counts=kwargs.get('without_counts'))
        with output_maker.open(output) as file_descr:
            output_maker.write_data(file_descr)
preprocess_table.py 文件源码 项目:entity_binding 作者: JasperGuo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def process_table(table):
    columns = group_table_by_column(table["rows"])

    clean_columns = list()
    column_name = list()
    for c in columns:
        column_name.append(c.pop(0))
        clean_columns.append(list(set(c)))

    column_type = list()
    for c in clean_columns:
        data_type = list()
        for value in c:
            data_type.append(check_value_type(value))
        column_type.append(check_column_type(data_type))
    """
    print(table["_id"])
    print("column_name: ", column_name)
    print("column_type: ", column_type)
    print("columns: ", clean_columns)
    """

    table_info = {
        "columns": clean_columns,
        "column_name": column_name,
        "column_type": column_type,
        "table_name": process_value(table["title"]),
        "map_id": table["map_id"],
        "_id": table["_id"]
    }

    return json.dumps(table_info, default=json_util.default)
read_data_from_mongo.py 文件源码 项目:entity_binding 作者: JasperGuo 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def query_table(table_key_set):
    tables = db.tables.find(
        {
            "map_id": {
                "$in": list(table_key_set)
            }
        }
    )
    table_str_list = list()
    for table in tables:
        table_str_list.append(json.dumps(table, default=json_util.default))
    save("test_tables.txt", table_str_list)


问题


面经


文章

微信
公众号

扫码关注公众号