config-generator/utils.py

246 lines
6.8 KiB
Python
Raw Permalink Normal View History

2025-03-24 06:34:03 +00:00
#!/usr/bin/env python
# coding:utf-8
import glob
import hashlib
import json
import os
import sys
import gzip
import shutil
import subprocess
import zipfile
curr_dir = os.path.split(os.path.abspath(__file__))[0]
def write_json_file(filename, json):
"""
将JSON内容写入指定文件
:param filename: 文件路径
:param json: 要写入的JSON内容
:return:
"""
# 确保文件所在目录存在
directory = os.path.dirname(filename)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(filename, "w") as f:
f.write(json)
f.close()
def gzip_file(_in_file):
with open(_in_file, 'rb') as f_in:
with gzip.open(_in_file + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
def zip_dir(dir_path, out_fullname):
"""
压缩指定文件夹
:param dir_path: 目标文件夹路径
:param out_fullname: 压缩文件报错路径+xxxx.zip
:return:
"""
zip = zipfile.ZipFile(out_fullname, "w", zipfile.ZIP_DEFLATED)
for path, dirnames, filenames in os.walk(dir_path):
# 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
fpath = path.replace(dir_path, '')
for filename in filenames:
zip.write(os.path.join(path, filename), os.path.join(fpath, filename))
zip.close()
def unzip_dir(zip_src, dst_dir):
"""
解压文件到指定文件夹
:param zip_src: zip文件
:param dst_dir: 解压目录
:return:
"""
r = zipfile.is_zipfile(zip_src)
if r:
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
fz.extract(file, dst_dir)
else:
print('%s This is not zip' % zip_src)
def zip_file(file_path, out_fullname):
"""
压缩指定文件
:param file_path: 目标文件路径
:param out_fullname: 压缩文件保存路径+xxxx.zip
:return:
"""
file_zip = zipfile.ZipFile(out_fullname, 'w')
file_zip.write(file_path, compress_type=zipfile.ZIP_DEFLATED)
file_zip.close()
def unzip_file(zip_file_path, sp_path=""):
"""
解压文件
:param zip_file_path: zip文件路径
:param sp_path: 指定目录
:return:
"""
file_zip = zipfile.ZipFile(zip_file_path)
sp_path = sp_path if sp_path != "" else "{}/../".format(os.path.splitext(zip_file_path)[0])
# 解压
file_zip.extractall(path=sp_path)
def get_file_last_line(f_name):
"""
:param f_name: f_name为所读xx.txt文件
:return: 文件最后一行
"""
print(f_name)
with open(f_name, 'r') as f: # 打开文件
first_line = f.readline() # 读第一行
off = -50 # 设置偏移量
while True:
f.seek(off, 2) # seek(off, 2)表示文件指针:从文件末尾(2)开始向前50个字符(-50)
lines = f.readlines() # 读取文件指针范围内所有行
if len(lines) >= 2: # 判断是否最后至少有两行,这样保证了最后一行是完整的
last_line = lines[-1] # 取最后一行
break
# 如果off为50时得到的readlines只有一行内容那么不能保证最后一行是完整的
# 所以off翻倍重新运行直到readlines不止一行
off *= 2
print('文件' + f_name + '第一行为:' + first_line)
print('文件' + f_name + '最后一行为:' + last_line)
return last_line
def open_json(path):
dic = {}
with open(path, 'r') as f:
dic = json.load(f)
return dic
def write_json(path, content):
with open(path, 'w') as f:
json.dump(content, f)
def calc_hash(filepath):
"""
根据文件内容生成hash值
:param filepath: 文件路径
:return: hash code
"""
with open(filepath, 'rb') as f:
sha1obj = hashlib.sha1()
sha1obj.update(f.read())
hash_code = sha1obj.hexdigest()
return hash_code
def calc_md5(filepath):
"""
根据文件内容生成md5码
:param filepath: 文件路径
:return: md5 code
"""
with open(filepath, 'rb') as f:
md5obj = hashlib.md5(f.read())
md5_code = md5obj.hexdigest()
return md5_code
def delete_files_with_extension(folder_path, extension):
"""
删除指定文件夹内指定后缀的文件
:param folder_path: 文件夹路径
:param extension: 后缀名称
:return:
"""
# 获取指定文件夹内指定后缀的文件列表
files = glob.glob(os.path.join(folder_path, f"*.{extension}"))
# 遍历文件列表并删除文件
for file_path in files:
os.remove(file_path)
def run_cmd(str_cmd, log_path=""):
if len(log_path) > 1:
logfile = open(log_path, "a")
logfile.writelines("-----------------------------")
logfile.writelines(str(str_cmd))
process = subprocess.Popen(
str_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(str_cmd)
lines_out = process.stdout.readlines()
for line in lines_out:
print(line)
if len(log_path) > 1:
logfile.writelines(str(line))
lines_error = process.stderr.readlines()
if len(log_path) > 1 and len(lines_error) > 0:
logfile.writelines("has error:\n\n")
for line in lines_error:
print(line)
if len(log_path) > 1:
logfile.writelines(str(line))
print("end: " + str_cmd)
if len(log_path) > 0:
logfile.writelines("end: " + str_cmd)
logfile.close()
return lines_out, lines_error
def mkdirs(dir_path: str):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
def clear_dirs(dir_path: str):
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
os.makedirs(dir_path)
def copy_dir(src_dir, dst_dir):
if not os.path.exists(src_dir):
return
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
for item in os.listdir(src_dir):
src_item = os.path.join(src_dir, item)
dst_item = os.path.join(dst_dir, item)
if os.path.isdir(src_item):
copy_dir(src_item, dst_item)
else:
shutil.copyfile(src_item, dst_item)
def copy_file(src_thum_file, dst_thum_file):
if not os.path.exists(src_thum_file):
return
shutil.copyfile(src_thum_file, dst_thum_file)
def get_bundle_file_path(bundle_dir_path):
"""
在指定目录中查找.bundle后缀文件返回文件路径不遍历子目录
:param bundle_dir_path: 目标文件夹路径
:return: .bundle文件路径如果没找到则返回None
"""
if not os.path.exists(bundle_dir_path):
return None
for file in os.listdir(bundle_dir_path):
file_path = os.path.join(bundle_dir_path, file)
if os.path.isfile(file_path) and file.endswith('.bundle'):
return file_path
return None