def represent_float_as_str(value):
"""Represent a float as a string without losing precision."""
# In Python 2, calling str() on a float object loses precision:
#
# In [1]: 1.23456789012345678
# Out[1]: 1.2345678901234567
#
# In [2]: 1.2345678901234567
# Out[2]: 1.2345678901234567
#
# In [3]: str(1.2345678901234567)
# Out[3]: '1.23456789012'
#
# The best way to ensure precision is not lost is to convert to string via Decimal:
# https://github.com/mogui/pyorient/pull/226/files
if not isinstance(value, float):
raise GraphQLInvalidArgumentError(u'Attempting to represent a non-float as a float: '
u'{}'.format(value))
with decimal.localcontext() as ctx:
ctx.prec = 20 # floats are max 80-bits wide = 20 significant digits
return u'{:f}'.format(decimal.Decimal(value))
python类localcontext()的实例源码
def __init__(self, default_position, undercut_market_by=0.01, minimum_return=1.005, market_fee=0.005):
'''
default_position is whether the Strategy should hold the minor currency (sell, False) or
the major currency (buy / True) after scalping the market. For example, in a USD-CAD market
True would mean CAD is held after scalping and False would mean USD is held after scalping.
It is assumed that the default_position is currently being held. If not, override it by setting
current_position after initializing the strategy.
'''
Strategy.__init__(self)
self._spread_size_indicator = SpreadSize(minimum_return, market_fee)
self.default_position = default_position
self.current_position = default_position
self._first_time_unprofitable = True #Prevents repeating the same message.
# undercut_market_by is used to make the strategy's order be the next one filled on the market.
with localcontext() as context:
context.prec = 8
self.undercut_market_by = Decimal(undercut_market_by)
def _load_(self, value, context):
if isinstance(value, decimal.Decimal):
if not self.get_options().allow_nan and not value.is_finite():
raise ValueError()
return value
elif isinstance(value, text_types):
try:
with decimal.localcontext() as ctx:
ctx.traps[decimal.InvalidOperation] = 1
value = decimal.Decimal(value)
if not self.get_options().allow_nan and not value.is_finite():
raise ValueError()
return value
except decimal.InvalidOperation:
raise ValueError()
elif isinstance(value, integer_types):
return decimal.Decimal(value)
elif isinstance(value, float):
if not self.get_options().allow_nan:
if math.isnan(value) or math.isinf(value):
raise ValueError()
return decimal.Decimal(value)
else:
raise ValueError()
def float2dec(ft, decimal_digits):
"""
Convert float (or int) to Decimal (rounding up) with the
requested number of decimal digits.
Arguments:
ft (float, int): Number to convert
decimal (int): Number of digits after decimal point
Return:
Decimal: Number converted to decima
"""
with decimal.localcontext() as ctx:
ctx.rounding = decimal.ROUND_UP
places = decimal.Decimal(10)**(-decimal_digits)
return decimal.Decimal.from_float(float(ft)).quantize(places)
# Sorting algos for rectangle lists
def _do_decode(self, state):
"""This is the internal function that does the JSON decoding.
Called by the decode() method, after it has performed any Unicode decoding, etc.
"""
buf = state.buf
self.skipws(state)
if buf.at_end:
state.push_error('No value to decode')
else:
if state.options.decimal_context:
dec_ctx = decimal.localcontext( state.options.decimal_context )
else:
dec_ctx = _dummy_context_manager
with dec_ctx:
state.obj = self.decodeobj(state, at_document_start=True )
if not state.should_stop:
# Make sure there's nothing at the end
self.skipws(state)
if not buf.at_end:
state.push_error('Unexpected text after end of JSON value')
def _format_number(cls, number):
if isinstance(number, (Number, str)):
try:
value = Decimal(number)
except InvalidOperation:
raise ValueError('Invalid coordinate format: %r' % number)
else:
with localcontext() as ctx:
ctx.prec = 13
ctx.rounding = ROUND_HALF_UP
return value + 0
elif isinstance(number, Decimal):
return number
else:
raise TypeError('Invalid coordinates type: %r' % number)
def getPriceGross(self):
with localcontext() as ctx:
ctx.rounding = ROUND_HALF_EVEN
vatPercent = Decimal(settings.INVOICE_VAT) * Decimal("0.01")
grossPrice = self.price + (self.price * vatPercent)
return grossPrice.quantize(Decimal("0.01"))
def getVATAmount(self):
with localcontext() as ctx:
ctx.rounding = ROUND_HALF_EVEN
vatPercent = Decimal(settings.INVOICE_VAT) * Decimal("0.01")
vatAmount = self.price * vatPercent
return vatAmount.quantize(Decimal("0.01"))
def working(maxnum=100):
sums = []
for i in range(1, maxnum+1):
rootint = int(i ** 0.5)
if rootint ** 2 != i:
with decimal.localcontext() as c:
c.prec = 102
rootstr = str(decimal.Decimal(i) ** decimal.Decimal('0.5'))
rootstr = rootstr.replace('.', '')
tmpsum = 0
for j in range(100):
tmpsum += int(rootstr[j])
sums.append(tmpsum)
return sum(sums)
def set_pd_mag_from_counts(photodict,
c='',
ec='',
lec='',
uec='',
zp=DEFAULT_ZP,
sig=DEFAULT_UL_SIGMA):
"""Set photometry dictionary from a counts measurement."""
with localcontext() as ctx:
if lec == '' or uec == '':
lec = ec
uec = ec
prec = max(
get_sig_digits(str(c), strip_zeroes=False),
get_sig_digits(str(lec), strip_zeroes=False),
get_sig_digits(str(uec), strip_zeroes=False)) + 1
ctx.prec = prec
dlec = Decimal(str(lec))
duec = Decimal(str(uec))
if c != '':
dc = Decimal(str(c))
dzp = Decimal(str(zp))
dsig = Decimal(str(sig))
photodict[PHOTOMETRY.ZERO_POINT] = str(zp)
if c == '' or float(c) < DEFAULT_UL_SIGMA * float(uec):
photodict[PHOTOMETRY.UPPER_LIMIT] = True
photodict[PHOTOMETRY.UPPER_LIMIT_SIGMA] = str(sig)
photodict[PHOTOMETRY.MAGNITUDE] = str(dzp - (D25 * (dsig * duec
).log10()))
dnec = Decimal('10.0') ** (
(dzp - Decimal(photodict[PHOTOMETRY.MAGNITUDE])) / D25)
photodict[PHOTOMETRY.E_UPPER_MAGNITUDE] = str(D25 * (
(dnec + duec).log10() - dnec.log10()))
else:
photodict[PHOTOMETRY.MAGNITUDE] = str(dzp - D25 * dc.log10())
photodict[PHOTOMETRY.E_UPPER_MAGNITUDE] = str(D25 * (
(dc + duec).log10() - dc.log10()))
photodict[PHOTOMETRY.E_LOWER_MAGNITUDE] = str(D25 * (
dc.log10() - (dc - dlec).log10()))
def set_pd_mag_from_flux_density(photodict,
fd='',
efd='',
lefd='',
uefd='',
sig=DEFAULT_UL_SIGMA):
"""Set photometry dictionary from a flux density measurement.
`fd` is assumed to be in microjanskys.
"""
with localcontext() as ctx:
if lefd == '' or uefd == '':
lefd = efd
uefd = efd
prec = max(
get_sig_digits(str(fd), strip_zeroes=False),
get_sig_digits(str(lefd), strip_zeroes=False),
get_sig_digits(str(uefd), strip_zeroes=False)) + 1
ctx.prec = prec
dlefd = Decimal(str(lefd))
duefd = Decimal(str(uefd))
if fd != '':
dfd = Decimal(str(fd))
dsig = Decimal(str(sig))
if fd == '' or float(fd) < DEFAULT_UL_SIGMA * float(uefd):
photodict[PHOTOMETRY.UPPER_LIMIT] = True
photodict[PHOTOMETRY.UPPER_LIMIT_SIGMA] = str(sig)
photodict[PHOTOMETRY.MAGNITUDE] = str(Decimal('23.9') - D25 * (
dsig * duefd).log10())
if fd:
photodict[PHOTOMETRY.E_UPPER_MAGNITUDE] = str(D25 * (
(dfd + duefd).log10() - dfd.log10()))
else:
photodict[PHOTOMETRY.MAGNITUDE] = str(Decimal('23.9') - D25 *
dfd.log10())
photodict[PHOTOMETRY.E_UPPER_MAGNITUDE] = str(D25 * (
(dfd + duefd).log10() - dfd.log10()))
photodict[PHOTOMETRY.E_LOWER_MAGNITUDE] = str(D25 * (
dfd.log10() - (dfd - dlefd).log10()))
def is_profitable(self, highest_bid, lowest_ask):
'''
Returns: True if the spread is profitable, False if it is not.
'''
with localcontext() as context:
context.prec = 8
lowest_ask = Decimal(lowest_ask)
highest_bid = Decimal(highest_bid)
spread = lowest_ask - highest_bid
return spread > self.threshold(highest_bid)
def satoshi_to_usd( satoshi ):
'''
Pre: satoshi is a Decimal.
Returns: A Decimal with eight significant decimal places. The value of the satoshi in USD.
'''
with localcontext() as context:
context.prec = 8
return satoshi * current_price_of_bitcoin()
def current_price_of_bitcoin():
'''
Pre: coinmarketcap's api can be accessed. This will require an internet connection.
Returns: The price of Bitcoin in USD according to coinmarketcap.com
The variable will be a Decimal with eight decimal places.
'''
with localcontext() as context:
context.prec = 8
r = requests.get("https://api.coinmarketcap.com/v1/ticker/bitcoin/")
return Decimal(r.json()[0]["price_usd"])
def test_decimal_extendedcontext_mismatched_infs_to_nan(self):
# Test adding Decimal INFs with opposite sign returns NAN.
inf = Decimal('inf')
data = [1, 2, inf, 3, -inf, 4]
with decimal.localcontext(decimal.ExtendedContext):
self.assertTrue(math.isnan(statistics._sum(data)[1]))
def test_decimal_basiccontext_mismatched_infs_to_nan(self):
# Test adding Decimal INFs with opposite sign raises InvalidOperation.
inf = Decimal('inf')
data = [1, 2, inf, 3, -inf, 4]
with decimal.localcontext(decimal.BasicContext):
self.assertRaises(decimal.InvalidOperation, statistics._sum, data)
def to_wei(number, unit):
"""
Takes a number of a unit and converts it to wei.
"""
if unit.lower() not in units:
raise ValueError(
"Unknown unit. Must be one of {0}".format('/'.join(units.keys()))
)
if is_integer(number) or is_string(number):
d_number = decimal.Decimal(value=number)
elif isinstance(number, float):
d_number = decimal.Decimal(value=str(number))
elif isinstance(number, decimal.Decimal):
d_number = number
else:
raise TypeError("Unsupported type. Must be one of integer, float, or string")
s_number = str(number)
if d_number == 0:
return 0
unit_value = units[unit.lower()]
if d_number < 1 and '.' in s_number:
with localcontext() as ctx:
multiplier = len(s_number) - s_number.index('.') - 1
ctx.prec = multiplier
d_number = decimal.Decimal(value=number, context=ctx) * 10**multiplier
unit_value /= 10**multiplier
result_value = d_number * unit_value
if result_value < MIN_WEI or result_value > MAX_WEI:
raise ValueError("Resulting wei value must be between 1 and 2**256 - 1")
return int(result_value)
def to_decimal(self):
"""Returns an instance of :class:`decimal.Decimal` for this
:class:`Decimal128`.
"""
high = self.__high
low = self.__low
sign = 1 if (high & _SIGN) else 0
if (high & _SNAN) == _SNAN:
return decimal.Decimal((sign, (), 'N'))
elif (high & _NAN) == _NAN:
return decimal.Decimal((sign, (), 'n'))
elif (high & _INF) == _INF:
return decimal.Decimal((sign, (0,), 'F'))
if (high & _EXPONENT_MASK) == _EXPONENT_MASK:
exponent = ((high & 0x1fffe00000000000) >> 47) - _EXPONENT_BIAS
return decimal.Decimal((sign, (0,), exponent))
else:
exponent = ((high & 0x7fff800000000000) >> 49) - _EXPONENT_BIAS
arr = bytearray(15)
mask = 0x00000000000000ff
for i in range(14, 6, -1):
arr[i] = (low & mask) >> ((14 - i) << 3)
mask = mask << 8
mask = 0x00000000000000ff
for i in range(6, 0, -1):
arr[i] = (high & mask) >> ((6 - i) << 3)
mask = mask << 8
mask = 0x0001000000000000
arr[0] = (high & mask) >> 48
# Have to convert bytearray to bytes for python 2.6.
digits = [int(digit) for digit in str(_from_bytes(bytes(arr), 'big'))]
with decimal.localcontext(_DEC128_CTX) as ctx:
return ctx.create_decimal((sign, digits, exponent))
def truncate(number, places):
if not isinstance(places, int):
raise ValueError("Decimal places must be an integer.")
if places < 1:
raise ValueError("Decimal places must be at least 1.")
# If you want to truncate to 0 decimal places, just do int(number).
with localcontext() as context:
context.rounding = ROUND_DOWN
exponent = Decimal(str(math.pow(10, - places)))
return Decimal(str(number)).quantize(exponent)
def test_decimal_mismatched_infs_to_nan(self):
# Test adding Decimal INFs with opposite sign returns NAN.
inf = Decimal('inf')
data = [1, 2, inf, 3, -inf, 4]
with decimal.localcontext(decimal.ExtendedContext):
self.assertTrue(math.isnan(statistics._sum(data)))
def test_decimal_mismatched_infs_to_nan(self):
# Test adding Decimal INFs with opposite sign raises InvalidOperation.
inf = Decimal('inf')
data = [1, 2, inf, 3, -inf, 4]
with decimal.localcontext(decimal.BasicContext):
self.assertRaises(decimal.InvalidOperation, statistics._sum, data)
def process_order_book(self, highest_bid, lowest_ask):
'''
Pre: Traders keep track of their balance / assets and do not attempt a trade when they do not
have the balance to buy with or the assets to sell.
'''
# Financial calculations need accurate decimals
with localcontext() as context:
context.prec = 8
# The spread has to be calculated using the values that the strategy will try to use, not what
# is already being used.
highest_bid = Decimal(highest_bid)+self.undercut_market_by
lowest_ask = Decimal(lowest_ask)-self.undercut_market_by
if self._spread_size_indicator.is_profitable(highest_bid, lowest_ask):
# Changing current_position causes the trader to vacillate between buying and selling.
# Traders will not enter a position when the trader does not have the balance/assets
# so this causes the Trader to wait until their position is exited before they enter the
# market again - with the opposite position.
if self.current_position:
self.notify_observers(False, lowest_ask)
self.current_position = False
else:
self.notify_observers(True, highest_bid)
self.current_position = True
self._first_time_unprofitable = True
else:
# Not profitable = hold default position.
if self.default_position == DefaultPosition.HOLD:
if self._first_time_unprofitable:
print("Spread is not profitable. Holding.")
self.notify_observers(None, -1) # market_value is irrelevant so it will be set to -1
else:
if self.default_position == DefaultPosition.BUY:
if self._first_time_unprofitable:
print("Spread is not profitable. Holding major currency.")
self.notify_observers(None, lowest_ask)
elif self.default_position == DefaultPosition.SELL:
if self._first_time_unprofitable:
print("Spread is not profitable. Holding minor currency.")
self.notify_observers(None, highest_bid)
self.current_position = self.default_position
self._first_time_unprofitable = False
def __init__(self, options, percentage_to_trade=1, start_by_buying=True, starting_amount=100):
'''
Pre: options is an instance of QuadrigaOptions and must have pair and ticker set.
Post: self.is_test = True until authenticate() is called
'''
# Trader is in test mode by default.
# minimum_trade is the minimum amount of assets that can be sold on a trade.
Trader.__init__(self, True, options.minimum_trade, options.ticker)
# Will be set to a number (order ID) when an order is placed.
self._waiting_for_order_to_fill = None
# In test mode: Is used to prevent the same transaction from being counted twice.
self._last_simulation_transaction_check = 0
# In test mode: tracks how much the trader's order has been filled.
self._expecting_simulation_balance = 0
self._expecting_simulation_assets = 0
self._filled_simulation_balance = 0
self._filled_simulation_assets = 0
# Used when aborting to determine if any positions need to be closed.
self._active_buy_order = False
self._active_sell_order = False
self.major_currency = options.major_currency
self.minor_currency = options.minor_currency
self.percentage_to_trade = Decimal(percentage_to_trade)
self.amount_precision = options.amount_precision
self.price_precision = options.price_precision
self.start_by_buying = start_by_buying
with localcontext() as context:
context.prec = 8
if self.start_by_buying:
self.balance = Decimal(starting_amount)
self.assets = Decimal(0)
else:
self.balance = Decimal(0)
self.assets = Decimal(starting_amount)
self.post_fee = Decimal(1) - Decimal(options.fee)
def __init__(self, trading_pair, percentage_to_trade=1, start_by_buying=True, starting_amount=1):
'''
Post: self.is_test = True until authenticate() is called
'''
# Split up the trading pair for currency specific options and for console output.
self.market_ticker = trading_pair
split_pair = trading_pair.split("_")
self.major_currency = split_pair[0]
self.minor_currency = split_pair[1]
# Trader is in test mode by default.
# minimum_trade is the minimum amount of assets that can be sold on a trade.
Trader.__init__(self, True, minimum_trade_for[self.minor_currency])
# Will be set to a number (order ID) when an order is placed.
self._waiting_for_order_to_fill = None
# In test mode: Is used to prevent the same transaction from being counted twice.
self._last_simulation_transaction_check = 0
# In test mode: tracks how much the trader's order has been filled.
self._expecting_simulation_balance = 0
self._expecting_simulation_assets = 0
self._filled_simulation_balance = 0
self._filled_simulation_assets = 0
# Needs to be None or the program will crash in simulation mode when it
# checks if the bot was shut off manually (a live-only feature)
self.emergency_shutdown_id = None
# Used when aborting to determine if any positions need to be closed.
self._active_buy_order = False
self._active_sell_order = False
self.percentage_to_trade = Decimal(percentage_to_trade)
self.amount_precision = 8
self.price_precision = 8
self.start_by_buying = start_by_buying
with localcontext() as context:
context.prec = 8
if self.start_by_buying:
self.balance = Decimal(starting_amount)
self.assets = Decimal(0)
else:
self.balance = Decimal(0)
self.assets = Decimal(starting_amount)
self.post_fee = Decimal(1) - cryptopia_fee
def _decimal_to_128(value):
"""Converts a decimal.Decimal to BID (high bits, low bits).
:Parameters:
- `value`: An instance of decimal.Decimal
"""
with decimal.localcontext(_DEC128_CTX) as ctx:
value = ctx.create_decimal(value)
if value.is_infinite():
return _NINF if value.is_signed() else _PINF
sign, digits, exponent = value.as_tuple()
if value.is_nan():
if digits:
raise ValueError("NaN with debug payload is not supported")
if value.is_snan():
return _NSNAN if value.is_signed() else _PSNAN
return _NNAN if value.is_signed() else _PNAN
significand = int("".join([str(digit) for digit in digits]))
bit_length = _bit_length(significand)
high = 0
low = 0
for i in range(min(64, bit_length)):
if significand & (1 << i):
low |= 1 << i
for i in range(64, bit_length):
if significand & (1 << i):
high |= 1 << (i - 64)
biased_exponent = exponent + _EXPONENT_BIAS
if high >> 49 == 1:
high = high & 0x7fffffffffff
high |= _EXPONENT_MASK
high |= (biased_exponent & 0x3fff) << 47
else:
high |= biased_exponent << 49
if sign:
high |= _SIGN
return high, low
def binomial_factorial(self):
r"""
Implementation of the binomial coefficient computation. Not meant for actual computation
as the other methods available are more efficient.
Parameters
----------
n : int
Number of possibilities
k : int
number of unordered outcomes
Returns
-------
float
The binomial coefficient
Notes
-----
The binomial coefficient equation (in compact form) is defined as:
.. math::
\binom{n}{k} = \frac{n!}{k!(n-k)!} \qquad 0 \leq k \leq n
References
----------
Binomial coefficient. (2017, April 17). In Wikipedia, The Free Encyclopedia.
From https://en.wikipedia.org/w/index.php?title=Binomial_coefficient&oldid=775905810
Press, W., Teukolsky, S., Vetterling, W., & Flannery, B. (2007). Numerical recipes (3rd ed.).
Cambridge: Cambridge University Press.
Weisstein, Eric W. "Binomial Coefficient." From MathWorld--A Wolfram Web Resource.
http://mathworld.wolfram.com/BinomialCoefficient.html
"""
nk = np.minimum(self.n, self.n - self.k)
if nk >= 100:
with localcontext() as ctx:
ctx.prec = 50
bico = Decimal(factorial(self.n)) / (Decimal(factorial(self.k)) * Decimal(factorial(nk)))
else:
bico = float(factorial(self.n)) / float(factorial(self.k) * factorial(nk))
return bico
def ramanujan(n, prec=100):
r"""
Approximates the factorial :math:`n!` given an integer :math:`n` using Ramanujan's formula.
Ramanujan's formula is just as or more accurate than several other factorial approximation
formulas.
Parameters
----------
n
Integer to approximate factorial
prec
Defines level of precision for factorials over 100. Default 100. Optional
Returns
-------
int or Decimal
Factorial of :math:`n` as approximated by Ramanujan's formula.
Notes
-----
Ramanujan's formula is another factorial approximation method known for its accuracy
in comparison to other factorial approximation approaches including Stirling's and
Gosper's approximations. Ramanujan's formula is defined as:
.. math::
n! \approx \sqrt{\pi} \left(\frac{n}{e}\right)^n \sqrt[6]{8n^3 + 4n^2 + n + \frac{1}{30}}
Examples
--------
>>> ramanujan(10)
3628800.3116126074
>>> ramanujan(5)
120.00014706585664
References
----------
Mortici, Cristinel. On Gosper's Formula for the Gamma Function. Valahia University of Targoviste,
Department of Mathematics. Retrieved from http://files.ele-math.com/articles/jmi-05-53.pdf
"""
if n != np.floor(n):
n = np.floor(n)
if n >= 100:
with localcontext() as ctx:
ctx.prec = prec
f = Decimal(
np.sqrt(np.pi) * n ** n * np.exp(-n) * (8. * n ** 3. + 4. * n ** 2. + n + 1. / 30.) ** (1. / 6.))
else:
f = np.sqrt(np.pi) * n ** n * np.exp(-n) * (8. * n ** 3. + 4. * n ** 2. + n + (1. / 30.)) ** (1. / 6.)
return f