python类print_()的实例源码

test_bot.py 文件源码 项目:errcron 作者: attakei 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_timezone_in_plugin(capsys):
    class ActivateImpl(MockedImpl):
        TIMEZONE = 'Asia/Tokyo'
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def activate(self):
            self.activate_crontab()

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
test_bot.py 文件源码 项目:errcron 作者: attakei 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_timezone_in_config(capsys):
    class MockConfig(object):
        TIMEZONE = 'Asia/Tokyo'

    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    setattr(plugin, 'bot_config', MockConfig())
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
test_root_import.py 文件源码 项目:errcron 作者: attakei 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_timezone_in_plugin(capsys):
    class ActivateImpl(MockedImpl):
        TIMEZONE = 'Asia/Tokyo'
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def activate(self):
            self.activate_crontab()

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
test_root_import.py 文件源码 项目:errcron 作者: attakei 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_timezone_in_config(capsys):
    class MockConfig(object):
        TIMEZONE = 'Asia/Tokyo'

    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    setattr(plugin, 'bot_config', MockConfig())
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
enact.py 文件源码 项目:pyqubes 作者: tommilligan 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def echo(args, file=None):
    '''
    Echo a list of arguments (as given to ``subprocess.call``) to the given stream.

    This defaults to ``stdout``, but can be changed to any stream-like object such as a file handle.

    :param args: A string or list of strings
    :param file: A file-like object to stream output to. Defaults to ``sys.stdout``
    '''
    if file is None:
        file = sys.stdout

    if isinstance(args, six.string_types + (six.text_type,)):
        args = [args]

    six.print_(*args, file=file, flush=True)
jws.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sign(cls, args):
        """Sign."""
        key = args.alg.kty.load(args.key.read())
        args.key.close()
        if args.protect is None:
            args.protect = []
        if args.compact:
            args.protect.append('alg')

        sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
                       protect=set(args.protect))

        if args.compact:
            six.print_(sig.to_compact().decode('utf-8'))
        else:  # JSON
            six.print_(sig.json_dumps_pretty())
jws.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def verify(cls, args):
        """Verify."""
        if args.compact:
            sig = JWS.from_compact(sys.stdin.read().encode())
        else:  # JSON
            try:
                sig = JWS.json_loads(sys.stdin.read())
            except errors.Error as error:
                six.print_(error)
                return -1

        if args.key is not None:
            assert args.kty is not None
            key = args.kty.load(args.key.read()).public_key()
            args.key.close()
        else:
            key = None

        sys.stdout.write(sig.payload)
        return not sig.verify(key=key)
frozen.py 文件源码 项目:kripodb 作者: 3D-e-Chem 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _ingest_pairs(self, pairs, oid2nid, frame_size, limit, single_sided):
        oid2nid_v = np.vectorize(oid2nid.get)
        # whole pairs set does not fit in memory, so split it in frames with `frame_size` number of pairs.
        for start in range(0, limit, frame_size):
            stop = frame_size + start
            t1 = process_time()
            six.print_('Fetching pairs {0}:{1} of {2} ... '.format(start, stop, limit), end='', flush=True)
            raw_frame = pairs.read(start=start, stop=stop)
            t2 = process_time()
            six.print_('{0}s, Parsing ... '.format(int(t2 - t1)), flush=True)
            frame = self._translate_frame(raw_frame, oid2nid_v, single_sided)
            t3 = process_time()
            six.print_('Writing ... '.format(int(t3 - t2)), flush=True)
            # alternate direction, to make use of cached chunks of prev frame
            self._ingest_pairs_frame(frame)
            del frame
            t4 = process_time()
            six.print_('{0}s, Done with {1}:{2} in {3}s'.format(int(t4 - t3), start, stop, int(t4 - t1)), flush=True)
frozen.py 文件源码 项目:kripodb 作者: 3D-e-Chem 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def to_pairs(self, pairs):
        """Copies labels and scores from self to pairs matrix.

        Args:
            pairs (SimilarityMatrix):

        """
        six.print_('copy labels', flush=True)
        self.build_label_cache()
        pairs.labels.update(self.cache_l2i)

        six.print_('copy matrix to pairs', flush=True)
        limit = self.scores.shape[0]
        bar = ProgressBar()
        for query_id in bar(six.moves.range(0, limit)):
            subjects = self.scores[query_id, ...]
            filled_subjects_ids = subjects.nonzero()[0]
            filled_subjects = [(query_id, i, subjects[i]) for i in filled_subjects_ids if query_id < i]
            if filled_subjects:
                pairs.pairs.table.append(filled_subjects)
test_view.py 文件源码 项目:lookml-gen 作者: symphonyrm 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_pdt_view():
    testname = 'pdt_view'
    pdt = view.DerivedTable(sql="SELECT id, count(*) c FROM table GROUP BY id",
                            sql_trigger_value='DATE()',
                            indexes=['id'])
    v = view.View(testname)
    v.derived_table = pdt
    v.add_field(field.Dimension('id', type='number',
                                primary_key=True))
    v.add_field(field.Dimension('c', type='number'))
    v.add_field(field.Measure('sum_c', sql='${TABLE}.c', type='sum'))
    f = six.StringIO()
    v.generate_lookml(f, format_options=test_format_options)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
jws.py 文件源码 项目:certbot 作者: nikoloskii 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sign(cls, args):
        """Sign."""
        key = args.alg.kty.load(args.key.read())
        args.key.close()
        if args.protect is None:
            args.protect = []
        if args.compact:
            args.protect.append('alg')

        sig = JWS.sign(payload=sys.stdin.read().encode(), key=key, alg=args.alg,
                       protect=set(args.protect))

        if args.compact:
            six.print_(sig.to_compact().decode('utf-8'))
        else:  # JSON
            six.print_(sig.json_dumps_pretty())
jws.py 文件源码 项目:certbot 作者: nikoloskii 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def verify(cls, args):
        """Verify."""
        if args.compact:
            sig = JWS.from_compact(sys.stdin.read().encode())
        else:  # JSON
            try:
                sig = JWS.json_loads(sys.stdin.read())
            except errors.Error as error:
                six.print_(error)
                return -1

        if args.key is not None:
            assert args.kty is not None
            key = args.kty.load(args.key.read()).public_key()
            args.key.close()
        else:
            key = None

        sys.stdout.write(sig.payload)
        return not sig.verify(key=key)
decision_task_poller.py 文件源码 项目:botoflow 作者: boto 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def single_poll(self, next_page_token=None):
        poll_time = time.time()
        try:
            kwargs = {'domain': self.domain,
                      'taskList': {'name': self.task_list},
                      'identity': self.identity}
            if next_page_token is not None:
                kwargs['nextPageToken'] = next_page_token
            # old botocore throws TypeError when unable to establish SWF connection
            return self.worker.client.poll_for_decision_task(**kwargs)

        except KeyboardInterrupt:
            # sleep before actually exiting as the connection is not yet closed
            # on the other end
            sleep_time = 60 - (time.time() - poll_time)
            six.print_("Exiting in {0}...".format(sleep_time), file=sys.stderr)
            time.sleep(sleep_time)
            raise
tempdir.py 文件源码 项目:klusta 作者: kwikteam 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cleanup(self, _warn=False):
        if self.name and not self._closed:
            try:
                self._rmtree(self.name)
            except (TypeError, AttributeError) as ex:
                # Issue #10188: Emit a warning on stderr
                # if the directory could not be cleaned
                # up due to missing globals
                if "None" not in str(ex):
                    raise
                six.print_("ERROR: {!r} while cleaning up {!r}".format(ex,
                                                                       self,),
                           file=_sys.stderr)
                return
            self._closed = True
            if _warn:
                # This should be a ResourceWarning, but it is not available in
                # Python 2.x.
                self._warn("Implicitly cleaning up {!r}".format(self),
                           Warning)
gurobi_ampl_example.py 文件源码 项目:pyomo 作者: Pyomo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def print_model_suffixes(model):
    # Six.Print_ all suffix values for all model components in a nice table
    six.print_("\t",end='')
    for name,suffix in active_import_suffix_generator(model):
            six.print_("%10s" % (name),end='')
    six.print_("")
    for i in model.s:
        six.print_(model.x[i].name+"\t",end='')
        for name,suffix in active_import_suffix_generator(model):
            six.print_("%10s" % (suffix.get(model.x[i])),end='')
        six.print_("")
    for i in model.s:
        six.print_(model.con[i].name+"\t",end='')
        for name,suffix in active_import_suffix_generator(model):
            six.print_("%10s" % (suffix.get(model.con[i])),end='')
        six.print_("")
    six.print_(model.obj.name+"\t",end='')
    for name,suffix in active_import_suffix_generator(model):
        six.print_("%10s" % (suffix.get(model.obj)),end='')
    print("")
    print("")
futu_data_source.py 文件源码 项目:rqalpha-mod-futu 作者: FutunnOpen 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def on_recv_rsp(self, rsp_str):
        ret_code, ret_data = super(DataCache, self).on_recv_rsp(rsp_str)
        if ret_code == RET_ERROR or isinstance(ret_data, str):
            six.print_(_(u"push kline data error:{bar_data}").format(ret_data=ret_data))
        else:
            if ret_data.empty:
                self._cache['cur_kline'] = {}
            else:
                bar_data = ret_data.iloc[-1:].copy()
                del bar_data['code'], bar_data['k_type']  # ????????????
                for i in range(len(bar_data['time_key'])):  # ????
                    bar_data.loc[i, 'time_key'] = int(
                        bar_data['time_key'][i].replace('-', '').replace(' ', '').replace(':', ''))

                bar_data.rename(columns={'time_key': 'datetime', 'turnover': 'total_turnover'},
                                inplace=True)  # ??????????
                bar_data['volume'] = bar_data['volume'].astype('float64')  # ???????????float

                self._cache['cur_kline'][ret_data['code'][0]]=bar_data
                return ret_code, self._cache['cur_kline'][ret_data['code'][0]]
stub.py 文件源码 项目:errcron 作者: attakei 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def print_datetime(plugin, polled_time):
    six.print_(polled_time.strftime('%Y-%m-%d'), end='')
stub.py 文件源码 项目:errcron 作者: attakei 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def print_datetime_with_str(plugin, polled_time, prefix):
    six.print_(prefix + polled_time.strftime('%Y-%m-%d'), end='')
test_bot.py 文件源码 项目:errcron 作者: attakei 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_activate_instance_method(capsys):
    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out == '2016-01-01'
project.py 文件源码 项目:psyplot 作者: Chilipp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def show_plot_methods(self):
        """Print the plotmethods of this instance"""
        print_func = PlotterInterface._print_func
        if print_func is None:
            print_func = six.print_
        s = "\n".join(
            "%s\n    %s" % t for t in six.iteritems(self._plot_methods))
        return print_func(s)
simple_timing.py 文件源码 项目:bistiming 作者: ianlini 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, description="", logger=None, logging_level=logging.INFO,
                 verbose_start=True, verbose_end=True, end_in_new_line=True,
                 prefix="..."):
        if logger is not None:
            self.log = partial(logger.log, logging_level)
        else:
            self.log = six.print_
        self.description = prefix + description
        self.verbose_start = verbose_start
        self.verbose_end = verbose_end
        self.end_in_new_line = end_in_new_line
        self.start_time = None
        self.end_time = None
        self.elapsed_time = None
_github.py 文件源码 项目:ci-diff-helper 作者: dhermes 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _rate_limit_info(response):
    """Print response rate limit information to stderr.

    Args:
        response (requests.Response): A GitHub API response.
    """
    remaining = response.headers.get(_RATE_REMAINING_HEADER)
    rate_limit = response.headers.get(_RATE_LIMIT_HEADER)
    rate_reset = response.headers.get(_RATE_RESET_HEADER)
    msg = _RATE_LIMIT_TEMPLATE.format(remaining, rate_limit, rate_reset)
    six.print_(msg, file=sys.stderr)
    six.print_(_GH_ENV_VAR_MSG, file=sys.stderr)
computeengine.py 文件源码 项目:loman 作者: janusassetallocation 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def print_errors(self):
        """
        Print tracebacks for every node with state "ERROR" in a Computation 
        """
        for n in self.nodes():
            if self.s[n] == States.ERROR:
                six.print_("{}".format(n))
                six.print_("=" * len(n))
                six.print_()
                six.print_(self.v[n].traceback)
                six.print_()
sharder.py 文件源码 项目:EasyStorj 作者: lakewik 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def determine_shard_size(self, file_size, accumulator):

        # Based on <https://github.com/aleitner/shard-size-calculator/blob/master/src/shard_size.c>

        hops = 0

        if (file_size <= 0):
            return 0
            # if accumulator != True:
            # accumulator  = 0
        self.__logger.debug(accumulator)

        # Determine hops back by accumulator
        if ((accumulator - self.SHARD_MULTIPLES_BACK) < 0):
            hops = 0
        else:
            hops = accumulator - self.SHARD_MULTIPLES_BACK

        # accumulator = 10
        byte_multiple = self.shard_size(accumulator)

        check = file_size / byte_multiple
        # print_(check)
        if (check > 0 and check <= 1):
            while (hops > 0 and self.shard_size(hops) > self.MAX_SHARD_SIZE):
                if hops - 1 <= 0:
                    hops = 0
                else:
                    hops = hops - 1
            return self.shard_size(hops)

        # Maximum of 2 ^ 41 * 8 * 1024 * 1024
        if (accumulator > 41):
            return 0

            # return self.determine_shard_size(file_size, ++accumulator)
test_view.py 文件源码 项目:lookml-gen 作者: symphonyrm 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_dimension_group():
    testname = 'dimension_group_test'
    v = view.View(testname)
    v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1'))
    f = six.StringIO()
    v.generate_lookml(f, format_options=test_format_options)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
test_view.py 文件源码 项目:lookml-gen 作者: symphonyrm 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_dimension_group_no_timeframes():
    testname = 'dimension_group_no_timeframes_test'
    v = view.View(testname)
    v.add_field(field.DimensionGroup('dimension1', sql='${TABLE}.dim1'))
    f = six.StringIO()
    fo_omit_timeframes = base_generator.\
        GeneratorFormatOptions(warning_header_comment=None, omit_time_frames_if_not_set=True)
    v.generate_lookml(f, format_options=fo_omit_timeframes)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
test_view.py 文件源码 项目:lookml-gen 作者: symphonyrm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_newlines():
    testname = 'newlines_test'
    v = view.View(testname)
    for l in ['a', 'b', 'c', 'd']:
        v.add_field(field.Dimension(l, type='number'))
        v.add_field(field.Measure('sum_' + l, type='sum', sql='${{{0}}}'.format(l)))

    f = six.StringIO()
    v.generate_lookml(f, format_options=test_format_options)
    lookml = f.getvalue()
    six.print_(lookml)
    with open(os.path.join(os.path.dirname(__file__),
                           'expected_output/%s.lkml' % testname),
              'rt') as expected:
        assert lookml == expected.read()
test_tailchaser.py 文件源码 项目:tailchaser 作者: thanos 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cmp_file(src_file, dst_file):
    six.print_('testing: ', src_file, dst_file)
    assert (Tailer.file_opener(src_file, 'rb').read() == Tailer.file_opener(dst_file, 'rb').read())
test_tailchaser.py 文件源码 项目:tailchaser 作者: thanos 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def emit(self, count):
        six.print_(count)
test_tailchaser.py 文件源码 项目:tailchaser 作者: thanos 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def doRollover(self):
        time.sleep(self.ROLL_DELAY)
        self.rolls += 1
        six.print_('rolls', self.rolls)
        return super(RotatingWithDelayFileHandler, self).doRollover()


问题


面经


文章

微信
公众号

扫码关注公众号