test_assoc.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:pynetdicom3 作者: pydicom 项目源码 文件源码
def test_scp_assoc_ap_abort_reply(self):
        """Test the SCP sending an A-ABORT instead of an A-ASSOCIATE response"""
        class DummyAE(threading.Thread, AE):
            """Dummy AE used for testing"""
            def __init__(self, scp_sop_class, port):
                """Initialise the class"""
                AE.__init__(self, scp_sop_class=scp_sop_class, port=port)
                threading.Thread.__init__(self)
                self.daemon = True

            def run(self):
                """The thread run method"""
                self.start_scp()

            def start_scp(self):
                """new runner"""
                self._bind_socket()
                while True:
                    try:
                        if self._quit:
                            break
                        self._monitor_socket()
                        self.cleanup_associations()

                    except KeyboardInterrupt:
                        self.stop()

            def _monitor_socket(self):
                """Override the normal method"""
                try:
                    read_list, _, _ = select.select([self.local_socket], [], [], 0)
                except ValueError:
                    return

                # If theres a connection
                if read_list:
                    client_socket, _ = self.local_socket.accept()
                    client_socket.setsockopt(socket.SOL_SOCKET,
                                             socket.SO_RCVTIMEO,
                                             pack('ll', 10, 0))

                    # Create a new Association
                    # Association(local_ae, local_socket=None, max_pdu=16382)
                    assoc = Association(self,
                                        client_socket,
                                        max_pdu=self.maximum_pdu_size,
                                        acse_timeout=self.acse_timeout,
                                        dimse_timeout=self.dimse_timeout)
                    # Set the ACSE to abort association requests
                    assoc._a_p_abort_assoc_rq = True
                    assoc.start()
                    self.active_associations.append(assoc)

        scp = DummyAE(scp_sop_class=[VerificationSOPClass], port=11112)
        scp.start()

        ae = AE(scu_sop_class=[VerificationSOPClass])
        assoc = ae.associate('localhost', 11112)
        self.assertFalse(assoc.is_established)

        scp.stop()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号