Python 编程/数据库
Python 通过简单的 API 支持与数据库交互。Python 包含的模块包括用于 SQLite 和 Berkeley DB 的模块。用于 MySQL 、PostgreSQL 、FirebirdSQL 等其他数据库的模块作为第三方模块提供。后者需要在使用前下载并安装。例如,可以使用 Debian 软件包“python-mysqldb”安装包 MySQLdb。
使用 MySQL 的示例如下所示
import MySQLdb
db = MySQLdb.connect("host machine", "dbuser", "password", "dbname")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()
在第一行, 模块MySQLdb被导入。然后建立与数据库的连接,在第 4 行,我们将要执行的实际 SQL 语句保存到变量query中。在第 5 行,我们执行查询,在第 6 行,我们获取所有数据。执行完这段代码后,lines包含获取的行的数量(例如,表中的行数sampletable)。变量data包含所有实际数据,例如,sampletable的内容。最后,将再次关闭与数据库的连接。如果行的数量很大,最好使用row = cursor.fetchone()并分别处理行
#first 5 lines are the same as above
while True:
row = cursor.fetchone()
if row == None: break
#do something with this row of data
db.close()
显然,必须对行使用某种数据处理,否则数据将不会被存储。 的结果fetchone()命令是一个 元组。
为了使连接的初始化更容易,可以使用配置文件
import MySQLdb
db = MySQLdb.connect(read_default_file="~/.my.cnf")
...
这里,主目录中的 .my.cnf 文件包含 MySQL 所需的配置信息。
使用 SQLite 的示例与上面的示例非常相似,并且游标提供了许多相同的功能。
import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()
写入数据库时,必须记住调用 db.commit(),否则更改不会保存
import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """INSERT INTO sampletable (value1, value2) VALUES (1,'test')"""
cursor.execute(query)
db.commit()
db.close()
import psycopg2
conn = psycopg2.connect("dbname=test")
cursor = conn.cursor()
cursor.execute("select * from test");
for i in cursor.next():
print(i)
conn.close()
import firebirdsql
conn = firebirdsql.connect(dsn='localhost/3050:/var/lib/firebird/2.5/test.fdb', user='alice', password='wonderland')
cur = conn.cursor()
cur.execute("select * from baz")
for c in cur.fetchall():
print(c)
conn.close()
您经常需要将动态数据替换到查询字符串中。确保正确完成此操作很重要。
# Do not do this!
result = db.execute("SELECT name FROM employees WHERE location = '" + location + "'")
此示例错误,因为它没有正确处理被替换字符串中的特殊字符,如撇号。如果您的代码必须处理潜在的恶意用户(例如在面向公众的 Web 服务器上),这可能会使您面临SQL 注入攻击的风险。
对于简单的情况,请使用 execute
方法提供的自动参数替换,例如
result = db.execute("SELECT name FROM employees WHERE location = ?", [location])
DBMS 接口本身会自动将您传入的值转换为正确的 SQL 语法。
对于更复杂的情况,DBMS 模块应提供一个您可以明确调用的引用函数。例如,MySQLdb 提供 escape_string
方法,而 APSW(用于 SQLite3)提供 format_sql_value
。当查询结构采用更动态的形式时,这是必需的
criteria = [("company", company)] # list of tuples (fieldname, value)
if department != None :
criteria.append(("department", department))
# ... append other optional criteria as appropriate ...
result = db.execute(
"SELECT name FROM employees WHERE "
+
" and ".join(
"%s = %s" % (criterion[0], MySQLdb.escape_string(criterion[1]))
for criterion in criteria
)
)
这将动态构建查询,例如“select name from employees where company = 'some company'”或“select name from employees where company = 'some company' and department = 'some department'”,具体取决于用户填写了哪些字段。
Python 迭代器非常适合迭代大量数据库记录的问题。以下是一个函数的示例,它执行数据库查询并返回结果的迭代器,而不是一次返回所有结果。它依赖于这样一个事实:在 APSW(SQLite 的 Python 3 接口库)中,cursor.execute
方法本身返回结果记录的迭代器。结果是,您可以用非常简洁的代码在 Python 中执行复杂的数据库查询。
def db_iter(db, cmd, mapfn = lambda x : x) :
"executes cmd on a new cursor from connection db and yields the results in turn."
cu = db.cursor()
result = cu.execute(cmd)
while True:
yield mapfn(next(result))
此函数的示例用法
for artist, publisher in db_iter(
db = db,
cmd =
"SELECT artist, publisher FROM artists WHERE location = %s"
%
apsw.format_sql_value(location)
):
print(artist, publisher)
和
for location in db_iter(
db = db,
cmd = "SELECT DISTINCT location FROM artists",
mapfn = lambda x : x[0]
):
print(location)
在第一个示例中,由于 db_iter
为每个记录返回一个元组,因此可以将其直接分配给记录字段的各个变量。在第二个示例中,元组只有一个元素,因此使用自定义 mapfn
来提取此元素并返回它,而不是元组。
数据库表定义经常会发生变化。随着应用程序需求的演变,字段甚至整个表经常被添加,或者有时被删除。考虑以下语句
result = db.execute("select * from employees")
您可能碰巧知道employees表当前包含例如 4 个字段。但是明天有人可能会添加第五个字段。您是否记得更新您的代码以处理这种情况?如果没有,它可能会崩溃。或者更糟,产生错误的结果!
最好始终列出您感兴趣的特定字段,无论有多少字段
result = db.execute("select name, address, department, location from employees")
这样,任何添加的额外字段都将被忽略。如果删除了任何命名字段,代码至少会以运行时错误失败,这提醒您忘记更新它!
考虑以下场景:您的销售公司数据库有一个员工表,还有一个每个员工销售记录的表。您想遍历这些销售条目,并生成一些每个员工的统计数据。一种天真的方法可能是
- 查询数据库以获取员工列表
- 对于每个员工,查询数据库以获取每个员工的销售列表。
如果您有很多员工,那么第一个查询可能会生成一个很大的列表,而第二步将涉及相应数量的数据库查询。
事实上,整个处理循环可以从一个单个数据库查询中运行,使用标准 SQL 结构称为 join
。
以下是一个此类循环的示例
rows = db_iter \
(
db = db,
cmd =
"select employees.name, sales.amount, sales.date from"
" employees left join sales on employees.id = sales.employee_id"
" order by employees.name, sales.date"
)
prev_employee_name = None
while True:
row = next(rows, None)
if row != None :
employee_name, amount, date = row
if row == None or employee_name != prev_employee_name :
if prev_employee_name != None :
# done stats for this employee
report(prev_employee_name, employee_stats)
if row == None :
break
# start stats for a new employee
prev_employee_name = employee_name
employee_stats = {"total_sales" : 0, "number_of_sales" : 0}
if date != None :
employee_stats["earliest_sale"] = date
# another row of stats for this employee
if amount != None :
employee_stats["total_sales"] += amount
employee_stats["number_of_sales"] += 1
if date != None :
employee_stats["latest_sale"] = date
这里的统计信息非常简单:最早和最新的销售、销售数量和总额,并且可以直接在 SQL 查询中计算。但是,相同的循环可以计算更复杂的统计信息(例如标准偏差),这些统计信息无法直接在简单的 SQL 查询中表示。
注意每个员工的统计信息是如何在两种条件之一下写出的
- 下一条记录的员工姓名与上一条记录不同
- 已达到查询结果的末尾。
这两个条件都使用row == None or employee_name != prev_employee_name
进行测试;在写出员工统计数据后,使用单独的检查第二个条件row == None
来终止循环。如果循环没有终止,则为新员工初始化处理。
还要注意在这种情况下使用left join
:如果员工没有销售额,则连接将为该员工返回一行,其中 SQL null
值(在 Python 中表示为None
)用于来自销售表的字段。这就是为什么在处理这些字段之前需要检查这些None
值。
或者,我们可以使用inner join
,它将对没有销售额的员工返回没有结果。是否要从报表中省略此类员工,或者将他们包含在内,总计为零,这实际上取决于您的应用程序。
- APSW 模块,code.google.com — Python 2.x 和 3.x 的 SQLite3
- SQLite 文档
- Psycopg2(PostgreSQL 模块 - 更新版),initd.org
- PyGreSQL(PostgreSQL 模块 - 旧版),pygresql.org
- MySQL 模块,sourceforge.net
- FirebirdSQL 模块,github.com