掌握Python 操作 MySQL 数据库

Python013

掌握Python 操作 MySQL 数据库,第1张

本文Python 操作 MySQL 数据库需要是使用到 PyMySQL 驱动

Python 操作 MySQL 前提是要安装好 MySQL 数据库并能正常连接使用,安装步骤详见下文。

注意: 安装过程我们需要通过开启管理员权限来安装,否则会由于权限不足导致无法安装。

首先需要先下载 MySQL 安装包, 官网下载地址 下载对应版本即可,或直接在网上拉取并安装:

权限设置:

初始化 MySQL:

启动 MySQL:

查看 MySQL 运行状态:

Mysql安装成功后,默认的root用户密码为空,你可以使用以下命令来创建root用户的密码:

登陆:

创建数据库:

查看数据库:

PyMySQL 模块使用 pip命令进行安装:

假如系统不支持 pip 命令,可以使用以下方式安装:

pymysql .connect 函数:连接上数据库

输出结果显示如下:表面数据库连接成功

使用 pymysql 的 connect() 方法连接数据库,connect 参数解释如下:

conn.cursor():获取游标

如果要操作数据库,光连接数据是不够的,咱们必须拿到操作数据库的游标,才能进行后续的操作,游标的主要作用是用来接收数据库操作后的返回结果,比如读取数据、添加数据。通过获取到的数据库连接实例 conn 下的 cursor() 方法来创建游标,实例如下:

输出结果为:

cursor 返回一个游标实例对象,其中包含了很多操作数据的方法,如执行sql语句,sql 执行命令: execute() 和 executemany()

execute(query,args=None):

executemany(query,args=None):

其他游标对象如下表:

完整数据库连接操作实例如下:

以上结果输出为:

创建表代码如下:

如下所示数据库表创建成功:

插入数据实现代码:

插入数据结果:

Python查询Mysql使用 fetchone() 方法获取单条数据, 使用fetchall() 方法获取多条数据。

查询数据代码如下:

输出结果:

DB API中定义了一些数据库操作的错误及异常,下表列出了这些错误和异常:

本文给大家介绍 Python 如何连接 Mysql 进行数据的增删改查操作,文章通过简洁的代码方式进行示例演示,给使用 Python 操作 Mysql 的工程师提供支撑。

本文章是 重写 500 Lines or Less 系列的其中一篇,目标是重写 500 Lines or Less 系列的原有项目:Dagoba: an in-memory graph database。

Dagoba 是作者设计用来展示如何从零开始自己实现一个图数据库( Graph Database )。该名字似乎来源于作者喜欢的一个乐队,另一个原因是它的前缀 DAG 也正好是有向无环图 ( Directed Acyclic Graph ) 的缩写。本文也沿用了该名称。

图是一种常见的数据结构,它将信息描述为若干独立的节点( vertex ,为了和下文的边更加对称,本文中称为 node ),以及把节点关联起来的边( edge )。我们熟悉的链表以及多种树结构可以看作是符合特定规则的图。图在路径选择、推荐算法以及神经网络等方面都是重要的核心数据结构。

既然图的用途如此广泛,一个重要的问题就是如何存储它。如果在传统的关系数据库中存储图,很自然的做法就是为节点和边各自创建一张表,并用外键把它们关联起来。这样的话,要查找某人所有的子女,就可以写下类似下面的查询:

还好,不算太复杂。但是如果要查找孙辈呢?那恐怕就要使用子查询或者 CTE(Common Table Expression) 等特殊构造了。再往下想,曾孙辈又该怎么查询?孙媳妇呢?

这样我们会意识到,SQL 作为查询语言,它只是对二维数据表这种结构而设计的,用它去查询图的话非常笨拙,很快会变得极其复杂,也难以扩展。针对图而言,我们希望有一种更为自然和直观的查询语法,类似这样:

为了高效地存储和查询图这种数据结构,图数据库( Graph Database )应运而生。因为和传统的关系型数据库存在极大的差异,所以它属于新型数据库也就是 NoSql 的一个分支(其他分支包括文档数据库、列数据库等)。图数据库的主要代表包括 Neo4J 等。本文介绍的 Dagoba 则是具备图数据库核心功能、主要用于教学和演示的一个简单的图数据库。

原文代码是使用 JavaScript 编写的,在定义调用接口时大量使用了原型( prototype )这种特有的语言构造。对于其他主流语言的用户来说,原型的用法多少显得有些别扭和不自然。

考虑到本系列其他数据库示例大多是用 Python 实现的,本文也按照传统,用 Python 重写了原文的代码。同样延续之前的惯例,为了让读者更好地理解程序是如何逐步完善的,我们用迭代式的方法完成程序的各个组成部分。

原文在 500lines 系列的 Github 仓库中只包含了实现代码,并未包含测试。按照代码注释说明,测试程序位于作者的另一个代码库中,不过和 500lines 版本的实现似乎略有不同。

本文实现的代码参考了原作者的测试内容,但跳过了北欧神话这个例子——我承认确实不熟悉这些神祇之间的亲缘关系,相信中文背景的读者们多数也未必了解,虽然作者很喜欢这个例子,想了想还是不要徒增困惑吧。因此本文在编写测试用例时只参考了原文关于家族亲属的例子,放弃了神话相关的部分,尽管会减少一些趣味性,相信对于入门级的代码来说这样也够用了。

本文实现程序位于代码库的 dagoba 目录下。按照本系列程序的同意规则,要想直接执行各个已完成的步骤,读者可以在根目录下的 main.py 找到相应的代码位置,取消注释并运行即可。

本程序的所有步骤只需要 Python3 ,测试则使用内置的 unittest , 不需要额外的第三方库。原则上 Python3.6 以上版本应该都可运行,但我只在 Python3.8.3 环境下完整测试过。

本文实现的程序从最简单的案例开始,通过每个步骤逐步扩展,最终形成一个完整的程序。这些步骤包括:

接下来依次介绍各个步骤。

回想一下,图数据库就是一些点( node )和边( edge )的集合。现在我们要做出的一个重大决策是如何对节点/边进行建模。对于边来说,必须指定它的关联关系,也就是从哪个节点指向哪个节点。大多数情况下边是有方向的——父子关系不指明方向可是要乱套的!

考虑到扩展性及通用性问题,我们可以把数据保存为字典( dict ),这样可以方便地添加用户需要的任何数据。某些数据是为数据库内部管理而保留的,为了明确区分,可以这样约定:以下划线开头的特殊字段由数据库内部维护,类似于私有成员,用户不应该自己去修改它们。这也是 Python 社区普遍遵循的约定。

此外,节点和边存在互相引用的关系。目前我们知道边会引用到两端的节点,后面还会看到,为了提高效率,节点也会引用到边。如果仅仅在内存中维护它们的关系,那么使用指针访问是很直观的,但数据库必须考虑到序列化到磁盘的问题,这时指针就不再好用了。

为此,最好按照数据库的一般要求,为每个节点维护一个主键( _id ),用主键来描述它们之间的关联关系。

我们第一步要把数据库的模型建立起来。为了测试目的,我们使用一个最简单的数据库模型,它只包含两个节点和一条边,如下所示:

按照 TDD 的原则,首先编写测试:

与原文一样,我们把数据库管理接口命名为 Dagoba 。目前,能够想到的最简单的测试是确认节点和边是否已经添加到数据库中:

assert_item 是一个辅助方法,用于检查字典是否包含预期的字段。相信大家都能想到该如何实现,这里就不再列出了,读者可参考 Github 上的完整源码。

现在,测试是失败的。用最简单的办法实现数据库:

需要注意的是,不管添加节点还是查询,程序都使用了拷贝后的数据副本,而不是直接使用原始数据。为什么要这样做?因为字典是可变的,用户可以在任何时候修改其中的内容,如果数据库不知道数据已经变化,就很容易发生难以追踪的一致性问题,最糟糕的情况下会使得数据内容彻底混乱。

拷贝数据可以避免上述问题,代价则是需要占用更多内存和处理时间。对于数据库来说,通常查询次数要远远多于修改,所以这个代价是可以接受的。

现在测试应该正常通过了。为了让它更加完善,我们可以再测试一些边缘情况,看看数据库能否正确处理异常数据,比如:

例如,如果用户尝试添加重复主键,我们预期应抛出 ValueError 异常。因此编写测试如下:

为了满足以上测试,代码需要稍作修改。特别是按照 id 查找主键是个常用操作,通过遍历的方法效率太低了,最好是能够通过主键直接访问。因此在数据库中再增加一个字典:

完整代码请参考 Github 仓库。

在上个步骤,我们在初始化数据库时为节点明确指定了主键。按照数据库设计的一般原则,主键最好是不具有业务含义的代理主键( Surrogate key ),用户不应该关心它具体的值是什么,因此让数据库去管理主键通常是更为合理的。当然,在部分场景下——比如导入外部数据——明确指定主键仍然是有用的。

为了同时支持这些要求,我们这样约定:字段 _id 表示节点的主键,如果用户指定了该字段,则使用用户设置的值(当然,用户有责任保证它们不会重复);否则,由数据库自动为它分配一个主键。

如果主键是数据库生成的,事先无法预知它的值是什么,而边( edge )必须指定它所指向的节点,因此必须在主键生成后才能添加。由于这个原因,在动态生成主键的情况下,数据库的初始化会略微复杂一些。还是先写一个测试:

为支持此功能,我们在数据库中添加一个内部字段 _next_id 用于生成主键,并让 add_node 方法返回新生成的主键:

接下来,再确认一下边是否可以正常访问:

运行测试,一切正常。这个步骤很轻松地完成了,不过两个测试( DbModelTest 和 PrimaryKeyTest )出现了一些重复代码,比如 get_item 。我们可以把这些公用代码提取出来。由于 get_item 内部调用了 TestCase.assertXXX 等方法,看起来应该使用继承,但从 TestCase 派生基类容易引起一些潜在的问题,所以我转而使用另一个技巧 Mixin :

实现数据库模型之后,接下来就要考虑如何查询它了。

在设计查询时要考虑几个问题。对于图的访问来说,几乎总是由某个节点(或符合条件的某一类节点)开始,从与它相邻的边跳转到其他节点,依次类推。所以链式调用对查询来说是一种很自然的风格。举例来说,要知道 Tom 的孙子养了几只猫,可以使用类似这样的查询:

可以想象,以上每个方法都应该返回符合条件的节点集合。这种实现是很直观的,不过存在一个潜在的问题:很多时候用户只需要一小部分结果,如果它总是不计代价地给我们一个巨大的集合,会造成极大的浪费。比如以下查询:

为了避免不必要的浪费,我们需要另外一种机制,也就是通常所称的“懒式查询”或“延迟查询”。它的基本思想是,当我们调用查询方法时,它只是把查询条件记录下来,而并不立即返回结果,直到明确调用某些方法时才真正去查询数据库。

如果读者比较熟悉流行的 Python ORM,比如 SqlAlchemy 或者 Django ORM 的话,会知道它们几乎都是懒式查询的,要调用 list(result) 或者 result[0:10] 这样的方法才能得到具体的查询结果。

在 Dagoba 中把触发查询的方法定义为 run 。也就是说,以下查询执行到 run 时才真正去查找数据:

和懒式查询( Lazy Query )相对应的,直接返回结果的方法一般称作主动查询( Eager Query )。主动查询和懒式查询的内在查找逻辑基本上是相同的,区别只在于触发机制不同。由于主动查询实现起来更加简单,出错也更容易排查,因此我们先从主动查询开始实现。

还是从测试开始。前面测试所用的简单数据库数据太少,难以满足查询要求,所以这一步先来创建一个更复杂的数据模型:

此关系的复杂之处之一在于反向关联:如果 A 是 B 的哥哥,那么 B 就是 A 的弟弟/妹妹,为了查询到他们彼此之间的关系,正向关联和反向关联都需要存在,因此在初始化数据库时需要定义的边数量会很多。

当然,父子之间也存在反向关联的问题,为了让问题稍微简化一些,我们目前只需要向下(子孙辈)查找,可以稍微减少一些关联数量。

因此,我们定义数据模型如下。为了减少重复工作,我们通过 _backward 字段定义反向关联,而数据库内部为了查询方便,需要把它维护成两条边:

然后,测试一个最简单的查询,比如查找某人的所有孙辈:

这里 outcome/income 分别表示从某个节点出发、或到达它的节点集合。在原作者的代码中把上述方法称为 out/in 。当然这样看起来更加简洁,可惜的是 in 在 Python 中是个关键字,无法作为函数名。我也考虑过加个下划线比如 out_.in_ 这种形式,但看起来也有点怪异,权衡之后还是使用了稍微啰嗦一点的名称。

现在我们可以开始定义查询接口了。在前面已经说过,我们计划分别实现两种查询,包括主动查询( Eager Query )以及延迟查询( Lazy Query )。

它们的内在查询逻辑是相通的,看起来似乎可以使用继承。不过遵循 YAGNI 原则,目前先不这样做,而是只定义两个新类,在满足测试的基础上不断扩展。以后我们会看到,与继承相比,把共同的逻辑放到数据库本身其实是更为合理的。

接下来实现访问节点的方法。由于 EagerQuery 调用查询方法会立即返回结果,我们把结果记录在 _result 内部字段中。虽然 node 方法只返回单个结果,但考虑到其他查询方法几乎都是返回集合,为统一起见,让它也返回集合,这样可以避免同时支持集合与单结果的分支处理,让代码更加简洁、不容易出错。此外,如果查询对象不存在的话,我们只返回空集合,并不视为一个错误。

查询输入/输出节点的方法实现类似这样:

查找节点的核心逻辑在数据库本身定义:

以上使用了内部定义的一些辅助查询方法。用类似的逻辑再定义 income ,它们的实现都很简单,读者可以直接参考源码,此处不再赘述。

在此步骤的最后,我们再实现一个优化。当多次调用查询方法后,结果可能会返回重复的数据,很多时候这是不必要的。就像关系数据库通常支持 unique/distinct 一样,我们也希望 Dagoba 能够过滤重复的数据。

假设我们要查询某人所有孩子的祖父,显然不管有多少孩子,他们的祖父应该是同一个人。因此编写测试如下:

现在来实现 unique 。我们只要按照主键把重复数据去掉即可:

在上个步骤,初始化数据库指定了双向关联,但并未测试它们。因为我们还没有编写代码去支持它们,现在增加一个测试,它应该是失败的:

运行测试,的确失败了。我们看看要如何支持它。回想一下,当从边查找节点时,使用的是以下方法:

这里也有一个潜在的问题:调用 self.edges 意味着遍历所有边,当数据库内容较多时,这是巨大的浪费。为了提高性能,我们可以把与节点相关的边记录在节点本身,这样要查找边只要看节点本身即可。在初始化时定义出入边的集合:

在添加边时,我们要同时把它们对应的关系同时更新到节点,此外还要维护反向关联。这涉及对字典内容的部分复制,先编写一个辅助方法:

然后,将添加边的实现修改如下:

这里的代码同时添加正向关联和反向关联。有的朋友可能会注意到代码略有重复,是的,但是重复仅出现在该函数内部,本着“三则重构”的原则,暂时不去提取代码。

实现之后,前面的测试就可以正常通过了。

在这个步骤中,我们来实现延迟查询( Lazy Query )。

延迟查询的要求是,当调用查询方法时并不立即执行,而是推迟到调用特定方法,比如 run 时才执行整个查询,返回结果。

延迟查询的实现要比主动查询复杂一些。为了实现延迟查询,查询方法的实现不能直接返回结果,而是记录要执行的动作以及传入的参数,到调用 run 时再依次执行前面记录下来的内容。

如果你去看作者的实现,会发现他是用一个数据结构记录执行操作和参数,此外还有一部分逻辑用来分派对每种结构要执行的动作。这样当然是可行的,但数据处理和分派部分的实现会比较复杂,也容易出错。

本文的实现则选择了另外一种不同的方法:使用 Python 的内部函数机制,把一连串查询变换成一组函数,每个函数取上个函数的执行结果作为输入,最后一个函数的输出就是整个查询的结果。由于内部函数同时也是闭包,尽管每个查询的参数形式各不相同,但是它们都可以被闭包“捕获”而成为内部变量,所以这些内部函数可以采用统一的形式,无需再针对每种查询设计额外的数据结构,因而执行过程得到了很大程度的简化。

首先还是来编写测试。 LazyQueryTest 和 EagerQueryTest 测试用例几乎是完全相同的(是的,两种查询只在于内部实现机制不同,它们的调用接口几乎是完全一致的)。

因此我们可以把 EagerQueryTest 的测试原样不变拷贝到 LazyQueryTest 中。当然拷贝粘贴不是个好注意,对于比较冗长而固定的初始化部分,我们可以把它提取出来作为两个测试共享的公共函数。读者可参考代码中的 step04_lazy_query/tests/test_lazy_query.py 部分。

程序把查询函数的串行执行称为管道( pipeline ),用一个变量来记录它:

然后依次实现各个调用接口。每种接口的实现都是类似的:用内部函数执行真正的查询逻辑,再把这个函数添加到 pipeline 调用链中。比如 node 的实现类似下面:

其他接口的实现也与此类似。最后, run 函数负责执行所有查询,返回最终结果;

完成上述实现后执行测试,确保我们的实现是正确的。

在前面我们说过,延迟查询与主动查询相比,最大的优势是对于许多查询可以按需要访问,不需要每个步骤都返回完整结果,从而提高性能,节约查询时间。比如说,对于下面的查询:

以上查询的意思是从孙辈中找到一个符合条件的节点即可。对该查询而言,主动查询会在调用 outcome('son') 时就遍历所有节点,哪怕最后一步只需要第一个结果。而延迟查询为了提高效率,应在找到符合条件的结果后立即停止。

目前我们尚未实现 take 方法。老规矩,先添加测试:

主动查询的 take 实现比较简单,我们只要从结果中返回前 n 条记录:

延迟查询的实现要复杂一些。为了避免不必要的查找,返回结果不应该是完整的列表( list ),而应该是个按需返回的可迭代对象,我们用内置函数 next 来依次返回前 n 个结果:

写完后运行测试,确保它们是正确的。

从外部接口看,主动查询和延迟查询几乎是完全相同的,所以用单纯的数据测试很难确认后者的效率一定比前者高,用访问时间来测试也并不可靠。为了测试效率,我们引入一个节点访问次数的概念,如果延迟查询效率更高的话,那么它应该比主动查询访问节点的次数更少。

为此,编写如下测试:

我们为 Dagoba 类添加一个成员来记录总的节点访问次数,以及两个辅助方法,分别用于获取和重置访问次数:

然后浏览代码,查找修改点。增加计数主要在从边查找节点的时候,因此修改部分如下:

此外还有 income/outcome 方法,修改都很简单,这里就不再列出。

实现后再次运行测试。测试通过,表明延迟查询确实在效率上优于主动查询。

不像关系数据库的结构那样固定,图的形式可以千变万化,查询机制也必须足够灵活。从原理上讲,所有查询无非是从某个节点出发按照特定方向搜索,因此用 node/income/outcome 这三个方法几乎可以组合出任意所需的查询。

但对于复杂查询,写出的代码有时会显得较为琐碎和冗长,对于特定领域来说,往往存在更为简洁的名称,例如:母亲的兄弟可简称为舅舅。对于这些场景,如果能够类似 DSL (领域特定语言)那样允许用户根据专业要求自行扩展,从而简化查询,方便阅读,无疑会更为友好。

如果读者去看原作者的实现,会发现他是用一种特殊语法 addAlias 来定义自己想要的查询,调用方法时再进行查询以确定要执行的内容,其接口和内部实现都是相当复杂的。

而我希望有更简单的方法来实现这一点。所幸 Python 是一种高度动态的语言,允许在运行时向类中增加新的成员,因此做到这一点可能比预想的还要简单。

为了验证这一点,编写测试如下:

无需 Dagoba 的实现做任何改动,测试就可以通过了!其实我们要做的就是动态添加一个自定义的成员函数,按照 Python 对象机制的要求,成员函数的第一个成员应该是名为 self 的参数,但这里已经是在 UnitTest 的内部,为了和测试类本身的 self 相区分,新函数的参数增加了一个下划线。

此外,函数应返回其所属的对象,这是为了链式调用所要求的。我们看到,动态语言的灵活性使得添加新语法变得非常简单。

到此,一个初具规模的图数据库就形成了。

和原文相比,本文还缺少一些内容,比如如何将数据库序列化到磁盘。不过相信读者都看到了,我们的数据库内部结构基本上是简单的原生数据结构(列表+字典),因此序列化无论用 pickle 或是 JSON 之类方法都应该是相当简单的。有兴趣的读者可以自行完成它们。

我们的图数据库实现为了提高查询性能,在节点内部存储了边的指针(或者说引用)。这样做的好处是,无论数据库有多大,从一个节点到相邻节点的访问是常数时间,因此数据访问的效率非常高。

但一个潜在的问题是,如果数据库规模非常大,已经无法整个放在内存中,或者出于安全性等原因要实现分布式访问的话,那么指针就无法使用了,必须要考虑其他机制来解决这个问题。分布式数据库无论采用何种数据模型都是一个棘手的问题,在本文中我们没有涉及。有兴趣的读者也可以考虑 500lines 系列中关于分布式和集群算法的其他一些文章。

本文的实现和系列中其他数据库类似,采用 Python 作为实现语言,而原作者使用的是 JavaScript ,这应该和作者的背景有关。我相信对于大多数开发者来说, Python 的对象机制比 JavaScript 基于原型的语法应该是更容易阅读和理解的。

当然,原作者的版本比本文版本在实现上其实是更为完善的,灵活性也更好。如果想要更为优雅的实现,我们可以考虑使用 Python 元编程,那样会更接近于作者的实现,但也会让程序的复杂性大为增加。如果读者有兴趣,不妨对照着去读读原作者的版本。

对大多数软件开发者而言,术语数据库通常是指RDBMS(关系数据库管理系统), 这些系统使用表格(类似于电子表格的网格),其中行表示记录,列表示记录的字段。表格及其中存放的数据是使用SQL (结构化査询语言)编写的语句来创建并操纵的。Python提供了用于操纵SQL数据库的API(应用程序接口),通常与作为标准的SQLite 3数据库一起发布。

另一种数据库是DBM (数据库管理器),其中存放任意数量的键-值项。Python 的标准库提供了几种DBM的接口,包括某些特定于UNIX平台的。DBM的工作方式 与Python中的字典类似,区别在于DBM通常存放于磁盘上而不是内存中,并且其键与值总是bytes对象,并可能受到长度限制。本章第一节中讲解的shelve模块提供了方便的DBM接口,允许我们使用字符串作为键,使用任意(picklable)对象作为值。

如果可用的 DBM 与 SQLite 数据库不够充分,Python Package Index, pypi.python.org/pypi中提供了大量数据库相关的包,包括bsddb DBM ("Berkeley DB"),对象-关系映射器,比如SQLAlchemy (www.sqlalchemy.org),以及流行的客户端/服务器数据的接口,比如 DB2、Informix、Ingres、MySQL、ODBC 以及 PostgreSQL。

本章中,我们将实现某程序的两个版本,该程序用于维护一个DVD列表,并追踪每个DVD的标题、发行年份、时间长度以及发行者。该程序的第一版使用DBM (通过shelve模块)存放其数据,第二版则使用SQLite数据库。两个程序都可以加载与保存简单的XML格式,这使得从某个程序导出DVD数据并将其导入到其他程序成为可能。与DBM版相比,基于SQL的程序提供了更多一些的功能,并且其数据设计也稍干净一些。

12.1 DBM数据库

shelve模块为DBM提供了一个wrapper,借助于此,我们在与DBM交互时,可以将其看做一个字典,这里是假定我们只使用字符串键与picklable值,实际处理时, shelve模块会将键与值转换为bytes对象(或者反过来)。

由于shelve模块使用的是底层的DBM,因此,如果其他计算机上没有同样的DBM,那么在某台计算机上保存的DBM文件在其他机器上无法读取是可能的。为解决这一问题,常见的解决方案是对那些必须在机器之间可传输的文件提供XML导入与导出功能,这也是我们在本节的DVD程序dvds-dbm.py中所做的。

对键,我们使用DVD的标题;对值,则使用元组,其中存放发行者、发行年份以及时间。借助于shelve模块,我们不需要进行任何数据转换,并可以把DBM对象当做一个字典进行处理。

程序在结构上类似于我们前面看到的那种菜单驱动型的程序,因此,这里主要展示的是与DBM程序设计相关的那部分。下面给出的是程序main()函数中的一部分, 忽略了其中菜单处理的部分代码。

db = None

try:

db = shelve.open(filename, protocol=pickle.HIGHEST_PROTOCOL)

finally:

if db is not None:

db.dose()

这里我们已打开(如果不存在就创建)指定的DBM文件,以便于对其进行读写操作。每一项的值使用指定的pickle协议保存为一个pickle,现有的项可以被读取, 即便是使用更底层的协议保存的,因为Python可以计算出用于读取pickle的正确协议。最后,DBM被关闭——其作用是清除DBM的内部缓存,并确保磁盘文件可以反映出已作的任何改变,此外,文件也需要关闭。

该程序提供了用于添加、编辑、列出、移除、导入、导出DVD数据的相应选项。除添加外,我们将忽略大部分用户接口代码,同样是因为已经在其他上下文中进行了展示。

def add_dvd(db):

title = Console.get_string("Title", "title")

if not title:

return

director = Console.get_string("Director", "director")

if not director:

return

year = Console.get_integer("Year", "year",minimum=1896,

maximum=datetime,date.today().year)

duration = Console.get_integer("Duration (minutes)", "minutes“, minimum=0, maximum=60*48)

db[title] = (director, year, duration)

db.sync()

像程序菜单调用的所有函数一样,这一函数也以DBM对象(db)作为其唯一参数。该函数的大部分工作都是获取DVD的详细资料,在倒数第二行,我们将键-值项存储在DBM文件中,DVD的标题作为键,发行者、年份以及时间(由shelve模块pickled在一起)作为值。

为与Python通常的一致性同步,DBM提供了与字典一样的API,因此,除了 shelve.open() 函数(前面已展示)与shelve.Shelf.sync()方法(该方法用于清除shelve的内部缓存,并对磁盘上文件的数据与所做的改变进行同步——这里就是添加一个新项),我们不需要学习任何新语法。

def edit_dvd(db):

old_title = find_dvd(db, "edit")

if old_title is None:

return

title = Console.get.string("Title", "title", old_title)

if not title:

return

director, year, duration = db[old_title]

...

db[title]= (director, year, duration)

if title != old_title:

del db[old_title]

db.sync()

为对某个DVD进行编辑,用户必须首先选择要操作的DVD,也就是获取DVD 的标题,因为标题用作键,值则用于存放其他相关数据。由于必要的功能在其他场合 (比如移除DVD)也需要使用,因此我们将其实现在一个单独的find_dvd()函数中,稍后将査看该函数。如果找到了该DVD,我们就获取用户所做的改变,并使用现有值作为默认值,以便提高交互的速度。(对于这一函数,我们忽略了大部分用户接口代码, 因为其与添加DVD时几乎是相同的。)最后,我们保存数据,就像添加时所做的一样。如果标题未作改变,就重写相关联的值;如果标题已改变,就创建一个新的键-值对, 并且需要删除原始项。

def find_dvd(db, message):

message = "(Start of) title to " + message

while True:

matches =[]

start = Console.get_string(message, "title")

if not start:

return None

for title in db:

if title.lower().startswith(start.lower()):

matches.append(title)

if len(matches) == 0:

print("There are no dvds starting with", start)

continue

elif len(matches) == 1:

return matches[0]

elif len(matches) >DISPLAY_LIMIT:

print("Too many dvds start with {0}try entering more of the title".format(start)

continue

else:

matches = sorted(matches, key=str.lower)

for i, match in enumerate(matches):

print("{0}: {1}".format(i+1, match))

which = Console.get_integer("Number (or 0 to cancel)",

"number", minimum=1, maximum=len(matches))

return matches[which - 1] if which != 0 else None

为尽可能快而容易地发现某个DVD,我们需要用户只输入其标题的一个或头几个字符。在具备了标题的起始字符后,我们在DBM中迭代并创建一个匹配列表。如果只有一个匹配项,就返回该项;如果有几个匹配项(但少于DISPLAY_LIMIT, 一个在程序中其他地方设置的整数),就以大小写不敏感的顺序展示所有这些匹配项,并为每一项设置一个编号,以便用户可以只输入编号就可以选择某个标题。(Console.get_integer()函数可以接受0,即便最小值大于0,以便0可以用作一个删除值。通过使用参数allow_zero=False, 可以禁止这种行为。我们不能使用Enter键,也就是说,没有什么意味着取消,因为什么也不输入意味着接受默认值。)

def list_dvds(db):

start =”"

if len(db)>DISPLAY.LIMIT:

start = Console.get_string(“List those starting with [Enter=all]”, "start”)

print()

for title in sorted(db, key=str.lower):

if not start or title.Iower().startswith(start.lower()):

director, year, duration = db[title]

print("{title} ({year}) {duration} minute{0}, by "

"{director}".format(Util.s(duration),**locals()))

列出所有DVD (或者那些标题以某个子字符串引导)就是对DBM的所有项进行迭代。

Util.s()函数就是简单的s = lambda x: "" if x == 1 else "s",因此,如果时间长度不是1分钟,就返回"s"。

def remove_dvd(db):

title = find_dvd(db, "remove")

if title is None:

return

ans = Console.get_bool("Remove {0}?".format(title), "no")

if ans:

del db[title]

db.sync()

要移除一个DVD,首先需要找到用户要移除的DVD,并请求确认,获取后从DBM中删除该项即可。

到这里,我们展示了如何使用shelve模块打开(或创建)一个DBM文件,以及如何向其中添加项、编辑项、对其项进行迭代以及移除某个项。

遗憾的是,在我们的数据设计中存在一个瑕疵。发行者名称是重复的,这很容易导致不一致性,比如,发行者Danny DeVito可能被输入为"Danny De Vito",用于 一个电影;也可以输入为“Danny deVito",用于另一个。为解决这一问题,可以使用两个DBM文件,主DVD文件使用标题键与(年份,时间长度,发行者ID)值发行者文件使用发行者ID (整数)键与发行者名称值。下一节展示的SQL数据库 版程序将避免这一瑕疵,这是通过使用两个表格实现的,一个用于DVD,另一个用于发行者。

12.2 SQL数据库

大多数流行的SQL数据库的接口在第三方模块中是可用的,Python带有sqlite3 模块(以及SQLite 3数据库),因此,在Python中,可以直接开始数据库程序设计。SQLite是一个轻量级的SQL数据库,缺少很多诸如PostgreSQL这种数据库的功能, 但非常便于构造原型系统,并且在很多情况下也是够用的。

为使后台数据库之间的切换尽可能容易,PEP 249 (Python Database API Specification v2.0)提供了称为DB-API 2.0的API规范。数据库接口应该遵循这一规范,比如sqlite3模块就遵循这一规范,但不是所有第三方模块都遵循。API规范中指定了两种主要的对象,即连接对象与游标对象。表12-1与表12-2中分别列出了这两种对象必须支持的API。在sqlite3模块中,除DB-API 2.0规范必需的之外,其连接对象与游标对象都提供了很多附加的属性与方法。

DVD程序的SQL版本为dvds.sql.py,该程序将发行者与DVD数据分开存储,以 避免重复,并提供一个新菜单,以供用户列出发行者。该程序使用的两个表格在图12-1

def connect(filename):

create= not os.path.exists(filename)

db = sqlite3.connect(filename)

if create:

cursor = db.cursor()

cursor.execute("CREATE TABLE directors ("

"id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL, "

"name TEXT UNIQUE NOT NULL)")

cursor.execute("CREATE TABLE dvds ("

"id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL, "

"title TEXT NOT NULL, "

"year INTEGER NOT NULL,"

"duration INTEGER NOT NULL, "

"director_id INTEGER NOT NULL, ”

"FOREIGN KEY (director_id) REFERENCES directors)")

db.commit()

return db

sqlite3.connect()函数会返回一个数据库对象,并打开其指定的数据库文件。如果该文件不存在,就创建一个空的数据库文件。鉴于此,在调用sqlite3.connect()之前,我们要注意数据库是否是准备从头开始创建,如果是,就必须创建该程序要使用的表格。所有査询都是通过一个数据库游标完成的,可以从数据库对象的cursor()方法获取。

注意,两个表格都是使用一个ID字段创建的,ID字段有一个AUTOINCREMENT 约束——这意味着SQLite会自动为ID字段赋予唯一性的数值,因此,在插入新记录时,我们可以将这些字段留给SQLite处理。

SQLite支持有限的数据类型——实际上就是布尔型、数值型与字符串——但使用数据'‘适配器”可以对其进行扩展,或者是扩展到预定义的数据类型(比如那些用于日期与datetimes的类型),或者是用于表示任意数据类型的自定义类型。DVD程序并不需要这一功能,如果需要,sqlite3模块的文档提供了很多详细解释。我们使用的外部键语法可能与用于其他数据库的语法不同,并且在任何情况下,只是记录我们的意图,因为SQLite不像很多其他数据库那样需要强制关系完整性,sqlite3另一点与众不同的地方在于其默认行为是支持隐式的事务处理,因此,没有提供显式的“开始事务” 方法。

def add_dvd(db):

title = Console.get_string("Title", "title")

if not title:

return

director = Console.get_string("Director", "director")

if not director:

return

year = Console.get_integer("Year", "year”, minimum=1896,

maximum=datetime.date.today().year)

duration = Console.get_integer("Duration (minutes)", "minutes",

minimum=0,maximum=60*48)

director_id = get_and_set_director(db, director)

cursor = db.cursor()

cursor.execute("INSERT INTO dvds ”

"(title, year, duration, director_id)"

"VALUES (?, ?, ?, ?)",

(title, year, duration, director_id))

db.commit()

这一函数的开始代码与dvds-dbm.py程序中的对应函数一样,但在完成数据的收集后,与原来的函数有很大的差别。用户输入的发行者可能在也可能不在directors表格中,因此,我们有一个get_and_set_director()函数,在数据库中尚无某个发行者时, 该函数就将其插入到其中,无论哪种情况都返回就绪的发行者ID,以便在需要的时候插入到dvds表。在所有数据都可用后,我们执行一条SQL INSERT语句。我们不需要指定记录ID,因为SQLite会自动为我们提供。

在査询中,我们使用问号(?)作为占位符,每个?都由包含SQL语句的字符串后面的序列中的值替代。命名的占位符也可以使用,后面在编辑记录时我们将看到。尽管避免使用占位符(而只是简单地使用嵌入到其中的数据来格式化SQL字符串)也是可能的,我们建议总是使用占位符,并将数据项正确编码与转义的工作留给数据库模块来完成。使用占位符的另一个好处是可以提高安全性,因为这可以防止任意的SQL 被恶意地插入到一个査询中。

def get_and_set_director(db, director):

director_id = get_director_id(db, director)

if directorjd is not None:

return director_id

cursor = db.cursor()

cursor.execute("lNSERT INTO directors (name) VALUES (?)”,(director,))

db.commit()

return get_director_id(db, director)

这一函数返回给定发行者的ID,并在必要的时候插入新的发行者记录。如果某个记录被插入,我们首先尝试使用get_director_id()函数取回其ID。

def get_director_id(db, director):

cursor = db.cursor()

cursor.execute("SELECT id FROM directors WHERE name=?",(director,))

fields = cursor.fetchone()

return fields[0] if fields is not None else None

get_director_id()函数返回给定发行者的ID,如果数据库中没有指定的发行者,就返回None。我们使用fetchone()方法,因为或者有一个匹配的记录,或者没有。(我们知道,不会有重复的发行者,因为directors表格的名称字段有一个UNIQUE约束,在任何情况下,在添加一个新的发行者之前,我们总是先检査其是否存在。)这种取回方法总是返回一个字段序列(如果没有更多的记录,就返回None)。即便如此,这里我们只是请求返回一个单独的字段。

def edit_dvd(db):

title, identity = find_dvd(db, "edit")

if title is None:

return

title = Console.get_string("Title","title", title)

if not title:

return

cursor = db.cursor()

cursor.execute("SELECT dvds.year, dvds.duration, directors.name"

“FROM dvds, directors "

"WHERE dvds.director_id = directors.id AND "

"dvds.id=:id", dict(id=identity))

year, duration, director = cursor.fetchone()

director = Console.get_string("Director", "director", director)

if not director:

return

year = Console,get_integer("Year","year", year, 1896,datetime.date.today().year)

duration = Console.get_integer("Duration (minutes)", "minutes",

duration, minimum=0, maximum=60*48)

director_id = get_and_set_director(db, director)

cursor.execute("UPDATE dvds SET title=:title, year=:year,"

"duration=:duration, director_id=:directorjd "

"WHERE id=:identity", locals())

db.commit()

要编辑DVD记录,我们必须首先找到用户需要操纵的记录。如果找到了某个记录,我们就给用户修改其标题的机会,之后取回该记录的其他字段,以便将现有值作为默认值,将用户的输入工作最小化,用户只需要按Enter键就可以接受默认值。这里,我们使用了命名的占位符(形式为:name),并且必须使用映射来提供相应的值。对SELECT语句,我们使用一个新创建的字典;对UPDATE语句,我们使用的是由 locals()返回的字典。

我们可以同时为这两个语句都使用新字典,这种情况下,对UPDATE语句,我们可以传递 dict(title=title, year=year, duration=duration, director_id=director_id, id=identity)),而非 locals()。

在具备所有字段并且用户已经输入了需要做的改变之后,我们取回相应的发行者ID (如果必要就插入新的发行者记录),之后使用新数据对数据库进行更新。我们采用了一种简化的方法,对记录的所有字段进行更新,而不仅仅是那些做了修改的字段。

在使用DBM文件时,DVD标题被用作键,因此,如果标题进行了修改,我们就需要创建一个新的键-值项,并删除原始项。不过,这里每个DVD记录都有一个唯一性的ID,该ID是记录初次插入时创建的,因此,我们只需要改变任何其他字段的值, 而不需要其他操作。

def find_dvd(db, message):

message = "(Start of) title to " + message

cursor = db.cursor()

while True: .

start = Console.get_stnng(message, "title")

if not start:

return (None, None)

cursor.execute("SELECT title, id FROM dvds "

"WHERE title LIKE ? ORDER BY title”,

(start +"%",))

records = cursor.fetchall()

if len(records) == 0:

print("There are no dvds starting with", start)

continue

elif len(records) == 1:

return records[0]

elif len(records) >DISPLAY_LIMIT:

print("Too many dvds ({0}) start with {1}try entering "

"more of the title".format(len(records),start))

continue

else:

for i, record in enumerate(records):

print("{0}:{1}".format(i + 1, record[0]))

which = Console.get_integer("Number (or 0 to cancel)",

"number", minimum=1, maximum=len(records))

return records[which -1] if which != 0 else (None, None)

这一函数的功能与dvdsdbm.py程序中的find_dvd()函数相同,并返回一个二元组 (DVD标题,DVD ID)或(None, None),具体依赖于是否找到了某个记录。这里并不需要在所有数据上进行迭代,而是使用SQL通配符(%),因此只取回相关的记录。

由于我们希望匹配的记录数较小,因此我们一次性将其都取回到序列的序列中。如果有不止一个匹配的记录,但数量上又少到可以显示,我们就打印记录,并将每条记录附带一个数字编号,以便用户可以选择需要的记录,其方式与在dvds-dbm.py程序中所做的类似:

def list_dvds(db):

cursor = db.cursor()

sql = ("SELECT dvds.title, dvds.year, dvds.duration, "

"directors.name FROM dvds, directors "

"WHERE dvds.director_id = directors.id")

start = None

if dvd_count(db) >DISPLAY_LIMIT:

start = Console.get_string("List those starting with [Enter=all]", "start")

sql += " AND dvds.title LIKE ?"

sql += ” ORDER BY dvds.title"

print()

if start is None:

cursor.execute(sql)

else:

cursor.execute(sql, (start +"%",))

for record in cursor:

print("{0[0]} ({0[1]}) {0[2]} minutes, by {0[3]}".format(record))

要列出每个DVD的详细资料,我们执行一个SELECT査询。该査询连接两个表,如果记录(由dvd_count()函数返回)数量超过了显示限制值,就将第2个元素添加到WHERE 分支,之后执行该査询,并在结果上进行迭代。每个记录都是一个序列,其字段是与 SELECT査询相匹配的。

def dvd_count(db):

cursor = db.cursor()

cursor.execute("SELECT COUNT(*) FROM dvds")

return cursor.fetchone()[0]

我们将这几行代码放置在一个单独的函数中,因为我们在几个不同的函数中都需要使用这几行代码。

我们忽略了 list_directors()函数的代码,因为该函数在结构上与list_dvds()函数非常类似,只不过更简单一些,因为本函数只列出一个字段(name)。

def remove_dvd(db):

title, identity = find_dvd(db, "remove")

if title is None:

return

ans = Console.get_bool("Remove {0}?".format(title), "no")

if ans:

cursor = db.cursor()

cursor.execute("DELETE FROM dvds WHERE id=?", (identity,))

db.commit()

在用户需要删除一个记录时,将调用本函数,并且本函数与dvds-dbm.py程序中 相应的函数是非常类似的。

到此,我们完全查阅了 dvds-sql.py程序,并且了解了如何创建数据库表格、选取 记录、在选定的记录上进行迭代以及插入、更新与删除记录。使用execute()方法,我们可以执行底层数据库所支持的任意SQL语句。

SQLite提供了比我们这里使用的多得多的功能,包括自动提交模式(以及任意其他类型的事务控制),以及创建可以在SQL查询内执行的函数的能力。提供一个工厂函数并用于控制对每个取回的记录返回什么(比如,一个字典或自定义类型,而不是字段序列)也是可能的。此外,通过传递“:memory:”作为文件名,创建内存中的SQLite 数据库也是可能的。

以上内容部分摘自视频课程05后端编程Python22 数据库编程,更多实操示例请参照视频讲解。跟着张员外讲编程,学习更轻松,不花钱还能学习真本领。