def __init__(self, addr):
connected = False
while(not connected):
try:
self.sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
self.sock.connect((addr, 1))
connected = True
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
# TODO: Move to the logging library
logging.error("Exception: "+str(e))
logging.info("Could not connect. Waiting 10s")
time.sleep(10)
# set the timeout to 5 seconds.
self.sock.settimeout(5)
## Read data from the GPS feed
#
# @returns Data read from the GPS
python类BTPROTO_RFCOMM的实例源码
def BluetoothSocket():
return socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM)
def _connect_bluetooth(self, host: str) -> int:
"""
Create a socket, that holds a bluetooth-connection to an EV3
"""
self._socket = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM)
self._socket.connect((host, 1))
def start(self):
"""
Starts the Bluetooth server if its not already running. The server needs to be started before
connections can be made.
"""
if not self._running:
if self._power_up_device:
self.adapter.powered = True
if not self.adapter.powered:
raise Exception("Bluetooth device {} is turned off".format(self.adapter.device))
#register the serial port profile with Bluetooth
register_spp(self._port)
#start Bluetooth server
#open the Bluetooth socket
self._server_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
self._server_sock.settimeout(BLUETOOTH_TIMEOUT)
try:
self._server_sock.bind((self.server_address, self.port))
except (socket.error, OSError) as e:
if e.errno == errno.EADDRINUSE:
print("Bluetooth address {} is already in use - is the server already running?".format(self.server_address))
raise e
self._server_sock.listen(1)
#wait for client connection
self._conn_thread = WrapThread(target=self._wait_for_connection)
self._conn_thread.start()
self._running = True
def connect(self):
"""
Connect to a Bluetooth server.
"""
if not self._connected:
if self._power_up_device:
self.adapter.powered = True
if not self.adapter.powered:
raise Exception("Bluetooth device {} is turned off".format(self.adapter.device))
#try and find the server name or MAC address in the paired devices list
server_mac = None
for device in self.adapter.paired_devices:
if self._server == device[0] or self._server == device[1]:
server_mac = device[0]
break
if server_mac == None:
raise Exception("Server {} not found in paired devices".format(self._server))
#create a socket
self._client_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
self._client_sock.bind((self.adapter.address, self._port))
self._client_sock.connect((server_mac, self._port))
self._connected = True
self._conn_thread = WrapThread(target=self._read)
self._conn_thread.start()
def _open(self) -> None:
"""Opens a Bluetooth socket connection to a sensor."""
if not self._server_mac_address:
self.logger.error('MAC address of server not set')
return
try:
self._sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM)
self._sock.connect((self._server_mac_address, self._port))
except OSError as e:
self.logger.error(e)