跳转到内容

Python 编程/数据库

来自维基教科书,开放世界中的开放书籍


Python 通过简单的 API 支持与数据库交互。Python 包含的模块包括用于 SQLiteBerkeley DB 的模块。用于 MySQLPostgreSQLFirebirdSQL 等其他数据库的模块作为第三方模块提供。后者需要在使用前下载并安装。例如,可以使用 Debian 软件包“python-mysqldb”安装包 MySQLdb。

DBMS 特性

[编辑 | 编辑源代码]

使用 MySQL 的示例如下所示

import MySQLdb
db = MySQLdb.connect("host machine", "dbuser", "password", "dbname")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()

在第一行, 模块MySQLdb被导入。然后建立与数据库的连接,在第 4 行,我们将要执行的实际 SQL 语句保存到变量query中。在第 5 行,我们执行查询,在第 6 行,我们获取所有数据。执行完这段代码后,lines包含获取的行的数量(例如,表中的行数sampletable)。变量data包含所有实际数据,例如,sampletable的内容。最后,将再次关闭与数据库的连接。如果行的数量很大,最好使用row = cursor.fetchone()并分别处理行

  #first 5 lines are the same as above
  while True:
    row = cursor.fetchone()
    if row == None: break
    #do something with this row of data
  db.close()

显然,必须对行使用某种数据处理,否则数据将不会被存储。 的结果fetchone()命令是一个 元组

为了使连接的初始化更容易,可以使用配置文件

import MySQLdb
db = MySQLdb.connect(read_default_file="~/.my.cnf")
...

这里,主目录中的 .my.cnf 文件包含 MySQL 所需的配置信息。

使用 SQLite 的示例与上面的示例非常相似,并且游标提供了许多相同的功能。

import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()

写入数据库时,必须记住调用 db.commit(),否则更改不会保存

import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """INSERT INTO sampletable (value1, value2) VALUES (1,'test')"""
cursor.execute(query)
db.commit()
db.close()
import psycopg2
conn = psycopg2.connect("dbname=test")
cursor = conn.cursor()
cursor.execute("select * from test");
for i in cursor.next():
    print(i)
conn.close()
import firebirdsql
conn = firebirdsql.connect(dsn='localhost/3050:/var/lib/firebird/2.5/test.fdb', user='alice', password='wonderland')
cur = conn.cursor()
cur.execute("select * from baz")
for c in cur.fetchall():
    print(c)
conn.close()

通用原则

[编辑 | 编辑源代码]

参数引用

[编辑 | 编辑源代码]

您经常需要将动态数据替换到查询字符串中。确保正确完成此操作很重要。

# Do not do this!
result = db.execute("SELECT name FROM employees WHERE location = '" + location + "'")

此示例错误,因为它没有正确处理被替换字符串中的特殊字符,如撇号。如果您的代码必须处理潜在的恶意用户(例如在面向公众的 Web 服务器上),这可能会使您面临SQL 注入攻击的风险。

对于简单的情况,请使用 execute 方法提供的自动参数替换,例如

result = db.execute("SELECT name FROM employees WHERE location = ?", [location])

DBMS 接口本身会自动将您传入的值转换为正确的 SQL 语法。

对于更复杂的情况,DBMS 模块应提供一个您可以明确调用的引用函数。例如,MySQLdb 提供 escape_string 方法,而 APSW(用于 SQLite3)提供 format_sql_value。当查询结构采用更动态的形式时,这是必需的

criteria = [("company", company)] # list of tuples (fieldname, value)
if department != None :
    criteria.append(("department", department))
# ... append other optional criteria as appropriate ...

result = db.execute(
        "SELECT name FROM employees WHERE "
    +
        " and ".join(
            "%s = %s" % (criterion[0], MySQLdb.escape_string(criterion[1]))
            for criterion in criteria
          )
  )

这将动态构建查询,例如“select name from employees where company = 'some company'”或“select name from employees where company = 'some company' and department = 'some department'”,具体取决于用户填写了哪些字段。

使用迭代器

[编辑 | 编辑源代码]

Python 迭代器非常适合迭代大量数据库记录的问题。以下是一个函数的示例,它执行数据库查询并返回结果的迭代器,而不是一次返回所有结果。它依赖于这样一个事实:在 APSW(SQLite 的 Python 3 接口库)中,cursor.execute 方法本身返回结果记录的迭代器。结果是,您可以用非常简洁的代码在 Python 中执行复杂的数据库查询。

def db_iter(db, cmd, mapfn = lambda x : x) :
    "executes cmd on a new cursor from connection db and yields the results in turn."
    cu = db.cursor()
    result = cu.execute(cmd)
    while True:
        yield mapfn(next(result))

此函数的示例用法

for artist, publisher in db_iter(
        db = db,
        cmd =
                "SELECT artist, publisher FROM artists WHERE location = %s"
            %
                 apsw.format_sql_value(location)
      ):
    print(artist, publisher)

for location in db_iter(
        db = db,
        cmd = "SELECT DISTINCT location FROM artists",
        mapfn = lambda x : x[0]
      ):
    print(location)

在第一个示例中,由于 db_iter 为每个记录返回一个元组,因此可以将其直接分配给记录字段的各个变量。在第二个示例中,元组只有一个元素,因此使用自定义 mapfn 来提取此元素并返回它,而不是元组。

在脚本中永远不要使用“SELECT *”

[编辑 | 编辑源代码]

数据库表定义经常会发生变化。随着应用程序需求的演变,字段甚至整个表经常被添加,或者有时被删除。考虑以下语句

result = db.execute("select * from employees")

您可能碰巧知道employees表当前包含例如 4 个字段。但是明天有人可能会添加第五个字段。您是否记得更新您的代码以处理这种情况?如果没有,它可能会崩溃。或者更糟,产生错误的结果!

最好始终列出您感兴趣的特定字段,无论有多少字段

result = db.execute("select name, address, department, location from employees")

这样,任何添加的额外字段都将被忽略。如果删除了任何命名字段,代码至少会以运行时错误失败,这提醒您忘记更新它!

在字段断点处循环

[编辑 | 编辑源代码]

考虑以下场景:您的销售公司数据库有一个员工表,还有一个每个员工销售记录的表。您想遍历这些销售条目,并生成一些每个员工的统计数据。一种天真的方法可能是

  • 查询数据库以获取员工列表
  • 对于每个员工,查询数据库以获取每个员工的销售列表。

如果您有很多员工,那么第一个查询可能会生成一个很大的列表,而第二步将涉及相应数量的数据库查询。

事实上,整个处理循环可以从一个单个数据库查询中运行,使用标准 SQL 结构称为 join

注意
SQL 编程本身是一项专业技能。要了解更多信息,请从 维基百科文章 开始。

以下是一个此类循环的示例

rows = db_iter \
  (
    db = db,
    cmd =
        "select employees.name, sales.amount, sales.date from"
        " employees left join sales on employees.id = sales.employee_id"
        " order by employees.name, sales.date"
  )
prev_employee_name = None
while True:
    row = next(rows, None)
    if row != None :
        employee_name, amount, date = row
    if row == None or employee_name != prev_employee_name :
         if prev_employee_name != None :
              # done stats for this employee
              report(prev_employee_name, employee_stats)
         if row == None :
              break
         # start stats for a new employee
         prev_employee_name = employee_name
         employee_stats = {"total_sales" : 0, "number_of_sales" : 0}
         if date != None :
               employee_stats["earliest_sale"] = date
    # another row of stats for this employee
    if amount != None :
         employee_stats["total_sales"] += amount
         employee_stats["number_of_sales"] += 1
    if date != None :
         employee_stats["latest_sale"] = date

这里的统计信息非常简单:最早和最新的销售、销售数量和总额,并且可以直接在 SQL 查询中计算。但是,相同的循环可以计算更复杂的统计信息(例如标准偏差),这些统计信息无法直接在简单的 SQL 查询中表示。

注意每个员工的统计信息是如何在两种条件之一下写出的

  • 下一条记录的员工姓名与上一条记录不同
  • 已达到查询结果的末尾。

这两个条件都使用row == None or employee_name != prev_employee_name进行测试;在写出员工统计数据后,使用单独的检查第二个条件row == None来终止循环。如果循环没有终止,则为新员工初始化处理。

还要注意在这种情况下使用left join:如果员工没有销售额,则连接将为该员工返回一行,其中 SQL null 值(在 Python 中表示为None)用于来自销售表的字段。这就是为什么在处理这些字段之前需要检查这些None值。

或者,我们可以使用inner join,它将对没有销售额的员工返回没有结果。是否要从报表中省略此类员工,或者将他们包含在内,总计为零,这实际上取决于您的应用程序。

另请参阅

[编辑 | 编辑源代码]
[编辑 | 编辑源代码]


华夏公益教科书