python类Database()的实例源码

container.py 文件源码 项目:CloudKitPy 作者: Baza207 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(
        self,
        container_identifier,
        environment,
        apns_environment=None,
        api_token=None,
        server_to_server_key=None,
        cert_path=None,
        logger=None
    ):
        self.__logger = logger

        self.container_identifier = container_identifier
        self.environment = environment
        self.apns_environment = apns_environment
        self.api_token = api_token
        self.server_to_server_key = server_to_server_key
        self.cert_path = cert_path

        # Setup public and private cloud databases
        self.public_cloud_database = Database(
            self,
            'public',
            self.__logger
        )
        self.private_cloud_database = Database(
            self,
            'private',
            self.__logger
        )

    # Discovering Users
prices.py 文件源码 项目:Enibar 作者: ENIB 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_descriptor(name, category, quantity):
    """ Add price descriptor

    :param str name: Price name
    :param int category: category id
    :param int quantity: The quantity (in mL) of the product
    :return int: Return price id created or None
    """
    with Database() as database:
        database.transaction()
        cursor = QtSql.QSqlQuery(database)
        cursor.prepare("INSERT INTO price_description(label, category, quantity) \
            VALUES(:label, :category, :quantity)")
        cursor.bindValue(":label", name)
        cursor.bindValue(":category", category)
        cursor.bindValue(":quantity", quantity)
        if cursor.exec_():
            desc_id = cursor.lastInsertId()
            cursor.prepare("SELECT id FROM products WHERE category=:category")
            cursor.bindValue(":category", category)
            if cursor.exec_():
                while cursor.next():
                    pcurser = QtSql.QSqlQuery(database)
                    pcurser.prepare("INSERT INTO prices(price_description, \
                            product, value) VALUES(:desc, :product, :value)")
                    pcurser.bindValue(":desc", desc_id)
                    pcurser.bindValue(":product", cursor.value('id'))
                    pcurser.bindValue(":value", 0.00)
                    pcurser.exec_()
            database.commit()
            return desc_id

        # Something went wrong
        database.rollback()
        return None
index.py 文件源码 项目:CMS 作者: Vinlagrenouille 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_db():
    db = getattr(g, '_database', None)
    if db is None:
        g._database = Database()
    return g._database
ap.py 文件源码 项目:wifi-probe-hack 作者: kings-way 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, mon):
        self.mon = mon
        self.stations = []
        self.ssids = []
        self.db = Database()
        self.airbase_process = Process(target=self.launch_airbase)
        self.airbase_process.daemon = True
        self.airbase_process.start()

    # deduplicate the ssids
test_retreiver.py 文件源码 项目:aria 作者: mattmaynes2 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass
        self.database    = Database(self._testMethodName + ".db")
        self.testDatabase  =TestDatabase(self._testMethodName + ".db") 
        self.retriever = Retriever(self.database)
test_set_behaviour_command.py 文件源码 项目:aria 作者: mattmaynes2 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass
        self.database    = Database(self._testMethodName + ".db")
test_get_sessions_command.py 文件源码 项目:aria 作者: mattmaynes2 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        try:
            os.remove(self._testMethodName + ".db")
        except OSError as e:
            pass
        self.database    = Database(self._testMethodName + ".db")
simulation.py 文件源码 项目:multichain-walker-simulation 作者: pimveldhuisen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, max_time, log_dir, verbose, persistent_walking, directed_walking, block_limit, alpha,
                 teleport_probability):
        self.max_time = max_time
        self.bootstrap = Node(0, self)
        self.nodes = []
        self.event_queue = []
        self.time = 0
        self.block_file = os.path.join(log_dir, 'blocks.dat')
        self.load_balance_file = os.path.join(log_dir, 'load.dat')
        self.ranking_deviation_file = os.path.join(log_dir, 'ranking.dat')
        self.verbose = verbose
        self.persistent_walking = persistent_walking
        self.directed_walking = directed_walking
        self.block_limit = block_limit
        self.alpha = alpha
        self.teleport_probability = teleport_probability
        self.last_progress_print = None

        print "Reading multichain database.."
        database = Database("multichain.db", self.block_limit)
        public_keys = database.get_identities()
        for public_key in public_keys:
            node = Node(public_key, self, self.persistent_walking, self.directed_walking, self.alpha,
                        self.teleport_probability)
            node.add_blocks(database.get_blocks(public_key))
            node.receive_identity(self.bootstrap)
            node.send_identity(self.bootstrap)
            self.nodes.append(node)
            self.add_event(Simulation.initialisation_delay(), node.take_walk_step)
            self.add_event(0, node.update_ranking)

        print "Calculating rankings.."
        # Here rankings are calculated based on the full database, not the individual databases of the nodes
        self.rankings = {}
        for public_key in public_keys:
            self.rankings[str(public_key)] = get_ranking(database, public_key)


        print "Scheduling data gathering.."
        self.log_data_times = range(self.max_time, -60000, -60000)
        print self.log_data_times
test_ranking_similiarity.py 文件源码 项目:multichain-walker-simulation 作者: pimveldhuisen 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_ranking_integrated(self):

        global_database = Database("../multichain.db", 100)
        personal_database = Database("temp.db", None)
        public_key = random.choice(global_database.get_identities())
        personal_database.add_blocks(global_database.get_blocks(public_key))
        global_ranking = get_ranking(global_database, public_key)
        personal_ranking = get_ranking(personal_database, public_key)

        ranking_similarity = calculate_ranking_similarity(personal_ranking, global_ranking)
        assert ranking_similarity <= 1
        assert ranking_similarity >= 0
test_ranking_similiarity.py 文件源码 项目:multichain-walker-simulation 作者: pimveldhuisen 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_ranking_integrated_full_database(self):

        global_database = Database("../multichain.db", 100)
        personal_database = Database("temp.db", None)
        public_key = random.choice(global_database.get_identities())
        for public_key in global_database.get_identities():
            personal_database.add_blocks(global_database.get_blocks(public_key))
        global_ranking = get_ranking(global_database, public_key)
        personal_ranking = get_ranking(personal_database, public_key)

        ranking_similarity = calculate_ranking_similarity(personal_ranking, global_ranking)

        self.assertEqual(ranking_similarity, 1)
node.py 文件源码 项目:multichain-walker-simulation 作者: pimveldhuisen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, public_key, simulation, persistent_walking=False, directed_walking=False, alpha=0.1,
                 teleport_probability=0.5):
        self.public_key = public_key
        self.simulation = simulation
        self.live_edges = []
        self.node_directory = "nodes/" + base64.encodestring(str(self.public_key))
        if not os.path.exists(self.node_directory):
            os.makedirs(self.node_directory)
        self.block_database = Database(self.node_directory + "/multichain.db")
        self.log = open(self.node_directory + "/log.txt", 'w')

        self.directed_walking = directed_walking
        self.persistent_walking = persistent_walking

        if self.persistent_walking:
            self.teleport_probability = teleport_probability
            self.current_walk = None
            if self.directed_walking:
                self.walk_function = self.walk_statefull_directed
            else:
                self.walk_function = self.walk_statefull_undirected
        else:
            if self.directed_walking:
                self.walk_function = self.walk_stateless_directed
            else:
                self.walk_function = self.walk_stateless_undirected

        self.ranking = {}
        self.number_of_requests_received = 0
        self.alpha = alpha
statistics_maker.py 文件源码 项目:PyTaskHelper 作者: AvSinStudio 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def build_task_database(files, entry_class):
    task_db = database.Database(entry_class)
    for filename in files:
        with open(filename, encoding='utf-8') as file:
            data = json.load(file)
        for task in data:
            task_db.add_entry(task)
    if 'finalize' in dir(task_db):
        task_db.finalize()
    return task_db
relaybot.py 文件源码 项目:RelayBot2.0 作者: nukeop 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, logfilename=None):
        self.configure_logging(logfilename)
        errors.set_exception_handler()
        self.user = None

        self.database = database.Database('relaybot.db')

        self.import_plugins()
        self.plugins = []

        for plugin in plugins.plugin.Plugin.__subclasses__():
            plugininst = plugin(self)
            self.plugins.append(plugininst)
action.py 文件源码 项目:MTodo 作者: mortezaipo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self):
        self.__config = Config()
        self.__database = Database(self.__config.database_path)
main.py 文件源码 项目:MTodo 作者: mortezaipo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start():
    """Start progress."""
    config = Config()
    config.start()

    database = Database(config.database_path)
    database.start()

    interface = Interface()
    interface.start()
interface.py 文件源码 项目:MTodo 作者: mortezaipo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self):
        """Initialize UserInterface class."""
        self.__config = Config()
        self.__action = Action()
        self.__database = Database(self.__config.database_path)
events.py 文件源码 项目:BlitzcrankBotV2 作者: SuperFrosty 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_guild_join(self, guild):
        """Check for bot collection"""
        l = list(filter(lambda m: m.bot, guild.members))
        members = len(guild.members)
        if len(l) / members >= .55:
            bots = "{0:.0F}% bots".format(100 * (len(l) / members))
            channel_test = discord.utils.find(lambda c: c.permissions_for(
                c.guild.me).send_messages, guild.text_channels)
            await channel_test.send("To avoid bot collection servers, I auto leave any server where 55% or above of the users are bots, sorry!")
            await guild.leave()
            embed = discord.Embed(title="Left Server", colour=0x1affa7)
            embed.add_field(name="Server:", value=guild.name, inline=True)
            embed.add_field(
                name="Reason:", value="Bot collection server", inline=True)
            embed.add_field(name="Users:", value=members, inline=True)
            embed.add_field(name="Justification:", value=bots, inline=True)
            channel = self.bot.get_channel(295831639219634177)
            await channel.send('', embed=embed)
        else:
            embed = discord.Embed(title="Joined Server", colour=0x1affa7)
            embed.add_field(name="Server:", value=guild.name, inline=True)
            embed.add_field(name="Users:", value=members, inline=True)
            embed.add_field(name="Total:", value=len(
                self.bot.guilds), inline=True)
            channel = self.bot.get_channel(295831639219634177)
            await channel.send('', embed=embed)
            channel_test = discord.utils.find(lambda c: c.permissions_for(
                c.guild.me).send_messages, guild.text_channels)
            await channel_test.send("Beep, boop! To set up a default LoL region for my lookup commands, please use the `b!region set` command! (Example, `b!region set OCE`)")
            db = database.Database("guilds.db")
            db.add_table(str(guild.id))
            db.close_connection()
            await self.post_stats()
default_regions.py 文件源码 项目:BlitzcrankBotV2 作者: SuperFrosty 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def set(self, ctx, region: str):
        """Set your server's default region"""
        try:
            Summoner(name="", region=region)
        except ValueError:
            embed = discord.Embed(
                title="Error!",
                description="{0} is not a valid region!".format(region),
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
            return
        db = database.Database('guilds.db')
        try:
            region_found = db.find_entry(ctx.guild.id)
            db.close_connection()
            embed = discord.Embed(
                title="Error!",
                description="{0} is already {1}'s default region!".format(
                    region_found, ctx.guild.name),
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            db.add_entry(ctx.guild.id, region)
            db.close_connection()
            embed = discord.Embed(
                title="Success!",
                description="{0} set as {1}'s default region!".format(
                    region, ctx.guild.name),
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
default_regions.py 文件源码 项目:BlitzcrankBotV2 作者: SuperFrosty 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update(self, ctx, region: str):
        """Update your server's default region"""
        try:
            Summoner(name="", region=region)
        except ValueError:
            embed = discord.Embed(
                title="Error!",
                description="{0} is not a valid region!".format(region),
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
            return
        db = database.Database('guilds.db')
        try:
            db.find_entry(ctx.guild.id)
            db.update_entry(ctx.guild.id, region)
            db.close_connection()
            embed = discord.Embed(
                title='Success!',
                description="Set {0} as {1}'s default region!".format(
                    region, ctx.guild.name),
                colour=0x1AFFA7)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
        except TypeError:
            db.close_connection()
            embed = discord.Embed(
                title="Error!",
                description="A default region for this server has not been set!",
                colour=0xCA0147)
            utils.footer(ctx, embed)
            await ctx.send("", embed=embed)
db.py 文件源码 项目:wednesday 作者: wcauchois 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_pubsub():
    async with create_pool(get_db_url()) as e:
        async with e.acquire() as listen_conn:
            listener = listen_helper(listen_conn)
            db = Database()
            await db.startup()
            await asyncio.gather(listener, db.insert_post(parent_id=290, content='testing notify'))
            print("listen/notify done!")


问题


面经


文章

微信
公众号

扫码关注公众号