python类embed()的实例源码

release.py 文件源码 项目:scibot 作者: SciCrunch 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def clean_dupes(get_annos, repr_issues=False):
    annos = get_annos()
    seen = set()
    dupes = [a.id for a in annos if a.id in seen or seen.add(a.id)]
    preunduped = [a for a in annos if a.id in dupes]
    for id_ in dupes:
        print('=====================')
        anns = sorted((a for a in annos if a.id == id_), key=lambda a: a.updated)
        if not repr_issues:
            [print(a.updated, HypothesisHelper(a, annos)) for a in anns]
        for a in anns[:-1]:  # all but latest
            annos.remove(a)
    deduped = [a for a in annos if a.id in dupes]
    assert len(preunduped) // len(dupes) == 2, 'Somehow you have managed to get more than 1 duplicate!'
    # get_annos.memoize_annos(annos)
    embed()
main.py 文件源码 项目:Video-Classification-Action-Recognition 作者: qijiezhao 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test(epoch):
    Net.eval()
    preditions=[]
    for iteration,(inputss,labelss) in enumerate(testing_data_loader,1):
        #embed()
        inputss=Variable(inputss.view(-1,3,training_size,training_size))
        if cuda:
            inputss=inputss.cuda()
        #embed()
        if model=='resnet101':
            prediction=Net.module.resnet101(inputss).cpu().data.numpy()
        elif model=='inception_v3':
            prediction=Net.module.inception_v3(inputss).cpu().data.numpy()
        elif model=='inception_v4':
            prediction=Net.module.inception_v4(inputss).cpu().data.numpy()
        prediction=prediction.mean(0).argmax()
        preditions.append(str(prediction))
        #print 'video num: ',iteration,' predition: ',str(prediction)
    with open('/S2/MI/zqj/video_classification/data/ucf101/tmp_result/{}result_{}_new_epoch'.format(save_prefix,model)+str(epoch)+'.txt','w')as fw:
        fw.write('\n'.join(preditions))
    str_out='python compute_test_result.py {}result_{}_new_epoch'.format(save_prefix,model)+str(epoch)+'.txt'
    os.system(str_out)
shell.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handle(self, **options):
        print('... starting jawaf shell ...')
        waf = Jawaf(settings.PROJECT_NAME)
        # Use IPython if it exists
        try:
            import IPython
            IPython.embed()
            return
        except ImportError:
            pass
        # Use bypython if it exists
        try:
            import bpython
            bpython.embed()
            return
        except ImportError:
            pass
        # Ok, just do the pumpkin spice python shell.
        import code
        code.interact(local=locals())
test_embed.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ipython_embed():
    """test that `IPython.embed()` works"""
    with NamedFileInTemporaryDirectory('file_with_embed.py') as f:
        f.write(_sample_embed)
        f.flush()
        f.close() # otherwise msft won't be able to read the file

        # run `python file_with_embed.py`
        cmd = [sys.executable, f.name]
        env = os.environ.copy()
        env['IPY_TEST_SIMPLE_PROMPT'] = '1'

        p = subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = p.communicate(_exit)
        std = out.decode('UTF-8')

        nt.assert_equal(p.returncode, 0)
        nt.assert_in('3 . 14', std)
        if os.name != 'nt':
            # TODO: Fix up our different stdout references, see issue gh-14
            nt.assert_in('IPython', std)
        nt.assert_in('bye!', std)
motion_editor.py 文件源码 项目:multi-contact-zmp 作者: stephane-caron 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def launch_ipython():
    print ""
    print "==================================================================="
    print ""
    print "                   Welcome to the motion editor!                   "
    print ""
    print "To quit, click on the 'Quit' button in the GUI."
    print ""
    print "From this shell, available function calls include:"
    print ""
    print "    plot_trajectories()"
    print ""
    print "==================================================================="
    IPython.embed()
    root.quit()
    time.sleep(0.2)
    os._exit(0)  # dirty but avoids hangs from IPython's atexit callback
command_line.py 文件源码 项目:yt 作者: yt-project 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __call__(self, args):
        if args.ds is None:
            print("Could not load file.")
            sys.exit()
        import yt.mods
        import yt

        import IPython

        local_ns = yt.mods.__dict__.copy()
        local_ns['ds'] = args.ds
        local_ns['pf'] = args.ds
        local_ns['yt'] = yt

        try:
            from traitlets.config.loader import Config
        except ImportError:
            from IPython.config.loader import Config
        import sys
        cfg = Config()
        # prepend sys.path with current working directory
        sys.path.insert(0,'')
        IPython.embed(config=cfg,user_ns=local_ns)
battle.py 文件源码 项目:pyAoEM 作者: kampffrosch94 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def go_interpreter():
    actors = _world.get_system_entities(game.TurnOrderSystem)

    # noinspection PyUnusedLocal
    def print_actors():
        import functools

        def concat(x, y):
            return x + "\n" + y

        print(str(functools.reduce(concat, map(str, actors))))

    import IPython
    IPython.embed()


# Activation
Model.py 文件源码 项目:NetworkCompress 作者: luzai 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_layers(self, name, next_layer=False, last_layer=False, type=None):
        if type is None:
            name2layer = {layer.name: layer for layer in self.model.layers}
        else:
            name2layer = {}
            for layer in self.model.layers:
                for t in type:
                    if t.lower() in layer.name.lower():
                        name2layer[layer.name] = layer
                        break
                        # name2layer = {layer.name: layer for layer in self.model.layers if type.lower() in layer.name.lower()}

        def _get_layer(name):
            return name2layer[name]

        nodes = self.graph.get_nodes(name, next_layer, last_layer, type=type)
        if not isinstance(nodes, list):
            nodes = [nodes]
        '''
        for node in nodes:
            if node.name not in name2layer:
                embed()
        '''
        return map(_get_layer, [node.name for node in nodes])
repl.py 文件源码 项目:iota.lib.py 作者: iotaledger 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _start_repl(api):
    # type: (Iota) -> None
    """
    Starts the REPL.
    """
    _banner = (
      'IOTA API client for {uri} ({testnet}) initialized as variable `api`.\n'
      'Type `help(api)` for list of API commands.'.format(
        testnet = 'testnet' if api.testnet else 'mainnet',
        uri     = api.adapter.get_uri(),
      )
    )

    try:
      # noinspection PyUnresolvedReferences
      import IPython
    except ImportError:
      # IPython not available; use regular Python REPL.
      from code import InteractiveConsole
      InteractiveConsole(locals={'api': api}).interact(_banner)
    else:
      # Launch IPython REPL.
      IPython.embed(header=_banner)
net.py 文件源码 项目:channel-pruning 作者: yihui-he 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def invBN(self, arr, Y_name):
        if isinstance(arr, int) or len(self.bns) == 0 or len(self.affines) == 0:
            return arr
        interstellar = Y_name.split('_')[0]
        for i in self.bottom_names[interstellar]:
            if i in self.bns and 'branch2c' in i:
                bn = i
                break
        for i in self.affines:
            if self.layer_bottom(i) == bn:
                affine = i
                break

        if 1: print('inverted bn', bn, affine, Y_name)
        mean, std, k, b = self.getBNaff(bn, affine)
        # (y - mean) / std * k + b
        #return (arr - b) * std / k + mean
        return arr * std / k
        #embed()
decompose.py 文件源码 项目:channel-pruning 作者: yihui-he 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def YYT(Y, n_components=None, DEBUG=False):
    """
    Param:
        Y: n x d
        n_components: use 'mle' to guess
    Returns:
        P: d x d'
        QT: d' x d
    """
    newdata = Y.copy()
    model = PCA(n_components=n_components)

    if len(newdata.shape) != 2:
        newdata = newdata.reshape((newdata.shape[0], -1))
    #TODO center data
    model.fit(newdata)
    if DEBUG: from IPython import embed; embed()

    return model.components_.T, model.components_

#def GSVD(Z, Y):
#    NotImplementedError
#    return [U,V,X,C,S]
shell.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def shell():
    import IPython
    from flask.globals import _app_ctx_stack
    app = _app_ctx_stack.top.app
    banner = 'Python %s on %s\nIPython: %s\nApp: %s%s\nInstance: %s\n' % (
        sys.version, sys.platform, IPython.__version__, app.import_name,
        app.debug and ' [debug]' or '', app.instance_path,
    )

    ctx = {}

    startup = os.environ.get('PYTHONSTARTUP')
    if startup and os.path.isfile(startup):
        with open(startup, 'rb') as f:
            eval(compile(f.read(), startup, 'exec'), ctx)

    ctx.update(app.make_shell_context())

    IPython.embed(banner1=banner, user_ns=ctx)
test_embed.py 文件源码 项目:Repobot 作者: Desgard 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_ipython_embed():
    """test that `IPython.embed()` works"""
    with NamedFileInTemporaryDirectory('file_with_embed.py') as f:
        f.write(_sample_embed)
        f.flush()
        f.close() # otherwise msft won't be able to read the file

        # run `python file_with_embed.py`
        cmd = [sys.executable, f.name]
        env = os.environ.copy()
        env['IPY_TEST_SIMPLE_PROMPT'] = '1'

        p = subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = p.communicate(_exit)
        std = out.decode('UTF-8')

        nt.assert_equal(p.returncode, 0)
        nt.assert_in('3 . 14', std)
        if os.name != 'nt':
            # TODO: Fix up our different stdout references, see issue gh-14
            nt.assert_in('IPython', std)
        nt.assert_in('bye!', std)
dkt_tf.py 文件源码 项目:student_simulator_policy 作者: kolchinski 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main(_):
    print('tf version', tf.__version__)

    topics, answers, num_topics = read_assistments_data(DATA_LOC)
    full_data = load_data(topics, answers, num_topics)

    model = DKTModel(num_topics, HIDDEN_SIZE, MAX_LENGTH)


    with tf.Session() as session:
        session.run(tf.global_variables_initializer())
        #We need to explicitly initialize local variables to use
        #TensorFlow's AUC function for some reason...
        session.run(tf.local_variables_initializer())
        train_model(model, session, full_data)
        #model1, model2 = train_paired_models(session, full_data, num_topics)
        #test_paired_models(session, full_data, model1, model2)
        #embed()
actor2.py 文件源码 项目:student_simulator_policy 作者: kolchinski 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main(_):
    print "Testing actor"
    topics, answers, masks, seq_lens, rewards = fake_sequences(20000, 3)
    actor = Actor(3, HIDDEN_SIZE, MAX_LENGTH)
    #embed()


    with tf.Session() as session:
        session.run(tf.global_variables_initializer())
        #print topics
        for i in range(200):
            s,e = 100*i, 100*(i+1)
            obj = actor.train_on_batch(session, rewards[s:e], seq_lens[s:e], masks[s:e], answers[s:e], topics[s:e])
            print obj
        actions = actor.test_on_batch(session, rewards[s:e], seq_lens[s:e], masks[s:e], answers[s:e], topics[s:e])
        actionsArray = np.array(actions[0])
        zerosByTime = np.sum(actionsArray == 0, axis=0)
        onesByTime = np.sum(actionsArray == 1, axis=0)
        twosByTime =  np.sum(actionsArray == 2, axis=0)
        avgZeroPos = np.sum(np.arange(50) * zerosByTime[:50]) / np.sum(zerosByTime[:50])
        avgOnePos = np.sum(np.arange(50) * onesByTime[:50]) / np.sum(onesByTime[:50])
        avgTwoPos = np.sum(np.arange(50) * twosByTime[:50]) / np.sum(twosByTime[:50])
        print avgZeroPos, avgOnePos, avgTwoPos
        embed()
        #print actions
main.py 文件源码 项目:tango-simlib 作者: ska-sa 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def simulator_main(sim_class, sim_control_class=TangoTestDeviceServerBase):
    """Main function for a simulator with class sim_class

    sim_class is a tango.server.Device subclass

    """
    run_ipython = '--ipython' in sys.argv
    if run_ipython:
        import IPython
        sys.argv.remove('--ipython')
        def start_ipython(sim_class):
            IPython.embed()
        t = threading.Thread(target=start_ipython, args=(sim_class,))
        t.setDaemon(True)
        t.start()

    logging.basicConfig(
        format='%(asctime)s - %(name)s - %(levelname)s - %(module)s - '
        '%(pathname)s : %(lineno)d - %(message)s',
        level=logging.INFO)

    classes = [sim_class]
    if sim_control_class:
        classes.append(sim_control_class)
    server_run(classes)
starting_pitcher.py 文件源码 项目:sportsball 作者: jgershen 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parse_game_row(game_row):
  'NYM - N. Syndergaard (4-5, 3.05)'
  home_cell = game_row.find('td', {'class': 'shsNamD shsProbHome'})
  away_cell = game_row.find('td', {'class': 'shsNamD shsProbAway'})
  home_match = pitcher_re.search(home_cell.text)
  away_match = pitcher_re.search(away_cell.text)
  if not home_match or not away_match:
    print 'Problem with RE matching!'
    import IPython
    IPython.embed()
  home_groups = home_match.groupdict()
  away_groups = away_match.groupdict()
  home_team = fix_team(home_groups['team'])
  away_team = fix_team(away_groups['team'])
  # Return teams, projected starters, @ if away, and opponents
  team_rows = [[home_team, home_groups['starter'], None, away_team],
               [away_team, away_groups['starter'], '@', home_team]]
  return team_rows
mlb_scraper.py 文件源码 项目:sportsball 作者: jgershen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_stats_tables_from_history_page(url):
  """Load all the prediction tables from a Numberfire history page"""
  soup = getSoupFromURL(url)
  #salary = load_player_salary_table(soup)
  projection_months = ['%s-schedule' % month for month in
                       ['March', 'April', 'May', 'June', 'July', 'August', 'September', 'October']]
  month_tables = []
  for month in projection_months:
    month_schedule = soup.find('div', attrs={'id': month})
    month_table = load_player_history_table(month_schedule)
    if month_table is not None:
      month_tables.append(month_table)
  if month_tables:
    all_predictions = pandas.concat(month_tables)
    all_predictions.sort_index(inplace=True)
    if all_predictions.index.duplicated().any():
      print 'Duplicate games scraped!'
      import IPython
      IPython.embed()
  else:
    all_predictions = None
  return all_predictions
fanduel_parsing.py 文件源码 项目:sportsball 作者: jgershen 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_nba_player_list(page_html):
  import IPython
  # This... doesn't really work. Leaving it to maybe-update in the future.
  soup = BeautifulSoup(page_html)
  player_rows = soup.findAll('tr', {'class': 'vs-repeat-repeated-element'})
  print len(player_rows)
  for i, row in enumerate(player_rows):
    print i
    player_pos = row.find('td', {'class': 'player-position'}).text.strip()
    player_name = row.find('span', {'class': 'player-first-name'}).text.strip() + ' ' + row.find('span', {'class': 'player-last-name'}).text.strip()
    flag_span = row.find('player')
    injured = row.findAll('abbr', {'class': 'player-badge'})
    if injured:
      injury_status = injured[0].attr('data-injury-status')
    else:
      injury_status = 'OK'
    print player_pos, player_name, injury_status
  IPython.embed()
test_embed.py 文件源码 项目:blender 作者: gastrodia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_ipython_embed():
    """test that `IPython.embed()` works"""
    with NamedFileInTemporaryDirectory('file_with_embed.py') as f:
        f.write(_sample_embed)
        f.flush()
        f.close() # otherwise msft won't be able to read the file

        # run `python file_with_embed.py`
        cmd = [sys.executable, f.name]
        env = os.environ.copy()
        env['IPY_TEST_SIMPLE_PROMPT'] = '1'

        p = subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = p.communicate(_exit)
        std = out.decode('UTF-8')

        nt.assert_equal(p.returncode, 0)
        nt.assert_in('3 . 14', std)
        if os.name != 'nt':
            # TODO: Fix up our different stdout references, see issue gh-14
            nt.assert_in('IPython', std)
        nt.assert_in('bye!', std)
test_embed.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_ipython_embed():
    """test that `IPython.embed()` works"""
    with NamedFileInTemporaryDirectory('file_with_embed.py') as f:
        f.write(_sample_embed)
        f.flush()
        f.close() # otherwise msft won't be able to read the file

        # run `python file_with_embed.py`
        cmd = [sys.executable, f.name]
        env = os.environ.copy()
        env['IPY_TEST_SIMPLE_PROMPT'] = '1'

        p = subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = p.communicate(_exit)
        std = out.decode('UTF-8')

        nt.assert_equal(p.returncode, 0)
        nt.assert_in('3 . 14', std)
        if os.name != 'nt':
            # TODO: Fix up our different stdout references, see issue gh-14
            nt.assert_in('IPython', std)
        nt.assert_in('bye!', std)
release.py 文件源码 项目:scibot 作者: SciCrunch 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, anno, annos):
        super().__init__(anno, annos)
        if self._done_loading:
            if self._done_all:
                print('WARNING you ether have a duplicate annotation or your annotations are not sorted by updated.')
            self._fetch_xmls(os.path.expanduser('~/ni/scibot/scibot_rrid_xml.pickle'))
                #print(HypothesisHelper(anno, annos))
                #embed()
                #raise BaseException('WHY ARE YOU GETTING CALLED MULTIPLE TIMES?')
            #self._do_papers()
release.py 文件源码 项目:scibot 作者: SciCrunch 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def main():
    from desc.prof import profile_me

    # clean updated annos
    #clean_dupes(get_annos, repr_issues=True)
    #return

    # fetching
    annos = get_annos()
    #_annos = annos
    #annos = [a for a in annos if a.updated > '2017-10-15']

    # loading
    #@profile_me
    #def load():
        #for a in annos:
            #rrcu(a, annos)
    #load()
    #rc = list(rrcu.objects.values())
    rc = [rrcu(a, annos) for a in annos]

    # id all the things
    #from joblib import Parallel, delayed
    #id_annos = []
    #for purl in rrcu._papers:
        #resp = idPaper(purl)
        #id_annos.append(resp)
    #id_annos = Parallel(n_jobs=5)(delayed(idPaper)(url)
                                  #for url in sorted(rrcu._papers))
    #embed()
    #return

    # sanity checks
    #print('repr everything')
    #_ = [repr(r) for r in rc]  # exorcise the spirits  (this is the slow bit, joblib breaks...)
    try:
        stats = sanity_and_stats(rc, annos)
    except AssertionError as e:
        print(e)
        embed()
compute_test_result.py 文件源码 项目:Video-Classification-Action-Recognition 作者: qijiezhao 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_confusion(gts,pres):
    confusion_mat=np.zeros([101,101])
    len_items=len(gts)
    for i in range(len_items):
        confusion_mat[gts[i]][pres[i]]+=1
    for i in range(101):
        confusion_mat[i,:]/=sum(confusion_mat[i,:])
    #embed()
    return confusion_mat
models.py 文件源码 项目:Video-Classification-Action-Recognition 作者: qijiezhao 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def forward(self,x):
        #minibatch_size,seq_len,feature_dic=x.size()
        #embed()
        out,hidden_n=self.lstm(x,self.hidden_data)
        #embed()
        return self.fc(hidden_n[0][1,:,:])
dataset.py 文件源码 项目:Video-Classification-Action-Recognition 作者: qijiezhao 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def transform_rgb_test(self,img,train_size):
        mother_img=img # do not rescale in the testing process
        mother_w,mother_h=mother_img.size
        crop_ix=np.zeros([5,4],dtype=np.int16)
        w_indices=(0,mother_w-train_size)
        h_indices=(0,mother_h-train_size)
        w_center=(mother_w-train_size)/2
        h_center=(mother_h-train_size)/2
        crop_ix[4,:]=[w_center,h_center,train_size+w_center,train_size+h_center]
        cnt=0
        for i in w_indices:
            for j in h_indices:
                crop_ix[cnt,:]=[i,j,i+train_size,j+train_size]
                cnt+=1
        crop_ix=np.tile(crop_ix,(2,1))
        img_return=np.zeros([10,3,train_size,train_size])
        for i in range(10):
            cp=crop_ix[i]
            #embed()
            img_return[i]=np.array(mother_img.crop(cp),dtype=np.float32).transpose([2,0,1])  # transform w*h*channel to channel*w*h
        img_return[5:,:,:,:]=img_return[5:,:,:,::-1] #flipping
        img_return[:,:,:,:]=img_return[:,::-1,:,:]   #transform the RGB to BGR type
        img_return[:,0,:,:]-=104
        img_return[:,1,:,:]-=116
        img_return[:,2,:,:]-=122
            #embed()
        return img_return
createScriptHelper.py 文件源码 项目:r4ge 作者: gast04 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def printSolution( r2proj ):

    # 0=offset, 1=size, 2=name
    symb_variables = getSymbolicMemoryRegions( r2proj )

    content = '''
# print soltion if we found a path
if len(pg.found) > 0:
    state_found = pg.found[0].state
    print "found the target!"
    '''

    for variable in symb_variables:
        tmp = '''
    concrete_memory = state_found.memory.load({0}, {1}) # {2}
    print state_found.se.any_str(concrete_memory)'''.format(hex(variable[0]), variable[1], variable[2])
        content += tmp

    if len(symb_variables) == 0: # -> check for static mode
        content += "IPython.embed()"

    content += '''
else:
    print "start IPython shell"
    print "Variables: state_found, start_state, pg, proj"
    IPython.embed()
    '''

    return content
worker.py 文件源码 项目:hierarchical-attention-networks 作者: ematvey 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def evaluate(dataset):
  tf.reset_default_graph()
  config = tf.ConfigProto(allow_soft_placement=True)
  with tf.Session(config=config) as s:
    model, _ = model_fn(s, restore_only=True)
    df = ev(s, model, dataset)
  print((df['predictions'] == df['labels']).mean())
  import IPython
  IPython.embed()
embed.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def exit_raise(self, parameter_s=''):
        """%exit_raise Make the current embedded kernel exit and raise and exception.

        This function sets an internal flag so that an embedded IPython will
        raise a `IPython.terminal.embed.KillEmbeded` Exception on exit, and then exit the current I. This is
        useful to permanently exit a loop that create IPython embed instance.
        """

        self.shell.should_raise = True
        self.shell.ask_exit()
node.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main():
    if '-v' in sys.argv:
        loglevel = logging.DEBUG
    else:
        loglevel = logging.INFO

    data_dir = bqueryd.DEFAULT_DATA_DIR
    for arg in sys.argv:
        if arg.startswith('--data_dir='):
            data_dir = arg[11:]

    if 'controller' in sys.argv:
        bqueryd.ControllerNode(redis_url=redis_url, loglevel=loglevel).go()
    elif 'worker' in sys.argv:
        bqueryd.WorkerNode(redis_url=redis_url, loglevel=loglevel, data_dir=data_dir).go()
    elif 'downloader' in sys.argv:
        bqueryd.DownloaderNode(redis_url=redis_url, loglevel=loglevel).go()
    elif 'movebcolz' in sys.argv:
        bqueryd.MoveBcolzNode(redis_url=redis_url, loglevel=loglevel).go()
    else:
        if len(sys.argv) > 1 and sys.argv[1].startswith('tcp:'):
            rpc = bqueryd.RPC(address=sys.argv[1], redis_url=redis_url, loglevel=loglevel)
        else:
            rpc = bqueryd.RPC(redis_url=redis_url, loglevel=loglevel)
        import IPython
        IPython.embed()


问题


面经


文章

微信
公众号

扫码关注公众号