python类get()的实例源码

views.py 文件源码 项目:valentina 作者: valentinavc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def facebook(request):

    # abort if invalid request
    should_abort = _should_abort(request, 'POST')
    if should_abort:
        return should_abort

    form = FacebookSearchForm(request.POST)
    if not form.is_valid():
        return JsonResponse({'error': 'URL inválida.'})

    token = _get_token(request.user)
    person = GetFacebookData(form.cleaned_data.get('url'), token=token)

    # check if user already tagged this person
    chat = Chat.objects.filter(person=person.facebook_id).first()
    args = {'chat': chat, 'user': request.user}
    affiliation = Affiliation.objects.filter(**args).first()
    if affiliation:
        error = "Você já entrou na sala do(a) {} e a apelidou de “{}”."
        name = person.data.get('name')
        return JsonResponse({'error': error.format(name, affiliation.alias)})

    return JsonResponse(person.data)
views.py 文件源码 项目:valentina 作者: valentinavc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_affiliation(request):

    # abort if invalid request
    should_abort = _should_abort(request, 'POST')
    if should_abort:
        return should_abort

    form = AffiliationForm(request.POST)
    if not form.is_valid():
        return JsonResponse({'error': form.errors})

    # make sure chat exists
    chat_data = {'person': form.cleaned_data.get('person')}
    chat, created = Chat.objects.get_or_create(**chat_data)

    # create or update affiliation
    alias = {'alias': form.cleaned_data.get('alias')}
    fields = {'chat': chat, 'user': request.user, 'defaults': alias}
    affiliation, created = Affiliation.objects.update_or_create(**fields)

    return JsonResponse(_affiliation_to_dict(affiliation), status=201)
views.py 文件源码 项目:valentina 作者: valentinavc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _should_abort(request, allowed_methods, **kwargs):

    ajax_only = kwargs.get('ajax_only', True)
    should_redirect = False if ajax_only else True

    # abort if user is not valid
    should_abort = _should_abort_user(request, should_redirect)
    if should_abort:
        return should_abort

    # only accept AJAX requests
    if not request.is_ajax() and kwargs.get('ajax_only', True):
        return HttpResponse(status=422)

    # only accept certain methods
    if isinstance(allowed_methods, str):
        allowed_methods = [allowed_methods]
    if request.method not in allowed_methods:
        return HttpResponseNotAllowed(allowed_methods)

    return None
admin_user.py 文件源码 项目:hashtagtodo-open 作者: slackpad 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def search_user():
    query = request.args.get('q', '')
    results = User.search(query)
    simplified = list()
    for result in results:
        entry = {
            'link': '/admin/user/manage/%s' % result.doc_id
        }
        for field in result.fields:
            if field.name == 'created':
                entry[field.name] = arrow.get(field.value).humanize()
            else:
                entry[field.name] = field.value
        simplified.append(entry)
    extra = {
        'total': results.number_found,
        'shown': len(results.results),
    }
    return render_template('admin-user-search.html', results=simplified, extra=extra)
admin_user.py 文件源码 项目:hashtagtodo-open 作者: slackpad 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def freemium():
    if request.method == 'POST':
        operation = request.form.get('operation')
        if operation == "add":
            addresses = [email.strip() for email in request.form.get('addresses', '').split(',')]
            for email in addresses:
                Freemium.create_or_update(email)
        elif operation == 'remove':
            prefix = 'remove-'
            for key in request.form:
                if key.startswith(prefix):
                    email = key[len(prefix):]
                    entry = Freemium.get_by_email(email)
                    if entry is not None:
                        entry.key.delete()
        else:
            flash('Unknown operation')

    entries = Freemium.get_all()
    entries = [entry.key.id() for entry in entries]
    return render_template('admin-freemium.html', entries=entries)
app_user.py 文件源码 项目:hashtagtodo-open 作者: slackpad 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def delete():
    if request.method == 'POST':
        dos = arrow.get(g.user.created).humanize()
        calendars = len([c for c in Calendar.query(ancestor=g.user.key)])
        todos = len([e for e in Event.query(ancestor=g.user.key)])
        subject = '%s %s closed their account' % (g.user.first_name, g.user.last_name)
        body = '''
%s (joined %s, made %d todos and had %d calendars)

%s
''' % (g.user.email, dos, todos, calendars, request.form.get('feedback'))
        mail.send_mail(sender=EMAILS['alerts'], to=EMAILS['support'], subject=subject, body=body)

        calendars = Calendar.get_all(g.user.key)
        for calendar in calendars:
            for event in Event.get_all(calendar.key):
                event.key.delete()
            calendar.key.delete()

        User.unindex(g.user.key.urlsafe())
        g.user.key.delete()
        session.pop('user', None)
        return redirect('/index.html')

    return render_template('delete.html')
gcal_sync.py 文件源码 项目:hashtagtodo-open 作者: slackpad 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_todolist(user, client):
    # Try the primary calendar first, as this is the default for the
    # vast majority of users.
    default = client.calendars().get(calendarId='primary').execute()
    event = _get_todolist(user, client, default)
    if event is not None:
        return event, default

    # Now look at the user's other calendars to see if it's on there.
    calendars = client.calendarList().list().execute()
    for calendar in calendars.get('items', ()):
        if calendar['id'] == default['id']:
            continue

        if not calendar['accessRole'] in ('owner', 'writer'):
            continue

        event = _get_todolist(user, client, calendar)
        if event is not None:
            return event, calendar

    # They don't have one, so add it to their default calendar.
    return None, default
barkutils.py 文件源码 项目:bark 作者: kylerbrown 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def mk_entry():
    p = argparse.ArgumentParser(description="create a bark entry")
    p.add_argument("name", help="name of bark entry")
    p.add_argument("-a",
                   "--attributes",
                   action='append',
                   type=lambda kv: kv.split("="),
                   dest='keyvalues',
                   help="extra metadata in the form of KEY=VALUE")
    p.add_argument("-t",
                   "--timestamp",
                   help="format: YYYY-MM-DD or YYYY-MM-DD_HH-MM-SS.S")
    p.add_argument("-p",
                   "--parents",
                   help="no error if already exists, new meta-data written",
                   action="store_true")
    p.add_argument('--timezone',
                   help="timezone of timestamp, default: America/Chicago",
                   default='America/Chicago')
    args = p.parse_args()
    timestamp = arrow.get(args.timestamp).replace(
        tzinfo=tz.gettz(args.timezone)).datetime
    attrs = dict(args.keyvalues) if args.keyvalues else {}
    bark.create_entry(args.name, timestamp, args.parents, **attrs)
gpg.py 文件源码 项目:scriptworker 作者: mozilla-releng 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_latest_tag(path,
                         exec_function=asyncio.create_subprocess_exec):
    """Get the latest tag in path.

    Args:
        path (str): the path to run ``git describe --abbrev=0`` in.

    Returns:
        str: the tag name found.

    Raises:
        ScriptWorkerRetryException: on failure.

    """
    proc = await exec_function(
        'git', "describe", "--abbrev=0", cwd=path,
        stdout=PIPE, stderr=DEVNULL, stdin=DEVNULL, close_fds=True,
    )
    tag, err = await proc.communicate()
    exitcode = await proc.wait()
    if exitcode:
        raise ScriptWorkerRetryException(
            "Can't get tag at {}: {}!".format(path, err)
        )
    return tag.decode('utf-8').rstrip()
model.py 文件源码 项目:Orator-Google-App-Engine 作者: MakarenaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_related(self, related):
        """
        Get the related class.

        :param related: The related model or table
        :type related: Model or str

        :rtype: Model class
        """
        if not isinstance(related, basestring) and issubclass(related, Model):
            return related

        related_class = _Register.get(related)

        if related_class:
            return related_class

        raise RelatedClassNotFound(related)
model.py 文件源码 项目:Orator-Google-App-Engine 作者: MakarenaLabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_attribute(self, key, original=None):
        """
        Get an attribute from the model.

        :param key: The attribute to get
        :type key: str
        """
        in_attributes = key in self.__attributes

        if in_attributes:
            return self._get_attribute_value(key)

        if key in self.__relations:
            return self.__relations[key]

        relation = original or super(Model, self).__getattribute__(key)

        if relation:
            return self._get_relationship_from_method(key, relation)

        raise AttributeError(key)
model.py 文件源码 项目:Orator-Google-App-Engine 作者: MakarenaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_attribute_value(self, key):
        """
        Get a plain attribute.

        :param key: The attribute to get
        :type key: str
        """
        value = self._get_attribute_from_dict(key)

        if self._has_cast(key):
            value = self._cast_attribute(key, value)
        elif key in self.get_dates():
            if value is not None:
                return self.as_datetime(value)

        return value
model.py 文件源码 项目:Orator-Google-App-Engine 作者: MakarenaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_original(self, key=None, default=None):
        """
        Get the original values

        :param key: The original key to get
        :type key: str

        :param default: The default value if the key does not exist
        :type default: mixed

        :rtype: mixed
        """
        if key is None:
            return self.__original

        return self.__original.get(key, default)
api.py 文件源码 项目:homingbot 作者: homingbot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(request):
    '''
    Generate x number of emails
    '''
    count = int(get_post_param(request, 'count'))
    try:
        if count <= 0:
            raise InvalidUsage('count must be greater than 0')
    except KeyError:
        raise InvalidUsage('count must be present')
    live_emails = mail.Email.all()
    logger.debug("Generating %s emails.", count)
    email_gen = utils.generate_emails(count)
    emails = []
    addresses = []
    for n in range(count):
        email = next(email_gen)
        emails.append(email)
        addresses.append(email.address)
    db_service.batch_save(emails)
    return json({'accounts': addresses, "total_active": len(live_emails) + count}, status = 201)
test_topline_summary.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def generate_dates(submission_date_s3, ts_offset=0, creation_offset=0):
    # convert submission_date into profile_creation_date and timestamps
    submission = arrow.get(submission_date_s3, "YYYYMMDD")
    creation = submission.replace(days=+creation_offset)
    timestamp = submission.replace(days=+ts_offset)

    # variables for conversion
    epoch = arrow.get(0)
    seconds_per_day = topline.seconds_per_day
    nanoseconds_per_second = 10**9

    date_snippet = {
        "submission_date_s3": submission_date_s3,
        "profile_creation_date": (
            long((creation - epoch).total_seconds() / seconds_per_day)
        ),
        "timestamp": (
            long((timestamp - epoch).total_seconds() * nanoseconds_per_second)
        )
    }

    return date_snippet
bookmark_validation.py 文件源码 项目:python_mozetl 作者: mozilla 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main(start_date, end_date, bucket, prefix, input_bucket, input_prefix):
    spark = (SparkSession
             .builder
             .appName("sync_bookmark")
             .getOrCreate())

    version = 1
    input_path = "s3://{}/{}".format(input_bucket, input_prefix)

    # use the airflow date convention
    ds_format = "YYYYMMDD"
    start = arrow.get(start_date, ds_format)
    end = arrow.get(end_date if end_date else start_date, ds_format)

    for date in arrow.Arrow.range('day', start, end):
        current_date = date.format(ds_format)
        logger.info("Processing sync bookmark validation for {}"
                    .format(current_date))
        extract(spark, input_path, current_date)
        transform(spark)
        load(spark, bucket, prefix, version, current_date)

    spark.stop()
models.py 文件源码 项目:python-annict 作者: kk6 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse(cls, api, json):
        """Parse a JSON object into a model instance.

        :param api: instance of :class:`API <annict.api.API>` .
        :type api: annict.api.API
        :param dict json: JSON from Annict API.
        :return: :class:`User <User>` object
        :rtype: User

        """
        user = cls(api)
        user._json = json
        for k, v in json.items():
            if k == 'created_at':
                setattr(user, k, arrow.get(v).datetime)
            else:
                setattr(user, k, v)
        return user
models.py 文件源码 项目:python-annict 作者: kk6 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse(cls, api, json):
        """Parse a JSON object into a model instance.

        :param api: instance of :class:`API <annict.api.API>` .
        :type api: annict.api.API
        :param dict json: JSON from Annict API.
        :return: :class:`Work <Work>` object
        :rtype: Work

        """
        work = cls(api)
        work._json = json
        for k, v in json.items():
            if k == 'released_on':
                if v:
                    date = arrow.get(v).date()
                else:
                    date = None
                setattr(work, k, date)
            else:
                setattr(work, k, v)
        return work
models.py 文件源码 项目:python-annict 作者: kk6 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parse(cls, api, json):
        """Parse a JSON object into a model instance.

        :param api: instance of :class:`API <annict.api.API>` .
        :type api: annict.api.API
        :param dict json: JSON from Annict API.
        :return: :class:`Program <Program>` object
        :rtype: Program

        """
        program = cls(api)
        program._json = json
        for k, v in json.items():
            if k == 'started_at':
                setattr(program, k, arrow.get(v).datetime)
            elif k == 'work':
                work = Work.parse(api, v)
                setattr(program, k, work)
            elif k == 'episode':
                episode = Episode.parse(api, v)
                setattr(program, k, episode)
            else:
                setattr(program, k, v)
        return program
webserver.py 文件源码 项目:LibreNews-Server 作者: milesmcc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get(self):
        try:
            req_resp = stats.request(str(get_ip(self.request)))
            say("Received API request (" + req_resp + ")")
        except:
            error("Errored while handling request IP -- still served...")
        self.set_header("Content-Type", "application/json")
        latest = -1
        try:
            latest = int(self.get_argument('latest'))
        except:
            pass # no latest flash specified
        data = {
            "server": "LibreNews Central",
            "channels": [k[2] for k in configuration.get_accounts()],
            "latest": [flash for flash in flashes.get_latest_flashes(25) if int(flash['id']) > int(latest)]
        }
        self.write(unicode(json.dumps(data, sort_keys=True, separators=(',',':'))))


问题


面经


文章

微信
公众号

扫码关注公众号