python类HTTPError()的实例源码

blockchain_info.py 文件源码 项目:bitencrypt 作者: OriginalMy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def broadcast_tx(self, tx):
        s = io.BytesIO()
        tx.stream(s)
        tx_as_hex = b2h(s.getvalue())
        data = urlencode(dict(tx=tx_as_hex)).encode("utf8")
        URL = "http://blockchain.info/pushtx"
        try:
            d = urlopen(URL, data=data).read()
            return d
        except HTTPError as ex:
            try:
                d = ex.read()
                ex.message = d
            except:
                pass
            raise ex
github.py 文件源码 项目:QCrash 作者: ColinDuquesnoy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_access_token(self, code, state=None):
        '''
        In callback url: http://host/callback?code=123&state=xyz

        use code and state to get an access token.
        '''
        kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code)
        if self._redirect_uri:
            kw['redirect_uri'] = self._redirect_uri
        if state:
            kw['state'] = state
        opener = build_opener(HTTPSHandler)
        request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw))
        request.get_method = _METHOD_MAP['POST']
        request.add_header('Accept', 'application/json')
        try:
            response = opener.open(request, timeout=TIMEOUT)
            r = _parse_json(response.read())
            if 'error' in r:
                raise ApiAuthError(str(r.error))
            return str(r.access_token)
        except HTTPError as e:
            raise ApiAuthError('HTTPError when get access token')
GoProHero.py 文件源码 项目:GPArrayController 作者: eastpiger 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def command(self, command, value=None):
        func_str = 'GoProHero.command({}, {})'.format(command, value)

        if command in self.commandMaxtrix:
            args = self.commandMaxtrix[command]
            # accept both None and '' for commands without a value
            if value == '':
                value = None
            # for commands with values, translate the value
            if value is not None and value in args['translate']:
                value = args['translate'][value]
            # build the final url
            url = self._commandURL(args['cmd'], value)

            # attempt to contact the camera
            try:
                urlopen(url, timeout=self.timeout).read()
                logging.info('{} - http success!'.format(func_str))
                return True
            except (HTTPError, URLError, socket.timeout) as e:
                logging.warning('{}{} - error opening {}: {}{}'.format(
                    Fore.YELLOW, func_str, url, e, Fore.RESET))

        # catchall return statement
        return False
hatebu_info.py 文件源码 项目:taikutsu_blog_works 作者: hokekiyoo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def rank_checker(url,hatebu_url):
    try:
        html = request.urlopen(hatebu_url)
    except request.HTTPError as e:
        print(e.reason)
    except request.URLError as e:
        print(e.reason)
    soup = BeautifulSoup(html,"lxml")
    a = soup.find("a",href=url)
    if a == None:
        rank = None
    else:
        rank = a.get("data-entryrank")
    return rank

# ????????????????????
browser.py 文件源码 项目:CloudPrint 作者: William-An 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def do_request(self, req):
        if DEBUG:
            print('requesting', req.get_method(), req.get_full_url())

        opener = self.build_opener()
        opener.add_handler(self._cookie_processor)
        try:
            self._response = opener.open(req)
        except HTTPError as e:
            self._response = e

        self.url = self._response.geturl()
        self.path = get_selector(Request(self.url))
        self.data = self._response.read()
        self.status = self._response.code
        self._forms = None
        self.form = None

        return self.get_response()
__init__.py 文件源码 项目:epsg_ident 作者: cmollet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def call_api(self):
        url = 'http://prj2epsg.org/search.json?'
        params = {
                'mode': 'wkt',
                'terms': self.prj
                }
        try:
            req = request.urlopen(url+urlencode(params))
        except request.HTTPError as http_exc:
            logger.warning("""Failed to retrieve data from prj2epsg.org API:\n
                            Status: %s \n
                            Message: %s""" % (http_exc.code, http_exc.msg))
        else:
            raw_resp = req.read()
            try:
                resp = json.loads(raw_resp.decode('utf-8'))
            except json.JSONDecodeError:
                logger.warning('API call succeeded but response\
                        is not JSON: %s' % raw_resp)
            self.process_api_result(resp)
http.py 文件源码 项目:OpenAPI-Python 作者: KKBOX 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _do_post(self, url, data, headers):
        try:  # Python 3
            if data != None:
                data = bytes(data, 'utf-8')
        except:  # Python 2
            pass

        try:
            req = url_request.Request(url=url, data=data, headers=headers)
            req.add_header('User-Agent', KKBOXHTTP.USER_AGENT)
            f = url_request.urlopen(req)
            r = f.read()
        except url_request.HTTPError as e:
            print(e.fp.read())
            raise (e)
        except Exception as e:
            raise (e)

        try:  # Python 3
            r = str(r, 'utf-8')
        except:  # Python 2
            pass

        json_object = json.loads(r)
        return json_object
setup.py 文件源码 项目:iterfzf 作者: dahlia 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def download_fzf_binary(plat, arch, overwrite=False, access_token=None):
    bin_path = fzf_windows_bin_path if plat == 'windows' else fzf_bin_path
    if overwrite or not os.path.isfile(bin_path):
        asset = get_fzf_binary_url(plat, arch, access_token)
        url, ext = asset
        if access_token:
            url = '{0}?access_token={1}'.format(url, access_token)
        try:
            r = urllib2.urlopen(url)
        except urllib2.HTTPError as e:
            if e.code == 403 and e.info().get('X-RateLimit-Remaining') == 0:
                raise RuntimeError(
                    'GitHub rate limit reached. To increate the limit use '
                    '-g/--github-access-token option.\n  ' + str(e)
                )
            elif e.code == 401 and access_token:
                raise RuntimeError('Invalid GitHub access token.')
            raise
        extract(r, ext, bin_path)
        r.close()
    mode = os.stat(bin_path).st_mode
    if not (mode & 0o111):
        os.chmod(bin_path, mode | 0o111)
controller.py 文件源码 项目:boltkit 作者: neo4j-contrib 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def install(self, version, core_count, read_replica_count, initial_port, password, verbose=False):
        try:
            package = _create_controller().download("enterprise", version, self.path, verbose=verbose)
            port_gen = count(initial_port)

            initial_discovery_members = self._install_cores(self.path, package, core_count, port_gen)
            self._install_read_replicas(self.path, package, initial_discovery_members, read_replica_count, port_gen)
            self._set_initial_password(password)

            return realpath(self.path)

        except HTTPError as error:
            if error.code == 401:
                raise RuntimeError("Missing or incorrect authorization")
            elif error.code == 403:
                raise RuntimeError("Could not download package from %s (403 Forbidden)" % error.url)
            else:
                raise
core_parse.py 文件源码 项目:neo 作者: Sausky 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_location(url):
    try:
        response = request.urlopen(url)
        # urllib will follow redirections and it's too much code to tell urllib
        # not to do that
        return response.geturl()
    except socket.timeout:
        print('request timeout')
        exit()
    except request.HTTPError as e:
        print(e.code)
    except request.URLError as e:
        print(e.reason)
        exit()

    return "fail"
common.py 文件源码 项目:neo 作者: Sausky 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_location(url):
    try:
        response = request.urlopen(url)
        # urllib will follow redirections and it's too much code to tell urllib
        # not to do that
        return response.geturl()
    except socket.timeout:
        print('request timeout')
        exit()
    except request.HTTPError as e:
        print(e.code)
    except request.URLError as e:
        print(e.reason)
        exit()

    return "fail"
ves_app.py 文件源码 项目:barometer 作者: opnfv 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def send_data(self, event):
        """Send event to VES"""
        server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
            's' if self._app_config['UseHttps'] else '',
            self._app_config['Domain'], int(self._app_config['Port']),
            '{}'.format('/{}'.format(self._app_config['Path']) if len(
                self._app_config['Path']) > 0 else ''),
            int(self._app_config['ApiVersion']), '{}'.format(
                '/{}'.format(self._app_config['Topic']) if len(
                    self._app_config['Topic']) > 0 else ''))
        logging.info('Vendor Event Listener is at: {}'.format(server_url))
        credentials = base64.b64encode('{}:{}'.format(
            self._app_config['Username'],
            self._app_config['Password']).encode()).decode()
        logging.info('Authentication credentials are: {}'.format(credentials))
        try:
            request = url.Request(server_url)
            request.add_header('Authorization', 'Basic {}'.format(credentials))
            request.add_header('Content-Type', 'application/json')
            event_str = json.dumps(event).encode()
            logging.debug("Sending {} to {}".format(event_str, server_url))
            url.urlopen(request, event_str, timeout=1)
            logging.debug("Sent data to {} successfully".format(server_url))
        except url.HTTPError as e:
            logging.error('Vendor Event Listener exception: {}'.format(e))
        except url.URLError as e:
            logging.error(
                'Vendor Event Listener is is not reachable: {}'.format(e))
        except Exception as e:
            logging.error('Vendor Event Listener error: {}'.format(e))
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def request(self, host, handler, request_body, verbose=0):
        """Send XMLRPC request"""
        uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
                                                  host=host, handler=handler)

        if self._passmgr:
            self._passmgr.add_password(None, uri, self._username,
                                       self._password)
        if self.verbose:
            _LOGGER.debug("FabricTransport: {0}".format(uri))

        opener = urllib2.build_opener(*self._handlers)

        headers = {
            'Content-Type': 'text/xml',
            'User-Agent': self.user_agent,
        }
        req = urllib2.Request(uri, request_body, headers=headers)

        try:
            return self.parse_response(opener.open(req))
        except (urllib2.URLError, urllib2.HTTPError) as exc:
            try:
                code = -1
                if exc.code == 400:
                    reason = 'Permission denied'
                    code = exc.code
                else:
                    reason = exc.reason
                msg = "{reason} ({code})".format(reason=reason, code=code)
            except AttributeError:
                if 'SSL' in str(exc):
                    msg = "SSL error"
                else:
                    msg = str(exc)
            raise InterfaceError("Connection with Fabric failed: " + msg)
        except BadStatusLine:
            raise InterfaceError("Connection with Fabric failed: check SSL")
transport.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def request(self, url, method="GET", body=None, headers={}):
        req = urllib2.Request(url, body, headers)
        try:
            f = self.request_opener(req, timeout=self._timeout)
            return f.info(), f.read()
        except urllib2.HTTPError as f:
            if f.code != 500:
                raise
            return f.info(), f.read()
filelister.py 文件源码 项目:crypto-detector 作者: Wind-River 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def download_file(url, download_directory):
        """Download a remote file

        Args:
            download_directory: (string)

        Returns:
            (string) that path of the file that was just downloaded. If something failed during
                download, return None

        Raises:
            DownloadError
        """
        Output.print_information("Downloading " + url + " ...")

        parsed_url = urlparse(url)
        if parsed_url.path in ["/", ""]:
            file_name = parsed_url.netloc
        else:
            file_name = parsed_url.path.split("/")[-1]
        download_path = abspath(join(download_directory, file_name))

        try:
            with open(download_path, 'wb') as file_object:
                file_object.write(urlopen(url).read())
                return download_path

        except HTTPError as expn:
            raise DownloadError("HTTP error code " + str(expn.code) + " while retrieving " \
             + url + "\n" + str(expn.reason))
        except URLError as expn:
            raise DownloadError("HTTP URL error while retrieving " + url + "\n" + str(expn.reason))
        except Exception as expn:
            raise DownloadError("Unable to retrieve " + url + "\n" + str(expn))
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def request(self, host, handler, request_body, verbose=0):
        """Send XMLRPC request"""
        uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme,
                                                  host=host, handler=handler)

        if self._passmgr:
            self._passmgr.add_password(None, uri, self._username,
                                       self._password)
        if self.verbose:
            _LOGGER.debug("FabricTransport: {0}".format(uri))

        opener = urllib2.build_opener(*self._handlers)

        headers = {
            'Content-Type': 'text/xml',
            'User-Agent': self.user_agent,
        }
        req = urllib2.Request(uri, request_body, headers=headers)

        try:
            return self.parse_response(opener.open(req))
        except (urllib2.URLError, urllib2.HTTPError) as exc:
            try:
                code = -1
                if exc.code == 400:
                    reason = 'Permission denied'
                    code = exc.code
                else:
                    reason = exc.reason
                msg = "{reason} ({code})".format(reason=reason, code=code)
            except AttributeError:
                if 'SSL' in str(exc):
                    msg = "SSL error"
                else:
                    msg = str(exc)
            raise InterfaceError("Connection with Fabric failed: " + msg)
        except BadStatusLine:
            raise InterfaceError("Connection with Fabric failed: check SSL")
transport.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def request(self, url, method="GET", body=None, headers={}):
        req = urllib2.Request(url, body, headers)
        try:
            f = self.request_opener(req, timeout=self._timeout)
            return f.info(), f.read()
        except urllib2.HTTPError as f:
            if f.code != 500:
                raise
            return f.info(), f.read()
transport.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def request(self, url, method="GET", body=None, headers={}):
        req = urllib2.Request(url, body, headers)
        try:
            f = self.request_opener(req, timeout=self._timeout)
            return f.info(), f.read()
        except urllib2.HTTPError as f:
            if f.code != 500:
                raise
            return f.info(), f.read()
github.py 文件源码 项目:QCrash 作者: ColinDuquesnoy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _http(self, _method, _path, **kw):
        data = None
        params = None
        if _method=='GET' and kw:
            _path = '%s?%s' % (_path, _encode_params(kw))
        if _method in ['POST', 'PATCH', 'PUT']:
            data = bytes(_encode_json(kw), 'utf-8')
        url = '%s%s' % (_URL, _path)
        opener = build_opener(HTTPSHandler)
        request = Request(url, data=data)
        request.get_method = _METHOD_MAP[_method]
        if self._authorization:
            request.add_header('Authorization', self._authorization)
        if _method in ['POST', 'PATCH', 'PUT']:
            request.add_header('Content-Type', 'application/x-www-form-urlencoded')
        try:
            response = opener.open(request, timeout=TIMEOUT)
            is_json = self._process_resp(response.headers)
            if is_json:
                return _parse_json(response.read().decode('utf-8'))
        except HTTPError as e:
            is_json = self._process_resp(e.headers)
            if is_json:
                json = _parse_json(e.read().decode('utf-8'))
            else:
                json = e.read().decode('utf-8')
            req = JsonObject(method=_method, url=url)
            resp = JsonObject(code=e.code, json=json)
            if resp.code==404:
                raise ApiNotFoundError(url, req, resp)
            raise ApiError(url, req, resp)
request.py 文件源码 项目:github-notifications 作者: unknownuser88 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def api_request_native(url, data=None, token=None, https_proxy=None, method=None):
    request = urllib.Request(url)
    # print('API request url:', request.get_full_url())
    if method:
        request.get_method = lambda: method
    token = token if token != None else token_auth_string()
    request.add_header('Authorization', 'token ' + token)
    request.add_header('Accept', 'application/json')
    request.add_header('Content-Type', 'application/json')

    if data is not None:
        request.add_data(bytes(data.encode('utf8')))

    # print('API request data:', request.get_data())
    # print('API request header:', request.header_items())
    # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy')
    # if https_proxy:
    #     opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(),
    #                                  urllib.ProxyHandler({'https': https_proxy}))

    #     urllib.install_opener(opener)

    try:
        with contextlib.closing(urllib.urlopen(request)) as response:
            if response.code == 204:  # No Content
                return None
            else:
                return json.loads(response.read().decode('utf8', 'ignore'))

    except urllib.HTTPError as err:
        with contextlib.closing(err):
            raise SimpleHTTPError(err.code, err.read())
lambda_utils.py 文件源码 项目:aws-cidr-finder 作者: awslabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_response(event, context, response_status, reason=None, response_data={}):
    body = {
        "Status": response_status,
        "PhysicalResourceId": context.log_stream_name,
        "StackId": event["StackId"],
        "RequestId": event["RequestId"],
        "LogicalResourceId": event["LogicalResourceId"],
    }

    print("Responding: {}".format(response_status))

    if reason:
        print(reason)
        body["Reason"] = reason

    if response_data:
        print(response_data)
        body["Data"] = response_data

    body = json.dumps(body).encode("utf-8")

    req = Request(event["ResponseURL"], data=body, headers={
        "Content-Length": len(body),
        "Content-Type": "",
    })
    req.get_method = lambda: "PUT"

    try:
        urlopen(req)
        return True
    except HTTPError as e:
        print("Failed executing HTTP request: {}".format(e.code))
        return False
    except URLError as e:
        print("Failed to reach the server: {}".format(e.reason))
        return False
GoProHero.py 文件源码 项目:GPArrayController 作者: eastpiger 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test(self, url, toHex=True):
        try:
            url = 'http://{}/{}'.format(self._ip, url)

            print(url)
            response = urlopen(
                url, timeout=self.timeout).read()

            if toHex:
                response = response.encode('hex')

            print(response)
        except (HTTPError, URLError, socket.timeout) as e:
            print(e)
transport.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def request(self, url, method="GET", body=None, headers={}):
        req = urllib2.Request(url, body, headers)
        try:
            f = self.request_opener(req, timeout=self._timeout)
            return f.info(), f.read()
        except urllib2.HTTPError as f:
            if f.code != 500:
                raise
            return f.info(), f.read()
speedtest_cli.py 文件源码 项目:automated-speedtests 作者: meowimacow 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def catch_request(request):
    """Helper function to catch common exceptions encountered when
    establishing a connection with a HTTP/HTTPS request

    """

    try:
        uh = urlopen(request)
        return uh, False
    except (HTTPError, URLError, socket.error):
        e = sys.exc_info()[1]
        return None, e
speedtest_cli.py 文件源码 项目:automated-speedtests 作者: meowimacow 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getBestServer(servers):
    """Perform a speedtest.net latency request to determine which
    speedtest.net server has the lowest latency
    """

    results = {}
    for server in servers:
        cum = []
        url = '%s/latency.txt' % os.path.dirname(server['url'])
        urlparts = urlparse(url)
        for i in range(0, 3):
            try:
                if urlparts[0] == 'https':
                    h = HTTPSConnection(urlparts[1])
                else:
                    h = HTTPConnection(urlparts[1])
                headers = {'User-Agent': user_agent}
                start = timeit.default_timer()
                h.request("GET", urlparts[2], headers=headers)
                r = h.getresponse()
                total = (timeit.default_timer() - start)
            except (HTTPError, URLError, socket.error):
                cum.append(3600)
                continue
            text = r.read(9)
            if int(r.status) == 200 and text == 'test=test'.encode():
                cum.append(total)
            else:
                cum.append(3600)
            h.close()
        avg = round((sum(cum) / 6) * 1000, 3)
        results[avg] = server
    fastest = sorted(results.keys())[0]
    best = results[fastest]
    best['latency'] = fastest

    return best
speedtest_cli.py 文件源码 项目:autoscript 作者: blazevpn 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getBestServer(servers):
    """Perform a speedtest.net latency request to determine which
    speedtest.net server has the lowest latency
    """

    results = {}
    for server in servers:
        cum = []
        url = '%s/latency.txt' % os.path.dirname(server['url'])
        urlparts = urlparse(url)
        for i in range(0, 3):
            try:
                if urlparts[0] == 'https':
                    h = HTTPSConnection(urlparts[1])
                else:
                    h = HTTPConnection(urlparts[1])
                start = timeit.default_timer()
                h.request("GET", urlparts[2])
                r = h.getresponse()
                total = (timeit.default_timer() - start)
            except (HTTPError, URLError, socket.error):
                cum.append(3600)
                continue
            text = r.read(9)
            if int(r.status) == 200 and text == 'test=test'.encode():
                cum.append(total)
            else:
                cum.append(3600)
            h.close()
        avg = round((sum(cum) / 6) * 1000, 3)
        results[avg] = server
    fastest = sorted(results.keys())[0]
    best = results[fastest]
    best['latency'] = fastest

    return best
transport.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def request(self, url, method="GET", body=None, headers={}):
        req = urllib2.Request(url, body, headers)
        try:
            f = self.request_opener(req, timeout=self._timeout)
            return f.info(), f.read()
        except urllib2.HTTPError as f:
            if f.code != 500:
                raise
            return f.info(), f.read()
speedtest.py 文件源码 项目:vsi_common 作者: VisionSystemsInc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def catch_request(request):
  """Helper function to catch common exceptions encountered when
  establishing a connection with a HTTP/HTTPS request

  """

  try:
    uh = urlopen(request)
    return uh
  except (HTTPError, URLError, socket.error):
    return False
speedtest.py 文件源码 项目:vsi_common 作者: VisionSystemsInc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getBestServer(servers):
  """Perform a speedtest.net latency request to determine which
  speedtest.net server has the lowest latency
  """

  results = {}
  for server in servers:
    cum = []
    url = '%s/latency.txt' % os.path.dirname(server['url'])
    urlparts = urlparse(url)
    for i in range(0, 3):
      try:
        if urlparts[0] == 'https':
          h = HTTPSConnection(urlparts[1])
        else:
          h = HTTPConnection(urlparts[1])
        headers = {'User-Agent': user_agent}
        start = timeit.default_timer()
        h.request("GET", urlparts[2], headers=headers)
        r = h.getresponse()
        total = (timeit.default_timer() - start)
      except (HTTPError, URLError, socket.error):
        cum.append(3600)
        continue
      text = r.read(9)
      if int(r.status) == 200 and text == 'test=test'.encode():
        cum.append(total)
      else:
        cum.append(3600)
      h.close()
    avg = round((sum(cum) / 6) * 1000, 3)
    results[avg] = server
  fastest = sorted(results.keys())[0]
  best = results[fastest]
  best['latency'] = fastest

  return best
blockchain_info.py 文件源码 项目:docforever 作者: Udala 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send_tx(tx):
    s = io.BytesIO()
    tx.stream(s)
    tx_as_hex = b2h(s.getvalue())
    data = urlencode(dict(tx=tx_as_hex)).encode("utf8")
    URL = "http://blockchain.info/pushtx"
    try:
        d = urlopen(URL, data=data).read()
        return d
    except HTTPError as ex:
        d = ex.read()
        print(ex)


问题


面经


文章

微信
公众号

扫码关注公众号