Python 编程/数据库
Python 通过简单的 API 支持与数据库交互。Python 自带的模块包括用于 SQLite 和 Berkeley DB 的模块。用于 MySQL、PostgreSQL、FirebirdSQL 等其他数据库的模块作为第三方模块提供。后者需要在使用前下载并安装。例如,可以使用 Debian 包“python-mysqldb”安装 MySQLdb 包。
使用 MySQL 的示例如下所示
import MySQLdb
db = MySQLdb.connect("host machine", "dbuser", "password", "dbname")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()
在第一行,模块MySQLdb被导入。然后建立与数据库的连接,在第 4 行,我们将要执行的实际 SQL 语句保存在变量query中。在第 5 行,我们执行查询,在第 6 行,我们获取所有数据。在这段代码执行之后,lines包含获取的行数(例如,表中的行数sampletable)。变量data包含所有实际数据,例如内容sampletable。最后,连接到数据库将再次关闭。如果行数很大,最好使用row = cursor.fetchone()并分别处理这些行
#first 5 lines are the same as above
while True:
row = cursor.fetchone()
if row == None: break
#do something with this row of data
db.close()
显然,必须对行使用某种数据处理,否则数据将不会被存储。的输出结果fetchone()命令是 元组。
为了使连接的初始化更容易,可以使用配置文件
import MySQLdb
db = MySQLdb.connect(read_default_file="~/.my.cnf")
...
在这里,主目录中的 .my.cnf 文件包含 MySQL 所需的配置信息。
SQLite 的示例与上面的示例非常相似,并且游标提供了许多相同的功能。
import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """SELECT * FROM sampletable"""
lines = cursor.execute(query)
data = cursor.fetchall()
db.close()
写入数据库时,必须记住调用 db.commit(),否则更改不会保存
import sqlite3
db = sqlite3.connect("/path/to/file")
cursor = db.cursor()
query = """INSERT INTO sampletable (value1, value2) VALUES (1,'test')"""
cursor.execute(query)
db.commit()
db.close()
import psycopg2
conn = psycopg2.connect("dbname=test")
cursor = conn.cursor()
cursor.execute("select * from test");
for i in cursor.next():
print(i)
conn.close()
import firebirdsql
conn = firebirdsql.connect(dsn='localhost/3050:/var/lib/firebird/2.5/test.fdb', user='alice', password='wonderland')
cur = conn.cursor()
cur.execute("select * from baz")
for c in cur.fetchall():
print(c)
conn.close()
您经常需要将动态数据替换到查询字符串中。务必确保正确执行此操作。
# Do not do this!
result = db.execute("SELECT name FROM employees WHERE location = '" + location + "'")
此示例错误,因为它没有正确处理要替换的字符串中的特殊字符(如撇号)。如果您的代码必须处理潜在的恶意用户(例如在面向公众的 Web 服务器上),这可能会让您面临SQL 注入攻击的风险。
对于简单情况,请使用 execute
方法提供的自动参数替换,例如
result = db.execute("SELECT name FROM employees WHERE location = ?", [location])
DBMS 接口本身会自动将您传递的值转换为正确的 SQL 语法。
对于更复杂的情况,DBMS 模块应提供一个您可以显式调用的引用函数。例如,MySQLdb 提供 escape_string
方法,而 APSW(用于 SQLite3)提供 format_sql_value
。这在查询结构采用更动态形式时是必要的
criteria = [("company", company)] # list of tuples (fieldname, value)
if department != None :
criteria.append(("department", department))
# ... append other optional criteria as appropriate ...
result = db.execute(
"SELECT name FROM employees WHERE "
+
" and ".join(
"%s = %s" % (criterion[0], MySQLdb.escape_string(criterion[1]))
for criterion in criteria
)
)
这将动态构建查询,例如“select name from employees where company = 'some company'”或“select name from employees where company = 'some company' and department = 'some department'”,具体取决于用户填写了哪些字段。
Python 迭代器非常适合迭代大量数据库记录的问题。以下是一个函数示例,该函数执行数据库查询并返回结果的迭代器,而不是一次返回所有结果。它依赖于以下事实:在 APSW(SQLite 的 Python 3 接口库)中,cursor.execute
方法本身返回结果记录的迭代器。结果是您可以编写非常简洁的代码来执行 Python 中复杂的数据库查询。
def db_iter(db, cmd, mapfn = lambda x : x) :
"executes cmd on a new cursor from connection db and yields the results in turn."
cu = db.cursor()
result = cu.execute(cmd)
while True:
yield mapfn(next(result))
此函数的示例用法
for artist, publisher in db_iter(
db = db,
cmd =
"SELECT artist, publisher FROM artists WHERE location = %s"
%
apsw.format_sql_value(location)
):
print(artist, publisher)
和
for location in db_iter(
db = db,
cmd = "SELECT DISTINCT location FROM artists",
mapfn = lambda x : x[0]
):
print(location)
在第一个示例中,由于 db_iter
为每个记录返回一个元组,因此可以直接将其分配给记录字段的各个变量。在第二个示例中,元组只有一个元素,因此使用自定义的 mapfn
来提取此元素并返回它,而不是返回元组。
数据库表定义经常会发生变化。随着应用程序需求的发展,字段甚至整个表经常会被添加,或者有时会被删除。考虑以下语句
result = db.execute("select * from employees")
您可能碰巧知道employees表当前包含,比如说,4 个字段。但是明天有人可能会添加第五个字段。您是否记得更新代码以处理这种情况?如果没有,它可能会崩溃。或者更糟糕的是,产生错误的结果!
最好始终列出您感兴趣的特定字段,无论有多少个
result = db.execute("select name, address, department, location from employees")
这样,添加的任何额外字段都将被简单地忽略。如果删除了任何命名的字段,代码至少会因运行时错误而失败,这提醒您忘记更新它!
考虑以下场景:您的销售公司数据库有一个员工表,还有一个每个员工进行的销售记录表。您希望遍历这些销售条目,并生成一些每个员工的统计信息。一种天真的方法可能是
- 查询数据库以获取员工列表
- 对于每个员工,执行数据库查询以获取每个员工的销售列表。
如果您有很多员工,那么第一个查询可能会产生一个很大的列表,并且第二步将涉及相应数量的数据库查询。
事实上,整个处理循环可以通过单个数据库查询运行,使用称为 join
的标准 SQL 结构。
以下是此类循环的示例
rows = db_iter \
(
db = db,
cmd =
"select employees.name, sales.amount, sales.date from"
" employees left join sales on employees.id = sales.employee_id"
" order by employees.name, sales.date"
)
prev_employee_name = None
while True:
row = next(rows, None)
if row != None :
employee_name, amount, date = row
if row == None or employee_name != prev_employee_name :
if prev_employee_name != None :
# done stats for this employee
report(prev_employee_name, employee_stats)
if row == None :
break
# start stats for a new employee
prev_employee_name = employee_name
employee_stats = {"total_sales" : 0, "number_of_sales" : 0}
if date != None :
employee_stats["earliest_sale"] = date
# another row of stats for this employee
if amount != None :
employee_stats["total_sales"] += amount
employee_stats["number_of_sales"] += 1
if date != None :
employee_stats["latest_sale"] = date
这里的统计信息非常简单:最早和最晚的销售、销售数量和总金额,并且可以直接在 SQL 查询中计算。但是,相同的循环可以计算更复杂的统计信息(如标准差),这些统计信息不能直接在简单的 SQL 查询中表示。
请注意,每个员工的统计信息是如何在以下两种情况之一下输出的
- 下一条记录的员工姓名与上一条记录不同
- 查询结果已结束。
这两个条件使用`row == None or employee_name != prev_employee_name`进行测试;在输出员工统计信息后,使用单独的第二个条件检查`row == None`来终止循环。如果循环没有终止,则为新员工初始化处理过程。
还要注意在此处使用了`left join`:如果某个员工没有销售记录,则联接将返回该员工的单行记录,其中来自销售表的字段将为SQL `null`值(在Python中表示为`None`)。这就是为什么在处理这些字段之前需要检查此类`None`值的原因。
或者,我们可以使用`inner join`,它对于没有销售记录的员工将不返回任何结果。是否要从报表中省略此类员工,或者将他们包含在内并显示为零总计,这完全取决于您的应用程序。
- APSW 模块,code.google.com — Python 2.x 和 3.x 的 SQLite3
- SQLite 文档
- Psycopg2(PostgreSQL 模块 - 更新),initd.org
- PyGreSQL(PostgreSQL 模块 - 旧版),pygresql.org
- MySQL 模块,sourceforge.net
- FirebirdSQL 模块,github.com