def facebook(request):
# abort if invalid request
should_abort = _should_abort(request, 'POST')
if should_abort:
return should_abort
form = FacebookSearchForm(request.POST)
if not form.is_valid():
return JsonResponse({'error': 'URL inválida.'})
token = _get_token(request.user)
person = GetFacebookData(form.cleaned_data.get('url'), token=token)
# check if user already tagged this person
chat = Chat.objects.filter(person=person.facebook_id).first()
args = {'chat': chat, 'user': request.user}
affiliation = Affiliation.objects.filter(**args).first()
if affiliation:
error = "Você já entrou na sala do(a) {} e a apelidou de “{}”."
name = person.data.get('name')
return JsonResponse({'error': error.format(name, affiliation.alias)})
return JsonResponse(person.data)
python类get()的实例源码
def create_affiliation(request):
# abort if invalid request
should_abort = _should_abort(request, 'POST')
if should_abort:
return should_abort
form = AffiliationForm(request.POST)
if not form.is_valid():
return JsonResponse({'error': form.errors})
# make sure chat exists
chat_data = {'person': form.cleaned_data.get('person')}
chat, created = Chat.objects.get_or_create(**chat_data)
# create or update affiliation
alias = {'alias': form.cleaned_data.get('alias')}
fields = {'chat': chat, 'user': request.user, 'defaults': alias}
affiliation, created = Affiliation.objects.update_or_create(**fields)
return JsonResponse(_affiliation_to_dict(affiliation), status=201)
def _should_abort(request, allowed_methods, **kwargs):
ajax_only = kwargs.get('ajax_only', True)
should_redirect = False if ajax_only else True
# abort if user is not valid
should_abort = _should_abort_user(request, should_redirect)
if should_abort:
return should_abort
# only accept AJAX requests
if not request.is_ajax() and kwargs.get('ajax_only', True):
return HttpResponse(status=422)
# only accept certain methods
if isinstance(allowed_methods, str):
allowed_methods = [allowed_methods]
if request.method not in allowed_methods:
return HttpResponseNotAllowed(allowed_methods)
return None
def search_user():
query = request.args.get('q', '')
results = User.search(query)
simplified = list()
for result in results:
entry = {
'link': '/admin/user/manage/%s' % result.doc_id
}
for field in result.fields:
if field.name == 'created':
entry[field.name] = arrow.get(field.value).humanize()
else:
entry[field.name] = field.value
simplified.append(entry)
extra = {
'total': results.number_found,
'shown': len(results.results),
}
return render_template('admin-user-search.html', results=simplified, extra=extra)
def freemium():
if request.method == 'POST':
operation = request.form.get('operation')
if operation == "add":
addresses = [email.strip() for email in request.form.get('addresses', '').split(',')]
for email in addresses:
Freemium.create_or_update(email)
elif operation == 'remove':
prefix = 'remove-'
for key in request.form:
if key.startswith(prefix):
email = key[len(prefix):]
entry = Freemium.get_by_email(email)
if entry is not None:
entry.key.delete()
else:
flash('Unknown operation')
entries = Freemium.get_all()
entries = [entry.key.id() for entry in entries]
return render_template('admin-freemium.html', entries=entries)
def delete():
if request.method == 'POST':
dos = arrow.get(g.user.created).humanize()
calendars = len([c for c in Calendar.query(ancestor=g.user.key)])
todos = len([e for e in Event.query(ancestor=g.user.key)])
subject = '%s %s closed their account' % (g.user.first_name, g.user.last_name)
body = '''
%s (joined %s, made %d todos and had %d calendars)
%s
''' % (g.user.email, dos, todos, calendars, request.form.get('feedback'))
mail.send_mail(sender=EMAILS['alerts'], to=EMAILS['support'], subject=subject, body=body)
calendars = Calendar.get_all(g.user.key)
for calendar in calendars:
for event in Event.get_all(calendar.key):
event.key.delete()
calendar.key.delete()
User.unindex(g.user.key.urlsafe())
g.user.key.delete()
session.pop('user', None)
return redirect('/index.html')
return render_template('delete.html')
def get_todolist(user, client):
# Try the primary calendar first, as this is the default for the
# vast majority of users.
default = client.calendars().get(calendarId='primary').execute()
event = _get_todolist(user, client, default)
if event is not None:
return event, default
# Now look at the user's other calendars to see if it's on there.
calendars = client.calendarList().list().execute()
for calendar in calendars.get('items', ()):
if calendar['id'] == default['id']:
continue
if not calendar['accessRole'] in ('owner', 'writer'):
continue
event = _get_todolist(user, client, calendar)
if event is not None:
return event, calendar
# They don't have one, so add it to their default calendar.
return None, default
def mk_entry():
p = argparse.ArgumentParser(description="create a bark entry")
p.add_argument("name", help="name of bark entry")
p.add_argument("-a",
"--attributes",
action='append',
type=lambda kv: kv.split("="),
dest='keyvalues',
help="extra metadata in the form of KEY=VALUE")
p.add_argument("-t",
"--timestamp",
help="format: YYYY-MM-DD or YYYY-MM-DD_HH-MM-SS.S")
p.add_argument("-p",
"--parents",
help="no error if already exists, new meta-data written",
action="store_true")
p.add_argument('--timezone',
help="timezone of timestamp, default: America/Chicago",
default='America/Chicago')
args = p.parse_args()
timestamp = arrow.get(args.timestamp).replace(
tzinfo=tz.gettz(args.timezone)).datetime
attrs = dict(args.keyvalues) if args.keyvalues else {}
bark.create_entry(args.name, timestamp, args.parents, **attrs)
def get_latest_tag(path,
exec_function=asyncio.create_subprocess_exec):
"""Get the latest tag in path.
Args:
path (str): the path to run ``git describe --abbrev=0`` in.
Returns:
str: the tag name found.
Raises:
ScriptWorkerRetryException: on failure.
"""
proc = await exec_function(
'git', "describe", "--abbrev=0", cwd=path,
stdout=PIPE, stderr=DEVNULL, stdin=DEVNULL, close_fds=True,
)
tag, err = await proc.communicate()
exitcode = await proc.wait()
if exitcode:
raise ScriptWorkerRetryException(
"Can't get tag at {}: {}!".format(path, err)
)
return tag.decode('utf-8').rstrip()
def _get_related(self, related):
"""
Get the related class.
:param related: The related model or table
:type related: Model or str
:rtype: Model class
"""
if not isinstance(related, basestring) and issubclass(related, Model):
return related
related_class = _Register.get(related)
if related_class:
return related_class
raise RelatedClassNotFound(related)
def get_attribute(self, key, original=None):
"""
Get an attribute from the model.
:param key: The attribute to get
:type key: str
"""
in_attributes = key in self.__attributes
if in_attributes:
return self._get_attribute_value(key)
if key in self.__relations:
return self.__relations[key]
relation = original or super(Model, self).__getattribute__(key)
if relation:
return self._get_relationship_from_method(key, relation)
raise AttributeError(key)
def _get_attribute_value(self, key):
"""
Get a plain attribute.
:param key: The attribute to get
:type key: str
"""
value = self._get_attribute_from_dict(key)
if self._has_cast(key):
value = self._cast_attribute(key, value)
elif key in self.get_dates():
if value is not None:
return self.as_datetime(value)
return value
def get_original(self, key=None, default=None):
"""
Get the original values
:param key: The original key to get
:type key: str
:param default: The default value if the key does not exist
:type default: mixed
:rtype: mixed
"""
if key is None:
return self.__original
return self.__original.get(key, default)
def get(request):
'''
Generate x number of emails
'''
count = int(get_post_param(request, 'count'))
try:
if count <= 0:
raise InvalidUsage('count must be greater than 0')
except KeyError:
raise InvalidUsage('count must be present')
live_emails = mail.Email.all()
logger.debug("Generating %s emails.", count)
email_gen = utils.generate_emails(count)
emails = []
addresses = []
for n in range(count):
email = next(email_gen)
emails.append(email)
addresses.append(email.address)
db_service.batch_save(emails)
return json({'accounts': addresses, "total_active": len(live_emails) + count}, status = 201)
def generate_dates(submission_date_s3, ts_offset=0, creation_offset=0):
# convert submission_date into profile_creation_date and timestamps
submission = arrow.get(submission_date_s3, "YYYYMMDD")
creation = submission.replace(days=+creation_offset)
timestamp = submission.replace(days=+ts_offset)
# variables for conversion
epoch = arrow.get(0)
seconds_per_day = topline.seconds_per_day
nanoseconds_per_second = 10**9
date_snippet = {
"submission_date_s3": submission_date_s3,
"profile_creation_date": (
long((creation - epoch).total_seconds() / seconds_per_day)
),
"timestamp": (
long((timestamp - epoch).total_seconds() * nanoseconds_per_second)
)
}
return date_snippet
def main(start_date, end_date, bucket, prefix, input_bucket, input_prefix):
spark = (SparkSession
.builder
.appName("sync_bookmark")
.getOrCreate())
version = 1
input_path = "s3://{}/{}".format(input_bucket, input_prefix)
# use the airflow date convention
ds_format = "YYYYMMDD"
start = arrow.get(start_date, ds_format)
end = arrow.get(end_date if end_date else start_date, ds_format)
for date in arrow.Arrow.range('day', start, end):
current_date = date.format(ds_format)
logger.info("Processing sync bookmark validation for {}"
.format(current_date))
extract(spark, input_path, current_date)
transform(spark)
load(spark, bucket, prefix, version, current_date)
spark.stop()
def parse(cls, api, json):
"""Parse a JSON object into a model instance.
:param api: instance of :class:`API <annict.api.API>` .
:type api: annict.api.API
:param dict json: JSON from Annict API.
:return: :class:`User <User>` object
:rtype: User
"""
user = cls(api)
user._json = json
for k, v in json.items():
if k == 'created_at':
setattr(user, k, arrow.get(v).datetime)
else:
setattr(user, k, v)
return user
def parse(cls, api, json):
"""Parse a JSON object into a model instance.
:param api: instance of :class:`API <annict.api.API>` .
:type api: annict.api.API
:param dict json: JSON from Annict API.
:return: :class:`Work <Work>` object
:rtype: Work
"""
work = cls(api)
work._json = json
for k, v in json.items():
if k == 'released_on':
if v:
date = arrow.get(v).date()
else:
date = None
setattr(work, k, date)
else:
setattr(work, k, v)
return work
def parse(cls, api, json):
"""Parse a JSON object into a model instance.
:param api: instance of :class:`API <annict.api.API>` .
:type api: annict.api.API
:param dict json: JSON from Annict API.
:return: :class:`Program <Program>` object
:rtype: Program
"""
program = cls(api)
program._json = json
for k, v in json.items():
if k == 'started_at':
setattr(program, k, arrow.get(v).datetime)
elif k == 'work':
work = Work.parse(api, v)
setattr(program, k, work)
elif k == 'episode':
episode = Episode.parse(api, v)
setattr(program, k, episode)
else:
setattr(program, k, v)
return program
def get(self):
try:
req_resp = stats.request(str(get_ip(self.request)))
say("Received API request (" + req_resp + ")")
except:
error("Errored while handling request IP -- still served...")
self.set_header("Content-Type", "application/json")
latest = -1
try:
latest = int(self.get_argument('latest'))
except:
pass # no latest flash specified
data = {
"server": "LibreNews Central",
"channels": [k[2] for k in configuration.get_accounts()],
"latest": [flash for flash in flashes.get_latest_flashes(25) if int(flash['id']) > int(latest)]
}
self.write(unicode(json.dumps(data, sort_keys=True, separators=(',',':'))))