python 运维常用脚本

Python010

python 运维常用脚本,第1张

Python 批量遍历目录文件,并修改访问时间

import os

path = "D:/UASM64/include/"

dirs = os.listdir(path)

temp=[]

for file in dirs:

temp.append(os.path.join(path, file))

for x in temp:

os.utime(x, (1577808000, 1577808000))

Python 实现的自动化服务器管理

import sys

import os

import paramiko

ssh = paramiko.SSHClient()

ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def ssh_cmd(user,passwd,port,userfile,cmd):

def ssh_put(user,passwd,source,target):

while True:

try:

shell=str(input("[Shell] # "))

if (shell == ""):

continue

elif (shell == "exit"):

exit()

elif (shell == "put"):

ssh_put("root","123123","./a.py","/root/a.py")

elif (shell =="cron"):

temp=input("输入一个计划任务: ")

temp1="(crontab -lecho "+ temp + ") |crontab"

ssh_cmd("root","123123","22","./user_ip.conf",temp1)

elif (shell == "uncron"):

temp=input("输入要删除的计划任务: ")

temp1="crontab -l | grep -v " "+ temp + "|crontab"

ssh_cmd("root","123123","22","./user_ip.conf",temp1)

else:

ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)

遍历目录和文件

import os

def list_all_files(rootdir):

import os

_files = []

list = os.listdir(rootdir) #列出文件夹下所有的目录与文件

for i in range(0,len(list)):

path = os.path.join(rootdir,list[i])

if os.path.isdir(path):

_files.extend(list_all_files(path))

if os.path.isfile(path):

_files.append(path)

return _files

a=list_all_files("C:/Users/LyShark/Desktop/a")

print(a)

python检测指定端口状态

import socket

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

sk.settimeout(1)

for ip in range(0,254):

try:

sk.connect(("192.168.1."+str(ip),443))

print("192.168.1.%d server open \n"%ip)

except Exception:

print("192.168.1.%d server not open"%ip)

sk.close()

python实现批量执行CMD命令

import sys

import os

import paramiko

ssh = paramiko.SSHClient()

ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

print("------------------------------>\n")

print("使用说明,在当前目录创建ip.txt写入ip地址")

print("------------------------------>\n")

user=input("输入用户名:")

passwd=input("输入密码:")

port=input("输入端口:")

cmd=input("输入执行的命令:")

file = open("./ip.txt", "r")

line = file.readlines()

for i in range(len(line)):

print("对IP: %s 执行"%line[i].strip('\n'))

python3-实现钉钉报警

import requests

import sys

import json

dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token=6d11af3252812ea50410c2ccb861814a6ed11b2306606934a5d4ca9f2ec8c09'

data = {"msgtype": "markdown","markdown": {"title": "监控","text": "apche异常"}}

headers = {'Content-Type':'application/jsoncharset=UTF-8'}

send_data = json.dumps(data).encode('utf-8')

requests.post(url=dingding_url,data=send_data,headers=headers)

import psutil

import requests

import time

import os

import json

monitor_name = set(['httpd','cobblerd']) # 用户指定监控的服务进程名称

proc_dict = {}

proc_name = set() # 系统检测的进程名称

monitor_map = {

'httpd': 'systemctl restart httpd',

'cobblerd': 'systemctl restart cobblerd' # 系统在进程down掉后,自动重启

}

dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token=b5258c4335ed8ab792075013c965efcbf4f8940f92e7bd936cdc7842d3bf9405'

while True:

for proc in psutil.process_iter(attrs=['pid','name']):

proc_dict[proc.info['pid']] = proc.info['name']

proc_name.add(proc.info['name'])

判断指定端口是否开放

import socket

port_number = [135,443,80]

for index in port_number:

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

result = sock.connect_ex(('127.0.0.1', index))

if result == 0:

print("Port %d is open" % index)

else:

print("Port %d is not open" % index)

sock.close()

判断指定端口并且实现钉钉轮询报警

import requests

import sys

import json

import socket

import time

def dingding(title,text):

dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token=6d11af3252812ea50410c2ccb861814a69ed11b2306606934a5d4ca9f2c8c09'

data = {"msgtype": "markdown","markdown": {"title": title,"text": text}}

headers = {'Content-Type':'application/jsoncharset=UTF-8'}

send_data = json.dumps(data).encode('utf-8')

requests.post(url=dingding_url,data=send_data,headers=headers)

def net_scan():

port_number = [80,135,443]

for index in port_number:

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

result = sock.connect_ex(('127.0.0.1', index))

if result == 0:

print("Port %d is open" % index)

else:

return index

sock.close()

while True:

dingding("Warning",net_scan())

time.sleep(60)

python-实现SSH批量CMD执行命令

import sys

import os

import paramiko

ssh = paramiko.SSHClient()

ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def ssh_cmd(user,passwd,port,userfile,cmd):

file = open(userfile, "r")

line = file.readlines()

for i in range(len(line)):

print("对IP: %s 执行"%line[i].strip('\n'))

ssh.connect(hostname=line[i].strip('\n'),port=port,username=user,password=passwd)

cmd=cmd

stdin, stdout, stderr = ssh.exec_command(cmd)

result = stdout.read()

ssh_cmd("lyshark","123","22","./ip.txt","free -h |grep 'Mem:' |awk '{print $3}'")

用python写一个列举当前目录以及所有子目录下的文件,并打印出绝对路径

import sys

import os

for root,dirs,files in os.walk("C://"):

for name in files:

print(os.path.join(root,name))

os.walk()

按照这样的日期格式(xxxx-xx-xx)每日生成一个文件,例如今天生成的文件为2013-09-23.log, 并且把磁盘的使用情况写到到这个文件中。

import os

import sys

import time

new_time = time.strftime("%Y-%m-%d")

disk_status = os.popen("df -h").readlines()

str1 = ''.join(disk_status)

f = open(new_time+'.log','w')

f.write("%s"%str1)

f.flush()

f.close()

统计出每个IP的访问量有多少?(从日志文件中查找)

import sys

list = []

f = open("/var/log/httpd/access_log","r")

str1 = f.readlines()

f.close()

for i in str1:

ip=i.split()[0]

list.append(ip)

list_num=set(list)

for j in list_num:

num=list.count(j)

print("%s ----->%s" %(num,j))

写个程序,接受用户输入数字,并进行校验,非数字给出错误提示,然后重新等待用户输入。

import tab

import sys

while True:

try:

num=int(input("输入数字:").strip())

for x in range(2,num+1):

for y in range(2,x):

if x % y == 0:

break

else:

print(x)

except ValueError:

print("您输入的不是数字")

except KeyboardInterrupt:

sys.exit("\n")

ps 可以查看进程的内存占用大小,写一个脚本计算一下所有进程所占用内存大小的和。

import sys

import os

list=[]

sum=0

str1=os.popen("ps aux","r").readlines()

for i in str1:

str2=i.split()

new_rss=str2[5]

list.append(new_rss)

for i in list[1:-1]:

num=int(i)

sum=sum+num

print("%s --->%s"%(list[0],sum))

关于Python 命令行参数argv

import sys

if len(sys.argv) <2:

print ("没有输入任何参数")

sys.exit()

if sys.argv[1].startswith("-"):

option = sys.argv[1][1:]

利用random生成6位数字加字母随机验证码

import sys

import random

rand=[]

for x in range(6):

y=random.randrange(0,5)

if y == 2 or y == 4:

num=random.randrange(0,9)

rand.append(str(num))

else:

temp=random.randrange(65,91)

c=chr(temp)

rand.append(c)

result="".join(rand)

print(result)

自动化-使用pexpect非交互登陆系统

import pexpect

import sys

ssh = pexpect.spawn('ssh [email protected]')

fout = file('sshlog.txt', 'w')

ssh.logfile = fout

ssh.expect("[email protected]'s password:")

ssh.sendline("密码")

ssh.expect('#')

ssh.sendline('ls /home')

ssh.expect('#')

Python-取系统时间

import sys

import time

time_str = time.strftime("日期:%Y-%m-%d",time.localtime())

print(time_str)

time_str= time.strftime("时间:%H:%M",time.localtime())

print(time_str)

psutil-获取内存使用情况

import sys

import os

import psutil

memory_convent = 1024 * 1024

mem =psutil.virtual_memory()

print("内存容量为:"+str(mem.total/(memory_convent))+"MB\n")

print("已使用内存:"+str(mem.used/(memory_convent))+"MB\n")

print("可用内存:"+str(mem.total/(memory_convent)-mem.used/(1024*1024))+"MB\n")

print("buffer容量:"+str(mem.buffers/( memory_convent ))+"MB\n")

print("cache容量:"+str(mem.cached/(memory_convent))+"MB\n")

Python-通过SNMP协议监控CPU

注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*

import os

def getAllitems(host, oid):

sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid + '|grep Raw|grep Cpu|grep -v Kernel').read().split('\n')[:-1]

return sn1

def getDate(host):

items = getAllitems(host, '.1.3.6.1.4.1.2021.11')

if name == ' main ':

Python-通过SNMP协议监控系统负载

注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*

import os

import sys

def getAllitems(host, oid):

sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')

return sn1

def getload(host,loid):

load_oids = '1.3.6.1.4.1.2021.10.1.3.' + str(loid)

return getAllitems(host,load_oids)[0].split(':')[3]

if name == ' main ':

Python-通过SNMP协议监控内存

注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*

import os

def getAllitems(host, oid):

def getSwapTotal(host):

def getSwapUsed(host):

def getMemTotal(host):

def getMemUsed(host):

if name == ' main ':

Python-通过SNMP协议监控磁盘

注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*

import re

import os

def getAllitems(host,oid):

def getDate(source,newitem):

def getRealDate(item1,item2,listname):

def caculateDiskUsedRate(host):

if name == ' main ':

Python-通过SNMP协议监控网卡流量

注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*

import re

import os

def getAllitems(host,oid):

sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')[:-1]

return sn1

def getDevices(host):

device_mib = getAllitems(host,'RFC1213-MIB::ifDescr')

device_list = []

def getDate(host,oid):

date_mib = getAllitems(host,oid)[1:]

date = []

if name == ' main ':

Python-实现多级菜单

import os

import sys

ps="[None]->"

ip=["192.168.1.1","192.168.1.2","192.168.1.3"]

flage=1

while True:

ps="[None]->"

temp=input(ps)

if (temp=="test"):

print("test page !!!!")

elif(temp=="user"):

while (flage == 1):

ps="[User]->"

temp1=input(ps)

if(temp1 =="exit"):

flage=0

break

elif(temp1=="show"):

for i in range(len(ip)):

print(i)

Python实现一个没用的东西

import sys

ps="[root@localhost]# "

ip=["192.168.1.1","192.168.1.2","192.168.1.3"]

while True:

temp=input(ps)

temp1=temp.split()

检查各个进程读写的磁盘IO

import sys

import os

import time

import signal

import re

class DiskIO:

def init (self, pname=None, pid=None, reads=0, writes=0):

self.pname = pname

self.pid = pid

self.reads = 0

self.writes = 0

def main():

argc = len(sys.argv)

if argc != 1:

print ("usage: please run this script like [./lyshark.py]")

sys.exit(0)

if os.getuid() != 0:

print ("Error: This script must be run as root")

sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

os.system('echo 1 >/proc/sys/vm/block_dump')

print ("TASK PID READ WRITE")

while True:

os.system('dmesg -c >/tmp/diskio.log')

l = []

f = open('/tmp/diskio.log', 'r')

line = f.readline()

while line:

m = re.match(

'^(\S+)(\d+)(\d+): (READ|WRITE) block (\d+) on (\S+)', line)

if m != None:

if not l:

l.append(DiskIO(m.group(1), m.group(2)))

line = f.readline()

continue

found = False

for item in l:

if item.pid == m.group(2):

found = True

if m.group(3) == "READ":

item.reads = item.reads + 1

elif m.group(3) == "WRITE":

item.writes = item.writes + 1

if not found:

l.append(DiskIO(m.group(1), m.group(2)))

line = f.readline()

time.sleep(1)

for item in l:

print ("%-10s %10s %10d %10d" %

(item.pname, item.pid, item.reads, item.writes))

def signal_handler(signal, frame):

os.system('echo 0 >/proc/sys/vm/block_dump')

sys.exit(0)

if name ==" main ":

main()

利用Pexpect实现自动非交互登陆linux

import pexpect

import sys

ssh = pexpect.spawn('ssh [email protected]')

fout = file('sshlog.log', 'w')

ssh.logfile = fout

ssh.expect("[email protected]'s password:")

ssh.sendline("密码")

ssh.expect('#')

ssh.sendline('ls /home')

ssh.expect('#')

利用psutil模块获取系统的各种统计信息

import sys

import psutil

import time

import os

time_str = time.strftime( "%Y-%m-%d", time.localtime( ) )

file_name = "./" + time_str + ".log"

if os.path.exists ( file_name ) == False :

os.mknod( file_name )

handle = open ( file_name , "w" )

else :

handle = open ( file_name , "a" )

if len( sys.argv ) == 1 :

print_type = 1

else :

print_type = 2

def isset ( list_arr , name ) :

if name in list_arr :

return True

else :

return False

print_str = ""

if ( print_type == 1 ) or isset( sys.argv,"mem" ) :

memory_convent = 1024 * 1024

mem = psutil.virtual_memory()

print_str += " 内存状态如下:\n"

print_str = print_str + " 系统的内存容量为: "+str( mem.total/( memory_convent ) ) + " MB\n"

print_str = print_str + " 系统的内存以使用容量为: "+str( mem.used/( memory_convent ) ) + " MB\n"

print_str = print_str + " 系统可用的内存容量为: "+str( mem.total/( memory_convent ) - mem.used/( 1024*1024 )) + "MB\n"

print_str = print_str + " 内存的buffer容量为: "+str( mem.buffers/( memory_convent ) ) + " MB\n"

print_str = print_str + " 内存的cache容量为:" +str( mem.cached/( memory_convent ) ) + " MB\n"

if ( print_type == 1 ) or isset( sys.argv,"cpu" ) :

print_str += " CPU状态如下:\n"

cpu_status = psutil.cpu_times()

print_str = print_str + " user = " + str( cpu_status.user ) + "\n"

print_str = print_str + " nice = " + str( cpu_status.nice ) + "\n"

print_str = print_str + " system = " + str( cpu_status.system ) + "\n"

print_str = print_str + " idle = " + str ( cpu_status.idle ) + "\n"

print_str = print_str + " iowait = " + str ( cpu_status.iowait ) + "\n"

print_str = print_str + " irq = " + str( cpu_status.irq ) + "\n"

print_str = print_str + " softirq = " + str ( cpu_status.softirq ) + "\n"

print_str = print_str + " steal = " + str ( cpu_status.steal ) + "\n"

print_str = print_str + " guest = " + str ( cpu_status.guest ) + "\n"

if ( print_type == 1 ) or isset ( sys.argv,"disk" ) :

print_str += " 硬盘信息如下:\n"

disk_status = psutil.disk_partitions()

for item in disk_status :

print_str = print_str + " "+ str( item ) + "\n"

if ( print_type == 1 ) or isset ( sys.argv,"user" ) :

print_str += " 登录用户信息如下:\n "

user_status = psutil.users()

for item in user_status :

print_str = print_str + " "+ str( item ) + "\n"

print_str += "---------------------------------------------------------------\n"

print ( print_str )

handle.write( print_str )

handle.close()

import psutil

mem = psutil.virtual_memory()

print mem.total,mem.used,mem

print psutil.swap_memory() # 输出获取SWAP分区信息

cpu = psutil.cpu_stats()

printcpu.interrupts,cpu.ctx_switches

psutil.cpu_times(percpu=True) # 输出每个核心的详细CPU信息

psutil.cpu_times().user # 获取CPU的单项数据 [用户态CPU的数据]

psutil.cpu_count() # 获取CPU逻辑核心数,默认logical=True

psutil.cpu_count(logical=False) # 获取CPU物理核心数

psutil.disk_partitions() # 列出全部的分区信息

psutil.disk_usage('/') # 显示出指定的挂载点情况【字节为单位】

psutil.disk_io_counters() # 磁盘总的IO个数

psutil.disk_io_counters(perdisk=True) # 获取单个分区IO个数

psutil.net_io_counter() 获取网络总的IO,默认参数pernic=False

psutil.net_io_counter(pernic=Ture)获取网络各个网卡的IO

psutil.pids() # 列出所有进程的pid号

p = psutil.Process(2047)

p.name() 列出进程名称

p.exe()列出进程bin路径

p.cwd()列出进程工作目录的绝对路径

p.status()进程当前状态[sleep等状态]

p.create_time() 进程创建的时间 [时间戳格式]

p.uids()

p.gids()

p.cputimes() 【进程的CPU时间,包括用户态、内核态】

p.cpu_affinity() # 显示CPU亲缘关系

p.memory_percent() 进程内存利用率

p.meminfo() 进程的RSS、VMS信息

p.io_counters() 进程IO信息,包括读写IO数及字节数

p.connections() 返回打开进程socket的namedutples列表

p.num_threads() 进程打开的线程数

import psutil

from subprocess import PIPE

p =psutil.Popen(["/usr/bin/python" ,"-c","print 'helloworld'"],stdout=PIPE)

p.name()

p.username()

p.communicate()

p.cpu_times()

psutil.users()# 显示当前登录的用户,和Linux的who命令差不多

psutil.boot_time() 结果是个UNIX时间戳,下面我们来转换它为标准时间格式,如下:

datetime.datetime.fromtimestamp(psutil.boot_time()) # 得出的结果不是str格式,继续进行转换 datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d%H:%M:%S')

Python生成一个随机密码

import random, string

def GenPassword(length):

if name == ' main ':

print (GenPassword(6))

「 生命完美的答案,无非走过没有遗憾 ---《天蓝》」

「如何能够解析脚本运行命令行选项(位于 sys.argv 中)」

argparse 模块可被用来解析命令行选项

常用来定义一个脚本的说明文档,一般我们写python脚本会通过 if..else 的方式来提供一个脚本说明文档,python不支持switch。所以很麻烦,其实,我们可以通过 argparse 来编写说明文档。

我们来看看怎么执行一个python脚本

对于熟悉Linux的小伙伴下面的文档在熟悉不过了,这个一个标准Linxu软件包的说明文档,文档中定义是软件包的说明

来看看这个脚本是如何编写的

为了解析命令行选项, 首先要创建一个 ArgumentParser 实例, 使用 add_argument() 方法声明你想要支持的选项。在每个 add-argument() 调用中:

dest 参数指定解析结果被指派给属性的名字。 metavar 参数被用来生成帮助信息。

action 参数 指定跟属性对应的处理逻辑,通常的 值为 store , 被用来存储 某个值 或将 多个参数值收集到一个列表中 。

nargs 参数收集 所有剩余的命令行参数到一个列表中。在本例中它被用来构造一个文件名列表

action='store_true' 根据参数是否存在来设置一个位置 Boolean 标志:

action='store' 参数接受一个单独值并将其存储为一个字符串

如果一个都没有,会提示缺少参数 -p/--pat

choices={'slow', 'fast'}, 参数说明接受一个值,但是会将其和可能的选择值做比较,以检测其合法性:

一旦参数选项被指定,你就可以执行 parser.parse() 方法了。它会处理 sys.argv 的值并返回一个结果实例。每个参数值会被设置成该实例中 add_argument() 方法的 dest 参数指定的属性值。

还很多种其他方法解析命令行选项。可以会手动地处理 sys.argv 或者使用 getopt 模块 。但是,如果你采用本节的方式,将会减少很多冗余代码,底层细节 argparse 模块 已经帮你处理好了。你可能还会碰到使用 optparse 库解析选项的代码。尽管 optparse 和 argparse 很像 ,但是后者更先进,因此在新的程序中你应该使用它。

「你写了个脚本,运行时需要一个密码。此脚本是交互式的,因此不能将密码在脚本中硬编码,而是需要弹出一个密码输入提示,让用户自己输入。」

Python 的 getpass 模块 正是你所需要的。你可以让你很轻松地弹出密码输入提示,并且不会在用户终端显示密码。

代码中 getpass.getuser() 不会弹出用户名的输入提示。它会根据该 用户的 shell 环境 或者会依据 本地系统的密码库 (支持 pwd 模块的平台)来使用 当前用户的登录名

在bash中编写pytohn脚本接收外部数据的方式,一般情况下,对于一般变量,我们用命令行变量的方式比较多(手动的处理 sys.argv ),对于 文件内容或者bash命令输出 直接通过脚本内部获取需要的数据。

其实python 脚本也可以用其他方式来接收 传递给他的 文件数据或者bash命令输出 ,包括将 命令行的输出 通过 管道传递 给该脚本、 重定向文件到该脚本 ,或在 命令行中传递一个文件名 或 文件名列表 给该脚本。

这里通过 Python 内置的 fileinput 模块 ,可以实现重 定向,管道,以文佳输出 的方式传递数据到脚本内部

使用 fileinput.input() 方法可以获取当前输入脚本的数据,脚本里面用一个 FileInput 迭代器接收

文件直接接收

重定向接收

管道方式接收

fileinput.input() 创建并返回一个 FileInput 类的实例,该实例可以被当做一个 上下文管理器 使用。因此,整合起来,如果我们要写一个打印多个文件输出的脚本,那么我们需要在输出中包含文件名和行号

「你想执行一个外部命令并以 Python 字符串的形式获取执行结果。」

使用 subprocess.check_output() 函数。

执行下试试

如果被执行的命令以非零码返回,就会抛出异常。下面的例子捕获到错误并获取返回码:

默认情况下, check_output() 仅仅返回输入到标准输出的值。如果你需要 同时收集标准输出和错误输出 ,使用 stderr 参数:

如果你需要用一个超时机制来执行命令,使用 timeout 参数:

通常来讲,命令的执行 不需要 使用到 底层 shell 环境(比如 sh、bash) 。一个字符串列表会被传递给一个 低级系统命令 ,比如 os.execve() 。

如果你想让 命令被一个shell 执行 ,传递一个字符串参数,并设置参数 shell=True . 有时候你想要 Python 去执行一个复杂的 shell 命令 的时候这个就很有用了,比如管道流、I/O 重定向和其他特性。例如:

是在 shell 中执行命令会存在一定的安全风险,特别是当参数来自于用户输入时。这时候可以使用 shlex.quote() 函数 来将参数正确的用双引用引起来。

使用 check_output() 函数 是执行 外部命令 并获取其 返回值 的最简单方式。但是,如果你需要对 子进程做更复杂的交互 ,比如给它发送输入,你得采用另外一种方法。这时候可直接使用 subprocess.Popen 类。

关于子进程,简单来看下

也可以进程列表同协程结合的方式。你既可以在子shell中 进行繁重的处理工作,同时也不会让子shell的I/O受制于终端。

如果直接丢到后台会自动在终端输出IO

subprocess 模块对于依赖 TTY 的外部命令不合适用 。例如,你不能使用它来自动化一个用户输入密码的任务(比如一个 ssh 会话)。这时候,你需要使用到第三方模块了,比如基于著名的 expect 家族的工具(pexpect 或类似的)(pexpect可以理解为Linux下的expect的Python封装、通过pexpect可以实现对ssh、ftp、passwd、telnet等命令行进行自动交互,而无需人工干涉来达到自动化的目的。比如我们可以模拟一个FTP登录时所有交互,包括输入主机地址、用户名、密码、上传文件等,待出现异常还可以进行尝试自动处理。)

「你想向标准错误打印一条消息并返回某个非零状态码来终止程序运行」

通过 python 的 raise SystemExit(3) 命令可以主动抛出一个错误,通过 sys.stderr.write 将命令写到标准的输出端

直接将消息作为参数传给 SystemExit() ,那么你可以省略其他步骤

抛出一个 SystemExit 异常,使用错误消息作为参数,它会将消息在 sys.stderr 中打印,然后程序以状态码 1 退出

「你需要知道当前终端的大小以便正确的格式化输出。」

使用 os.get terminal size() 函数 来做到这一点。

「复制或移动文件和目录,但是又不想调用 shell 命令。」

shutil 模块 有很多便捷的函数可以复制文件和目录。使用起来非常简单

这里不多讲,熟悉Linux的小伙伴应该不陌生。

默认情况下,对于 符号链接 这些命令处理的是它指向的东西文件。例如,如果 源文件 是一个 符号链接 ,那么目标文件将会是 符号链接 指向的文件。如果你只想 复制符号链接本身 ,那么需要指定 关键字 参数 follow_symlinks

copytree() 可以让你在复制过程中选择性的忽略某些文件或目录。你可以提供一个忽略函数,接受一个目录名和文件名列表作为输入,返回一个忽略的名称列表。例如:

对于文件元数据信息, copy2() 这样的函数只能尽自己最大能力来保留它。 访问时间、创建时间和权限 这些基本信息会被保留,但是 对于所有者、ACLs、资源 fork 和其他更深层次的文件元信息就说不准了

通常不会去使用 shutil.copytree() 函数 来执行 系统备份 。当处理文件名的时候,最好使用 os.path 中的函数来确保最大的可移植性

使用 copytree() 复制文件夹的一个棘手的问题是对于错误的处理,可以使用异常块处理,或者通过 参数 ignore dangling symlinks=True 忽略掉无效符号链接。

「创建或解压常见格式的归档文件(比如.tar, .tgz 或.zip)」

shutil 模块拥有两个函数—— make archive() 和 unpack archive() 可派上用场,

make archive() 的第二个参数是期望的输出格式。可以使用 get archive formats() 获取所有支持的归档格式列表。

「你需要写一个涉及到文件查找操作的脚本,比如对日志归档文件的重命名工具,你不想在 Python 脚本中调用 shell,或者你要实现一些 shell 不能做的功能。」

查找文件,可使用 os.walk() 函数 ,传一个顶级目录名给它

os.walk() 方法 为我们 遍历目录树 ,每次进入一个目录,它会返回一个 三元组 ,包含 相对于查找目录的相对路径,一个该目录下的目录名列表,以及那个目录下面的文件名列表。

对于每个元组,只需检测一下目标文件名是否在文件列表中。如果是就使用 os.path.join() 合并路径。为了避免奇怪的路径名比如 ././foo//bar ,使用了另外两个函数来修正结果

os.walk(start) 还有跨平台的优势。并且,还能很轻松的加入其他的功能。我们再演示一个例子,下面的函数打印所有最近被修改过的文件:

打印10分钟之前被修改的数据

「怎样读取普通.ini 格式的配置文件?」

configparser 模块 能被用来读取配置文件

编写配置文件

如果有需要,你还能修改配置并使用 cfg.write() 方法将其写回到文件中

「你希望在脚本和程序中将诊断信息写入日志文件。」

python 脚本打印日志最简单方式是使用 logging 模块

五个日志调用( critical(), error(), warning(), info(), debug() )以降序方式表示不同的严重级别。 basicConfig() 的 level 参数是一个 过滤器 。所有级别低于此级别的日志消息都会被忽略掉。每个 logging 操作的参数是一个消息字符串,后面再跟一个或多个参数。构造最终的日志消息的时候我们使用了 % 操作符来格式化消息字符串。

如果你想使用配置文件,可以像下面这样修改 basicConfig() 调用:

logconfig.ini

在调用日志操作前先执行下 basicConfig() 函数方法 ,可以找标准输出或者文件中输出

basicConfig() 在程序中只能被执行一次。如果你稍后想改变日志配置,就需要先获取 root logger ,然后直接修改它。

更多见日志模块文档https://docs.python.org/3/howto/logging-cookbook.html

「你想给某个函数库增加日志功能,但是又不能影响到那些不使用日志功能的程序。」

对于想要执行日志操作的函数库,你应该创建一个专属的 logger 对象,并且像下面这样初始化配置:

使用这个配置,默认情况下不会打印日志,只有配置过日志系统,那么日志消息打印就开始生效

通常来讲,不应该在函数库代码中 自己配置日志系统 ,或者是已经有个已经存在的日志配置了。调用 getLogger( name ) 创建一个和调用模块同名的 logger 模块 。由于 模块 都是唯一的,因此创建的 logger 也将是唯一 的。所以当前进程中只有一个logging会生效。

log.addHandler(logging.NullHandler()) 操作将一个 空处理器 绑定到刚刚已经创建好的 logger 对象 上。一个空处理器默认会忽略调用所有的日志消息。因此,如果使用该函数库的时候还没有配置日志,那么将不会有消息或警告出现。

在这里,根日志被配置成仅仅 输出 ERROR 或更高级别的消息 。不过, somelib 的日志级别被单独配置成可以输出 debug 级别的消息, 它的优先级比全局配置高。像这样更改单独模块的日志配置对于调试来讲是很方便的,因为你无需去更改任何的全局日志配置——只需要修改你想要更多输出的模块的日志等级。(这个还有待研究)

「你想记录程序执行多个任务所花费的时间」

time 模块 包含很多函数来执行跟时间有关的函数。尽管如此,通常我们会在此基础之上构造一个更高级的接口来模拟一个计时器。

这个类定义了一个可以被用户根据需要启动、停止和重置的计时器。它会在elapsed 属性中记录整个消耗时间。下面是一个例子来演示怎样使用它:

这里通过 __enter__,__exit__ ,使用 with 语句 以及上下文管理器协议可以省略计时器打开和关闭操作。(关于上下文管理协议,即with语句,为了让一个对象兼容with语句,必须在这个对象的类中声明 __enter__和__exit__方法, , __enter__ 在出现with语句被调用, __exit__ 在代码执行完毕被调用,可以参考open()方法)

在计时中要考虑一个 底层的时间函数问题 。 一般来说, 使用 time.time() 或 time.clock() 计算的时间精度因操作系统的不同会有所不同。而使用 time.perf_counter() 函数可以确保使用系统上面 最精确的计时器 。

「你想对在 Unix 系统上面运行的程序设置内存或 CPU 的使用限制。」

resource 模块 能同时执行这两个任务。例如,要限制 CPU 时间,下面的代码在windows平台执行不了,但是Linux是可以的。

程序运行时, SIGXCPU 信号 在时间过期时被生成,然后执行清理并退出。

这暂时没有好的Demo...

程序运行到没有多余内存时会抛出 MemoryError 异常。

setrlimit() 函数 被用来设置特定资源上面的 软限制和硬限制 。

setrlimit() 函数 还能被用来设置 子进程数量、打开文件数以及类似系统资源的限制(cgroup) 。

「通过脚本启动浏览器并打开指定的 URL 网页」

webbrowser 模块 能被用来启动一个浏览器,并且与平台无关

新窗口打卡网站

当前窗口打开一个tab页

指定浏览器类型,可以使用 webbrowser.get() 函数