config-generator/utils.py

246 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# coding:utf-8
import glob
import hashlib
import json
import os
import sys
import gzip
import shutil
import subprocess
import zipfile
curr_dir = os.path.split(os.path.abspath(__file__))[0]
def write_json_file(filename, json):
"""
将JSON内容写入指定文件
:param filename: 文件路径
:param json: 要写入的JSON内容
:return: 无
"""
# 确保文件所在目录存在
directory = os.path.dirname(filename)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(filename, "w") as f:
f.write(json)
f.close()
def gzip_file(_in_file):
with open(_in_file, 'rb') as f_in:
with gzip.open(_in_file + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
def zip_dir(dir_path, out_fullname):
"""
压缩指定文件夹
:param dir_path: 目标文件夹路径
:param out_fullname: 压缩文件报错路径+xxxx.zip
:return: 无
"""
zip = zipfile.ZipFile(out_fullname, "w", zipfile.ZIP_DEFLATED)
for path, dirnames, filenames in os.walk(dir_path):
# 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
fpath = path.replace(dir_path, '')
for filename in filenames:
zip.write(os.path.join(path, filename), os.path.join(fpath, filename))
zip.close()
def unzip_dir(zip_src, dst_dir):
"""
解压文件到指定文件夹
:param zip_src: zip文件
:param dst_dir: 解压目录
:return: 无
"""
r = zipfile.is_zipfile(zip_src)
if r:
fz = zipfile.ZipFile(zip_src, 'r')
for file in fz.namelist():
fz.extract(file, dst_dir)
else:
print('%s This is not zip' % zip_src)
def zip_file(file_path, out_fullname):
"""
压缩指定文件
:param file_path: 目标文件路径
:param out_fullname: 压缩文件保存路径+xxxx.zip
:return: 无
"""
file_zip = zipfile.ZipFile(out_fullname, 'w')
file_zip.write(file_path, compress_type=zipfile.ZIP_DEFLATED)
file_zip.close()
def unzip_file(zip_file_path, sp_path=""):
"""
解压文件
:param zip_file_path: zip文件路径
:param sp_path: 指定目录
:return: 无
"""
file_zip = zipfile.ZipFile(zip_file_path)
sp_path = sp_path if sp_path != "" else "{}/../".format(os.path.splitext(zip_file_path)[0])
# 解压
file_zip.extractall(path=sp_path)
def get_file_last_line(f_name):
"""
:param f_name: f_name为所读xx.txt文件
:return: 文件最后一行
"""
print(f_name)
with open(f_name, 'r') as f: # 打开文件
first_line = f.readline() # 读第一行
off = -50 # 设置偏移量
while True:
f.seek(off, 2) # seek(off, 2)表示文件指针:从文件末尾(2)开始向前50个字符(-50)
lines = f.readlines() # 读取文件指针范围内所有行
if len(lines) >= 2: # 判断是否最后至少有两行,这样保证了最后一行是完整的
last_line = lines[-1] # 取最后一行
break
# 如果off为50时得到的readlines只有一行内容那么不能保证最后一行是完整的
# 所以off翻倍重新运行直到readlines不止一行
off *= 2
print('文件' + f_name + '第一行为:' + first_line)
print('文件' + f_name + '最后一行为:' + last_line)
return last_line
def open_json(path):
dic = {}
with open(path, 'r') as f:
dic = json.load(f)
return dic
def write_json(path, content):
with open(path, 'w') as f:
json.dump(content, f)
def calc_hash(filepath):
"""
根据文件内容生成hash值
:param filepath: 文件路径
:return: hash code
"""
with open(filepath, 'rb') as f:
sha1obj = hashlib.sha1()
sha1obj.update(f.read())
hash_code = sha1obj.hexdigest()
return hash_code
def calc_md5(filepath):
"""
根据文件内容生成md5码
:param filepath: 文件路径
:return: md5 code
"""
with open(filepath, 'rb') as f:
md5obj = hashlib.md5(f.read())
md5_code = md5obj.hexdigest()
return md5_code
def delete_files_with_extension(folder_path, extension):
"""
删除指定文件夹内指定后缀的文件
:param folder_path: 文件夹路径
:param extension: 后缀名称
:return: 无
"""
# 获取指定文件夹内指定后缀的文件列表
files = glob.glob(os.path.join(folder_path, f"*.{extension}"))
# 遍历文件列表并删除文件
for file_path in files:
os.remove(file_path)
def run_cmd(str_cmd, log_path=""):
if len(log_path) > 1:
logfile = open(log_path, "a")
logfile.writelines("-----------------------------")
logfile.writelines(str(str_cmd))
process = subprocess.Popen(
str_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(str_cmd)
lines_out = process.stdout.readlines()
for line in lines_out:
print(line)
if len(log_path) > 1:
logfile.writelines(str(line))
lines_error = process.stderr.readlines()
if len(log_path) > 1 and len(lines_error) > 0:
logfile.writelines("has error:\n\n")
for line in lines_error:
print(line)
if len(log_path) > 1:
logfile.writelines(str(line))
print("end: " + str_cmd)
if len(log_path) > 0:
logfile.writelines("end: " + str_cmd)
logfile.close()
return lines_out, lines_error
def mkdirs(dir_path: str):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
def clear_dirs(dir_path: str):
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
os.makedirs(dir_path)
def copy_dir(src_dir, dst_dir):
if not os.path.exists(src_dir):
return
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
for item in os.listdir(src_dir):
src_item = os.path.join(src_dir, item)
dst_item = os.path.join(dst_dir, item)
if os.path.isdir(src_item):
copy_dir(src_item, dst_item)
else:
shutil.copyfile(src_item, dst_item)
def copy_file(src_thum_file, dst_thum_file):
if not os.path.exists(src_thum_file):
return
shutil.copyfile(src_thum_file, dst_thum_file)
def get_bundle_file_path(bundle_dir_path):
"""
在指定目录中查找.bundle后缀文件返回文件路径不遍历子目录
:param bundle_dir_path: 目标文件夹路径
:return: .bundle文件路径如果没找到则返回None
"""
if not os.path.exists(bundle_dir_path):
return None
for file in os.listdir(bundle_dir_path):
file_path = os.path.join(bundle_dir_path, file)
if os.path.isfile(file_path) and file.endswith('.bundle'):
return file_path
return None