def get(self, number):
'''Gets an entry, or return None if the entry does not exist.'''
# Do a binary search of the interval tree
bmin = 0
bmax = len(self.intervals) - 1
while True:
if bmax < bmin:
return None
index = (bmax + bmin) // 2
on = self.intervals[index]
if number < on[0][0]:
bmax = index - 1
elif number >= on[0][1]:
bmin = index + 1
else:
return on[1]
python类get()的实例源码
def get_logentry_at_time(*args):
'''
If one argument is provided, get the LogEntry corresponding to the given
unix time (in milliseconds). If two arguments are provided, get the
LogEntry at clipid (first argument) and time in that clip in milliseconds
(second argument).
'''
if len(args) == 1:
unixtime = args[0]
else:
unixtime = timestamp_from_video(*args)
for logentry in get_logentries():
if logentry.starttime <= unixtime < logentry.endtime:
return logentry
def _environ_cols_linux(fp): # pragma: no cover
# import os
# if fp is None:
# try:
# fp = os.open(os.ctermid(), os.O_RDONLY)
# except:
# pass
try:
from termios import TIOCGWINSZ
from fcntl import ioctl
from array import array
except ImportError:
return None
else:
try:
return array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[1]
except:
try:
from os.environ import get
except ImportError:
return None
else:
return int(get('COLUMNS', 1)) - 1
def get_data_home(ramp_kits_home=None):
"""Return the path of the ramp-kits data dir.
This folder is used to fetch the up-to-date ramp-kits
By default the data dir is set to a folder named 'ramp-kits'
in the user home folder.
Alternatively, it can be set by the 'RAMP-KITS' environment
variable or programmatically by giving an explicit folder path. The
'~' symbol is expanded to the user home folder.
If the folder does not already exist, it is automatically created.
"""
if ramp_kits_home is None:
ramp_kits_home = environ.get('RAMP-KITS',
join('~', 'ramp-kits'))
ramp_kits_home = expanduser(ramp_kits_home)
if not exists(ramp_kits_home):
makedirs(ramp_kits_home)
return ramp_kits_home
def __getattr__(self, attr):
"""Return a terminal capability, like bold.
For example, you can say ``term.bold`` to get the string that turns on
bold formatting and ``term.normal`` to get the string that turns it off
again. Or you can take a shortcut: ``term.bold('hi')`` bolds its
argument and sets everything to normal afterward. You can even combine
things: ``term.bold_underline_red_on_bright_green('yowzers!')``.
For a parametrized capability like ``cup``, pass the parameters too:
``some_term.cup(line, column)``.
``man terminfo`` for a complete list of capabilities.
Return values are always Unicode.
"""
resolution = (self._resolve_formatter(attr) if self.does_styling
else NullCallableString())
setattr(self, attr, resolution) # Cache capability codes.
return resolution
def _height_and_width(self):
"""Return a tuple of (terminal height, terminal width).
Start by trying TIOCGWINSZ (Terminal I/O-Control: Get Window Size),
falling back to environment variables (LINES, COLUMNS), and returning
(None, None) if those are unavailable or invalid.
"""
# tigetnum('lines') and tigetnum('cols') update only if we call
# setupterm() again.
for descriptor in self._init_descriptor, sys.__stdout__:
try:
return struct.unpack(
'hhhh', ioctl(descriptor, TIOCGWINSZ, '\000' * 8))[0:2]
except IOError:
# when the output stream or init descriptor is not a tty, such
# as when when stdout is piped to another program, fe. tee(1),
# these ioctls will raise IOError
pass
try:
return int(environ.get('LINES')), int(environ.get('COLUMNS'))
except TypeError:
return None, None
def getDynamoDBConnection(self, config=None, endpoint=None, port=None,
local=False, use_instance_metadata=False):
if not config:
config = {'region_name': 'us-west-2'}
params = {
'region_name': config.get('region_name', 'cn-north-1')
}
if local:
endpoint_url = 'http://{endpoint}:{port}'.format(endpoint=endpoint,
port=port)
params['endpoint_url'] = endpoint_url
db = boto3.resource('dynamodb', **params)
else:
if not config or not isinstance(config, dict):
raise ParameterException("Invalid config")
params.update(config)
db = boto3.resource('dynamodb', **params)
return db
def update_item_by_set():
Test.create(realname='gs01', score=100, order_score=99.99, date_created=now)
item = Test.get(realname='gs01', score=100)
item.update(Test.order_score.set(80))
print 'set'
assert item.order_score == 80
item.update(Test.order_score.set(78.7, attr_label='os'))
print 'set with attr_label'
assert item.order_score == 78.7
item.update(Test.order_score.set(78.7, if_not_exists='order_score'))
print 'set with if_not_exists'
assert item.order_score == 78.7
item.update(Test.order_score.set(10, if_not_exists='ids[0]'))
assert item.order_score == 10
print 'ids', item.ids, type(item.ids)
item.update(ids=[12])
print 'ids', item.ids, type(item.ids)
item.update(Test.ids.set([100], list_append=('ids', -1)))
print 'set with list_append'
assert item.ids[-1] == 100
item.update(Test.order_score.set(78.7, attr_label='os'),
doc={'a': 'bbb'})
print 'set with attr_label and upate_field'
assert item.doc['a'] == 'bbb'
def rrid_resolver_xml(exact, found_rrids):
print('\t' + exact)
resolver_uri = 'https://scicrunch.org/resolver/%s.xml' % exact
r = requests.get(resolver_uri)
status_code = r.status_code
xml = r.content
print(status_code)
found_rrids[exact] = status_code
return xml, status_code, resolver_uri
def initialize_snapshot_tags(event, context):
"""
Event comes in with snapshot info, make sure we"re working with a snapshot
we"re supposed to and wait for it to be done snapshotting.
"""
session = boto3.session.Session(region_name=event.get("region"))
ec2 = session.resource("ec2")
snapshot_ids = [vol["snapshot-id"] for vol in event.get("backup-volumes")]
for sid in snapshot_ids:
if ec2.Snapshot(sid).state != "completed":
raise NotReady("Snapshot not ready")
return event
def terminate_instances(event, context):
""" Terminate instance identified in the event """
session = boto3.session.Session(region_name=event.get("region"))
ec2 = session.resource("ec2")
ec2.Instance(event["instance-id"]).terminate()
return event
def are_snapshots_tagged(region, snapshot_ids):
"""Check the current snapshot tags to prevent multiple snapshots."""
session = boto3.session.Session(region_name=region)
ec2 = session.resource('ec2')
for sid in snapshot_ids:
snapshot = ec2.Snapshot(sid)
current_tags = snapshot.tags
if not current_tags:
continue
for tag in current_tags:
if tag.get("Key") == VALIDATION_TAG_KEY:
return True
return False
def index_page():
if request.args.get('epub'):
return create_epub()
else:
return create_page()
def create_page():
page = int(request.args.get("page", "0"))
limit = int(request.args.get("limit", "10"))
offset = page * limit
toots, count, count_all = get_toots(offset, limit)
accounts = Account.query.order_by(Account.username)
instances = Instance.query.order_by(Instance.domain)
blacklist_status = True if request.args.get('blacklisted', None) else False
if request.args.get('blacklisted') != 'ignore':
accounts = accounts.filter(Account.blacklisted == blacklist_status)
instances = instances.filter(Instance.blacklisted == blacklist_status)
pagination = {'page': page + 1,
'limit': limit}
pagination['next'] = "/?page=%s&" % (page + 1)
pagination['previous'] = "/?page=%s&" % (page - 1)
for key, value in request.args.iteritems():
if not key == "page":
pagination['next'] += "&%s=%s" % (key, value)
pagination['previous'] += "&%s=%s" % (key, value)
if count < limit:
pagination.pop('next')
if page == 0:
pagination.pop('previous')
pagination['page_count'] = int(count_all / limit) + 1
return render_template('index.html',
toots=toots,
accounts=accounts,
instances=instances,
pagination=pagination)
def is_ci():
''' Check if boss is running in a Continuous Integration (CI) environment. '''
return bool(
env.get('BOSS_RUNNING') == 'true' and (
(env.get('CI') == 'true') or
(env.get('CONTINUOUS_INTEGRATION') == 'true')
)
)
def is_travis():
''' Check if boss is running under Travis CI. '''
return is_ci() and env.get('TRAVIS') == 'true'
def get_ci_link(config):
''' Get CI build link for the current build deployment. '''
if is_travis():
base_url = config['ci']['base_url'].rstrip('/')
return ci.TRAVIS_BUILD_URL.format(
base_url=base_url,
repo_slug=env.get('TRAVIS_REPO_SLUG'),
build_id=env.get('TRAVIS_BUILD_ID')
)
# Other CI providers aren't supported at the moment.
# TODO: Add support for more providers.
return None
def test_intersect_proximity():
ec = environment.EnvironmentalCorrelation()
ec.intersect_proximity(mining_filename, vector_filename, proximity, test_filename)
expected = spectral.open_image(correlated_filename)
actual = spectral.open_image(test_filename)
assert numpy.array_equal(expected.asarray(), actual.asarray())
assert actual.metadata.get('description') == 'COAL '+pycoal.version+' environmental correlation image.'
assert expected.metadata.get('class names') == actual.metadata.get('class names')
assert expected.metadata.get('map info') == actual.metadata.get('map info')