def __init__(self, *args, **kw):
self._read_only = False
OrderedDict.__init__(self, *args, **kw)
self._read_only = True
error = self._read_only_error
self.clear = self.update = error
self.pop = self.setdefault = self.popitem = error
python类__init__()的实例源码
def __init__(self, offset, name=None):
self.offset = offset
if not name:
minutes = self.offset.days * 1440 + self.offset.seconds // 60
if minutes < 0:
hours, minutes = divmod(-minutes, 60)
hours = -hours
else:
hours, minutes = divmod(minutes, 60)
name = 'UTC%+03d:%02d' % (hours, minutes)
self.name = name
def __init__(self):
for typ, keys in self._types.items():
for key in keys.split():
self[key] = typ
self['_%s' % key] = '%s[]' % typ
# this could be a static method in Python > 2.6
def __init__(self, obj):
self.obj = obj
def __init__(self, db):
"""Initialize type cache for connection."""
super(DbTypes, self).__init__()
self._regtypes = False
self._get_attnames = db.get_attnames
self._typecasts = Typecasts()
self._typecasts.get_attnames = self.get_attnames
self._typecasts.connection = db
db = db.db
self.query = db.query
self.escape_string = db.escape_string
def __init__(self, result, fields):
"""Create query from given result rows and field names."""
self.result = result
self.fields = fields
def __init__(self, db, event, callback=None,
arg_dict=None, timeout=None, stop_event=None):
"""Initialize the notification handler.
You must pass a PyGreSQL database connection, the name of an
event (notification channel) to listen for and a callback function.
You can also specify a dictionary arg_dict that will be passed as
the single argument to the callback function, and a timeout value
in seconds (a floating point number denotes fractions of seconds).
If it is absent or None, the callers will never time out. If the
timeout is reached, the callback function will be called with a
single argument that is None. If you set the timeout to zero,
the handler will poll notifications synchronously and return.
You can specify the name of the event that will be used to signal
the handler to stop listening as stop_event. By default, it will
be the event name prefixed with 'stop_'.
"""
self.db = db
self.event = event
self.stop_event = stop_event or 'stop_%s' % event
self.listening = False
self.callback = callback
if arg_dict is None:
arg_dict = {}
self.arg_dict = arg_dict
self.timeout = timeout
def __init__(self, *args, **kw):
"""Create a new connection
You can pass either the connection parameters or an existing
_pg or pgdb connection. This allows you to use the methods
of the classic pg interface with a DB-API 2 pgdb connection.
"""
if not args and len(kw) == 1:
db = kw.get('db')
elif not kw and len(args) == 1:
db = args[0]
else:
db = None
if db:
if isinstance(db, DB):
db = db.db
else:
try:
db = db._cnx
except AttributeError:
pass
if not db or not hasattr(db, 'db') or not hasattr(db, 'query'):
db = connect(*args, **kw)
self._closeable = True
else:
self._closeable = False
self.db = db
self.dbname = db.db
self._regtypes = False
self._attnames = {}
self._pkeys = {}
self._privileges = {}
self._args = args, kw
self.adapter = Adapter(self)
self.dbtypes = DbTypes(self)
db.set_cast_hook(self.dbtypes.typecast)
self.debug = None # For debugging scripts, this can be set
# * to a string format specification (e.g. in CGI set to "%s<BR>"),
# * to a file object to write debug statements or
# * to a callable object which takes a string argument
# * to any other true value to just print debug statements
def __init__(self, *args, **kw):
if len(args) > 1 or kw:
raise TypeError
items = args[0] if args else []
if isinstance(items, dict):
raise TypeError
items = list(items)
self._keys = [item[0] for item in items]
dict.__init__(self, items)
self._read_only = True
error = self._read_only_error
self.clear = self.update = error
self.pop = self.setdefault = self.popitem = error
def __init__(self, offset, name=None):
self.offset = offset
if not name:
minutes = self.offset.days * 1440 + self.offset.seconds // 60
if minutes < 0:
hours, minutes = divmod(-minutes, 60)
hours = -hours
else:
hours, minutes = divmod(minutes, 60)
name = 'UTC%+03d:%02d' % (hours, minutes)
self.name = name
def __init__(self):
for typ, keys in self._types.items():
for key in keys.split():
self[key] = typ
self['_%s' % key] = '%s[]' % typ
# this could be a static method in Python > 2.6
def __init__(self, obj):
self.obj = obj
def __init__(self, db):
self.db = db
self.encode_json = db.encode_json
db = db.db
self.escape_bytea = db.escape_bytea
self.escape_string = db.escape_string
def __init__(self, db):
"""Initialize type cache for connection."""
super(DbTypes, self).__init__()
self._regtypes = False
self._get_attnames = db.get_attnames
self._typecasts = Typecasts()
self._typecasts.get_attnames = self.get_attnames
self._typecasts.connection = db
db = db.db
self.query = db.query
self.escape_string = db.escape_string
def __init__(self, db, event, callback=None,
arg_dict=None, timeout=None, stop_event=None):
"""Initialize the notification handler.
You must pass a PyGreSQL database connection, the name of an
event (notification channel) to listen for and a callback function.
You can also specify a dictionary arg_dict that will be passed as
the single argument to the callback function, and a timeout value
in seconds (a floating point number denotes fractions of seconds).
If it is absent or None, the callers will never time out. If the
timeout is reached, the callback function will be called with a
single argument that is None. If you set the timeout to zero,
the handler will poll notifications synchronously and return.
You can specify the name of the event that will be used to signal
the handler to stop listening as stop_event. By default, it will
be the event name prefixed with 'stop_'.
"""
self.db = db
self.event = event
self.stop_event = stop_event or 'stop_%s' % event
self.listening = False
self.callback = callback
if arg_dict is None:
arg_dict = {}
self.arg_dict = arg_dict
self.timeout = timeout
def __init__(self, *args, **kw):
"""Create a new connection
You can pass either the connection parameters or an existing
_pg or pgdb connection. This allows you to use the methods
of the classic pg interface with a DB-API 2 pgdb connection.
"""
if not args and len(kw) == 1:
db = kw.get('db')
elif not kw and len(args) == 1:
db = args[0]
else:
db = None
if db:
if isinstance(db, DB):
db = db.db
else:
try:
db = db._cnx
except AttributeError:
pass
if not db or not hasattr(db, 'db') or not hasattr(db, 'query'):
db = connect(*args, **kw)
self._closeable = True
else:
self._closeable = False
self.db = db
self.dbname = db.db
self._regtypes = False
self._attnames = {}
self._pkeys = {}
self._privileges = {}
self._args = args, kw
self.adapter = Adapter(self)
self.dbtypes = DbTypes(self)
db.set_cast_hook(self.dbtypes.typecast)
self.debug = None # For debugging scripts, this can be set
# * to a string format specification (e.g. in CGI set to "%s<BR>"),
# * to a file object to write debug statements or
# * to a callable object which takes a string argument
# * to any other true value to just print debug statements
def __init__(self, maxsize=100):
OrderedDict.__init__(self)
self._maxsize = maxsize
def __init__(self, maxsize=100):
dict.__init__(self)
self._maxsize = maxsize
def __init__ (self, rc = None, data = None, is_warn = False):
self.rc_list = []
if (rc != None):
self.rc_list.append(TupleRC(rc, data, is_warn))
def __init__(self, maxlen = 20, *args, **kwargs):
OrderedDict.__init__(self, *args, **kwargs)
self.maxlen = maxlen