python类List()的实例源码

neo4j_client.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_batch_query(self, query: str,
                        labels: List[str] = None,
                        params: List[dict] = None,
                        chunk_size: int = 1000):
        node_labels = ':{0}'.format(':'.join(labels)) \
            if labels else ''
        query_template = Template("UNWIND {params} AS params " + query)
        labeled_query = query_template.safe_substitute(labels=node_labels)

        chunk_count = 1

        def batch():
            for i in range(0, len(params), chunk_size):
                logger.debug('starting chunk %s', i)
                result = (yield labeled_query,
                                dict(params=params[i:i + chunk_size]))
                logger.debug(result)

        result = self.run_in_tx(batch(), chunk_count=chunk_count)
        return result
test_python_driver.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _send_receive(self, nummsgs: int, outformat: str='json',
                      dataupdate: Optional[Dict[AnyStr, Any]]=None,
                      restart_data: bool=True) -> List[Response]:
        if restart_data:
            self._restart_data(outformat)

        if dataupdate:
            self.data.update(dataupdate)

        self._add_to_buffer(nummsgs, outformat)
        self.sendbuffer.seek(0)

        processor, _ = get_processor_instance(
                outformat,
                custom_outbuffer=self.recvbuffer,
                custom_inbuffer=self.sendbuffer
        )
        processor.process_requests(self.sendbuffer)
        return self._loadResults(outformat)
methods.py 文件源码 项目:djaio 作者: Sberned 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(
            self,
            description: str = None,
            pre_hooks: (List, Tuple) = None,
            post_hooks: (List, Tuple) = None
    ):
        self.result = None
        self.total = None
        self.success = None
        self.errors = None
        self.params = None
        self.output = None
        self.pagination = None
        self.limit = None
        self.offset = None
        self.app = None
        self.settings = None
        self.description = description
        self.pre_hooks = pre_hooks
        self.post_hooks = post_hooks
        self.meta = {}
acceptability.py 文件源码 项目:trf 作者: aistairc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def calc_norm_lp_div_scores(
        log_prob_scores: List[float],
        unigram_scores: List[float]) -> List[Union[None, float]]:
    r"""
    .. math:
        \frac{%
            \log P_\text{model}\left(\xi\right)
        }{%
            \log P_\text{unigram}\left(\xi\right)
        }
    >>> '{:.3f}'.format(calc_norm_lp_div_scores([-14.7579], [-35.6325])[0])
    '-0.414'
    """
    results = []
    for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
        if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
            x = None
        else:
            x = (-1.0) * float(log_prob) / float(unigram_score)
        results.append(x)
    return results
acceptability.py 文件源码 项目:trf 作者: aistairc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def calc_norm_lp_sub_scores(
        log_prob_scores: List[float],
        unigram_scores: List[float]) -> List[Union[None, float]]:
    r"""
    .. math:
        \log P_\text{model}\left(\xi\right)
            - \log P_\text{unigram}\left(\xi\right)
    >>> '{:.3f}'.format(calc_norm_lp_sub_scores([-14.7579], [-35.6325])[0])
    '20.875'
    """

    results = []
    for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
        if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
            x = None
        else:
            x = float(log_prob) - float(unigram_score)
        results.append(x)
    return results
acceptability.py 文件源码 项目:trf 作者: aistairc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def calc_slor_scores(norm_lp_sub_scores: List[float],
                     lengths: List[int]) -> List[Union[None, float]]:
    r"""Calculate SLOR (Syntactic Log-Odds Ratio)
    .. math:
        \frac{%
            \log P_\text{model}\left(\xi\right)
                - \log P_\text{unigram}\left(\xi\right)
        }{%
            \text{length}\left(\xi\right)
        }
    >>> '{:.3f}'.format(calc_slor_scores([20.8746], [4])[0])
    '5.219'
    """

    results = []
    for norm_lp_sub_score, length in zip(norm_lp_sub_scores, lengths):
        if (norm_lp_sub_score is None) or length == 0:
            x = None
        else:
            x = norm_lp_sub_score / length
        results.append(x)
    return results
scraper.py 文件源码 项目:pylongecity 作者: cyrbon 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def scrape_all_posts_unflat(url: str, verbose: bool, cache: bool) -> List[List['Post']]:
    unflat_posts = []

    fget = requests.get if not cache else memory.cache(requests.get)
    page = fget(url).text # Downloads the page twice.
    # ^ we can scrape_page(page), .append, [urls - url], but KISS.
    n_of_pages = pq(page).find('.pagejump > a').eq(0).text().strip().split(' ')[-1] # Gets '10' from 'Page 1 of 10'

    # If there is only one page
    if(n_of_pages is ''):
        urls = [url]
    else:
        url_prefix_match = re.match('(.*)(page-[0-9]+)', url)
        url_prefix = url if url_prefix_match is None else url_prefix_match.group(1)
        if(url_prefix[-1] != '/'): url_prefix += '/'
        urls = [(url_prefix + 'page-' + str(n + 1)) for n in range(int(n_of_pages))]

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fscrape = scrape_posts if not cache else memory.cache(scrape_posts, ignore=['verbose'])
        futures = [executor.submit(fscrape, url, verbose) for url in urls]
        results, _ = concurrent.futures.wait (futures)
        for result in results:
            unflat_posts.append(result.result())
    return unflat_posts
scraper.py 文件源码 项目:pylongecity 作者: cyrbon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_posts(*urls : List[str], **kwargs):
    '''
    Args:
        *urls (List[str]): Url, where each url is a unique thread
        verbose (bool): Verbosity
        cache (bool): Cache results across calls
        disambiguate_threads (bool): When scraping multiple threads will add url to html of the first post to show thread.
    '''
    posts_unflat = []
    disambiguate_threads = True if 'disambiguate_threads' not in kwargs else kwargs['disambiguate_threads']
    kwargs.pop('disambiguate_threads', None)
    for url in urls:
        posts = scrape_all_posts(url, **kwargs)
        # Displaying a link title to show which posts come from which thread if
        # we are getting multiple threads.
        if(disambiguate_threads and len(urls) > 1):
            posts[0].html = '''
            <div style="background-color: #3B6796;">
                <a href="{0}"><h1 style="font-size: 40px; color: white;">{0}</h1></a>
            </div>'''.format(url) + posts[0].html
        posts_unflat.append(posts)

    return [p for slist in posts_unflat for p in slist]
frame_manager.py 文件源码 项目:pyppeteer 作者: miyakogi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def querySelectorAll(self, selector: str) -> List['ElementHandle']:
        """Get all elelments which matches `selector`."""
        remoteObject = await self._rawEvaluate(
            'selector => Array.from(document.querySelectorAll(selector))',
            selector,
        )
        response = await self._client.send('Runtime.getProperties', {
            'objectId': remoteObject.get('objectId', ''),
            'ownProperties': True,
        })
        properties = response.get('result', {})
        result: List[ElementHandle] = []
        releasePromises = [helper.releaseObject(self._client, remoteObject)]
        for prop in properties:
            value = prop.get('value', {})
            if prop.get('enumerable') and value.get('subtype') == 'node':
                result.append(ElementHandle(self._client, value, self._mouse,
                                            self._touchscreen))
            else:
                releasePromises.append(
                    helper.releaseObject(self._client, value))
        await asyncio.gather(*releasePromises)
        return result

    #: Alias to querySelector
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, exe, cache=None):
        # type: (str, Optional[Cache]) -> None
        if not os.path.isabs(exe):
            exe = which(exe)  # type: ignore
        self.exe = unifilename(exe)
        self.cache = cache
        self._styledefinition = styledef_make()
        self.allow_encoding_change = False
        self.languages = []  # type: List[str]

        self.initial_style = style_make()
        # The are deleted after one call to minimize_errors
        self.globaltempfiles = set()  # type: Set[str]
        # These are deleted after each round of attempts
        self.tempfiles = set()  # type: Set[str]
        self.keeptempfiles = False
        self.version_string = formatter_version(exe)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def can_process_in_parallel(self, filenames):
        # type: (List[str]) -> bool
        """
        Returns False if one of the files is too large to be processed in parallel
        with another file.
        Returns True if all files are small enough.
        """
        result = True
        for filename in filenames:
            sourcedata = get_cached_file(filename)
            if len(sourcedata) > MAX_FILESIZE_FOR_MULTIPROCESSING:
                reportwarning('Warning: %s has a size of %s bytes.' % (filename,
                                                                       len(sourcedata)))
                reportwarning('  This may cause memory swapping so we only use'
                              ' a single processor core.')
                result = False
        return result
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def nested_derivations(self, style):
        # type: (Style) -> List[Style]
        options = [('BreakBeforeBraces', 'Custom')]
        nstyles = []
        for optionname, value in options:
            optdef = styledef_option(self.styledefinition, optionname)
            # We can only use this nested option if the clang version in use supports it.
            if optdef is None:
                continue
            if value not in option_configs(optdef):
                continue
            if style.get(optionname) != value:
                nstyle = Style(copy.deepcopy(style))
                set_option(nstyle, optionname, value)
                nstyles.append(nstyle)
        return nstyles
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def variants_for(self, option):
        # type: (Option) -> List[Style]

        def kvpairs(vs):
            # type: (Iterable[OptionValue]) -> List[Style]
            return stylevariants(stylename, vs)

        stylename = option_name(option)
        styletype = option_type(option)
        configs = option_configs(option)

        if configs:
            return kvpairs(configs)
        if stylename == self.columnlimitname:
            return kvpairs(self.column_limit_candidates)
        if styletype == 'int':
            return kvpairs([0, 1, 2, 4, 8, 16])
        return []
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cmdargs_for_style(self, formatstyle, filename=None):
        # type: (Style, Optional[str]) -> List[str]
        assert isinstance(formatstyle, Style)
        configdata = bytestr(self.styletext(formatstyle))
        sha = shahex(configdata)
        cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_uncrustify_%s.cfg' % sha)
        if not self.tempfile_exists(cfg):
            writebinary(cfg, configdata)
            self.add_tempfile(cfg)
        cmdargs = ['-c', cfg]
        # The filename extension might be ambiguous so we choose from the languages
        # registered in identify_language.
        if self.languages:
            lang = self.languages[0]
            cmdargs.extend(['-l', lang])
        return cmdargs
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def register_options(self):
        # type: () -> None
        """Parse options from text like this:
        Preferences:
          [+|-]alignArguments                                        Enable/disable ...
          ...
          [+|-]spacesWithinPatternBinders                            Enable/disable ...
          -alignSingleLineCaseStatements.maxArrowIndent=[1-100]      Set Maximum number ...
          -indentSpaces=[1-10]                                       Set Number of spaces ...
        """
        exeresult = run_executable(self.exe, ['--help'], cache=self.cache)
        options = []
        text = unistr(exeresult.stdout)
        for m in re.finditer(r'^  (\[\+\|-\]|-)([a-z][a-zA-Z.]+)(?:=\[(\d+)-(\d+)\])?', text,
                             re.MULTILINE):
            optionprefix, optionname, start, end = m.groups()
            if start is None:
                optiontype = 'bool'
                configs = [True, False]  # type: List[OptionValue]
            else:
                optiontype = 'int'
                configs = list(inclusiverange(int(start), int(end)))
            options.append(option_make(optionname, optiontype, configs))
        self.styledefinition = styledef_make(options)
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def cmdargs_for_style(self, formatstyle, filename=None):
        # type: (Style, Optional[str]) -> List[str]
        assert isinstance(formatstyle, Style)
        configdata = bytestr(self.styletext(formatstyle))
        sha = shahex(configdata)
        cfg = os.path.join(tempfile.gettempdir(),
                           'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename))
        try:
            dirpath = os.path.dirname(cfg)
            os.makedirs(dirpath)
            self.add_tempfile(dirpath)
        except OSError as exc:
            if exc.errno != errno.EEXIST:
                raise
        if not self.tempfile_exists(cfg):
            writebinary(cfg, configdata)
            self.add_tempfile(cfg)
        cmdargs = ['--config-path', cfg]
        return cmdargs
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        rows = []
        if self.support_mget:
            try:
                with self.conn as conn:
                    for somekeys in grouper(self.sqlite_limit_variable_number, keys):
                        keylist = list(somekeys)
                        questionmarks = ','.join(['?'] * len(keylist))
                        sql = self.kv_mget % questionmarks
                        for row in conn.execute(sql, keylist):
                            rows.append(row)
                resultdict = dict(rows)  # type: Dict[str, bytes]
                rget = resultdict.get
                return [rget(k) for k in keys]
            except sqlite3.OperationalError:
                self.support_mget = False
        return [self.__get(k) for k in keys]
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        if not keys:
            return []
        cached = []
        uncached = []  # type: List[Tuple[int, Optional[bytes]]]
        contentkeys = super(DedupKeyValueStore, self).mget(keys)
        for idx, contentkey in enumerate(contentkeys):
            if contentkey is None:
                uncached.append((idx, None))
            else:
                sha = binary_type(contentkey)
                cached.append((idx, unistr(sha)))
        if not cached:
            return [None for _, contentkey in uncached]
        indices, existing_keys = zip(*cached)
        existing_values = self.kvstore.mget(existing_keys)
        idx_value_pairs = sorted(uncached + list(zip(indices, existing_values)))
        return list([value for _, value in idx_value_pairs])
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def table_iter(pairs,            # type: List[BytesPair]
               uniqueidx,        # type: int
               enc='utf-8',      # type: str
               fromdesc='',      # type: str
               todesc='',        # type: str
               numlines=2,       # type: int
               wrapcolumn=0      # type: int
               ):
    # type: (...) -> Iterator[Tuple[str, str, str]]
    htmldiffer = HtmlMultiDiff(tabsize=8, wrapcolumn=wrapcolumn)
    htmldiffer.uniqueidx = uniqueidx
    table = htmldiffer.table_from_pairs(pairs,
                                        enc,
                                        fromdesc=fromdesc,
                                        todesc=todesc,
                                        context=True,
                                        numlines=numlines)
    for tablestart, tbody, tableend in iter_tbodies(table):
        yield tablestart, tbody, tableend
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def update_evaluations(formatter,  # type: CodeFormatter
                       evaluations,  # type: List[AttemptResult]
                       finished_styles,  # type: List[AttemptResult]
                       bestdist  # type: Sequence[int]
                       ):
    # type: (...) -> Tuple[bool, bool, Sequence[int]]
    attemptresult = heapq.heappop(evaluations)
    nested_round = False
    if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)):
        bestdist = attemptresult.distance
        heapq.heappush(evaluations, attemptresult)
    else:
        # We found a style that could no longer be improved by adding a single option value.
        heapq.heappush(finished_styles, attemptresult)
        nested_styles = formatter.nested_derivations(attemptresult.formatstyle)
        if not nested_styles:
            # This formatstyle does not unlock more options.
            return True, nested_round, bestdist
        # Restart the optimization from scratch with the attemptresult augmented with
        # every nested option as seed styles.
        bestdist = None
        ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE)
        evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles]
        nested_round = True
    return False, nested_round, bestdist
whatstyle.py 文件源码 项目:whatstyle 作者: mikr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def avg_linelength_diffs(diffargs):
    # type: (List[Tuple[str, bytes]]) -> Iterable[int]
    """Returns the nudged absolute line length differences.
    """
    for filename1, content2 in diffargs:
        linelen1 = get_num_lines(filename1)
        filelen1 = len(get_cached_file(filename1))
        avg1 = 0.0
        if linelen1 > 0:
            avg1 = float(filelen1) / linelen1

        linelen2 = count_content_lines(content2)
        filelen2 = len(content2)
        avg2 = 0.0
        if linelen2 > 0:
            avg2 = float(filelen2) / linelen2

        yield int(abs(10000.0 * (avg1 - avg2)))
channel.py 文件源码 项目:telegram-autoposter 作者: vaniakosmos 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def command_add(self, bot: Bot, update: Update, args: List[str]):
        usage_string = ('Nothing was added.\n'
                        'Usage: `/add <subreddit> <score_limit> [<subreddit> <score_limit>]*`')
        if len(args) == 0 or len(args) % 2 != 0:
            update.message.reply_text(usage_string, parse_mode=ParseMode.MARKDOWN)
            return

        subreddits = {}
        while args:
            name, score = args[:2]
            args = args[2:]
            if score.isdecimal():
                score = int(score)
            else:
                update.message.reply_text(usage_string)
                return
            subreddits[name] = score

        self.store.add(subreddits)
        self.command_list(bot, update)
operations.py 文件源码 项目:PicoSim 作者: Vadman97 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def ripple(a: List[bool], b: List[bool], cin: bool = False, invert_b: bool = False) -> List[bool]:
    # allocate result bits
    result = list(range(0, Memory.REGISTER_WIDTH))  # type: List[bool]
    carry_wire = cin  # type: bool

    # go backwards to preserve carry propagation
    for i in range(max(len(a), len(b)) - 1, -1, -1):
        # sign extend, should not be needed as long as the memory row has all 8 bits filled out
        # if i < 8 - len(a):
        #     a_bit = a[0]
        #     b_bit = b[i]
        # elif i < 8 - len(b):
        #     a_bit = a[i]
        #     b_bit = b[0]
        # else:
        a_bit = a[i]
        b_bit = b[i]

        if invert_b:
            b_bit = not b_bit
        result[i], carry_wire = full_adder(carry_wire, a_bit, b_bit)

    return result
neo4j_client.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def import_nodes(self, nodes: List[dict],
                     labels: List[str] = None,
                     chunk_size: int = 1000):
        node_labels = ':{0}'.format(':'.join(labels)) \
            if labels else ''
        query = self.import_nodes_template.safe_substitute(labels=node_labels)

        chunk_count = 1

        def batch():
            for i in range(0, len(nodes), chunk_size):
                logger.debug('starting chunk %s', i)
                result = (yield query, dict(props=nodes[i:i + chunk_size]))
                logger.debug(result)

        result = self.run_in_tx(batch(), chunk_count=chunk_count)
        return result
actor.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse_name(self, name: str) -> List[str]:
        """
        splits a name into parts separated by ., _, camel casing and 
        similar
        :param name: potentially human name
        :return: list of name parts
        """
        parsed_name = ParsedName(**su.empty_dict(PARSED_NAME_FIELDS))
        lower_name = name.lower()
        if lower_name in self.role_names:
            parsed_name.name_type = self.role_names[lower_name]
            parsed_name.name = lower_name
        else:
            parsed_name.name_type = 'proper'
            parsed_name.name = cleanup_proper_name(name)
        return parsed_name
lexeme.py 文件源码 项目:saapy 作者: ashapochka 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_lexeme(self, lexeme: str) -> List:
        try:
            clean_lexeme = strip_noise(lexeme)
            low_lexeme = clean_lexeme.lower()
            if low_lexeme in self.terms:
                lexeme_parts = [clean_lexeme]
            else:
                lexeme_parts = split_lexeme(lexeme)
            parsed_lexeme = []
            for lexeme_part in lexeme_parts:
                segments = self.segment_into_words(lexeme, lexeme_part)
                parsed_lexeme.append((lexeme_part, segments))
            return parsed_lexeme
        except Exception:
            logger.exception('failed to parse lexeme {}'.format(lexeme))
            return [(lexeme, [SegmentMap('miss', lexeme, None, lexeme, [0])])]
test_python_driver.py 文件源码 项目:python-driver 作者: bblfsh 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _loadResults(self, format_: str) -> List[Response]:
        """Read all msgs from the recvbuffer"""
        self.recvbuffer.seek(0)

        res: List[Response] = []
        res = [doc for doc in self._extract_docs(self.recvbuffer)]
        return res
youtube.py 文件源码 项目:Zoom2Youtube 作者: Welltory 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_files_from_dir(self, path: str, ext: str) -> List[str]:
        """
        Return list of files with .ext
        :param path:
        :param ext:
        :return:
        """
        return [x for x in os.listdir(path) if x.endswith('.{}'.format(ext))]
db.py 文件源码 项目:djaio 作者: Sberned 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute(self, db_name: str, query: str, values: List, _type: str):
        """
        Execute SQL query in connection pool
        """
        warnings.warn("Use single methods!", DeprecationWarning)

        if _type not in ('select', 'insert', 'update', 'delete'):
            raise RuntimeError(
                'Wrong request type {}'.format(_type)
            )
        if not self.dbs[db_name]['master']:
            raise RuntimeError(
                'db {} master is not initialized'.format(db_name)
            )

        pool = self.dbs[db_name]['master']
        if _type == 'select' and 'slave' in self.dbs[db_name]:
            pool = self.dbs[db_name]['slave']

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                if _type == 'select':
                    data = await cursor.fetchall()
                else:
                    data = cursor.rowcount
        return data
db.py 文件源码 项目:djaio 作者: Sberned 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def select(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default') -> List[DictRow]:
        return await self._select(query=query, values=values, db_name=db_name)


问题


面经


文章

微信
公众号

扫码关注公众号