python类get()的实例源码

bark.py 文件源码 项目:bark 作者: kylerbrown 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def convert_timestamp(obj, default_tz='America/Chicago'):
    """Makes a Bark timestamp from an object.

    If the object is not timezone-aware, the timezone is set to be `default_tz.`

    Args:
        obj: time object; see :meth:`timestamp_to_datetime` for supported types
        default_tz (str): timezone to use if `obj` is timezone-naive; must be a
            string from the `tz database
            <https://en.wikipedia.org/wiki/List_of_tz_database_time_zones>`_.

    Returns:
        Arrow: Bark timestamp
    """
    dt = timestamp_to_datetime(obj)
    if dt.tzinfo:
        return arrow.get(dt).isoformat()
    else:
        return arrow.get(dt, default_tz).isoformat()
models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def share(self, profile):
        """Share this content as the profile given."""
        if self.content_type != ContentType.CONTENT:
            # TODO: support sharing replies too
            raise ValidationError("Can only share top level content.")
        if self.author == profile:
            raise ValidationError("Cannot share own content")
        if not self.visible_for_user(profile.user):
            raise ValidationError("Content to be shared is not visible to sharer.")
        if self.shares.filter(author=profile).exists():
            raise ValidationError("Profile has already shared this content.")
        # Use get or created as a safety to stop duplicates
        share, _created = Content.objects.get_or_create(author=profile, share_of=self, defaults={
            "visibility": self.visibility,
        })
        delete_memoized(Content.has_shared, self.id, profile.id)
        return share
trader.py 文件源码 项目:algo-trading-pipeline 作者: NeuralKnot 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def sell_positions(self):
        q = Query()
        test_func = lambda closed: not closed
        docs = self.position_db.search(q.closed.test(test_func))

        # Sell and remove position if >1hr old
        for doc in docs:
            if arrow.get(doc["at"]) < (arrow.now() - datetime.timedelta(hours=1)):
                self.logger.log("Trader/Seller", "informative", "Selling position for contract " + doc["contract_id"] + "!")

                if self.web_interface.have_position_in_market(doc["contract_id"]):
                    self.web_interface.sell(doc["contract_id"], doc["side"], doc["amount"])

                self.position_db.update({ "closed": True }, eids=[doc.eid])

    # Make a trade based on the result
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_event(self, container_factory, rabbit_config):

        from examples.retry import Service

        container = container_factory(Service, rabbit_config)
        container.start()

        timestamp = arrow.utcnow().replace(seconds=+1)

        dispatch = event_dispatcher(rabbit_config)
        with entrypoint_waiter(
            container, 'handle_event', callback=wait_for_result
        ) as result:
            payload = {'timestamp': timestamp.isoformat()}
            dispatch("src_service", "event_type", payload)

        res = result.get()
        assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_message(self, container_factory, rabbit_config):

        from examples.retry import Service

        container = container_factory(Service, rabbit_config)
        container.start()

        timestamp = arrow.utcnow().replace(seconds=+1)

        publish = publisher(rabbit_config)
        with entrypoint_waiter(
            container, 'handle_message', callback=wait_for_result
        ) as result:
            payload = {'timestamp': timestamp.isoformat()}
            publish(payload, routing_key="messages")

        res = result.get()
        assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
core.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_revision_list(self, article_path):
        """Get a list of revision objects for a given article title
        """
        revisions = []

        for commit in self.get_commits(article_path):
            committed_date = arrow.get(commit.committed_date)

            revisions.append({
                'id': commit.hexsha,
                'message': commit.message,
                'author': commit.author.name,
                'email': commit.author.email,
                'committed': str(committed_date),
                'committed_humanized': committed_date.humanize()
            })

        return revisions
core.py 文件源码 项目:bock 作者: afreeorange 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_revision(self, article_path, sha):
        """Get a single revision from a blob object for a given article
        title and commit ID
        """
        commit = self.get_commit(article_path, sha)

        if not commit:
            return None

        commit_date = arrow.get(commit.committed_date)
        blob = self.get_blob(article_path, commit)
        raw_article_content = (
            blob.data_stream.read().decode('UTF-8').replace('\u00a0', '')
            if blob
            else self.raw_article(article_path)
        )

        return {
            'title': self.article_title(article_path),
            'html': self.markdown_to_html(raw_article_content),
            'raw': raw_article_content,
            'committed': str(commit_date),
            'committed_humanized': commit_date.humanize(),
        }
invites.py 文件源码 项目:pluralsight 作者: tonybaloney 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_invites(self, email=None, note=None, team_id=None):
        """
        Get invitations matching certain filters

        :param email: The users' email address
        :type  email: ``str``

        :param team_id: The team identifier
        :type  team_id: ``str``

        :param note: Additional notes on the user
        :type  note: ``str``

        :return: A list of :class:`Invite`
        :rtype: ``list`` of :class:`Invite`
        """
        params = {}
        if email is not None:
            params['email'] = email
        if note is not None:
            params['note'] = note
        if team_id is not None:
            params['teamId'] = team_id
        invites = self.client.get('invites', params=params)
        return [self._to_invite(i) for i in invites['data']]
commontasks.py 文件源码 项目:activity-browser 作者: LCA-ActivityBrowser 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_activity_data(datasets):
    # if not fields:
    #     fields = ["name", "reference product", "amount", "location", "unit", "database"]
    # obj = {}
    # for field in fields:
    #     obj.update({field: key.get(field, '')})
    # obj.update({"key": key})
    for ds in datasets:
        obj = {
            'Activity': ds.get('name', ''),
            'Reference product': ds.get('reference product', ''),  # only in v3
            'Location': ds.get('location', 'unknown'),
            # 'Amount': "{:.4g}".format(key['amount']),
            'Unit': ds.get('unit', 'unknown'),
            'Database': ds['database'],
            'Uncertain': "True" if ds.get("uncertainty type", 0) > 1 else "False",
            'key': ds,
        }
        yield obj
osfexplorer.py 文件源码 项目:QOpenScienceFramework 作者: dschreij 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __upload_refresh_item(self, reply, parent_item, *args, **kwargs):
        """ Called by __upload_finished, if it is possible to add the new item
        at the correct position in the tree, without refreshing the whole tree.
        """
        item = json.loads(safe_decode(reply.readAll().data()))
        # Remove old item first, before adding new one
        updateIndex = kwargs.get('updateIndex')
        if not updateIndex is None:
            parent_item.takeChild(updateIndex)
        # Add the item as a new item to the tree
        new_item, kind = self.tree.add_item(parent_item, item['data'])
        # Set new item as currently selected item
        self.tree.setCurrentItem(new_item)
        # Store item in kwargs so callback functions can use it
        kwargs['new_item'] = new_item
        # Perform the afterUploadCallback if it has been specified
        after_upload_cb = kwargs.pop('afterUploadCallback', None)
        if callable(after_upload_cb):
            after_upload_cb(*args, **kwargs)
aws.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_item(self, data):
        """Method to get an item

        Args:
            data (dict): A dictionary of attributes to put

        Returns:
            (dict)
        """
        try:
            response = self.table.get_item(Key=data,
                                           ConsistentRead=True)
        except ClientError as err:
            raise IOError("Error getting item: {}".format(err.message))

        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
            raise IOError("Error getting item: {}".format(response['ResponseMetadata']))

        if "Item" in response:
            return response["Item"]
        else:
            return None
aws.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def delete_item(self, data):
        """Method to get an item

        Args:
            data (dict): A dictionary of attributes to access an item (hash and sort keys)

        Returns:
            None
        """
        try:
            response = self.table.delete_item(Key=data)

        except ClientError as err:
            raise IOError("Error deleting item: {}".format(err.message))

        if response['ResponseMetadata']['HTTPStatusCode'] != 200:
            raise IOError("Error deleting item: {}".format(response['ResponseMetadata']))
aws.py 文件源码 项目:donatemates 作者: donatemates 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def scan_table(self, eval_function, result_dict, attribute_str):
        """Method to scan a table, passing each item into an evaluation function

        Args:
            eval_function (function): Function to process the items
            attribute_str (str): Comma separated string of attributes to get

        Returns:
            (dict) Result of the scan
        """
        client = boto3.client('dynamodb', region_name="us-east-1")
        paginator = client.get_paginator('scan')
        params = {"ProjectionExpression": attribute_str,
                  "TableName": self.table_name,
                  "PaginationConfig": {'PageSize': 500}
                  }

        response_iterator = paginator.paginate(**params)
        for page in response_iterator:
            result_dict = eval_function(page["Items"], result_dict)

        return result_dict
websearch.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _youtube_info(*video_ids):
    results = googleapi(
        'youtube', 'v3', 'videos',
        part='contentDetails,snippet,statistics', id=','.join(video_ids))

    return [dict(
        title=r['snippet']['title'],
        duration=r['contentDetails']['duration'][2:].lower(),
        likes=r.get('statistics', {}).get('likeCount'),
        dislikes=r.get('statistics', {}).get('dislikeCount'),
        views=r.get('statistics', {}).get('viewCount'),
        channel=r['snippet']['channelTitle'],
        date=r['snippet']['publishedAt'][:10])
        for r in results]


###############################################################################
websearch.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def steam(inp, title, _cache={}):
    """Find steam games by their title."""
    if not _cache:
        data = requests.get(
            'http://api.steampowered.com/ISteamApps/GetAppList/v0001/').json()
        data = data['applist']['apps']['app']
        data = {i['name'].lower(): i['appid'] for i in data}
        _cache.update(data)
    if title in _cache:
        return get_steam_game(_cache[title])
    for k, v in _cache.items():
        if title in k:
            return get_steam_game(v)
    return lex.steam.not_found


###############################################################################
websearch.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dictionary(inp, *, query):
    """Look up dictionary definition of a word or a phrase."""
    url = 'http://ninjawords.com/' + query
    soup = bs4.BeautifulSoup(requests.get(url).text, 'lxml')
    word = soup.find(class_='word')

    if not word or not word.dl:
        return lex.dictionary.not_found

    output = ['\x02{}\x02 - '.format(word.dt.text)]
    for line in word.dl('dd'):
        if 'article' in line['class']:
            output.append('\x02{}\x02:'.format(line.text))
            idx = 1
        elif 'entry' in line['class']:
            text = line.find(class_='definition').text.strip().lstrip('°')
            output.append('{}. {}'.format(idx, text))
            idx += 1
        elif 'synonyms' in line['class']:
            strings = [i for i in line.stripped_strings if i != ','][1:]
            output.append('\x02Synonyms\x02: ' + ', '.join(strings) + '.')
    return ' '.join(output)
websearch.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def tvtropes(inp, *, query):
    """Show laconic description of the trope, and a link to the full page."""
    query = query.title().replace(' ', '')
    baseurl = 'http://tvtropes.org/{}/' + query
    url = baseurl.format('Laconic')
    soup = bs4.BeautifulSoup(requests.get(url).text, 'lxml')
    text = soup.find(class_='page-content').find('hr')
    if text is None:
        return lex.tvtropes.not_found
    text = reversed(list(text.previous_siblings))
    text = [i.text if hasattr(i, 'text') else i for i in text]
    text = [str(i).strip() for i in text]
    return '{} {}'.format(' '.join(text), baseurl.format('Main'))

###############################################################################
# Kaktuskast
###############################################################################
websearch.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _parse_kk(url):
    url = 'https://www.djkakt.us/' + url
    soup = bs4.BeautifulSoup(requests.get(url).text, 'lxml')

    episodes = []
    for epi in soup.find(class_='blog-list')('article'):
        date = epi.find(class_='entry-dateline-link').text
        date = ' '.join(date.split())
        date = arrow.get(date, 'MMMM D, YYYY').format('YYYY-MM-DD')
        title = epi.find(class_='entry-title').text.strip()

        index = _extract_episode_index(title)
        if not index:
            continue

        text = epi.find(class_='sqs-block-content').text
        url = epi.find(class_='entry-title').a['href']
        url = urllib.parse.urljoin('https://www.djkakt.us/', url)

        episodes.append(utils.AttrDict(
            date=date, title=title, index=index, text=text, url=url))

    episodes = list(sorted(episodes, key=lambda x: x.date, reverse=True))
    return episodes
parser.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def alert(pr):
    aset = pr.subparser('set')

    aset.add_argument(
        'date',
        type=arrow.get,
        nargs='?',
        help="""Date in YYYY-MM-DD format.""")

    aset.add_argument(
        'span',
        re='(\d+[dhm])+',
        help="""Time to wait before the alert, for example 2d3h4m.""")

    aset.exclusive('date', 'span', required=True)

    aset.add_argument(
        'message',
        nargs='+',
        action='join',
        help="""Alert text.""")

    ###########################################################################

    pr.subparser('echo')
notes.py 文件源码 项目:jarvis 作者: anqxyr 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_quote(inp, *, user, index):
    """Retrieve a quote."""
    if index is not None and index <= 0:
        return lex.input.bad_index

    if user:
        query = db.Quote.find(channel=inp.channel, user=user)
    else:
        query = db.Quote.find(channel=inp.channel)

    if not query.exists():
        return lex.quote.not_found

    index = index or random.randint(1, query.count())
    if index > query.count():
        return lex.quote.index_error
    quote = query.order_by(db.Quote.time).limit(1).offset(index - 1)[0]

    return lex.quote.get(
        index=index,
        total=query.count(),
        time=str(quote.time)[:10],
        user=quote.user,
        text=quote.text)
graylog_api.py 文件源码 项目:glog-cli 作者: globocom 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self, url, **kwargs):
        params = {}

        for label, item in six.iteritems(kwargs):
            if isinstance(item, list):
                params[label + "[]"] = item
            else:
                params[label] = item

        r = requests.get(self.base_url + url, params=params, headers=self.get_header, auth=(self.username, self.password), proxies=self.proxies)
        if r.status_code == requests.codes.ok:
            return r.json()
        elif r.status_code == 401:
            click.echo("API error: {} Message: User authorization denied.".format(r.status_code))
            exit()
        else:
            click.echo("API error: URL: {} Status: {} Message: {}".format(self.base_url + url, r.status_code, r.content))
            exit()
dateutils.py 文件源码 项目:glog-cli 作者: globocom 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def datetime_parser(s):
    try:
        ts = arrow.get(s)
        if ts.tzinfo == arrow.get().tzinfo:
            ts = ts.replace(tzinfo=LOCAL_TIMEZONE)
    except:
        c = pdt.Calendar()
        result, what = c.parse(s)

        ts = None
        if what in (1, 2, 3):
            ts = datetime.datetime(*result[:6])


            ts = arrow.get(ts)
            ts = ts.replace(tzinfo=LOCAL_TIMEZONE)
            return ts

    if ts is None:
         raise ValueError("Cannot parse timestamp '"+s+"'")

    return ts
clikraken_utils.py 文件源码 项目:clikraken 作者: zertrin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_config():
    """Load configuration parameters from the settings file"""

    if not os.path.exists(gv.USER_SETTINGS_PATH):
        logger.info("The user settings file {} was not found! "
                    "Using hardcoded default values.".format(gv.USER_SETTINGS_PATH))

    config = configparser.ConfigParser()
    config.read_string(gv.DEFAULT_SETTINGS_INI)
    config.read(gv.USER_SETTINGS_PATH)

    conf = config['clikraken']

    # Get the default currency pair from environment variable if available
    # otherwise take the value from the config file.
    gv.DEFAULT_PAIR = os.getenv('CLIKRAKEN_DEFAULT_PAIR', conf.get('currency_pair'))
    gv.TICKER_PAIRS = os.getenv('CLIKRAKEN_TICKER_PAIRS', conf.get('ticker_currency_pairs'))

    # Get the default asset pair from the config file
    gv.DEFAULT_ASSET = os.getenv('CLIKRAKEN_DEFAULT_ASSET', conf.get('asset'))

    gv.TZ = conf.get('timezone')
    gv.TRADING_AGREEMENT = conf.get('trading_agreement')
fetcher.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _cache_refresh(self, s, auth):
        resp = s.get(self.remote, stream=True, auth=auth, timeout=self.fetcher_timeout, verify=self.verify_ssl)

        if resp.status_code == 200:
            return resp

        if resp.status_code == 429 or resp.status_code in [500, 502, 503, 504]:
            n = RETRIES
            retry_delay = RETRIES_DELAY
            while n != 0:
                if resp.status_code == 429:
                    logger.info('Rate Limit Exceeded, retrying in %ss' % retry_delay)
                else:
                    logger.error('%s found, retrying in %ss' % (resp.status_code, retry_delay))

                sleep(retry_delay)
                resp = s.get(self.remote, stream=True, auth=auth, timeout=self.fetcher_timeout,
                             verify=self.verify_ssl)
                if resp.status_code == 200:
                    return resp

                n -= 1
zcifv2.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _post(self, uri, data):
        if type(data) == dict:
            data = [data]

        if type(data[0]) != dict:
            raise RuntimeError('submitted data must be a dictionary')

        data = json.dumps(data)

        if self.nowait:
            uri = "{0}?nowait=1".format(uri)

        logger.debug('uri: %s' % uri)

        body = self.session.post(uri, data=data, verify=self.verify_ssl)
        logger.debug('status code: ' + str(body.status_code))
        if body.status_code > 299:
            logger.error('request failed: %s' % str(body.status_code))
            logger.error(json.loads(body.text).get('message'))
            return None

        body = json.loads(body.text)
        return body
archiver.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, indicator=None, group='everyone', provider=None, firsttime=None, lasttime=None, tags=None):

        self.indicator = indicator
        self.group = group
        self.provider = provider
        self.firsttime = firsttime
        self.lasttime = lasttime
        self.tags = tags

        if isinstance(group, list):
            self.group = group[0]

        if isinstance(self.tags, list):
            self.tags.sort()
            self.tags = ','.join(self.tags)

        if self.lasttime and isinstance(self.lasttime, basestring):
            self.lasttime = arrow.get(self.lasttime).datetime

        if self.firsttime and isinstance(self.firsttime, basestring):
            self.firsttime = arrow.get(self.firsttime).datetime


# http://www.pythoncentral.io/sqlalchemy-orm-examples/
test_nltk.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_nltk_vnc():
    x = text_to_list(vnc)

    ips = set()
    ts = set()
    tags = set()

    for i in x:
        ips.add(i.indicator)
        ts.add(i.lasttime)
        tags.add(i.tags[0])

    assert '190.10.9.246' in ips
    assert '68.135.40.6' in ips
    assert '114.32.32.66' in ips
    assert '80.13.221.76' in ips

    assert 'vncprobe' in tags
    assert arrow.get('2015-12-06T21:09:17.000000Z').datetime in ts
test_nltk.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_nltk_ssh():
    x = text_to_list(ssh)

    ips = set()
    ts = set()
    tags = set()

    for i in x:
        ips.add(i.indicator)
        ts.add(i.lasttime)
        tags.add(i.tags[0])

    assert '96.242.156.153' in ips
    assert '83.132.9.42' in ips

    assert 'sshpwauth' in tags
    assert arrow.get('2016-03-22T06:19:56Z').datetime in ts
views.py 文件源码 项目:valentina 作者: valentinavc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def save_message(request, pk):

    form = MessageForm(request.POST)
    if not form.is_valid():
        return JsonResponse({'error': form.errors}, status=400)

    chat = Chat.objects.get(pk=pk)
    if not chat:
        return JsonResponse({'error': 'Invalid chat'}, status=401)

    affiliation = Affiliation.objects.filter(chat=chat, user=request.user)
    if not affiliation:
        return JsonResponse({'error': 'User does not belongs to chat'},
                            status=401)

    message = Message.objects.create(chat=chat, user=request.user,
                                     content=form.cleaned_data.get('content'))

    return JsonResponse(_message_to_dict(request, message), status=201)
views.py 文件源码 项目:valentina 作者: valentinavc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def chat_preferences(request):

    # abort if invalid request
    should_abort = _should_abort(request, ['POST'])
    if should_abort:
        return should_abort

    form = ChatPreferencesForm(request.POST)
    if not form.is_valid():
        return JsonResponse({'error': form.errors}, status=400)

    pk = Affiliation.get_id_from_hash(form.cleaned_data.get('key'))
    affiliation = get_object_or_404(Affiliation, pk=pk, user=request.user)
    affiliation.active = form.cleaned_data.get('active')
    affiliation.save()
    return JsonResponse(_affiliation_to_dict(affiliation))


问题


面经


文章

微信
公众号

扫码关注公众号