def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
python类get()的实例源码
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
connections.py 文件源码
项目:ServerlessCrawler-VancouverRealState
作者: MarcelloLins
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length
def _get_descriptions(self):
"""Read a column descriptor packet for each column in the result."""
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range_type(self.field_count):
field = self.connection._read_packet(FieldDescriptorPacket)
self.fields.append(field)
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data is encoded in ascii
encoding = 'ascii'
else:
encoding = None
converter = self.connection.decoders.get(field_type)
if converter is through:
converter = None
if DEBUG: print("DEBUG: field={}, converter={}".format(field, converter))
self.converters.append((encoding, converter))
eof_packet = self.connection._read_packet()
assert eof_packet.is_eof_packet(), 'Protocol error, expecting EOF'
self.description = tuple(description)
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length
def _get_descriptions(self):
"""Read a column descriptor packet for each column in the result."""
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range_type(self.field_count):
field = self.connection._read_packet(FieldDescriptorPacket)
self.fields.append(field)
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data is encoded in ascii
encoding = 'ascii'
else:
encoding = None
converter = self.connection.decoders.get(field_type)
if converter is through:
converter = None
if DEBUG: print("DEBUG: field={}, converter={}".format(field, converter))
self.converters.append((encoding, converter))
eof_packet = self.connection._read_packet()
assert eof_packet.is_eof_packet(), 'Protocol error, expecting EOF'
self.description = tuple(description)
connections.py 文件源码
项目:ServerlessCrawler-VancouverRealState
作者: MarcelloLins
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length
connections.py 文件源码
项目:ServerlessCrawler-VancouverRealState
作者: MarcelloLins
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _get_descriptions(self):
"""Read a column descriptor packet for each column in the result."""
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range_type(self.field_count):
field = self.connection._read_packet(FieldDescriptorPacket)
self.fields.append(field)
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data is encoded in ascii
encoding = 'ascii'
else:
encoding = None
converter = self.connection.decoders.get(field_type)
if converter is through:
converter = None
if DEBUG: print("DEBUG: field={}, converter={}".format(field, converter))
self.converters.append((encoding, converter))
eof_packet = self.connection._read_packet()
assert eof_packet.is_eof_packet(), 'Protocol error, expecting EOF'
self.description = tuple(description)
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length
def _get_descriptions(self):
"""Read a column descriptor packet for each column in the result."""
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range_type(self.field_count):
field = self.connection._read_packet(FieldDescriptorPacket)
self.fields.append(field)
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data is encoded in ascii
encoding = 'ascii'
else:
encoding = None
converter = self.connection.decoders.get(field_type)
if converter is through:
converter = None
if DEBUG: print("DEBUG: field={}, converter={}".format(field, converter))
self.converters.append((encoding, converter))
eof_packet = self.connection._read_packet()
assert eof_packet.is_eof_packet(), 'Protocol error, expecting EOF'
self.description = tuple(description)
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length
def _get_descriptions(self):
"""Read a column descriptor packet for each column in the result."""
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range_type(self.field_count):
field = self.connection._read_packet(FieldDescriptorPacket)
self.fields.append(field)
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data is encoded in ascii
encoding = 'ascii'
else:
encoding = None
converter = self.connection.decoders.get(field_type)
if converter is through:
converter = None
if DEBUG: print("DEBUG: field={}, converter={}".format(field, converter))
self.converters.append((encoding, converter))
eof_packet = self.connection._read_packet()
assert eof_packet.is_eof_packet(), 'Protocol error, expecting EOF'
self.description = tuple(description)
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length
def _get_descriptions(self):
"""Read a column descriptor packet for each column in the result."""
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range_type(self.field_count):
field = self.connection._read_packet(FieldDescriptorPacket)
self.fields.append(field)
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data is encoded in ascii
encoding = 'ascii'
else:
encoding = None
converter = self.connection.decoders.get(field_type)
if converter is through:
converter = None
if DEBUG: print("DEBUG: field={}, converter={}".format(field, converter))
self.converters.append((encoding, converter))
eof_packet = self.connection._read_packet()
assert eof_packet.is_eof_packet(), 'Protocol error, expecting EOF'
self.description = tuple(description)
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length
def _get_descriptions(self):
"""Read a column descriptor packet for each column in the result."""
self.fields = []
self.converters = []
use_unicode = self.connection.use_unicode
conn_encoding = self.connection.encoding
description = []
for i in range_type(self.field_count):
field = self.connection._read_packet(FieldDescriptorPacket)
self.fields.append(field)
description.append(field.description())
field_type = field.type_code
if use_unicode:
if field_type == FIELD_TYPE.JSON:
# When SELECT from JSON column: charset = binary
# When SELECT CAST(... AS JSON): charset = connection encoding
# This behavior is different from TEXT / BLOB.
# We should decode result by connection encoding regardless charsetnr.
# See https://github.com/PyMySQL/PyMySQL/issues/488
encoding = conn_encoding # SELECT CAST(... AS JSON)
elif field_type in TEXT_TYPES:
if field.charsetnr == 63: # binary
# TEXTs with charset=binary means BINARY types.
encoding = None
else:
encoding = conn_encoding
else:
# Integers, Dates and Times, and other basic data is encoded in ascii
encoding = 'ascii'
else:
encoding = None
converter = self.connection.decoders.get(field_type)
if converter is through:
converter = None
if DEBUG: print("DEBUG: field={}, converter={}".format(field, converter))
self.converters.append((encoding, converter))
eof_packet = self.connection._read_packet()
assert eof_packet.is_eof_packet(), 'Protocol error, expecting EOF'
self.description = tuple(description)
def get_column_length(self):
if self.type_code == FIELD_TYPE.VAR_STRING:
mblen = MBLENGTH.get(self.charsetnr, 1)
return self.length // mblen
return self.length