162 lines
4.8 KiB
Python
162 lines
4.8 KiB
Python
|
|
#!/usr/bin/env python
|
|||
|
|
# coding:utf-8
|
|||
|
|
|
|||
|
|
import datetime
|
|||
|
|
import os
|
|||
|
|
import zipfile
|
|||
|
|
import subprocess
|
|||
|
|
import json
|
|||
|
|
import hashlib
|
|||
|
|
|
|||
|
|
|
|||
|
|
def zip_dir(dirpath, out_fullname):
|
|||
|
|
"""
|
|||
|
|
压缩指定文件夹
|
|||
|
|
:param dirpath: 目标文件夹路径
|
|||
|
|
:param outFullName: 压缩文件保存路径+xxxx.zip
|
|||
|
|
:return: 无
|
|||
|
|
"""
|
|||
|
|
zip = zipfile.ZipFile(out_fullname, "w", zipfile.ZIP_DEFLATED)
|
|||
|
|
for path, dirnames, filenames in os.walk(dirpath):
|
|||
|
|
# 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
|
|||
|
|
fpath = path.replace(dirpath, '')
|
|||
|
|
|
|||
|
|
for filename in filenames:
|
|||
|
|
zip.write(os.path.join(path, filename), os.path.join(fpath, filename))
|
|||
|
|
zip.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def unzip_dir(zip_src, dst_dir):
|
|||
|
|
"""
|
|||
|
|
解压文件到指定文件夹
|
|||
|
|
:param zip_src: zip文件
|
|||
|
|
:param dst_dir: 解压目录
|
|||
|
|
:return: 无
|
|||
|
|
"""
|
|||
|
|
r = zipfile.is_zipfile(zip_src)
|
|||
|
|
if r:
|
|||
|
|
fz = zipfile.ZipFile(zip_src, 'r')
|
|||
|
|
for file in fz.namelist():
|
|||
|
|
fz.extract(file, dst_dir)
|
|||
|
|
else:
|
|||
|
|
print("{}不是zip文件".format(zip_src))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def zip_file(file_path, out_fullname, is_fixedtime=True):
|
|||
|
|
"""
|
|||
|
|
压缩指定文件
|
|||
|
|
:param dirpath: 目标文件路径
|
|||
|
|
:param outFullName: 压缩文件保存路径+xxxx.zip
|
|||
|
|
:return: 无
|
|||
|
|
"""
|
|||
|
|
zip_file = zipfile.ZipFile(out_fullname, 'w')
|
|||
|
|
zip_file.write(file_path, compress_type=zipfile.ZIP_DEFLATED)
|
|||
|
|
if is_fixedtime:
|
|||
|
|
# 分离文件名称
|
|||
|
|
names = zip_file.namelist()
|
|||
|
|
for a_name in names:
|
|||
|
|
zipinfo_obj = zip_file.getinfo(a_name)
|
|||
|
|
file_stat = os.stat(file_path)
|
|||
|
|
zipinfo_obj.date_time = datetime.datetime.fromtimestamp(file_stat.st_mtime).timetuple()[:6]
|
|||
|
|
|
|||
|
|
zip_file.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def unzip_file(zip_file_path):
|
|||
|
|
"""
|
|||
|
|
解压文件
|
|||
|
|
:param zip_src: zip文件
|
|||
|
|
:return: 无
|
|||
|
|
"""
|
|||
|
|
zip_file = zipfile.ZipFile(zip_file_path)
|
|||
|
|
|
|||
|
|
# 解压
|
|||
|
|
# zip_file.extractall(path="{}/../".format(os.path.splitext(zip_file_path)[0]))
|
|||
|
|
|
|||
|
|
# 设置要解压缩的文件和目录
|
|||
|
|
extract_dir = "{}".format(os.path.split(zip_file_path)[0])
|
|||
|
|
|
|||
|
|
# 解压缩文件并将修改时间设置为原有时间
|
|||
|
|
with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
|
|||
|
|
for zip_info in zip_file.infolist():
|
|||
|
|
zip_file.extract(zip_info.filename, path=extract_dir)
|
|||
|
|
file_path = os.path.join(extract_dir, zip_info.filename)
|
|||
|
|
mtime = int(datetime.datetime(*zip_info.date_time).timestamp())
|
|||
|
|
os.utime(file_path, (mtime, mtime))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def DoCmd(strcmd, logPath=""):
|
|||
|
|
if len(logPath) > 1:
|
|||
|
|
logfile = file(logPath, "a")
|
|||
|
|
logfile.writelines("-----------------------------")
|
|||
|
|
logfile.writelines(strcmd)
|
|||
|
|
|
|||
|
|
process = subprocess.Popen(
|
|||
|
|
strcmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|||
|
|
print(strcmd)
|
|||
|
|
lines_out = process.stdout.readlines()
|
|||
|
|
for l in lines_out:
|
|||
|
|
print(l)
|
|||
|
|
if len(logPath) > 1:
|
|||
|
|
logfile.writelines(l)
|
|||
|
|
|
|||
|
|
lines_error = process.stderr.readlines()
|
|||
|
|
if len(logPath) > 1 and len(lines_error) > 0:
|
|||
|
|
logfile.writelines("has error:\n\n")
|
|||
|
|
for l in lines_error:
|
|||
|
|
print(l)
|
|||
|
|
if len(logPath) > 1:
|
|||
|
|
logfile.writelines(l)
|
|||
|
|
|
|||
|
|
print("end: " + strcmd)
|
|||
|
|
if len(logPath) > 0:
|
|||
|
|
logfile.writelines("end: " + strcmd)
|
|||
|
|
logfile.close()
|
|||
|
|
#
|
|||
|
|
return lines_out, lines_error
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_file_last_line(fname):
|
|||
|
|
"""
|
|||
|
|
f_name为所读xx.txt文件
|
|||
|
|
输出为:文件最后一行
|
|||
|
|
"""
|
|||
|
|
print(fname)
|
|||
|
|
with open(fname, 'r') as f: # 打开文件
|
|||
|
|
first_line = f.readline() # 读第一行
|
|||
|
|
off = -50 # 设置偏移量
|
|||
|
|
while True:
|
|||
|
|
f.seek(off, 2) # seek(off, 2)表示文件指针:从文件末尾(2)开始向前50个字符(-50)
|
|||
|
|
lines = f.readlines() # 读取文件指针范围内所有行
|
|||
|
|
if len(lines) >= 2: # 判断是否最后至少有两行,这样保证了最后一行是完整的
|
|||
|
|
last_line = lines[-1] # 取最后一行
|
|||
|
|
break
|
|||
|
|
# 如果off为50时得到的readlines只有一行内容,那么不能保证最后一行是完整的
|
|||
|
|
# 所以off翻倍重新运行,直到readlines不止一行
|
|||
|
|
off *= 2
|
|||
|
|
|
|||
|
|
print('文件' + fname + '第一行为:' + first_line)
|
|||
|
|
print('文件' + fname + '最后一行为:' + last_line)
|
|||
|
|
return last_line
|
|||
|
|
|
|||
|
|
|
|||
|
|
def open_json(path):
|
|||
|
|
dic = {}
|
|||
|
|
with open(path, 'r') as f:
|
|||
|
|
dic = json.load(f)
|
|||
|
|
|
|||
|
|
return dic
|
|||
|
|
|
|||
|
|
|
|||
|
|
def write_json(path, content):
|
|||
|
|
with open(path, 'w') as f:
|
|||
|
|
json.dump(content, f)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 根据文件内容,生成hash值,filepath文件路径
|
|||
|
|
def calc_hash(filepath):
|
|||
|
|
with open(filepath, 'rb') as f:
|
|||
|
|
sha1obj = hashlib.sha1()
|
|||
|
|
sha1obj.update(f.read())
|
|||
|
|
hash = sha1obj.hexdigest()
|
|||
|
|
return hash
|