python 操作DB

Python010

python 操作DB,第1张

//兼容2、3,使用Mysql,sqlite,gadfly

//py2版本

import os

from random import randrange as rand

COLSIZ = 10

FIELDS = ('login', 'userid', 'projid')

RDBMSs = {'s': 'sqlite', 'm': 'mysql', 'g': 'gadfly'}

DBNAME = 'test'

DBUSER = 'root'

DB_EXC = None

NAMELEN = 16

tformat = lambda s: str(s).title().ljust(COLSIZ)

cformat = lambda s: s.upper().ljust(COLSIZ)

def setup():

return RDBMSs[raw_input('''

Choose a database system:

(M)ySQL

(G)adfly

(S)QLite

Enter choice: ''').strip().lower()[0]]

def connect(db):

global DB_EXC

dbDir = '%s_%s' % (db, DBNAME)

if db == 'sqlite':

try:

import sqlite3

except ImportError:

try:

from pysqlite2 import dbapi2 as sqlite3

except ImportError:

return None

DB_EXC = sqlite3

if not os.path.isdir(dbDir):

os.mkdir(dbDir)

cxn = sqlite3.connect(os.path.join(dbDir, DBNAME))

elif db == 'mysql':

try:

import MySQLdb

import _mysql_exceptions as DB_EXC

except ImportError:

return None

try:

cxn = MySQLdb.connect(db=DBNAME)

except DB_EXC.OperationalError:

try:

cxn = MySQLdb.connect(user=DBUSER)

cxn.query('CREATE DATABASE %s' % DBNAME)

cxn.commit()

cxn.close()

cxn = MySQLdb.connect(db=DBNAME)

except DB_EXC.OperationalError:

return None

elif db == 'gadfly':

try:

from gadfly import gadfly

DB_EXC = gadfly

except ImportError:

return None

try:

cxn = gadfly(DBNAME, dbDir)

except IOError:

cxn = gadfly()

if not os.path.isdir(dbDir):

os.mkdir(dbDir)

cxn.startup(DBNAME, dbDir)

else:

return None

return cxn

def create(cur):

try:

cur.execute('''

CREATE TABLE users (

login VARCHAR(%d),

userid INTEGER,

projid INTEGER)

''' % NAMELEN)

except DB_EXC.OperationalError:

drop(cur)

create(cur)

drop = lambda cur: cur.execute('DROP TABLE users')

NAMES = (

('aaron', 8312), ('angela', 7603), ('dave', 7306),

('davina',7902), ('elliot', 7911), ('ernie', 7410),

('jess', 7912), ('jim', 7512), ('larry', 7311),

('leslie', 7808), ('melissa', 8602), ('pat', 7711),

('serena', 7003), ('stan', 7607), ('faye', 6812),

('amy', 7209), ('mona', 7404), ('jennifer', 7608),

)

def randName():

pick = set(NAMES)

while pick:

yield pick.pop()

def insert(cur, db):

if db == 'sqlite':

cur.executemany("INSERT INTO users VALUES(?, ?, ?)",

[(who, uid, rand(1,5)) for who, uid in randName()])

elif db == 'gadfly':

for who, uid in randName():

cur.execute("INSERT INTO users VALUES(?, ?, ?)",

(who, uid, rand(1,5)))

elif db == 'mysql':

cur.executemany("INSERT INTO users VALUES(%s, %s, %s)",

[(who, uid, rand(1,5)) for who, uid in randName()])

getRC = lambda cur: cur.rowcount if hasattr(cur, 'rowcount') else -1

def update(cur):

fr = rand(1,5)

to = rand(1,5)

cur.execute("UPDATE users SET projid=%d WHERE projid=%d" % (to, fr))

return fr, to, getRC(cur)

def delete(cur):

rm = rand(1,5)

cur.execute('DELETE FROM users WHERE projid=%d' % rm)

return rm, getRC(cur)

def dbDump(cur):

cur.execute('SELECT * FROM users')

print '\n%s' % ''.join(map(cformat, FIELDS))

for data in cur.fetchall():

print ''.join(map(tformat, data))

def main():

db = setup()

print '*** Connect to %r database' % db

cxn = connect(db)

if not cxn:

print 'ERROR: %r not supported or unreachable, exiting' % db

return

cur = cxn.cursor()

print '\n*** Create users table (drop old one if appl.)'

create(cur)

print '\n*** Insert names into table'

insert(cur, db)

dbDump(cur)

print '\n*** Move users to a random group'

fr, to, num = update(cur)

print '\t(%d users moved) from (%d) to (%d)' % (num, fr, to)

dbDump(cur)

print '\n*** Randomly delete group'

rm, num = delete(cur)

print '\t(group #%d%d users removed)' % (rm, num)

dbDump(cur)

print '\n*** Drop users table'

drop(cur)

print '\n*** Close cxns'

cur.close()

cxn.commit()

cxn.close()

if name == ' main ':

main()

可以给db文件添加密码。

在Python中异或操作符为,^,也可以记作XOR。按位异或的意思是。相同值异或为0,不同值异或为1.具体来讲,有四种可能,0^0=0,0^1=1,1^0=1,1^1=0。我们还可总结出规律(A为0或1),0和A异或为A本身。1和A异或为A反。

加密操作,首先将文件转换成二进制数,再生成与该二进制数等长的随机密钥,将二进制数与密钥进行异或操作,得到加密后的二进制数。解密操作,将加密后的二进制程序与密钥进行异或操作,就得到原二进制数,最后将原二进制数恢复成文本文件。

多套测试环境,如何做基线的数据库级别的同步更新?

工作中测试环境有多套时,为保证基础环境配置的一致性,就需要所有测试环境的数据库结构保持一致。

例如:A需求在 beta1 环境进行测试,且A需求提测单中有新增表的 sql,B需求在 beta2 环境进行测试,由于A需求比B需求先发布上线,此时在B需求测试过程中发布时需要将主干的代码合并到当前需求分支(集成测试的需要,可以提前检测出已上线的需求是否对当前在测的需求有影响),代码合并后对应的相关配置也得跟上,否则程序运行时会报错,所以就需要在 beta2 环境更新 beta1 环境A需求新增表的sql。

因为每一次的发布上线都会做数据库级别的同步更新,如果只是两、三个测试环境,使用人工来手动更新也是可以的,如果测试环境多且数据库更新的内容量大,依然使用人工手动更新,效率就会十分低下,同时也会造成一些人为操作的错误。这时自动化同步更新数据库就显得犹为重要了。在效率和正确率上都是完胜手工更新的。

由代码实现部分可以看出,有了这个自动同步的自动化脚本,在数据库更新时,只需要传入更新的 sql 语句就可一键自动同步多套测试环境的数据库信息了,十分高效。