python类url()的实例源码

microurl.py 文件源码 项目:microurl 作者: francium 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def route_micro(micro):
    '''
        Micro to real URL redirection handler.
    '''
    try:
        temp = lookup_micro(micro)

        if urlcheck(temp):
            return redirect(temp)
        elif domaincheck(temp):
            return redirect("http://" + temp)
        elif ipcheck(temp.split(':')[0]) and urlcheck('http://' + temp):
            # checks for plain ip or an ip with something after it
            return redirect("http://" + temp)
        else:
            abort(404)
    except Exception as e:
        # If micro is not registered, handle the exception from trying to look
        # it up and raise a 404 HTTP error.
        sys.stderr.write(str(e))
        abort(404)
web.py 文件源码 项目:MalwrAgent 作者: michaelschratt 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __do_http_request(self, type_, url, data):
        """make http get and post requests"""
        parsed_url = self.__parse_url(url)
        parameter = self.__get_parameter_from_parsed_url(parsed_url)
        hostname = self.__get_host_from_parsed_url(parsed_url)
        url = hostname + parsed_url.path  # url is overwritten
        payload = {
            parameter: data
        }

        if type_ == 'GET':
            request = requests.get(url, payload)
        elif type_ == 'POST':
            request = requests.post(url, payload)
        else:
            request = None

        return self.__validate_request_status(request)
import_data.py 文件源码 项目:find-that-charity 作者: TechforgoodCAST 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def import_extract_main(chars={}, datafile=os.path.join("data", "ccew", "extract_main_charity.csv")):

    with open(datafile, encoding="latin1") as a:
        csvreader = csv.reader(a, doublequote=False, escapechar='\\')
        ccount = 0
        for row in csvreader:
            if len(row) > 1:
                row = clean_row(row)
                if row[1]:
                    chars[row[0]]["company_number"].append({
                        "number": parse_company_number(row[1]),
                        "url": "http://beta.companieshouse.gov.uk/company/" + parse_company_number(row[1]),
                        "source": "ccew"
                    })
                if row[9]:
                    chars[row[0]]["url"] = row[9]
                if row[6]:
                    chars[row[0]]["latest_income"] = int(row[6])
                ccount += 1
                if ccount % 10000 == 0:
                    print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount, end='')
        print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount)

    return chars
import_data.py 文件源码 项目:find-that-charity 作者: TechforgoodCAST 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def clean_chars(chars={}, pc_es=None, es_pc_index="postcode", es_pc_type="postcode"):

    ccount = 0
    for c in chars:
        if pc_es:
            geo_data = fetch_postcode(chars[c]["geo"]["postcode"], pc_es, es_pc_index, es_pc_type)
            if geo_data:
                chars[c]["geo"]["location"] = geo_data[0]
                chars[c]["geo"]["areas"] = geo_data[1]

        chars[c]["url"] = parse_url(chars[c]["url"])
        chars[c]["domain"] = get_domain(chars[c]["url"])
        chars[c]['org-ids'] = add_org_id_prefix(chars[c])

        chars[c]["alt_names"] = [n["name"] for n in chars[c]["names"] if n["name"] != chars[c]["known_as"]]

        # @TODO capitalisation of names

        ccount += 1
        if ccount % 10000 == 0:
            print('\r', "[Geo] %s charites added location details" % ccount, end='')
    print('\r', "[Geo] %s charites added location details" % ccount)

    return chars
BTG.py 文件源码 项目:BTG 作者: conix-security 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def checkType(self, argument):
        """
            Identify observable type
        """
        if validators.url(argument):
            return "URL"
        elif validators.md5(argument):
            return "MD5"
        elif validators.sha1(argument):
            return "SHA1"
        elif validators.sha256(argument):
            return "SHA256"
        elif validators.sha512(argument):
            return "SHA512"
        elif validators.ipv4(argument):
            return "IPv4"
        elif validators.ipv6(argument):
            return "IPv6"
        elif validators.domain(argument):
            return "domain"
        else:
            mod.display("MAIN", argument, "ERROR", "Unable to retrieve observable type")
            return None
functions.py 文件源码 项目:poseidon 作者: sidus-dev 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def do_import_bookmarks(filename):

    content = []
    first = _("Oops, import failed")
    second = _("could be corrupted or a invalid HTML bookmark file")

    with open(filename) as f: l = f.readlines()

    if not re.findall("<!DOCTYPE NETSCAPE-Bookmark-file-1>", l[0], re.IGNORECASE):
        dialog().error(first, "<span size='small'>\"<b>{}</b>\" {}.</span>".format(filename, second))
        return True

    title = re.findall(r'<a[^>]*>(.*?)</a>', str(l), re.IGNORECASE)
    url = re.findall(r'<a[^>]* href="([^"]*)"', str(l), re.IGNORECASE)

    for c, i in enumerate(title):
        if title[c] and url[c]: content.append([title[c]] + [url[c]])

    return content
functions.py 文件源码 项目:poseidon 作者: sidus-dev 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_export_bookmarks(list):

    content = []

    header = "<!DOCTYPE NETSCAPE-Bookmark-file-1><!--This is an automatically generated file.\
    It will be read and overwritten. Do Not Edit! --><META HTTP-EQUIV=\"Content-Type\" CONTENT=\"text/html;\
    charset=UTF-8\"><Title>{}</Title><H1>{}</H1><DL><p>".format(_("Bookmarks"), _("Bookmarks"))
    footer = "</DL><p>"

    content.append(header)

    for i in list:

        timestamp = int(datetime.datetime.strptime(i[0], "%Y-%m-%d %H:%M").timestamp())
        title = i[1]
        url = i[2]

        content.append("<DT><A HREF=\"{}\" ADD_DATE=\"{}\">{}</A>".format(url, timestamp, title))

    content.append(footer)
    content = "".join([s for s in content])

    return content
poseidon.py 文件源码 项目:poseidon 作者: sidus-dev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def on_insert_bookmarks(self, title, url):

        with bookmarks_con:    
            cur = bookmarks_con.cursor()
            cur.execute("SELECT * FROM bookmarks;")
            urls = cur.fetchall()

            if len(urls) != 0:
                for i in urls:
                    if url == i[1]:
                        return True

            cur.execute("INSERT INTO bookmarks VALUES(?, ?, ?);",\
            (title.replace("\n","").strip(), url, time.strftime("%Y-%m-%d %H:%M")))

            self.refresh_liststore(1)

            return True
poseidon.py 文件源码 项目:poseidon 作者: sidus-dev 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_js_switch(self, button, active):

        if not set_enable_javascript: return True

        page = self.tabs[self.current_page][0]
        settings = page.webview.get_settings()

        if button.get_active():
            settings.set_property("enable-javascript", True)
            self.js_label.set_markup(self.jse_label_text)
        else:
            settings.set_property("enable-javascript", False)
            self.js_label.set_markup(self.jsd_label_text)

        page.webview.set_settings(settings)
        url = page.webview.get_uri()
        if url and validators.url(url): page.webview.reload()
poseidon.py 文件源码 项目:poseidon 作者: sidus-dev 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_decide_destination(self, download, name):

        url = download.get_request().get_uri()

        if not name: name = get_domain(url).replace(".", "_")
        if not "." in name:

            mime = download.get_response().get_mime_type()
            suf = mime.split("/")
            name = "{}.{}".format(name, suf[1])

        for i in self.dlview:
            for a in i:
                if type(a) == Gtk.ModelButton:
                    if a.get_name().split("/")[-1] == name:
                        self.downloads_menu.show()
                        return True

        if url: pathchooser().save(name, download, url)
poseidon.py 文件源码 项目:poseidon 作者: sidus-dev 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def dynamic_title(self, view, title):

        url = view.get_uri()

        if not url and not title: title = tab_name
        if not title: title = url

        counter = 0

        for tab, widget in self.tabs:

            widget = self.check_tab(widget, 0)

            if tab.webview is view:
                if widget:
                    widget.set_text(minify(title, 50))
                    widget.set_tooltip_text("")
                    if len(title) > 50: widget.set_tooltip_text(title)

            counter += 1
app_helpers.py 文件源码 项目:uclapi 作者: uclapi 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def is_url_safe(url):
    if not url.startswith("https://"):
        return False

    if not validators.url(url, public=True):
        return False

    whitelist_urls = os.environ["WHITELISTED_CALLBACK_URLS"].split(';')
    if url in whitelist_urls:
        return True

    forbidden_urls = os.environ["FORBIDDEN_CALLBACK_URLS"].split(';')
    for furl in forbidden_urls:
        if furl in url:
            return False

    return True
scanner.py 文件源码 项目:hacker-scripts 作者: restran 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, url, max_worker=10, timeout=3,
                 scan_dict=None, verbose=False, status=None):
        self.site_lang = ''
        self.raw_base_url = url
        self.base_url = url
        self.max_worker = max_worker
        self.timeout = timeout
        self.scan_dict = scan_dict
        self.verbose = verbose
        self.first_item = ''
        self.dict_data = {}
        self.first_queue = []
        self.found_items = {}
        if status is None or len(status) == 0:
            self.status = [200, 301, 302, 304, 401, 403]
        else:
            self.status = [int(t) for t in status]
scanner.py 文件源码 项目:hacker-scripts 作者: restran 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def on_response(self, url, item, method, response, queue):
        if response.code in self.status:
            if item in self.found_items:
                return
            self.found_items[item] = None
            logger.warning('[Y] %s %s %s' % (response.code, method, url))
            # ??????????????????????
            if any(map(item.endswith, ['.php', '.asp', '.jsp'])):
                bak_list = self.make_bak_file_list(item)
                bak_list = [(t, 'HEAD') for t in bak_list]
                queue.extendleft(bak_list)
        else:
            if response.code == 405 and method != 'POST':
                queue.appendleft((item, 'POST'))

            if self.verbose:
                logger.info('[N] %s %s %s' % (response.code, method, url))
scanner.py 文件源码 项目:hacker-scripts 作者: restran 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def prepare_url(self):
        url_parsed = urlparse(self.raw_base_url)
        items = url_parsed.path.split('/')
        if len(items) > 0:
            item = items[-1]
            items = items[:-1]
            new_path = '/'.join(items)
        else:
            item = ''
            new_path = url_parsed.path
        url = urlunparse((url_parsed.scheme, url_parsed.netloc, new_path, '', '', ''))

        if item.endswith('.php'):
            self.site_lang = 'php'
        elif item.endswith('.asp'):
            self.site_lang = 'asp'
        elif item.endswith('.aspx'):
            self.site_lang = 'aspx'

        if self.site_lang != '':
            logger.info('site_lang: %s' % self.site_lang)
        self.base_url = url
        self.first_item = item
        logger.info('base_url: %s' % url)
        logger.info('first_item: %s' % item)
core_extract_comments.py 文件源码 项目:amazon-reviews-scraper 作者: philipperemy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_comments_based_on_keyword(search):
    logging.info('SEARCH = {}'.format(search))
    url = 'http://www.amazon.co.jp/s/ref=nb_sb_noss_2?url=search-alias%3Daps&field-keywords=' + \
          search + '&rh=i%3Aaps%2Ck%3A' + search
    soup = get_soup(url)
    items = []
    for a in soup.find_all('a', class_='s-access-detail-page'):
        if a.find('h2') is not None and validators.url(a.get('href')):
            name = str(a.find('h2').string)
            link = a.get('href')
            items.append((link, name))
    logging.info('Found {} items.'.format(len(items)))
    for (link, name) in items:
        logging.debug('link = {}, name = {}'.format(link, name))
        product_id = extract_product_id(link)
        get_comments_with_product_id(product_id)
__init__.py 文件源码 项目:emile-server 作者: gsort 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_paginated_list(results, url, start, size, page_size=settings.PAGINATION_SIZE):
    # check if page exists
    count = size
    # make response
    obj = {}
    obj['start'] = start
    obj['page_size'] = page_size
    obj['count'] = count
    # make URLs
    # make previous url
    if start == 1:
        obj['previous'] = ''
    else:
        start_copy = max(1, start - page_size)
        page_size_copy = start - 1
        obj['previous'] = url + '?start=%d' % (start_copy)
    # make next url
    if start + page_size > count:
        obj['next'] = ''
    else:
        start_copy = start + page_size
        obj['next'] = url + '?start=%d' % (start_copy)
    # finally extract result according to bounds
    obj['results'] = results
    return obj
__init__.py 文件源码 项目:emile-server 作者: gsort 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def format_urls_in_text(text):
    new_text = []

    accepted_protocols = ['http://', 'https://', 'ftp://', 'ftps://']

    for word in str(text).split():
        new_word = word
        accepted = [protocol for protocol in accepted_protocols if protocol in new_word]

        if not accepted:
            new_word = 'http://{0}'.format(new_word)

        if validators.url(new_word)==True:
            new_word = '<a href="{0}">{1}</a>'.format(new_word, word)
        else:
            new_word = word
        new_text.append(new_word)

    return ' '.join(new_text)
m3u8_loader.py 文件源码 项目:cumulus-tv-m3u8-loader 作者: curif 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def loadm3u(url):
  hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
         'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
         'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
         'Accept-Encoding': 'none',
         'Accept-Language': 'en-US,en;q=0.8',
         'Connection': 'keep-alive'}

  req = urllib2.Request(url, headers=hdr)
  response = urllib2.urlopen(req)
  data = response.read()

  if not 'EXTM3U' in data:
    raise Exception(url + " is not a m3u8 file.")

  #return data.encode('utf-8')
  return data
m3u8_loader.py 文件源码 项目:cumulus-tv-m3u8-loader 作者: curif 项目源码 文件源码 阅读 13 收藏 0 点赞 0 评论 0
def dictToM3U(cumulustv):
  channels = cumulustv["channels"]
  channelDataMap = [
    ("number", "tvg-id"),
    ("name", "tvg-name"),
    ("logo", "tvg-logo"),
    ("genres", "group-title"),
    ("country", "tvg-country"),
    ("lang", "tvg-language")
  ]
  m3uStr = "#EXTM3U\n"
  for channel in channels:
    m3uStr += "#EXTINF:-1"
    for dataId, extinfId in channelDataMap:
      if channel[dataId] is not None and channel[dataId] != "":
        m3uStr += " " + extinfId + "=\"" + channel[dataId].strip() + "\""
    m3uStr += "," + channel["name"].strip() + "\n"
    m3uStr += channel["url"] + "\n"

  return m3uStr
api_utils.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_api_url():
    """
    Get management url from the config file
    """
    config_key = 'api_url'
    try:
        url = CONFIG.get(URL_SECTION, config_key)
        if validators.url(str(url)):
            return url
        else:
            print_config_error_and_exit(URL_SECTION, 'REST API URL(%s)' % config_key)
    except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
        return DEFAULT_API_URL
api_utils.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def build_url(nodes):
    """
    Build a url with the given array of nodes for the url and return path and url respectively
    Ordering is important
    """
    path = str.join('/', nodes)
    url = str.join('/', [get_api_url(), path])
    return path, url
oletools_analyze.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def parse_result(self):
        """
        Abstract parse_result method.
        It calls when analyze is finished.
        It uptade malware with indicators.
        """
        if not self._result:
            return

        json_ole = self.json_decode(self._result)
        if not json_ole:
            return

        for item in json_ole:
            if "IOC" in item["type"]:
                score = 7
                if "URL" in item['description'] and validators.url(item['keyword']):
                    extract_malware = self.malware.add_extract_malware(
                        self.module_cls_name, item['keyword'], Type.get_label(Type.URL))
                    Input.analyse_malware(extract_malware)
            elif "AutoExec" in item["type"]:
                score = 7
            elif "Suspicious" in item["type"]:
                score = 5
            elif "VBA string" in item["type"]:
                score = 3
            elif "Hex String" in item["type"]:
                score = 1
            else:
                score = -1

            indicator = Indicator.factory(module_cls_name=self.module_cls_name,
                                          name="item",
                                          content_type=Type.JSON,
                                          content=json.dumps(item),
                                          score=score)
            self._malware.get_module_status(self.module_cls_name
                                            ).add_indicator(indicator)
app.py 文件源码 项目:storperf 作者: opnfv 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def success():
    try:
        URL = session["url"]
        if URL.find("jobs") is not -1 and URL.find("metadata") is -1:
            data = urllib.urlopen(URL).read()
            data = json.loads(data)
            temp = data["job_ids"]
            if temp:
                info = {}
                for ID in temp:
                    url = URL + "?id=" + ID + "&type=metadata"
                    data_temp = urllib.urlopen(url).read()
                    data_temp = json.loads(data_temp)
                    report_data = get_data(data_temp)[-1]
                    info[ID] = report_data
                return render_template('plot_jobs.html', results=info)
        if validators.url(URL):
            data = urllib.urlopen(URL).read()
        else:
            data = open("./static/testdata/" + URL).read()
        data = json.loads(data)
        response = get_data(data)
        if response[0] == "single":
            metrics, report_data = response[1], response[2]
            results = response[3]
            return render_template('plot_tables.html',
                                   metrics=metrics, report_data=report_data,
                                   results=results)
        else:
            return render_template('plot_multi_data.html',
                                   results=response[1])
    except Exception as e:
        session['server_error'] = e.message + ' ' + repr(e.args)
        return redirect(url_for('file_not_found'))
app.py 文件源码 项目:storperf 作者: opnfv 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def url():
    if request.method == 'POST':
        url = request.form['url']
        session["url"] = url
        return redirect(url_for('success'))
scraper.py 文件源码 项目:open-event-scraper 作者: fossasia 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validate_result(current, default, type):
    """
    Validates the data, whether it needs to be url, twitter, linkedin link etc.
    """
    if current is None:
        current = ""
    if default is None:
        default = ""
    if type == "URL" and validators.url(current, require_tld=True) and not validators.url(default, require_tld=True):
        return current
    if type == "EMAIL" and validators.email(current) and not validators.email(default):
        return current
    return default
scraper.py 文件源码 项目:open-event-scraper 作者: fossasia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def fetch_tsv_data(gid):
    base_url = 'https://docs.google.com/spreadsheets/d/' + SHEET_ID + '/export?format=tsv'
    url = base_url + '&gid=' + gid
    logging.info('GET ' + url)
    res = urllib2.urlopen(url)
    return res.read()
omnihash.py 文件源码 项目:omnihash 作者: Miserlou 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def iterate_bytechunks(hashme, is_string, use_json, hash_many):
    """
    Prep our bytes.
    """

    # URL
    if not is_string and validators.url(hashme):
        if not use_json:
            click.echo("Hashing content of URL " + click.style(hashme, bold=True) + "..", err=not hash_many)
        try:
            response = requests.get(hashme)
        except requests.exceptions.ConnectionError as e:
            raise ValueError("Not a valid URL. :(")
        except Exception as e:
            raise ValueError("Not a valid URL. {}.".format(e))
        if response.status_code != 200:
            click.echo("Response returned %s. :(" % response.status_code, err=True)
        bytechunks = response.iter_content()
    # File
    elif os.path.exists(hashme) and not is_string:
        if os.path.isdir(hashme):
            if not use_json:
                click.echo(click.style("Skipping", fg="yellow") + " directory " + "'" + hashme + "'..", err=True)
            return None

        if not use_json:
            click.echo("Hashing file " + click.style(hashme, bold=True) + "..", err=not hash_many)
        bytechunks = FileIter(open(hashme, mode='rb'))
    # String
    else:
        if not use_json:
            click.echo("Hashing string " + click.style(hashme, bold=True) + "..", err=not hash_many)
        bytechunks = (hashme.encode('utf-8'), )

    return bytechunks
test_pkg_metadata.py 文件源码 项目:quantopian-tools 作者: Gitlitio 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_valid_project_url():
    assert validators.url(quantopian_tools.__project_url__)
frontend.py 文件源码 项目:mopidy-jukebox 作者: palfrey 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self):
        url = self.get_body_argument("url")
        if not validators.url(url):
            self.set_status(400, "bad URL")
            return
        with self._connect() as connection:
            try:
                createSource(connection, url)
            except sqlite3.IntegrityError:
                self.set_status(400, "duplicate URL")
                return
        self.set_status(201)


问题


面经


文章

微信
公众号

扫码关注公众号