python类SOCK_SEQPACKET的实例源码

hfp.py 文件源码 项目:nOBEX 作者: nccgroup 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _connect_hfp(address, control_chan=True, audio_chan=True):
        connection = None

        # Connect to RFCOMM control channel on HF (car kit)
        if control_chan:
            port = find_service("hf", address)
            print("HFP connecting to %s on port %i" % (address, port))
            connection = common.Socket()
            time.sleep(0.5)
            connection.connect((address, port))

        if audio_chan and hasattr(socket, "BTPROTO_SCO"):
            asock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO)
            time.sleep(0.5)
            asock.connect(bytes(address, encoding="UTF-8"))
            print("HFP SCO audio socket established")

        return connection
juniper-vpn-wrap.py 文件源码 项目:service-juniper-vpn 作者: docksal 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def tncc_start(self):
        # tncc is the host checker app. It can check different
        # security policies of the host and report back. We have
        # to send it a preauth key (from the DSPREAUTH cookie)
        # and it sends back a new cookie value we submit.
        # After logging in, we send back another cookie to tncc.
        # Subsequently, it contacts https://<vpn_host:443 every
        # 10 minutes.

        if not self.tncc_jar:
            self.tncc_init()

        self.tncc_socket, sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET)
        null = open(os.devnull, 'w')

        self.tncc_process = subprocess.Popen(['java',
            '-classpath', self.tncc_jar + ':' + self.plugin_jar,
            self.class_name,
            'log_level', '3',
            'postRetries', '6',
            'ivehost', self.vpn_host,
            'home_dir', os.path.expanduser('~'),
            'Parameter0', '',
            'user_agent', self.user_agent,
            ], env={'LD_PRELOAD': self.tncc_preload}, stdin=sock, stdout=null)
test_socket.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _have_socket_rds():
    """Check whether RDS sockets are supported on this host."""
    try:
        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
    except (AttributeError, OSError):
        return False
    else:
        s.close()
    return True
test_socket.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        self.addCleanup(self.serv.close)
        try:
            self.port = support.bind_port(self.serv)
        except OSError:
            self.skipTest('unable to bind RDS socket')
test_socket.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def clientSetUp(self):
        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        try:
            # RDS sockets must be bound explicitly to send or receive data
            self.cli.bind((HOST, 0))
            self.cli_addr = self.cli.getsockname()
        except OSError:
            # skipTest should not be called here, and will be called in the
            # server instead
            pass
test_socket.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testCreateSocket(self):
        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
            pass
test_socket.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _have_socket_rds():
    """Check whether RDS sockets are supported on this host."""
    try:
        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
    except (AttributeError, OSError):
        return False
    else:
        s.close()
    return True
test_socket.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp(self):
        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        self.addCleanup(self.serv.close)
        try:
            self.port = support.bind_port(self.serv)
        except OSError:
            self.skipTest('unable to bind RDS socket')
test_socket.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def clientSetUp(self):
        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        try:
            # RDS sockets must be bound explicitly to send or receive data
            self.cli.bind((HOST, 0))
            self.cli_addr = self.cli.getsockname()
        except OSError:
            # skipTest should not be called here, and will be called in the
            # server instead
            pass
test_socket.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testCreateSocket(self):
        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
            pass
test_socket.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _have_socket_rds():
    """Check whether RDS sockets are supported on this host."""
    try:
        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
    except (AttributeError, OSError):
        return False
    else:
        s.close()
    return True
test_socket.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUp(self):
        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        self.addCleanup(self.serv.close)
        try:
            self.port = support.bind_port(self.serv)
        except OSError:
            self.skipTest('unable to bind RDS socket')
test_socket.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def clientSetUp(self):
        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
        try:
            # RDS sockets must be bound explicitly to send or receive data
            self.cli.bind((HOST, 0))
            self.cli_addr = self.cli.getsockname()
        except OSError:
            # skipTest should not be called here, and will be called in the
            # server instead
            pass
test_socket.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testCrucialConstants(self):
        # Testing for mission critical constants
        socket.AF_INET
        socket.SOCK_STREAM
        socket.SOCK_DGRAM
        socket.SOCK_RAW
        socket.SOCK_RDM
        socket.SOCK_SEQPACKET
        socket.SOL_SOCKET
        socket.SO_REUSEADDR
test_socket.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testCreateSocket(self):
        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
            pass
dualshock4.py 文件源码 项目:derplearning 作者: John-Ellis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, config, full_config):
        super(Dualshock4, self).__init__(config, full_config)

        # bluetooth control socket
        self.report_id = 0x11
        self.report_size = 79
        self.ctrl_socket = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET,
                                         socket.BTPROTO_L2CAP)
        self.intr_socket = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET,
                                         socket.BTPROTO_L2CAP)

        # Prepare packet
        self.packet = bytearray(79)
        self.packet[0] = 0x52
        self.packet[1] = self.report_id
        self.packet[2] = 0x80
        self.packet[4] = 0xFF

        # The deadzone of the analog sticks to ignore them
        self.deadzone = self.config['deadzone']        
        self.left_analog_active = False
        self.right_analog_active = False
        self.left_trigger_active = False
        self.right_trigger_active = False
        self.up_active = False
        self.down_active = False
        self.left_active = False
        self.right_active = False

        n_attemps = 5
        for attempt in range(n_attemps):
            print("Attempt %i of %i" % (attempt + 1, n_attemps), end='\r')
            cmd = ["hcitool", "scan", "--flush"]
            res = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf8")
            for _, address, name in [l.split("\t") for l in res.splitlines()[1:]]:
                if name == "Wireless Controller":
                    self.ctrl_socket.connect((address, 0x11))
                    self.intr_socket.connect((address, 0x13))
                    self.intr_socket.setblocking(False)
                    self.ready = True
                    return
unit.py 文件源码 项目:diafuzzer 作者: phil777 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dwr_handler(fuzzed, f):
  (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)

  child = WrappedThread(fuzzed_plug, target=fuzzed, args=[fuzzed_plug])
  child.start()

  while True:
    (readable, _, _) = sl.select([own_plug, f], [], [])

    if own_plug in readable:
      try:
        b = own_plug.recv(dm.U24_MAX)
        if len(b) == 0:
          break
      except:
        break
      f.send(b)
    elif f in readable:
      b = f.recv(dm.U24_MAX)
      if len(b) == 0:
        break

      m = dm.Msg.decode(b)
      if m.code == 280 and m.R:
        dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
          dm.Avp(code=264, M=True, data=local_host),
          dm.Avp(code=296, M=True, data=local_realm),
          dm.Avp(code=268, M=True, u32=2001),
          dm.Avp(code=278, M=True, u32=0xcafebabe)])
        f.send(dwa.encode())
      else:
        own_plug.send(b)

  own_plug.close()
  return child.join()
fuzz.py 文件源码 项目:diafuzzer 作者: phil777 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def dwr_handler(scenario, f):
  msgs = []

  (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)

  child = Thread(target=scenario, args=[fuzzed_plug])
  child.start()

  while True:
    (readable, _, _) = sl.select([own_plug, f], [], [])

    if own_plug in readable:
      b = own_plug.recv(dm.U24_MAX)
      if len(b) == 0:
        break
      m = dm.Msg.decode(b)
      msgs.append((m, True))
      f.send(b)
    elif f in readable:
      b = f.recv(dm.U24_MAX)
      if len(b) == 0:
        break

      m = dm.Msg.decode(b)
      if m.code == 280 and m.R:
        dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
          dm.Avp(code=264, M=True, data=local_host),
          dm.Avp(code=296, M=True, data=local_realm),
          dm.Avp(code=268, M=True, u32=2001),
          dm.Avp(code=278, M=True, u32=0xcafebabe)])
        f.send(dwa.encode())
      else:
        msgs.append((m, False))
        own_plug.send(b)

  own_plug.close()
  exc_info = child.join()

  return (exc_info, msgs)
fuzz.py 文件源码 项目:diafuzzer 作者: phil777 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def fuzz_handler(scenario, f, fuzz):
  assert(isinstance(fuzz, FuzzScenario))

  msgs = []

  (own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)

  fuzz.bind(f)

  child = WrappedThread(fuzzed_plug, target=scenario, args=[fuzzed_plug])
  child.start()

  while True:
    (readable, _, _) = sl.select([own_plug, f], [], [])

    if own_plug in readable:
      b = own_plug.recv(dm.U24_MAX)
      if len(b) == 0:
        break
      m = dm.Msg.decode(b)
      msgs.append((m, True))
      assert(isinstance(m, dm.Msg))
      fuzz.send(m)
    elif f in readable:
      b = f.recv(dm.U24_MAX)
      if len(b) == 0:
        break

      m = dm.Msg.decode(b)
      if m.code == 280 and m.R:
        dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
          dm.Avp(code=264, M=True, data=local_host),
          dm.Avp(code=296, M=True, data=local_realm),
          dm.Avp(code=268, M=True, u32=2001),
          dm.Avp(code=278, M=True, u32=0xcafebabe)])
        f.send(dwa.encode())
      else:
        msgs.append((m, False))
        own_plug.send(b)

  own_plug.close()
  exc_info = child.join()

  return (exc_info, msgs)


问题


面经


文章

微信
公众号

扫码关注公众号