python类HTTPError()的实例源码

grafana-config-copy.py 文件源码 项目:tidb-ansible 作者: pingcap 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def import_dashboard_via_user_pass(api_url, user, password, dashboard):
    payload = {'dashboard': dashboard,
               'overwrite': True}
    auth_string = base64.b64encode('%s:%s' % (user, password))
    headers = {'Authorization': "Basic {}".format(auth_string),
               'Content-Type': 'application/json'}
    req = urllib2.Request(api_url + 'api/dashboards/db',
                          headers=headers,
                          data=json.dumps(payload))
    try:
        resp = urllib2.urlopen(req)
        data = json.load(resp)
        return data
    except urllib2.HTTPError, error:
        data = json.load(error)
        return data
rest.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get(self, url, proxy=None):
        if proxy:
            proxy = urllib2.ProxyHandler({'http': proxy})
            opener = urllib2.build_opener(proxy)
            urllib2.install_opener(opener)

        try:
            response = urllib2.urlopen(url)
        except HTTPError, e:
            resp = e.read()
            self.status_code = e.code
        except URLError, e:
            resp = e.read()
            self.status_code = e.code
        else:
            self.status_code = response.code
            resp = response.read()

        return resp
hippo.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run(self):
        data = self.getData()

        value = {
            data: {
                "type": self.data_type
            }
        }
        json_data = json.dumps(value)
        post_data = json_data.encode('utf-8')
        headers = {'Content-Type': 'application/json'}

        try:
            request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
            response = urllib2.urlopen(request)
            report = json.loads(response.read())

            self.report(report)
        except urllib2.HTTPError:
            self.error("Hippocampe: " + str(sys.exc_info()[1]))
        except urllib2.URLError:
            self.error("Hippocampe: service is not available")
        except Exception as e:
            self.unexpectedError(e)
tools.py 文件源码 项目:mongoaudit 作者: Exploit-install 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def send_result(email, result, title, urn):
    """
    Args:
        email (str): address to send the results
        result (obj): results to send
        title (str):
        urn (str): uniform resource name
    Returns:
        str: response from endpoint
    """
    url = 'https://mongoaud.it/results'
    headers = {'Content-type': 'application/json',
               'Accept': 'application/json'}
    values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()}
    try:
        req = urllib2.Request(url, json.dumps(values), headers)
        response = urllib2.urlopen(req)
        return response.read()
    except (urllib2.HTTPError, urllib2.URLError) as exc:
        return "Sadly enough, we are having technical difficulties at the moment, " \
               "please try again later.\n\n%s" % str(exc)
tools.py 文件源码 项目:mongoaudit 作者: Exploit-install 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def check_version(version):
    # if application is binary then check for latest version
    if getattr(sys, 'frozen', False):
        try:
            url = "https://api.github.com/repos/stampery/mongoaudit/releases/latest"
            req = urllib2.urlopen(url)
            releases = json.loads(req.read())
            latest = releases["tag_name"]
            if version < latest:
                print("mongoaudit version " + version)
                print("There's a new version " + latest)
                _upgrade(releases)

        except (urllib2.HTTPError, urllib2.URLError):
            print("Couldn't check for upgrades")
        except os.error:
            print("Couldn't write mongoaudit binary")
get_all_songs.py 文件源码 项目:encore.ai 作者: dyelax 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def download_songs(url):
  time.sleep(random.random() * 0.5)
  try:
    page = urllib2.urlopen(url).read()
    soup = BeautifulSoup(page, 'html.parser')

    # Get the artist name
    artist_name = soup.findAll('h1')[0].get_text()[:-7].lower().replace(' ', '_')

    # Store all songs for a given artist
    with open('artist_data/'+artist_name+'.txt', 'wb') as w:
      for song in soup.findAll('a', {'target': '_blank'}):
        if 'lyrics/' in song['href']:
          song_url = song['href'][1:].strip()
          w.write(song_url + '\n')
  except urllib2.HTTPError:
    print '404 not found'
wpforce.py 文件源码 项目:WPForce 作者: n00py 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def TestSite(url):
    protocheck(url)
    print "Trying: " + url
    try:
        urllib2.urlopen(url, timeout=3)
    except urllib2.HTTPError, e:
        if e.code == 405:
            print url + " found!"
            print "Now the brute force will begin!  >:)"
        if e.code == 404:
            printout(str(e), YELLOW)
            print " - XMLRPC has been moved, removed, or blocked"
            sys.exit()
    except urllib2.URLError, g:
        printout("Could not identify XMLRPC.  Please verify the domain.\n", YELLOW)
        sys.exit()
    except socket.timeout as e:
        print type(e)
        printout("The socket timed out, try it again.", YELLOW)
        sys.exit()
sample.py 文件源码 项目:yelp-fusion 作者: Yelp 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
                        type=str, help='Search term (default: %(default)s)')
    parser.add_argument('-l', '--location', dest='location',
                        default=DEFAULT_LOCATION, type=str,
                        help='Search location (default: %(default)s)')

    input_values = parser.parse_args()

    try:
        query_api(input_values.term, input_values.location)
    except HTTPError as error:
        sys.exit(
            'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format(
                error.code,
                error.url,
                error.read(),
            )
        )
trakt.py 文件源码 项目:plex-trakt-scrobbler 作者: cristianmiranda 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _do_trakt_auth_post(self, url, data):
        try:
            session = self.get_session()

            headers = {
                'Content-Type': 'application/json',
                'Authorization': 'Bearer ' + session,
                'trakt-api-version': '2',
                'trakt-api-key': self.CLIENT_ID
            }

            # timeout in seconds
            timeout = 5
            socket.setdefaulttimeout(timeout)

            request = urllib2.Request(url, data, headers)
            response = urllib2.urlopen(request).read()

            self.logger.info('Response: {0}'.format(response))
            return response
        except urllib2.HTTPError as e:
            self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason))
            raise
darkMySQLi.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def GetThatShit(head_URL):
        source = ""
        global gets;global proxy_num
        head_URL = head_URL.replace("+",arg_eva)
        request_web = urllib2.Request(head_URL)
        request_web.add_header('User-Agent',agent)
        while len(source) < 1:
                if arg_debug == "on":
                        print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                try:
                        gets+=1;proxy_num+=1
                        source = proxy_list[proxy_num % proxy_len].open(request_web).read()
                except (KeyboardInterrupt, SystemExit):
                        raise
                except (urllib2.HTTPError):
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        break
                except:
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        raise
        return source

#the guts and glory - Binary Algorithim that does all the guessing for the Blind Methodology
inside_pro_dl.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def getFile(link): 
    try: 
        source = urllib2.urlopen(link) 
    except(urllib2.HTTPError),msg: 
        print "\nError:",msg 
        sys.exit() 
    num = 1 
    file = 'tmp_insidepropw_'+link.split('=')[1]+'.txt' 
    while os.path.isfile(file) == True: 
        file = link.rsplit("/",1)[1]+"."+str(num) 
        num+=1 
    try: 
        shutil.copyfileobj(source, open(file, "w+")) 
    except(IOError): 
        print "\nCannot write to `"+file+"' (Permission denied)." 
        sys.exit(1) 
    print "File downloaded", file 
    newfilelist.append(file)
darkPGSQLi.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def GetThatShit(head_URL):
        source = ""
        global gets;global proxy_num
        head_URL = head_URL.replace("+",arg_eva)
        request_web = urllib2.Request(head_URL)
        request_web.add_header('User-Agent',agent)
        while len(source) < 1:
                if arg_debug == "on":
                        print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                try:
                        gets+=1;proxy_num+=1
                        source = proxy_list[proxy_num % proxy_len].open(request_web).read()
                except (KeyboardInterrupt, SystemExit):
                        raise
                except (urllib2.HTTPError):
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        break
                except:
                        print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!"
                        print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
                        raise
        return source

#say hello
linksysbrute.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        password = getword()
        try:
            print "-"*12
            print "User:",username,"Password:",password
            req = urllib2.Request(sys.argv[1])
            passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
            passman.add_password(None, sys.argv[1], username, password)
            authhandler = urllib2.HTTPBasicAuthHandler(passman)
            opener = urllib2.build_opener(authhandler)
            fd = opener.open(req)
            print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n"            
            print "[+] Retrieved", fd.geturl()
            info = fd.info()
            for key, value in info.items():
                    print "%s = %s" % (key, value)
            sys.exit(2)
        except (urllib2.HTTPError,socket.error):
            pass
webauthbrute.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def run(self):
        username, password = getword()
        try:
            print "-"*12
            print "User:",username,"Password:",password
            req = urllib2.Request(sys.argv[1])
            passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
            passman.add_password(None, sys.argv[1], username, password)
            authhandler = urllib2.HTTPBasicAuthHandler(passman)
            opener = urllib2.build_opener(authhandler)
            fd = opener.open(req)
            print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n"           
            print "Retrieved", fd.geturl()
            info = fd.info()
            for key, value in info.items():
                    print "%s = %s" % (key, value)
            sys.exit(2)
        except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg: 
            print "An error occurred:", msg
            pass
google.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def getCookie(self):
        """
        This method is the first to be called when initializing a
        Google dorking object through this library. It is used to
        retrieve the Google session cookie needed to perform the
        further search
        """

        try:
            conn = self.opener.open("http://www.google.com/ncr")
            headers = conn.info()
        except urllib2.HTTPError, e:
            headers = e.info()
        except urllib2.URLError, e:
            errMsg = "unable to connect to Google"
            raise sqlmapConnectionException, errMsg
google.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def search(self, googleDork):
        """
        This method performs the effective search on Google providing
        the google dork and the Google session cookie
        """

        if not googleDork:
            return None

        url  = "http://www.google.com/search?"
        url += "q=%s&" % urlencode(googleDork)
        url += "num=100&hl=en&safe=off&filter=0&btnG=Search"

        try:
            conn = self.opener.open(url)
            page = conn.read()
        except urllib2.HTTPError, e:
            page = e.read()
        except urllib2.URLError, e:
            errMsg = "unable to connect to Google"
            raise sqlmapConnectionException, errMsg

        self.__matches = self.__parsePage(page)

        return self.__matches
couchbase.py 文件源码 项目:collectd-couchbase 作者: signalfx 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _api_call(url, opener):
    """
    Makes a REST call against the Couchbase API.
    Args:
    url (str): The URL to get, including endpoint
    Returns:
    list: The JSON response
    """
    try:
        urllib2.install_opener(opener)
        resp = urllib2.urlopen(url, timeout=http_timeout)
    except (urllib2.HTTPError, urllib2.URLError) as e:
        collectd.error("Error making API call (%s) %s" % (e, url))
        return None
    try:
        return json.load(resp)
    except ValueError, e:
        collectd.error("Error parsing JSON for API call (%s) %s" % (e, url))
        return None
woxikon_de_lookup.py 文件源码 项目:thesaurus_query.vim 作者: Ron89 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _woxikon_de_url_handler(target):
    '''
    Query woxikon for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(unescape(decode_utf_8(response.read())))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # other error
    except socket.timeout:  # timeout error failed to be captured by URLError
        return 1
    return web_content
jeck_ru_lookup.py 文件源码 项目:thesaurus_query.vim 作者: Ron89 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _jeck_ru_url_handler(target):
    '''
    Query jiport for sysnonym
    '''
    time_out_choice = float(get_variable(
        'tq_online_backends_timeout', _timeout_period_default))
    try:
        response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
        web_content = StringIO(decode_utf_8(response.read()))
        response.close()
    except HTTPError:
        return 1
    except URLError as err:
        if isinstance(err.reason, socket.timeout):  # timeout error?
            return 1
        return -1   # any other error
    except socket.timeout:  # if timeout error not captured by URLError
        return 1
    return web_content
influx.py 文件源码 项目:weewx-influx 作者: matthewwall 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def post_request(self, request, payload=None):
        # FIXME: provide full set of ssl options instead of this hack
        if self.server_url.startswith('https'):
            import ssl
            return urllib2.urlopen(request, data=payload, timeout=self.timeout,
                                   context=ssl._create_unverified_context())
        return urllib2.urlopen(request, data=payload, timeout=self.timeout)

#    def post_request(self, request, payload=None):  # @UnusedVariable
#        try:
#            try:
#                _response = urllib2.urlopen(request, timeout=self.timeout)
#            except TypeError:
#                _response = urllib2.urlopen(request)
#        except urllib2.HTTPError, e:
#            logerr("post failed: %s" % e)
#            raise weewx.restx.FailedPost(e)
#        else:
#            return _response
struts2_hunt.py 文件源码 项目:struts2_check 作者: coffeehb 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def gethtml(url):
    try:
        request = urllib2.Request(url)
        request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0')
        request.add_header('Accept-Language', 'en-us;q=0.5,en;q=0.3')
        request.add_header('Referer', request.get_full_url())
        u = urllib2.urlopen(request , timeout = 3)
        content = u.read()
        try:
            content = content.encode("utf-8")
        except:
            content = content.decode('gbk','ignore').encode("utf-8",'ignore')
        return {"html":content,"code":u.code,"url":u.geturl()}
    except urllib2.HTTPError,e:
        try:
            return {"html":e.read(),"code":e.code,"url":e.geturl()}
        except:
            return {"html":'',"code":e.code,"url":e.geturl()}
    except:
        return {"html":"","code":404, "url":url}
__init__.py 文件源码 项目:kcli 作者: karmab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def symlinks(user, repo):
    mappings = []
    url1 = 'https://api.github.com/repos/%s/%s/git/refs/heads/master' % (user, repo)
    try:
        r = urllib2.urlopen(url1)
    except urllib2.HTTPError:
        print("Invalid url %s.Leaving..." % url1)
        sys.exit(1)
    base = json.load(r)
    sha = base['object']['sha']
    url2 = 'https://api.github.com/repos/%s/%s/git/trees/%s?recursive=1' % (user, repo, sha)
    r = urllib2.urlopen(url2)
    try:
        base = json.load(r)
    except:
        return []
    for e in base['tree']:
        if e['mode'] == '120000':
            mappings.append(e['path'])
    return mappings
librus.py 文件源码 项目:Botek-Librus 作者: mRokita 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __login(self):
        """Funkcja wykonuj?ca logowanie do librusa"""
        # Odebranie ciasteczek
        self.__opener.addheaders = [('Authorization', 'Basic MzU6NjM2YWI0MThjY2JlODgyYjE5YTMzZjU3N2U5NGNiNGY=')]

        try:
            self.__opener.open('https://synergia.librus.pl')
            list(self.__cj)[0].domain='api.librus.pl'
            tokens = loads(self.__opener.open('https://api.librus.pl/OAuth/Token',
                                              data=urlencode({
                                                  'grant_type': 'password',
                                                  'username': config.login,
                                                  'password': config.password,
                                                  'librus_long_term_token': '1',
                                              })).read())

        except urllib2.HTTPError as e:
            e.getcode() == 400
            raise WrongPasswordError('Nieprawid?owe has?o')
        self.__opener.addheaders = [('Authorization', 'Bearer %s' % tokens['access_token'])]
librus.py 文件源码 项目:Botek-Librus 作者: mRokita 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_announcements(self):
        """
        Funkcja pobieraj?ca dane ze strony https://librus.synergia.pl/ogloszenia
        :returns: :return: lista [{"author": autor,
                         "title": tytu?,
                         "time": czas,
                         "content": zawarto??}]
        """
        # Za?adowanie og?osze?
        try:
            data = loads(self.__opener.open('https://api.librus.pl/2.0/SchoolNotices').read())
        except urllib2.HTTPError:
            raise SessionExpiredError
        print data
        return [{'author': notice[u'AddedBy'][u'Id'],
                 'title': notice[u'Subject'].encode('utf-8'),
                 'content': notice[u'Content'].encode('utf-8'),
                 'time': notice[u'StartDate']
                 } for notice in data[u'SchoolNotices']]
basic_auth.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def basic_auth(server="http://127.0.0.1"):
    """
    to use basic login with a different server
    from gluon.contrib.login_methods.basic_auth import basic_auth
    auth.settings.login_methods.append(basic_auth('http://server'))
    """

    def basic_login_aux(username,
                        password,
                        server=server):
        key = base64.b64encode(username + ':' + password)
        headers = {'Authorization': 'Basic ' + key}
        request = urllib2.Request(server, None, headers)
        try:
            urllib2.urlopen(request)
            return True
        except (urllib2.URLError, urllib2.HTTPError):
            return False
    return basic_login_aux
getServerSecretShare.py 文件源码 项目:incubator-milagro-mfa-server 作者: apache 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_server_secret(credentials, expires):
    """ Fetch server secret from CertiVox server """
    path = 'serverSecret'
    params = urllib.urlencode({
        'app_id': credentials['app_id'],
        'expires': expires,
        'signature': sign_message(
            '{}{}{}'.format(path, credentials['app_id'], expires),
            str(credentials['app_key'])
        )
    })

    try:
        response = urllib2.urlopen('{api_url}{end_point}?{params}'.format(
            api_url=credentials['api_url'],
            end_point=path,
            params=params,
        ))
    except urllib2.HTTPError as e:
        if e.code == 408:
            print "Make sure your time it correct!"
        raise ScriptException('Response code: {} - {}'.format(e.code, e.read()))

    data = json.loads(response.read())
    return data['serverSecret']
db_to_gtfs.py 文件源码 项目:db-api-to-gtfs 作者: patrickbr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def fetch_json(self, url):
        """Fetch remote json"""

        timeout = 1
        while True:
            try:
                logging.debug('Opening %s.', url)
                response = urllib2.urlopen(url)
                break
            except urllib2.HTTPError as err:
                if timeout <= MAX_TIMEOUT:
                    logging.warn('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason)
                    logging.warn('Waiting for %ds before retrying.', timeout)
                    time.sleep(timeout)
                    timeout *= 2
                else:
                    logging.error('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason)
                    raise err

        data = json.load(response)

        return data
SwarfarmLogger.py 文件源码 项目:SWProxy-plugins 作者: lstern 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self):
        super(SwarfarmLogger, self).__init__()
        self.plugin_enabled = True

        config_name = 'swproxy.config'
        if not os.path.exists(config_name):
            self.config = {}
        else:
            with open(config_name) as f:
                self.config = json.load(f)

        self.plugin_enabled = not self.config.get('disable_swarfarm_logger', False)

        if self.plugin_enabled:
            # Get the list of accepted commands from the server
            logger.info('SwarfarmLogger - Retrieving list of accepted log types from SWARFARM...')
            try:
                resp = urllib2.urlopen(self.commands_url)
                self.accepted_commands = json.loads(resp.readline())
                resp.close()
                logger.info('SwarfarmLogger - Looking for the following commands to log:\r\n' + ', '.join(self.accepted_commands.keys()))
            except urllib2.HTTPError:
                logger.fatal('SwarfarmLogger - Unable to retrieve accepted log types. SWARFARM logging is disabled.')
                self.plugin_enabled = False
SwarfarmLogger.py 文件源码 项目:SWProxy-plugins 作者: lstern 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def process_data(self, req_json, resp_json):
        command = req_json.get('command')

        if command in self.accepted_commands:
            accepted_data = self.accepted_commands[command]
            result_data = {}

            if 'request' in accepted_data:
                result_data['request'] = {item: req_json.get(item) for item in accepted_data['request']}

            if 'response' in accepted_data:
                result_data['response'] = {item: resp_json.get(item) for item in accepted_data['response']}

            if result_data:
                data = json.dumps(result_data)
                try:
                    resp = urllib2.urlopen(self.log_url, data=urllib.urlencode({'data': data}))
                except urllib2.HTTPError as e:
                    logger.warn('SwarfarmLogger - Error: {}'.format(e.readline()))
                else:
                    resp.close()
                    logger.info('SwarfarmLogger - {} logged successfully'.format(command))
post_json.py 文件源码 项目:lain 作者: laincloud 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def main():
    module = AnsibleModule(
        argument_spec=dict(
            url=dict(required=True),
            body=dict(required=True),
            header=dict(required=False),
        )
    )

    url = module.params['url']
    body = module.params['body']
    header = module.params['header']

    req = Request(url)
    req.add_header('Content-Type', 'application/json')
    if header:
        for k, v in header.iteritems():
            req.add_header(k, v)

    try:
        urlopen(req, json.dumps(body))
    except HTTPError as e:
        module.fail_json(msg=e.reason, code=e.code, response=e.read())
    else:
        module.exit_json(changed=True)


问题


面经


文章

微信
公众号

扫码关注公众号