python类Database()的实例源码

app.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, config):
        self.service = None
        self.webServer = None
        self.config = config
        self.httpsPort = int(self.config.get('web', 'httpsPort'))
        self.httpPort = int(self.config.get('web', 'httpPort'))        
        self.adminPasswordHash = self.config.get('web', 'adminPasswordHash')
        self.apiSecret = self.config.get('web', 'apiSecret')
        self.uploadDir = self.config.get('web', 'uploadDir')
        self.dbFile = self.config.get('web', 'dbFile')
        self.httpsCertFile = self.config.get('web', 'httpsCertFile')
        self.httpsKeyFile = self.config.get('web', 'httpsKeyFile')
        self.httpsChainFile = self.config.get('web', 'httpsChainFile')
        self.localVideoPort = int(self.config.get('web', 'localVideoPort'))
        dir = os.path.dirname(os.path.realpath(sys.argv[0]))        
        self.database = database.Database(self.dbFile)
        self.deviceConfig = dict()
        for deviceId, jsonConf in dict(self.config.items('devices')).iteritems():
            self.deviceConfig[deviceId] = json.loads(jsonConf, object_pairs_hook=OrderedDict)
        self.trends = dict()
        self.lock = threading.Lock()
bandit.py 文件源码 项目:bandit-http-server 作者: evancasey 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, request):

        arms = {}
        for i in range(request.json['arm_count']):          
            arm = Arm()
            arms[arm.id] = arm.__dict__

        self.id = self.bandit_keys()
        self.name = request.json['name']
        self.arm_count = request.json['arm_count']
        self.arms = arms
        self.algo_type = request.json['algo_type']
        self.budget_type = request.json['budget_type']
        self.budget = request.json['budget']
        self.epsilon = request.json['epsilon']
        self.reward_type = request.json['reward_type']
        self.max_reward = 1
        self.total_reward = 0
        self.total_count = 0
        self.regret = 0         

        Database().hset("bandits", self.id, self.__dict__)
prices.py 文件源码 项目:Enibar 作者: ENIB 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set_multiple_values(prices):
    """ Update all prices with their new values
    Note: this function should be prefered when dealing with multiple prices at
    once for performance purposes.

    :param list prices: List of prices
    """
    with Database() as database:
        database.transaction()
        cursor = QtSql.QSqlQuery(database)
        cursor.prepare("UPDATE prices SET value=:value WHERE id=:id")
        for price in prices:
            cursor.bindValue(":id", price['id'])
            cursor.bindValue(":value", price['value'])
            cursor.exec_()

        database.commit()
        return True
notes.py 文件源码 项目:Enibar 作者: ENIB 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def transactions(notes, diff, *, do_not=False):
    """ Change the note on multiple notes

        :param str nickname: The nickname of the note
        :param float diff: Will add the diff to the note.

        :return bool: True if success else False
    """
    with Database() as database:
        database.transaction()
        cursor = QtSql.QSqlQuery(database)
        cursor.prepare("UPDATE notes SET note=note+:diff WHERE nickname=:nick")
        for nick in notes:
            cursor.bindValue(':nick', nick)
            cursor.bindValue(':diff', diff)
            cursor.exec_()
        value = database.commit()
    if not do_not:
        api.redis.send_message("enibar-notes", notes)
    return value
Database.py 文件源码 项目:aria 作者: mattmaynes2 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):

        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass

        self.database    = Database(self._testMethodName + ".db")
        self.hub         = Hub()
        self.hub.mode    = HubMode.Learning
        self.cli         = CLI(self.hub)
        self.exchange    = Exchange(self.hub, self.cli, self.database)
        self.hub.addCommand(GetEventWindowCommand(self.database))
        self.hub.addCommand(GetDeviceEventsCommand(self.database))
        self.testAdapter = StubDeviceAdapter()
        self.exchange.register('stub', self.testAdapter)
        self.exchange.register('hub',HubAdapter(self.hub))
        self.exchange.discovered(self.hub)
        self.registerFakeDevices()
        self.exchange.start()

        self.db = TestDatabase(self._testMethodName + ".db")
test_Training.py 文件源码 项目:aria 作者: mattmaynes2 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass

        self.database    = Database(self._testMethodName + ".db")
        self.hub         = Hub()
        self.hub.mode    = HubMode.Learning
        self.cli         = CLI(self.hub)
        self.exchange    = Exchange(self.hub, self.cli, self.database)
        self.registerAdapters()
        self.testAdapter = StubDeviceAdapter()
        self.exchange.register('stub', self.testAdapter)
        self.exchange.register('hub',HubAdapter(self.hub))
        self.exchange.discovered(self.hub)
        self.exchange.start()
        self.db = TestDatabase(self._testMethodName + ".db")
        self.exchange.discovered(self.device)
__main__.py 文件源码 项目:aria 作者: mattmaynes2 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main ():
    global hub, cli, exchange
    argv = args.parse()
    if argv.daemonize:
        daemon.daemonize()

    pid = os.getpid()
    signal.signal(signal.SIGTERM, signalHandler)

    database    = Database()
    hub         = Hub(argv, exit)
    cli         = CLI(hub)
    setupCommands(hub,database)
    exchange    = create_exchange(hub, cli, database)
    exchange.discovered(hub)

    cli.start()
    exchange.start()
default_regions.py 文件源码 项目:BlitzcrankBotV2 作者: SuperFrosty 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def view(self, ctx):
        """View your server's default region"""
        title = "Default Region for {0}:".format(ctx.guild.name)
        try:
            db = database.Database('guilds.db')
            region = db.find_entry(ctx.guild.id)
            db.close_connection()
            embed = discord.Embed(
                title=title, description=region, colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            embed = discord.Embed(
                title=title,
                description="A default region for this server has not been set!",
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
default_regions.py 文件源码 项目:BlitzcrankBotV2 作者: SuperFrosty 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def remove(self, ctx):
        """Remove your server's default region"""
        db = database.Database('guilds.db')
        try:
            db.find_entry(ctx.guild.id)
            db.remove_entry(ctx.guild.id)
            db.close_connection()
            embed = discord.Embed(
                title='Success!',
                description="Default region for {0} has been cleared!".format(
                    ctx.guild.name),
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            db.close_connection()
            embed = discord.Embed(
                title="Error!",
                description="A default region for this server has not been set!",
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
general_utilities.py 文件源码 项目:BlitzcrankBotV2 作者: SuperFrosty 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def no_region_check(self, ctx, region):
        """Returns server's default region if none is set"""
        if region is None:
            try:
                db = database.Database('guilds.db')
                region = db.find_entry(ctx.guild.id)
                db.close_connection()
                return region
            except TypeError:
                embed = self.error_embed(
                    ctx, "Please specify a region, or set a default region with `b!region set [region]`.")
                await ctx.send("", embed=embed)
                return
        else:
            return region

# Base finished
scraper.py 文件源码 项目:porn_sieve 作者: PornSieve 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        db = Database()
        site = site_selector(self.site)
        for i in range(self.start_pg, self.start_pg + self.max_pgs + 1):
            gal_url = site.fmt_gallery(self.niche, i)
            for j, (vid_url, img_url) in enumerate(site.scrape_gallery(gal_url)):
                # update progress bar
                progress = (i * 20 + j) / ((self.max_pgs+1) * 20) * 100
                self.updateProgress.emit(progress)

                data = site.scrape_video(vid_url)
                if not data:
                    continue
                data["img"] = img_url
                if not db.has_feedback(data["url"]):
                    db.save(data)
                    with self.winlock:
                        self.q.put((-self.pred.predict(data), data["url"]))
        self.updateProgress.emit(100)
spotify.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def createPlaylist(self, token):
        #db = Database()
        #token = db.getEventSpotifyToken(eventID)
        #use GET command to get user info
        req = requests.get("https://api.spotify.com/v1/me", headers={"Authorization":'Bearer ' + token})
        j = json.loads(req.text)
        userID = j['id'] 
        #print("userID === " + userID)
        playlist_name = 'Chorus'
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        sp.user_playlist_create(userID, playlist_name)
        playlists = sp.user_playlists(userID, limit=50, offset=0)

        for playlist in playlists['items']:
            if(playlist['name'] == "Chorus"):
                playlist_id = playlist['id']

        headers={"Authorization":'Bearer ' + token}
        requests.put('https://api.spotify.com/v1/me/player/shuffle?state=false',headers={"Authorization":'Bearer ' + token})

        data = []
        data.append(userID)
        data.append(playlist_id)
        return data
spotify.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def addSongs(self, eventID):
        db = Database()
        trackID = db.getTopSong(eventID)
        spotify = Spotify()
        token = db.getEventSpotifyToken(eventID)
        if(trackID == -1):
            trackID = spotify.recommend_fallback(eventID)

        username = db.getHostSpotifyUserName(eventID)
        playlist_id = db.getPlaylistID(eventID)
        if(trackID == -1): print("NO TOP VOTED")
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        print("100%")
        sp.user_playlist_add_tracks(username, str(playlist_id), {str(trackID)})
        print("NOT 100%")
spotify.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def addTwo(self, eventID):
        db = Database()
        token = db.getEventSpotifyToken(eventID)
        username = db.getHostSpotifyUserName(eventID)
        playlist_id = db.getPlaylistID(eventID)
        #print("--------")
        #print(username)
        #print(playlist_id)
        #print("--------")
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        #ERROR: int has no attribute split
        sp.user_playlist_add_tracks(username, str(playlist_id), {'0KKkJNfGyhkQ5aFogxQAPU', '6b8Be6ljOzmkOmFslEb23P'})
        #, '0KKkJNfGyhkQ5aFogxQAPU'
        db.updateCurrentSong('0KKkJNfGyhkQ5aFogxQAPU', eventID)
        db.insertSong('0KKkJNfGyhkQ5aFogxQAPU', eventID, "0", "Thats What I Like", "Bruno Mars", "0", "0", "0")
        db.insertSong('6b8Be6ljOzmkOmFslEb23P', eventID, "0", "24K Magic", "Bruno Mars", "0", "0", "0")
spotify.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def recommend_fallback(self, eventID): 
        db = Database()
        token = db.getEventSpotifyToken(eventID)
        playlist_id = db.getPlaylistID(eventID)
        username = db.getHostID(eventID)
        track_id = db.getCurrentPlayingSong(eventID)
        headers={"Authorization":'Bearer ' + token}
        count = 0
        sp = spotipy.Spotify(auth=token)
        sp.trace = False
        req = requests.get('https://api.spotify.com/v1/recommendations?seed_tracks=' + track_id, headers={"Authorization":"Bearer "+str(token)})
        #print(req.content)
        json_obj = json.loads(req.text)
        for i in json_obj['tracks']:
            if(count < 1):
                return i
#addSongs(token, i, playlist_id, username)
                count = count + 1
            break
server.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def CreateUser(currentEvent, inEvent, isHost):
    try:
        # Parse the arguments
        '''
        parser = reqparse.RequestParser()
        parser.add_argument('currentEvent', type=str)
        parser.add_argument('inEvent', type=str)
        parser.add_argument('host', type=str)
        args = parser.parse_args()
        '''     


        db = Database()
        userID = db.insertUser(currentEvent, inEvent, isHost)

        return userID

    except Exception as e:
        return str(e)

#class CreateHost(Resource):
server.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self):
        try:
            # Parse the arguments
            parser = reqparse.RequestParser()
            parser.add_argument('userid', type=str, help='ID of User that is sending vote')
            parser.add_argument('eventid', type=str, help='ID of event user is in')
            args = parser.parse_args()

            _userID = args['userid']
            _eventID = args['eventid']

            db = Database()
            songs = db.getQueue(_eventID, _userID)

            return json.dumps({'songs': songs})

        except Exception as e:
            return {'error': str(e)}
server.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post(self):
        try:
            # Parse the arguments                                                                                               
            parser = reqparse.RequestParser()
            parser.add_argument('userid', type=str, help='ID of User that is sending vote')
            parser.add_argument('eventid', type=str, help='ID of event user is in')
            args = parser.parse_args()

            _userID = args['userid']           
            _eventID = args['eventid']

            db = Database()
            print(_eventID)
            print(_userID)
            songs = db.getPlayedSongs(_eventID, _userID)

            return json.dumps({'songs': songs})

        except Exception as e:
            return {'error': str(e)}
server.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post(self):
        try:
            # Parse the arguments
            parser = reqparse.RequestParser()
            parser.add_argument('password', type=str, help='Password to create user')
            args = parser.parse_args()

            _eventPassword = args['password']

            #print(_eventPassword)
            db = Database()
            #print("name :" +_eventPassword)

            eventID = db.getEventid(_eventPassword)
            userID  = CreateUser(eventID, "0", "0")

            return json.dumps({'eventID': eventID, 'userID': userID})

        except Exception as e:
            return {'error': str(e)}
server.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def authtarget():
    sp  = Spotify()
    db = Database()
    resultList = []
    while True:
        resultList = db.getAllEventID()
        time.sleep(10)
        for i in resultList:
            t = threading.Thread(target = authtarget)
            t.daemon = True
            t.start()
            sp.timer(i) 

#define API endpoints
#api.add_resource(CreateUser, '/CreateUser')
#api.add_resource(CreateHost, '/CreateHost')
graph_module.py 文件源码 项目:RobotNSGA 作者: LuisLaraP 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main(args):
    '''Graph module's main function'''
    abs_path = ''
    if args.database is None:
        abs_path = os.getcwd()
    else:
        abs_path = os.path.abspath(args.database)
    if not database.is_database(abs_path, verbose=True):
        sys.exit()

    db = database.Database(abs_path)

    for graph_type in args.types:
        if graph_type == 'fitness':
            graph.fitness_graph(db, args.objectives)
        elif graph_type == 'pareto':
            graph.pareto_graph(db, args.objectives)
utils.py 文件源码 项目:RobotNSGA 作者: LuisLaraP 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def initialize_database(args, default_dir):
    '''Performs the necessary checks and initialization procedures to initialize the database

    Returns the database to be used.
    '''
    if args.database is None:
        args.database = default_dir
    ret_db = Database(args.database)
    if args.reset:
        ret_db.reset()
    if ret_db.properties['highest_population'] == 0:
        if args.size is None:
            print('ERROR: Population size must be specified when starting a new run.')
            sys.exit()
        ret_db.set_property('population_size', args.size)
    else:
        ret_db.select()
    return ret_db
ev3_problem.py 文件源码 项目:RobotNSGA 作者: LuisLaraP 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test(args):
    '''Runs a single test, using a prevously generated individual'''
    global database
    global goal_positions
    pygame.init()
    random.seed()
    np.random.seed()
    if args.database is None:
        args.database = 'RobotTrainingData'
    database = Database(args.database)
    problem = EV3Problem(log_to_file=False)

    res_path = os.path.abspath(pkg_resources.resource_filename('resources.ev3', 'y_test.txt'))
    goal_positions = np.loadtxt(res_path)

    database.select(args.generation)
    chromosome = np.fromstring(database.load()['I' + str(args.individual)]).tolist()
    problem.robot.home()
    for goal in goal_positions:
        problem.run_test(goal, chromosome)
ntp.py 文件源码 项目:Mk3-Firmware 作者: emfcamp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def set_NTP_time():
    import time
    from pyb import RTC
    print("Setting time from NTP")

    t = get_NTP_time()
    if t is None:
        print("Could not set time from NTP")
        return False

    tz = 0
    with database.Database() as db:
        tz = db.get("timezone", 0)

    tz_minutes = int(abs(tz) % 100) * (1 if tz >= 0 else -1)
    tz_hours = int(tz / 100)
    t += (tz_hours * 3600) + (tz_minutes * 60)

    tm = time.localtime(t)
    tm = tm[0:3] + (0,) + tm[3:6] + (0,)

    rtc = RTC()
    rtc.init()
    rtc.datetime(tm)

    return True
bandit.py 文件源码 项目:bandit-http-server 作者: evancasey 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def bandit_keys(self):
        return Database().hincrby("unique_ids", "bandit")
bandit.py 文件源码 项目:bandit-http-server 作者: evancasey 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_bandit(self, id):
        return eval(Database().hget("bandits", id))
bandit.py 文件源码 项目:bandit-http-server 作者: evancasey 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set_bandit(self, id, bandit):
        Database().hset("bandits", id, bandit)
bandit.py 文件源码 项目:bandit-http-server 作者: evancasey 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_bandit(self,id):
        if Database().hexists("bandits",id):
            return True
        else:
            return False
arm.py 文件源码 项目:bandit-http-server 作者: evancasey 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def arm_keys(self):     
        return Database().hincrby("unique_ids", "arm")
harvest_public_document.py 文件源码 项目:Belati 作者: aancw 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        self.db = Database()
        self.project_id = 0


问题


面经


文章

微信
公众号

扫码关注公众号