__init__.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:pymoku 作者: liquidinstruments 项目源码 文件源码
def __init__(self, ip_addr, load_instruments=None, force=False):
        """Create a connection to the Moku:Lab unit at the given IP address

        :type ip_addr: string
        :param ip_addr: The address to connect to. This should be in IPv4 dotted notation.

        :type load_instruments: bool or None
        :param load_instruments: Leave default (*None*) unless you know what you're doing.

        :type force: bool
        :param force: Ignore firmware and network compatibility checks and force the instrument
        to deploy. This is dangerous on many levels, leave *False* unless you know what you're doing.

        """
        self._ip = ip_addr
        self._seq = 0
        self._instrument = None
        self._known_mokus = []

        self._ctx = zmq.Context.instance()
        self._conn_lock = threading.RLock()

        try:
            self._conn = self._ctx.socket(zmq.REQ)
            self._conn.setsockopt(zmq.LINGER, 5000)
            self._conn.curve_publickey, self._conn.curve_secretkey = zmq.curve_keypair()
            self._conn.curve_serverkey, _ = zmq.auth.load_certificate(os.path.join(data_folder, '000'))
            self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))

            # Getting the serial should be fairly quick; it's a simple operation. More importantly we
            # don't wait to block the fall-back operation for too long
            self._conn.setsockopt(zmq.SNDTIMEO, 1000)
            self._conn.setsockopt(zmq.RCVTIMEO, 1000)

            self.serial = self.get_serial()
            self._set_timeout()
        except zmq.error.Again:
            if not force:
                print("Connection failed, either the Moku cannot be reached or the firmware is out of date")
                raise

            # If we're force-connecting, try falling back to non-encrypted.
            self._conn = self._ctx.socket(zmq.REQ)
            self._conn.setsockopt(zmq.LINGER, 5000)
            self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))

            self._set_timeout()

            self.serial = self.get_serial()

        self.name = None
        self.led = None
        self.led_colours = None

        # Check that pymoku is compatible with the Moku:Lab's firmware version
        if not force:
            build = self.get_firmware_build()
            if cp.firmware_is_compatible(build) == False: # Might be None = unknown, don't print that.
                raise MokuException("The connected Moku appears to be incompatible with this version of pymoku. Please run 'moku --ip={} firmware check_compat' for more information.".format(self._ip))

        self.load_instruments = load_instruments if load_instruments is not None else self.get_bootmode() == 'normal'
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号