#!/usr/bin/env python #coding:utf-8 import datetime import os import zipfile import subprocess import json import hashlib def zip_dir(dirpath, out_fullname): """ 压缩指定文件夹 :param dirpath: 目标文件夹路径 :param outFullName: 压缩文件保存路径+xxxx.zip :return: 无 """ zip = zipfile.ZipFile(out_fullname,"w",zipfile.ZIP_DEFLATED) for path,dirnames,filenames in os.walk(dirpath): # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩 fpath = path.replace(dirpath,'') for filename in filenames: zip.write(os.path.join(path,filename),os.path.join(fpath,filename)) zip.close() def unzip_dir(zip_src, dst_dir): """ 解压文件到指定文件夹 :param zip_src: zip文件 :param dst_dir: 解压目录 :return: 无 """ r = zipfile.is_zipfile(zip_src) if r: fz = zipfile.ZipFile(zip_src, 'r') for file in fz.namelist(): fz.extract(file, dst_dir) else: print("{}不是zip文件".format(zip_src)) def zip_file(file_path, out_fullname, is_fixedtime = True): """ 压缩指定文件 :param dirpath: 目标文件路径 :param outFullName: 压缩文件保存路径+xxxx.zip :return: 无 """ zip_file = zipfile.ZipFile(out_fullname,'w') zip_file.write(file_path, compress_type=zipfile.ZIP_DEFLATED) if is_fixedtime: # 分离文件名称 names = zip_file.namelist() for a_name in names: zipinfo_obj = zip_file.getinfo(a_name) file_stat = os.stat(file_path) zipinfo_obj.date_time = datetime.datetime.fromtimestamp(file_stat.st_mtime).timetuple()[:6] zip_file.close() def unzip_file(zip_file_path): """ 解压文件 :param zip_src: zip文件 :return: 无 """ zip_file = zipfile.ZipFile(zip_file_path) # 解压 # zip_file.extractall(path="{}/../".format(os.path.splitext(zip_file_path)[0])) # 设置要解压缩的文件和目录 extract_dir = "{}".format(os.path.split(zip_file_path)[0]) # 解压缩文件并将修改时间设置为原有时间 with zipfile.ZipFile(zip_file_path, 'r') as zip_file: for zip_info in zip_file.infolist(): zip_file.extract(zip_info.filename, path=extract_dir) file_path = os.path.join(extract_dir, zip_info.filename) mtime = int(datetime.datetime(*zip_info.date_time).timestamp()) os.utime(file_path, (mtime, mtime)) def DoCmd(strcmd, logPath=""): if len(logPath) > 1: logfile = file(logPath, "a") logfile.writelines("-----------------------------") logfile.writelines(strcmd) process = subprocess.Popen( strcmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print(strcmd) lines_out = process.stdout.readlines() for l in lines_out: print(l) if len(logPath) > 1: logfile.writelines(l) lines_error = process.stderr.readlines() if len(logPath) > 1 and len(lines_error)>0: logfile.writelines("has error:\n\n") for l in lines_error: print(l) if len(logPath) > 1: logfile.writelines(l) print("end: "+strcmd) if len(logPath) > 0: logfile.writelines("end: "+strcmd) logfile.close() # return lines_out, lines_error def get_file_last_line(fname): """ f_name为所读xx.txt文件 输出为:文件最后一行 """ print(fname) with open(fname, 'r') as f: #打开文件 first_line = f.readline() #读第一行 off = -50 #设置偏移量 while True: f.seek(off, 2) #seek(off, 2)表示文件指针:从文件末尾(2)开始向前50个字符(-50) lines = f.readlines() #读取文件指针范围内所有行 if len(lines)>=2: #判断是否最后至少有两行,这样保证了最后一行是完整的 last_line = lines[-1] #取最后一行 break #如果off为50时得到的readlines只有一行内容,那么不能保证最后一行是完整的 #所以off翻倍重新运行,直到readlines不止一行 off *= 2 print('文件' + fname + '第一行为:' + first_line) print('文件' + fname + '最后一行为:'+ last_line) return last_line def open_json(path): dic = {} with open(path, 'r') as f: dic = json.load(f) return dic def write_json(path, content): with open(path, 'w') as f: json.dump(content, f) #根据文件内容,生成hash值,filepath文件路径 def calc_hash(filepath): with open(filepath,'rb') as f: sha1obj = hashlib.sha1() sha1obj.update(f.read()) hash = sha1obj.hexdigest() return hash