python类values()的实例源码

report.py 文件源码 项目:python-ares 作者: pynog 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def deployment():
  """
  """
  hasFiles = False
  if request.files.getlist('file')[0]:
    hasFiles = True
  DATA = {'files': list(zip(request.files.getlist('file'), request.form.getlist('File Type'), request.form.getlist('file_code')))}
  for file, fileType, fileCod in DATA['files']:
    if fileType in ['static', 'data'] and not fileCod:
      return json.dumps('Error - You should specify a file code for statics and outputs'), 500

  env = request.values['env']
  isNew = True if request.values['isNew'] == 'true' else False
  if isNew:
    createEnv(env)
  if hasFiles:
    return deployFiles(env, DATA)

  return json.dumps("Environment created"), 200
webcrawler.py 文件源码 项目:simple-web-crawler 作者: fikander 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def crawl():
    try:
        depth_limit = int(request.values['depth'])
    except ValueError as e:
        return "Depth parameter must be a number", 400
    except:
        depth_limit = 1

    if 'url' in request.values:
        url = request.values['url']
        parsed_url = urlparse.urlsplit(url)
        if parsed_url.scheme not in ['http', 'https']:
            return "Only http and https protocols are supported", 400
        if parsed_url.netloc == '':
            return "Missing domain", 400
        allowed_domains = [ parsed_url.netloc ]
        crawler = Crawler(allowed_domains, depth_limit)
        crawler.crawl(url)
        return jsonify(**crawler.crawled)
    else:
        return "Missing url parameter", 400
slack_helper.py 文件源码 项目:chatops_slack 作者: DennyZhang 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def verify_slack_username(http_method, api_method, method_acl_userlist):
    user_name = ''
    if http_method == 'GET':
        user_name = request.args.get('user_name', '')
    elif http_method == 'POST':
        user_name = request.values['user_name']
    else:
        raise Exception("Error: unsupported http_method(%s)" % (http_method))

    if "*" in method_acl_userlist:
        super_name_list = method_acl_userlist["*"]
        if user_name in super_name_list:
            return None

    if api_method in method_acl_userlist:
        allowed_user_list = method_acl_userlist[api_method]
        if user_name not in allowed_user_list:
            content = "Error: user(%s) not allowed to call api_method(%s)" % \
                      (user_name, api_method)
            raise Exception(content)

    return None
__init__.py 文件源码 项目:RSVPBot 作者: recursecenter 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def join(id):
    event = find_event(id)
    if event == None:
        return not_found()

    user = find_user(request.values.get('user_param'), request.values.get('user_param_value'))
    if user == None:
        return user_not_specified()

    join_event(user, event)

    return jsonify({
        'joined': True,
        'rsvps_disabled': False,
        'event_archived': False,
        'over_capacity': False,
        'past_deadline': False,
    })
views.py 文件源码 项目:CactusAPI 作者: CactusDev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def chan_friends(channel):
    """
    If you GET this endpoint, go to /api/v1/channel/<channel>/friend
    with <channel> replaced for the channel of the friends you want to get

    <channel> can either be an int that matches the channel, or a string
    that matches the owner's username
    """

    # model = request.path.split("/")[-1]
    model = "Friend"

    if channel.isdigit():
        fields = {"channelId": int(channel)}
    else:
        fields = {"owner": channel.lower()}

    packet, code = generate_response(
        model,
        request.path,
        request.method,
        request.values,
        data=results,
        fields=fields
    )

    return make_response(jsonify(packet), code)

    # There was an error!
    # if not str(code).startswith("2"):
    #    return make_response(jsonify(packet), code)
    # NOTE: Not needed currently, but this is how you would check


# TODO: Fix this endpoint to remove timing elements (friends are forever)
# TODO: Use Object.update(**changes) instead of Object(**updated_object).save()
views.py 文件源码 项目:CactusAPI 作者: CactusDev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def chan_messages(channel):

    """
    If you GET this endpoint, go to /api/v1/channel/<channel>/messages
    with <channel> replaced for the messages you want to get
    """

    model = "Message"

    if channel.isdigit():
        fields = {"channelId": int(channel)}
    else:
        fields = {"owner": channel.lower()}

    packet, code = generate_response(
        model,
        request.path,
        request.method,
        request.values,
        fields=fields
    )

    return make_response(jsonify(packet), code)
views.py 文件源码 项目:CactusAPI 作者: CactusDev 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def user_quotes(channel):
    """
    If you GET this endpoint, go to /api/v1/channel/<channel>/quote
    with <channel> replaced for the channel you want to get quotes for
    """
    model = "quote"

    if channel.isdigit():
        fields = {"channelId": int(channel), "deleted": False}
    else:
        fields = {"owner": channel.lower(), "deleted": False}

    packet, code = generate_response(
        model,
        request.path,
        request.method,
        request.values,
        fields=fields
    )

    return make_response(jsonify(packet), code)
views.py 文件源码 项目:CactusAPI 作者: CactusDev 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def user_commands(channel):

    """
    If you GET this endpoint, simply go to /api/v1/channel/<channel>/command
    with <channel> replaced for the channel you want to get commands for
    """
    model = "Command"

    if channel.isdigit():
        fields = {"channelId": int(channel), "deleted": False}
    else:
        fields = {"channelName": channel.lower(), "deleted": False}

    packet, code = generate_response(
        model,
        request.path,
        request.method,
        request.values,
        fields=fields
    )

    return make_response(jsonify(packet), code)
state.py 文件源码 项目:airbnb_clone 作者: bennettbuchanan 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def handle_states():
    '''Returns all the states from the database as JSON objects with a GET
    request, or adds a new state to the database with a POST request. Refer to
    exception rules of peewee `get()` method for additional explanation of
    how the POST request is handled:
    http://docs.peewee-orm.com/en/latest/peewee/api.html#SelectQuery.get
    '''
    if request.method == 'GET':
        list = ListStyle().list(State.select(), request)
        return jsonify(list), 200

    elif request.method == 'POST':
        params = request.values

        '''Check that all the required parameters are made in request.'''
        required = set(["name"]) <= set(request.values.keys())
        if required is False:
            return jsonify(msg="Missing parameter."), 400

        try:
            State.select().where(State.name == request.form['name']).get()
            return jsonify(code=10001, msg="State already exists"), 409
        except State.DoesNotExist:
            state = State.create(name=request.form['name'])
            return jsonify(state.to_dict()), 201
state.py 文件源码 项目:airbnb_clone 作者: bennettbuchanan 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handle_state_id(state_id):
    '''Select the state with the id from the database and store as the variable
    `state` with a GET request method. Update the data of the particular state
    with a PUT request method. This will take the parameters passed and update
    only those values. Remove the state with this id from the database with
    a DELETE request method.

    Keyword arguments:
    state_id: The id of the state from the database.
    '''
    try:
        state = State.select().where(State.id == state_id).get()
    except State.DoesNotExist:
        return jsonify(msg="There is no state with this id."), 404

    if request.method == 'GET':
        return jsonify(state.to_dict()), 200

    elif request.method == 'DELETE':
        try:
            state = State.delete().where(State.id == state_id)
        except State.DoesNotExist:
            raise Exception("There is no state with this id.")
        state.execute()
        return jsonify(msg="State deleted successfully."), 200
twillio_handler.py 文件源码 项目:DecidePolitics 作者: georgeaf99 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def handle_sms():
    """Handles Twillio SMS message"""
    customer_phone_number = request.values["From"]
    text_message_body = request.values["Body"]

    # Retrieve the customer from the DB or create a new one
    customer = Customer.get_customer_by_phone_number(customer_phone_number)
    if not customer:
        # Save the customer to the DB
        customer = Customer.create_new({
            CFields.PHONE_NUMBER: customer_phone_number,
        })
        customer.create()

        customer_lc.on_new_customer()

    messaging.on_message_recieve(customer, text_message_body)

    return jsonpickle.encode(dict(
        success=True
    ))
views.py 文件源码 项目:dockmaster 作者: lioncui 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def engine():
    if session.get('login_in',None):
        if session.get('username',None):
            if request.values.get('uuid',None):
                uuid = request.values['uuid']
                session['uuid'] = uuid
            else:
                uuid = session['uuid']
            username = session['username']
            try:
                engine = models.Engine.query.filter_by(uuid = uuid, user_name = username).first()
                base_url = "tcp://" + engine.host + ":" + engine.port
                docker = DOCKER(base_url = base_url, timeout = 5, version = "auto")
                return render_template('engine.html', host_info = docker.get_info(), usage = docker.monitor())
            except Exception as msg:
                return str(msg), 503
        else:
            return "Error 401, Authentication failed", 401
    else:
        return redirect('/login')
functions.py 文件源码 项目:dockmaster 作者: lioncui 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def change_passwd():
    if session.get('login_in',None):
        if session.get('username',None):
            oldpassword = request.values['oldpassword']
            newpassword = request.values['newpassword']
            try:
                user = models.User.query.filter_by(username = session['username']).first()
                if check_password_hash(user.password, oldpassword):
                    user.password = generate_password_hash(newpassword)
                    db.session.add(user)
                    db.session.commit()
                    return jsonify(result="change sucessfull")
                else:
                    return jsonify(result="change failed")
            except:
                db.session.rollback()
                return jsonify(result="change failed")
            finally:
                db.session.close()
        else:
            return redirect('/login')
    else:
        return redirect('/login')
files.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def readGcodeFilesForOrigin(origin):
    if origin not in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
        return make_response("Unknown origin: %s" % origin, 404)

    recursive = request.values.get("recursive", "false") in valid_boolean_trues
    force = request.values.get("force", "false") in valid_boolean_trues

    if force:
        with _file_cache_mutex:
            try:
                del _file_cache[origin]
            except KeyError:
                pass

    files = _getFileList(origin, recursive=recursive)

    if origin == FileDestinations.LOCAL:
        usage = psutil.disk_usage(settings().getBaseFolder("uploads"))
        return jsonify(files=files, free=usage.free, total=usage.total)
    else:
        return jsonify(files=files)
printer.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def _get_temperature_data(preprocessor):
    if not printer.is_operational():
        return make_response("Printer is not operational", 409)

    tempData = printer.get_current_temperatures()

    if "history" in request.values.keys() and request.values["history"] in valid_boolean_trues:
        tempHistory = printer.get_temperature_history()

        limit = 300
        if "limit" in request.values.keys() and unicode(request.values["limit"]).isnumeric():
            limit = int(request.values["limit"])

        history = list(tempHistory)
        limit = min(limit, len(history))

        tempData.update({
            "history": map(lambda x: preprocessor(x), history[-limit:])
        })

    return preprocessor(tempData)
languages.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def uploadLanguagePack():
    input_name = "file"
    input_upload_path = input_name + "." + settings().get(["server", "uploads", "pathSuffix"])
    input_upload_name = input_name + "." + settings().get(["server", "uploads", "nameSuffix"])
    if not input_upload_path in request.values or not input_upload_name in request.values:
        return make_response("No file included", 400)

    upload_name = request.values[input_upload_name]
    upload_path = request.values[input_upload_path]

    exts = filter(lambda x: upload_name.lower().endswith(x), (".zip", ".tar.gz", ".tgz", ".tar"))
    if not len(exts):
        return make_response("File doesn't have a valid extension for a language pack archive", 400)

    target_path = settings().getBaseFolder("translations")

    if tarfile.is_tarfile(upload_path):
        _unpack_uploaded_tarball(upload_path, target_path)
    elif zipfile.is_zipfile(upload_path):
        _unpack_uploaded_zipfile(upload_path, target_path)
    else:
        return make_response("Neither zip file nor tarball included", 400)

    return getInstalledLanguagePacks()
__init__.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_locale(self):
        global LANGUAGES

        if "l10n" in request.values:
            return Locale.negotiate([request.values["l10n"]], LANGUAGES)

        if "X-Locale" in request.headers:
            return Locale.negotiate([request.headers["X-Locale"]], LANGUAGES)

        if hasattr(g, "identity") and g.identity and userManager.enabled:
            userid = g.identity.id
            try:
                user_language = userManager.getUserSetting(userid, ("interface", "language"))
                if user_language is not None and not user_language == "_default":
                    return Locale.negotiate([user_language], LANGUAGES)
            except octoprint.users.UnknownUser:
                pass

        default_language = self._settings.get(["appearance", "defaultLanguage"])
        if default_language is not None and not default_language == "_default" and default_language in LANGUAGES:
            return Locale.negotiate([default_language], LANGUAGES)

        return Locale.parse(request.accept_languages.best_match(LANGUAGES))
s3.py 文件源码 项目:threatstack-to-s3 作者: threatstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_alerts_by_form_parameters():
    '''
    Get an alert based on the
    '''
    start = request.args.get('start') or request.form.get('start')
    end = request.args.get('end') or request.form.get('end')

    _logger.info('{}: {} - {}'.format(request.method,
                                      request.path,
                                      request.values))

    # Convert to datetime objects
    start_datetime = _parse_date(start)
    end_datetime = _parse_date(end)

    alerts = s3_model.get_alerts_by_date(start_datetime, end_datetime)
    status_code = 200
    success = True
    response = {
        'success': success,
        'alerts': alerts
    }

    return jsonify(response), status_code
user.py 文件源码 项目:flask_api 作者: nullcc 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def create():
    """
    ????
    :return:
    """
    username = request.values.get("username")
    password = request.values.get("password")
    email = request.values.get("email")
    gender = int(request.values.get("gender"))

    password_hash = generate_password_hash(password, method='pbkdf2:sha1', salt_length=8)
    user = User(username=username,
                password_hash=password_hash,
                email=email,
                gender=gender)
    user.save()
    return success()
main.py 文件源码 项目:Awesome-Python 作者: JoMingyu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def session_test():
    if request.method == 'DELETE':
        session.pop('username')
        # ?? ??

        return 'Session deleted!'
    else:
        if 'username' in session:
            # ?? ?? ?? ??

            return 'Hello {0}'.format(session['username'])
        else:
            session['username'] = request.values['username']
            # ?? ??

            return 'Session appended!'
app.py 文件源码 项目:hayabusa 作者: hirolovesbeer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, request, columns, collection):
        self.columns = columns
        self.collection = collection

        # values specified by the datatable for filtering, sorting, paging
        self.request_values = request.values

        # results from the db
        self.result_data = None

        # total in the table after filtering
        self.cardinality_filtered = 0

        # total in the table unfiltered
        self.cadinality = 0

        self.run_queries()
views.py 文件源码 项目:pwnedhub 作者: lanmaster53 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def profile_change():
    user = g.user
    if set(['password', 'question', 'answer']).issubset(request.values):
        password = request.values['password']
        if is_valid_password(password):
            name = request.values['name']
            question = request.values['question']
            answer = request.values['answer']
            user.name = name
            user.password = password
            user.question = question
            user.answer = answer
            db.session.add(user)
            db.session.commit()
            flash('Account information successfully changed.')
        else:
            flash('Password does not meet complexity requirements.')
    return redirect(url_for('ph_bp.profile'))
guilds.py 文件源码 项目:rowboat 作者: b1naryth1ef 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def guild_config_history(guild):
    def serialize(gcc):
        return {
            'user': serialize_user(gcc.user_id),
            'before': unicode(gcc.before_raw),
            'after': unicode(gcc.after_raw),
            'created_at': gcc.created_at.isoformat(),
        }

    q = GuildConfigChange.select(GuildConfigChange, User).join(
        User, on=(User.user_id == GuildConfigChange.user_id),
    ).where(GuildConfigChange.guild_id == guild.guild_id).order_by(
        GuildConfigChange.created_at.desc()
    ).paginate(int(request.values.get('page', 1)), 25)

    return jsonify(map(serialize, q))
views.py 文件源码 项目:distributed_health_app 作者: ReedAnders 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update():
    if request.method == 'POST':
        user = request.values['user']
        vector = request.values['vector']
        if user not in logged_in:
            abort(404)
        vector = np.fromstring(vector, dtype=float, sep=',')

        if True:
            fm = FeatureMatrix()
            # nearest_neighbor = fm.match('A', np.array([.1,.2,.3,.5]))
            nearest_neighbor = fm.match(user, vector)
        else:
            nearest_neighbor = querySpark.match(user, vector)

        return jsonify(result='Success',
                            neighbor=nearest_neighbor)
views.py 文件源码 项目:distributed_health_app 作者: ReedAnders 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def login():
    if request.method == 'POST':
        remote_ip = request.remote_addr
        print("Login from client IP",remote_ip)
        user = request.values['user']
        pwd = request.values['password']
        if user in logged_in:
            return jsonify(result='Success')
        if user in registered_ids.keys():
            if registered_ids[user] == pwd:
                print("logging in",user)
                logged_in.append(user)
                #r.set(user,str(remote_ip))
                return jsonify(result='Success')
        else:
            abort(404)
__init__.py 文件源码 项目:cloud-monitoring 作者: hep-gc 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_history(targets, start='-1h', end='now'):
    """Retrieve the time series data for one or more metrics.

    Args:
        targets (str|List[str]): Graphite path, or list of paths.
        start (Optional[str]): Start of date range. Defaults to one hour ago.
        end (Optional[str]): End of date range. Defaults to now.

    Returns:
        A list of metric values.
    """

    metrics = query(targets, start, end)

    try:
        metrics = json.loads(metrics)[0]['datapoints']
    except:
        return []

    # Convert unix timestamps to plot.ly's required date format
    for metric in metrics:
        timestamp = datetime.fromtimestamp(metric[1])
        metric[1] = timestamp.strftime('%Y-%m-%d %H:%M:%S')

    return metrics
__init__.py 文件源码 项目:cloud-monitoring 作者: hep-gc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def plotly(metrics=[], name='', color=None):
    """Convert a list of metric values to a Plot.ly object.
    """

    values, timestamps = zip(*metrics)
    trace = {
        'type': 'scatter',
        'x': timestamps,
        'y': values,
        'name': name,
    }

    if color:
        trace['marker'] = { color: color }

    return trace
__init__.py 文件源码 项目:cloud-monitoring 作者: hep-gc 项目源码 文件源码 阅读 81 收藏 0 点赞 0 评论 0
def date_range():
    """Ensure the requested date range is in a format Graphite will accept.
    """

    range_from = request.values.get('from', '-1h')
    range_end  = request.values.get('end', 'now')

    try:
        # Try to coerce date range into integers
        range_from = int(float(range_from) / 1000)
        range_end  = int(float(range_end) / 1000)
    except:
        # ... or pass string directly to Graphite
        pass

    return (range_from, range_end)
elasticsearchclient.py 文件源码 项目:rest_api 作者: opentargets 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_efo_info_from_code(self, efo_codes, **kwargs):
        params = SearchParams(**kwargs)

        if not isinstance(efo_codes, list):
            efo_codes = [efo_codes]

        query_body = addict.Dict()
        query_body.query.ids["values"] = efo_codes
        query_body.size = params.size
        if params.facets == 'true':
            query_body.aggregations.significant_therapeutic_areas.significant_terms.field = "therapeutic_labels.keyword"
            query_body.aggregations.significant_therapeutic_areas.significant_terms.size = 25
            query_body.aggregations.significant_therapeutic_areas.significant_terms.min_doc_count = 2

        if params.fields:
            query_body._source = params.fields

        if efo_codes:
            res = self._cached_search(index=self._index_efo,
                                      doc_type=self._docname_efo,
                                      body=query_body.to_dict()
                                      )
            return PaginatedResult(res, params)
elasticsearchclient.py 文件源码 项目:rest_api 作者: opentargets 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_evidences_by_id(self, evidenceid, **kwargs):

        if isinstance(evidenceid, str):
            evidenceid = [evidenceid]

        params = SearchParams(**kwargs)
        if params.datastructure == SourceDataStructureOptions.DEFAULT:
            params.datastructure = SourceDataStructureOptions.FULL

        res = self._cached_search(index=self._index_data,
                                  # doc_type=self._docname_data,
                                  body={"query": {
                                      "ids": {"values": evidenceid},
                                  },
                                      "size": len(evidenceid),
                                  }
                                  )
        return SimpleResult(res,
                            params,
                            data=[hit['_source'] for hit in res['hits']['hits']])


问题


面经


文章

微信
公众号

扫码关注公众号