246 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
		
		
			
		
	
	
			246 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
| 
								 | 
							
								#!/usr/bin/env python
							 | 
						|||
| 
								 | 
							
								# coding:utf-8
							 | 
						|||
| 
								 | 
							
								import glob
							 | 
						|||
| 
								 | 
							
								import hashlib
							 | 
						|||
| 
								 | 
							
								import json
							 | 
						|||
| 
								 | 
							
								import os
							 | 
						|||
| 
								 | 
							
								import sys
							 | 
						|||
| 
								 | 
							
								import gzip
							 | 
						|||
| 
								 | 
							
								import shutil
							 | 
						|||
| 
								 | 
							
								import subprocess
							 | 
						|||
| 
								 | 
							
								import zipfile
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								curr_dir = os.path.split(os.path.abspath(__file__))[0]
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def write_json_file(filename, json):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    将JSON内容写入指定文件
							 | 
						|||
| 
								 | 
							
								    :param filename: 文件路径
							 | 
						|||
| 
								 | 
							
								    :param json: 要写入的JSON内容
							 | 
						|||
| 
								 | 
							
								    :return: 无
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    # 确保文件所在目录存在
							 | 
						|||
| 
								 | 
							
								    directory = os.path.dirname(filename)
							 | 
						|||
| 
								 | 
							
								    if directory and not os.path.exists(directory):
							 | 
						|||
| 
								 | 
							
								        os.makedirs(directory)
							 | 
						|||
| 
								 | 
							
								    
							 | 
						|||
| 
								 | 
							
								    with open(filename, "w") as f:
							 | 
						|||
| 
								 | 
							
								        f.write(json)
							 | 
						|||
| 
								 | 
							
								        f.close()
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def gzip_file(_in_file):
							 | 
						|||
| 
								 | 
							
								    with open(_in_file, 'rb') as f_in:
							 | 
						|||
| 
								 | 
							
								        with gzip.open(_in_file + '.gz', 'wb') as f_out:
							 | 
						|||
| 
								 | 
							
								            shutil.copyfileobj(f_in, f_out)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def zip_dir(dir_path, out_fullname):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    压缩指定文件夹
							 | 
						|||
| 
								 | 
							
								    :param dir_path: 目标文件夹路径
							 | 
						|||
| 
								 | 
							
								    :param out_fullname: 压缩文件报错路径+xxxx.zip
							 | 
						|||
| 
								 | 
							
								    :return: 无
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    zip = zipfile.ZipFile(out_fullname, "w", zipfile.ZIP_DEFLATED)
							 | 
						|||
| 
								 | 
							
								    for path, dirnames, filenames in os.walk(dir_path):
							 | 
						|||
| 
								 | 
							
								        # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
							 | 
						|||
| 
								 | 
							
								        fpath = path.replace(dir_path, '')
							 | 
						|||
| 
								 | 
							
								        for filename in filenames:
							 | 
						|||
| 
								 | 
							
								            zip.write(os.path.join(path, filename), os.path.join(fpath, filename))
							 | 
						|||
| 
								 | 
							
								    zip.close()
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def unzip_dir(zip_src, dst_dir):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    解压文件到指定文件夹
							 | 
						|||
| 
								 | 
							
								    :param zip_src: zip文件
							 | 
						|||
| 
								 | 
							
								    :param dst_dir: 解压目录
							 | 
						|||
| 
								 | 
							
								    :return: 无
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    r = zipfile.is_zipfile(zip_src)
							 | 
						|||
| 
								 | 
							
								    if r:
							 | 
						|||
| 
								 | 
							
								        fz = zipfile.ZipFile(zip_src, 'r')
							 | 
						|||
| 
								 | 
							
								        for file in fz.namelist():
							 | 
						|||
| 
								 | 
							
								            fz.extract(file, dst_dir)
							 | 
						|||
| 
								 | 
							
								    else:
							 | 
						|||
| 
								 | 
							
								        print('%s This is not zip' % zip_src)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def zip_file(file_path, out_fullname):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    压缩指定文件
							 | 
						|||
| 
								 | 
							
								    :param file_path: 目标文件路径
							 | 
						|||
| 
								 | 
							
								    :param out_fullname: 压缩文件保存路径+xxxx.zip
							 | 
						|||
| 
								 | 
							
								    :return: 无
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    file_zip = zipfile.ZipFile(out_fullname, 'w')
							 | 
						|||
| 
								 | 
							
								    file_zip.write(file_path, compress_type=zipfile.ZIP_DEFLATED)
							 | 
						|||
| 
								 | 
							
								    file_zip.close()
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def unzip_file(zip_file_path, sp_path=""):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    解压文件
							 | 
						|||
| 
								 | 
							
								    :param zip_file_path: zip文件路径
							 | 
						|||
| 
								 | 
							
								    :param sp_path: 指定目录
							 | 
						|||
| 
								 | 
							
								    :return: 无
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    file_zip = zipfile.ZipFile(zip_file_path)
							 | 
						|||
| 
								 | 
							
								    sp_path = sp_path if sp_path != "" else "{}/../".format(os.path.splitext(zip_file_path)[0])
							 | 
						|||
| 
								 | 
							
								    # 解压
							 | 
						|||
| 
								 | 
							
								    file_zip.extractall(path=sp_path)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def get_file_last_line(f_name):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    :param f_name: f_name为所读xx.txt文件
							 | 
						|||
| 
								 | 
							
								    :return: 文件最后一行
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    print(f_name)
							 | 
						|||
| 
								 | 
							
								    with open(f_name, 'r') as f:  # 打开文件
							 | 
						|||
| 
								 | 
							
								        first_line = f.readline()  # 读第一行
							 | 
						|||
| 
								 | 
							
								        off = -50  # 设置偏移量
							 | 
						|||
| 
								 | 
							
								        while True:
							 | 
						|||
| 
								 | 
							
								            f.seek(off, 2)  # seek(off, 2)表示文件指针:从文件末尾(2)开始向前50个字符(-50)
							 | 
						|||
| 
								 | 
							
								            lines = f.readlines()  # 读取文件指针范围内所有行
							 | 
						|||
| 
								 | 
							
								            if len(lines) >= 2:  # 判断是否最后至少有两行,这样保证了最后一行是完整的
							 | 
						|||
| 
								 | 
							
								                last_line = lines[-1]  # 取最后一行
							 | 
						|||
| 
								 | 
							
								                break
							 | 
						|||
| 
								 | 
							
								            # 如果off为50时得到的readlines只有一行内容,那么不能保证最后一行是完整的
							 | 
						|||
| 
								 | 
							
								            # 所以off翻倍重新运行,直到readlines不止一行
							 | 
						|||
| 
								 | 
							
								            off *= 2
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    print('文件' + f_name + '第一行为:' + first_line)
							 | 
						|||
| 
								 | 
							
								    print('文件' + f_name + '最后一行为:' + last_line)
							 | 
						|||
| 
								 | 
							
								    return last_line
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def open_json(path):
							 | 
						|||
| 
								 | 
							
								    dic = {}
							 | 
						|||
| 
								 | 
							
								    with open(path, 'r') as f:
							 | 
						|||
| 
								 | 
							
								        dic = json.load(f)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    return dic
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def write_json(path, content):
							 | 
						|||
| 
								 | 
							
								    with open(path, 'w') as f:
							 | 
						|||
| 
								 | 
							
								        json.dump(content, f)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def calc_hash(filepath):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    根据文件内容,生成hash值
							 | 
						|||
| 
								 | 
							
								    :param filepath: 文件路径
							 | 
						|||
| 
								 | 
							
								    :return: hash code
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    with open(filepath, 'rb') as f:
							 | 
						|||
| 
								 | 
							
								        sha1obj = hashlib.sha1()
							 | 
						|||
| 
								 | 
							
								        sha1obj.update(f.read())
							 | 
						|||
| 
								 | 
							
								        hash_code = sha1obj.hexdigest()
							 | 
						|||
| 
								 | 
							
								        return hash_code
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def calc_md5(filepath):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    根据文件内容,生成md5码
							 | 
						|||
| 
								 | 
							
								    :param filepath: 文件路径
							 | 
						|||
| 
								 | 
							
								    :return: md5 code
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    with open(filepath, 'rb') as f:
							 | 
						|||
| 
								 | 
							
								        md5obj = hashlib.md5(f.read())
							 | 
						|||
| 
								 | 
							
								        md5_code = md5obj.hexdigest()
							 | 
						|||
| 
								 | 
							
								        return md5_code
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def delete_files_with_extension(folder_path, extension):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    删除指定文件夹内指定后缀的文件
							 | 
						|||
| 
								 | 
							
								    :param folder_path: 文件夹路径
							 | 
						|||
| 
								 | 
							
								    :param extension: 后缀名称
							 | 
						|||
| 
								 | 
							
								    :return: 无
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    # 获取指定文件夹内指定后缀的文件列表
							 | 
						|||
| 
								 | 
							
								    files = glob.glob(os.path.join(folder_path, f"*.{extension}"))
							 | 
						|||
| 
								 | 
							
								    # 遍历文件列表并删除文件
							 | 
						|||
| 
								 | 
							
								    for file_path in files:
							 | 
						|||
| 
								 | 
							
								        os.remove(file_path)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def run_cmd(str_cmd, log_path=""):
							 | 
						|||
| 
								 | 
							
								    if len(log_path) > 1:
							 | 
						|||
| 
								 | 
							
								        logfile = open(log_path, "a")
							 | 
						|||
| 
								 | 
							
								        logfile.writelines("-----------------------------")
							 | 
						|||
| 
								 | 
							
								        logfile.writelines(str(str_cmd))
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    process = subprocess.Popen(
							 | 
						|||
| 
								 | 
							
								        str_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
							 | 
						|||
| 
								 | 
							
								    print(str_cmd)
							 | 
						|||
| 
								 | 
							
								    lines_out = process.stdout.readlines()
							 | 
						|||
| 
								 | 
							
								    for line in lines_out:
							 | 
						|||
| 
								 | 
							
								        print(line)
							 | 
						|||
| 
								 | 
							
								        if len(log_path) > 1:
							 | 
						|||
| 
								 | 
							
								            logfile.writelines(str(line))
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    lines_error = process.stderr.readlines()
							 | 
						|||
| 
								 | 
							
								    if len(log_path) > 1 and len(lines_error) > 0:
							 | 
						|||
| 
								 | 
							
								        logfile.writelines("has error:\n\n")
							 | 
						|||
| 
								 | 
							
								    for line in lines_error:
							 | 
						|||
| 
								 | 
							
								        print(line)
							 | 
						|||
| 
								 | 
							
								        if len(log_path) > 1:
							 | 
						|||
| 
								 | 
							
								            logfile.writelines(str(line))
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    print("end: " + str_cmd)
							 | 
						|||
| 
								 | 
							
								    if len(log_path) > 0:
							 | 
						|||
| 
								 | 
							
								        logfile.writelines("end: " + str_cmd)
							 | 
						|||
| 
								 | 
							
								        logfile.close()
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    return lines_out, lines_error
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def mkdirs(dir_path: str):
							 | 
						|||
| 
								 | 
							
								    if not os.path.exists(dir_path):
							 | 
						|||
| 
								 | 
							
								        os.makedirs(dir_path)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def clear_dirs(dir_path: str):
							 | 
						|||
| 
								 | 
							
								    if os.path.exists(dir_path):
							 | 
						|||
| 
								 | 
							
								        shutil.rmtree(dir_path)
							 | 
						|||
| 
								 | 
							
								    os.makedirs(dir_path)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def copy_dir(src_dir, dst_dir):
							 | 
						|||
| 
								 | 
							
								    if not os.path.exists(src_dir):
							 | 
						|||
| 
								 | 
							
								        return
							 | 
						|||
| 
								 | 
							
								    if not os.path.exists(dst_dir):
							 | 
						|||
| 
								 | 
							
								        os.makedirs(dst_dir)
							 | 
						|||
| 
								 | 
							
								    for item in os.listdir(src_dir):
							 | 
						|||
| 
								 | 
							
								        src_item = os.path.join(src_dir, item)
							 | 
						|||
| 
								 | 
							
								        dst_item = os.path.join(dst_dir, item)
							 | 
						|||
| 
								 | 
							
								        if os.path.isdir(src_item):
							 | 
						|||
| 
								 | 
							
								            copy_dir(src_item, dst_item)
							 | 
						|||
| 
								 | 
							
								        else:
							 | 
						|||
| 
								 | 
							
								            shutil.copyfile(src_item, dst_item)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def copy_file(src_thum_file, dst_thum_file):
							 | 
						|||
| 
								 | 
							
								    if not os.path.exists(src_thum_file):
							 | 
						|||
| 
								 | 
							
								        return
							 | 
						|||
| 
								 | 
							
								    shutil.copyfile(src_thum_file, dst_thum_file)
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								def get_bundle_file_path(bundle_dir_path):
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    在指定目录中查找.bundle后缀文件,返回文件路径,不遍历子目录
							 | 
						|||
| 
								 | 
							
								    :param bundle_dir_path: 目标文件夹路径
							 | 
						|||
| 
								 | 
							
								    :return: .bundle文件路径,如果没找到则返回None
							 | 
						|||
| 
								 | 
							
								    """
							 | 
						|||
| 
								 | 
							
								    if not os.path.exists(bundle_dir_path):
							 | 
						|||
| 
								 | 
							
								        return None
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    for file in os.listdir(bundle_dir_path):
							 | 
						|||
| 
								 | 
							
								        file_path = os.path.join(bundle_dir_path, file)
							 | 
						|||
| 
								 | 
							
								        if os.path.isfile(file_path) and file.endswith('.bundle'):
							 | 
						|||
| 
								 | 
							
								            return file_path
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								    return None
							 |