#!/usr/bin/env python # coding:utf-8 import glob import hashlib import json import os import sys import gzip import shutil import subprocess import zipfile curr_dir = os.path.split(os.path.abspath(__file__))[0] def write_json_file(filename, json): """ 将JSON内容写入指定文件 :param filename: 文件路径 :param json: 要写入的JSON内容 :return: 无 """ # 确保文件所在目录存在 directory = os.path.dirname(filename) if directory and not os.path.exists(directory): os.makedirs(directory) with open(filename, "w") as f: f.write(json) f.close() def gzip_file(_in_file): with open(_in_file, 'rb') as f_in: with gzip.open(_in_file + '.gz', 'wb') as f_out: shutil.copyfileobj(f_in, f_out) def zip_dir(dir_path, out_fullname): """ 压缩指定文件夹 :param dir_path: 目标文件夹路径 :param out_fullname: 压缩文件报错路径+xxxx.zip :return: 无 """ zip = zipfile.ZipFile(out_fullname, "w", zipfile.ZIP_DEFLATED) for path, dirnames, filenames in os.walk(dir_path): # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩 fpath = path.replace(dir_path, '') for filename in filenames: zip.write(os.path.join(path, filename), os.path.join(fpath, filename)) zip.close() def unzip_dir(zip_src, dst_dir): """ 解压文件到指定文件夹 :param zip_src: zip文件 :param dst_dir: 解压目录 :return: 无 """ r = zipfile.is_zipfile(zip_src) if r: fz = zipfile.ZipFile(zip_src, 'r') for file in fz.namelist(): fz.extract(file, dst_dir) else: print('%s This is not zip' % zip_src) def zip_file(file_path, out_fullname): """ 压缩指定文件 :param file_path: 目标文件路径 :param out_fullname: 压缩文件保存路径+xxxx.zip :return: 无 """ file_zip = zipfile.ZipFile(out_fullname, 'w') file_zip.write(file_path, compress_type=zipfile.ZIP_DEFLATED) file_zip.close() def unzip_file(zip_file_path, sp_path=""): """ 解压文件 :param zip_file_path: zip文件路径 :param sp_path: 指定目录 :return: 无 """ file_zip = zipfile.ZipFile(zip_file_path) sp_path = sp_path if sp_path != "" else "{}/../".format(os.path.splitext(zip_file_path)[0]) # 解压 file_zip.extractall(path=sp_path) def get_file_last_line(f_name): """ :param f_name: f_name为所读xx.txt文件 :return: 文件最后一行 """ print(f_name) with open(f_name, 'r') as f: # 打开文件 first_line = f.readline() # 读第一行 off = -50 # 设置偏移量 while True: f.seek(off, 2) # seek(off, 2)表示文件指针:从文件末尾(2)开始向前50个字符(-50) lines = f.readlines() # 读取文件指针范围内所有行 if len(lines) >= 2: # 判断是否最后至少有两行,这样保证了最后一行是完整的 last_line = lines[-1] # 取最后一行 break # 如果off为50时得到的readlines只有一行内容,那么不能保证最后一行是完整的 # 所以off翻倍重新运行,直到readlines不止一行 off *= 2 print('文件' + f_name + '第一行为:' + first_line) print('文件' + f_name + '最后一行为:' + last_line) return last_line def open_json(path): dic = {} with open(path, 'r') as f: dic = json.load(f) return dic def write_json(path, content): with open(path, 'w') as f: json.dump(content, f) def calc_hash(filepath): """ 根据文件内容,生成hash值 :param filepath: 文件路径 :return: hash code """ with open(filepath, 'rb') as f: sha1obj = hashlib.sha1() sha1obj.update(f.read()) hash_code = sha1obj.hexdigest() return hash_code def calc_md5(filepath): """ 根据文件内容,生成md5码 :param filepath: 文件路径 :return: md5 code """ with open(filepath, 'rb') as f: md5obj = hashlib.md5(f.read()) md5_code = md5obj.hexdigest() return md5_code def delete_files_with_extension(folder_path, extension): """ 删除指定文件夹内指定后缀的文件 :param folder_path: 文件夹路径 :param extension: 后缀名称 :return: 无 """ # 获取指定文件夹内指定后缀的文件列表 files = glob.glob(os.path.join(folder_path, f"*.{extension}")) # 遍历文件列表并删除文件 for file_path in files: os.remove(file_path) def run_cmd(str_cmd, log_path=""): if len(log_path) > 1: logfile = open(log_path, "a") logfile.writelines("-----------------------------") logfile.writelines(str(str_cmd)) process = subprocess.Popen( str_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print(str_cmd) lines_out = process.stdout.readlines() for line in lines_out: print(line) if len(log_path) > 1: logfile.writelines(str(line)) lines_error = process.stderr.readlines() if len(log_path) > 1 and len(lines_error) > 0: logfile.writelines("has error:\n\n") for line in lines_error: print(line) if len(log_path) > 1: logfile.writelines(str(line)) print("end: " + str_cmd) if len(log_path) > 0: logfile.writelines("end: " + str_cmd) logfile.close() return lines_out, lines_error def mkdirs(dir_path: str): if not os.path.exists(dir_path): os.makedirs(dir_path) def clear_dirs(dir_path: str): if os.path.exists(dir_path): shutil.rmtree(dir_path) os.makedirs(dir_path) def copy_dir(src_dir, dst_dir): if not os.path.exists(src_dir): return if not os.path.exists(dst_dir): os.makedirs(dst_dir) for item in os.listdir(src_dir): src_item = os.path.join(src_dir, item) dst_item = os.path.join(dst_dir, item) if os.path.isdir(src_item): copy_dir(src_item, dst_item) else: shutil.copyfile(src_item, dst_item) def copy_file(src_thum_file, dst_thum_file): if not os.path.exists(src_thum_file): return shutil.copyfile(src_thum_file, dst_thum_file) def get_bundle_file_path(bundle_dir_path): """ 在指定目录中查找.bundle后缀文件,返回文件路径,不遍历子目录 :param bundle_dir_path: 目标文件夹路径 :return: .bundle文件路径,如果没找到则返回None """ if not os.path.exists(bundle_dir_path): return None for file in os.listdir(bundle_dir_path): file_path = os.path.join(bundle_dir_path, file) if os.path.isfile(file_path) and file.endswith('.bundle'): return file_path return None