find-object-bundle-builder/pipeline_Profile/code_file/build_profile.py

436 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import datetime
import json
import os
import shutil
import sys
import time
curr_dir = os.path.split(os.path.abspath(__file__))[0]
print("curr_dir = " + curr_dir)
# 添加工具路径到系统路径
tools_path = os.path.abspath(os.path.join(curr_dir, "../../FindObjectBundleBuilder/Tools"))
print("tools_path = " + tools_path)
# 检查路径是否存在
# if os.path.exists(tools_path):
# print("Tools path exists")
# if os.path.exists(os.path.join(tools_path, "utils.py")):
# print("utils.py found")
# else:
# print("utils.py not found")
# else:
# print("Tools path does not exist")
sys.path.insert(0, tools_path)
sys.path.insert(0, os.path.join(tools_path, "ipm"))
sys.path.insert(0, os.path.join(tools_path, "google_drive"))
sys.path.insert(0, os.path.join(tools_path, "config_convert"))
sys.path.insert(0, os.path.join(tools_path, "build_package"))
platform = "Android"
env = "Release"
if len(sys.argv) > 1:
platform = sys.argv[1]
if len(sys.argv) > 1:
env = sys.argv[2]
from google_drive.google_sheet import GoogleSheetHelper
from firebase.firebase_helper import FirebaseHelper
from notification_helper import NotificationHelper
import upload_firebase_storage
import config as config
notification_helper = NotificationHelper()
# 尝试导入 utils 模块
try:
import utils
print("utils module imported successfully")
except ImportError as e:
print(f"Failed to import utils: {e}")
# 如果导入失败,直接定义 mkdirs 函数
def mkdirs(dir_path: str):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# 创建一个简单的 utils 对象来模拟模块
class Utils:
@staticmethod
def mkdirs(dir_path: str):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
utils = Utils()
PROJECT_PATH = os.path.abspath(os.path.join(curr_dir, "../build_profile_package"))
ASSET_PATH = os.path.abspath(os.path.join(curr_dir, "../profile_asset"))
# Unity执行文件路径
UNITY_PATH = "/Applications/Unity/Hub/Editor/2021.3.45f1/Unity.app/Contents/MacOS/Unity"
RESOURCE_ROOT = os.path.abspath(os.path.join(curr_dir, "../build_profile_package/Bundles"))
REMOTE_CONFIG_FILE = os.path.join(curr_dir, "../../FindObjectBundleBuilder/Tools/firebase/remote_config.json")
json_path = os.path.abspath(os.path.join(curr_dir, 'temp_config/profile.json'))
google_sheet_file_name = 'FIndObject装饰配置表'
sheet_table_name = 'default'
def get_json():
''' 访问配置的谷歌表格生成json文件 '''
sheet_helper = GoogleSheetHelper()
sheet = sheet_helper.get_sheet_table(google_sheet_file_name, sheet_table_name)
sheet_all_row_datas = sheet.get_all_values(major_dimension='ROWS')
config_json = {}
error_lines = []
list_field = ''
if sheet is None:
notification_helper.wechat_alert_exception(
f'获取表格失败[文件名: {google_sheet_file_name}, 表名: {sheet_table_name}]')
sheet_all_row_datas = sheet.get_all_values(major_dimension='ROWS')
field_dict = {}
field_type_dict = {}
for i, row_values in enumerate(sheet_all_row_datas):
if i == 0:
continue
# parse_cdn_config(row_values)
elif i == 1:
continue
# parse_remote_config(row_values)
elif i == 2:
continue
elif i == 3:
parse_field(row_values, field_dict)
elif i == 4:
parse_field_type(row_values, field_type_dict)
else:
break
for i, row_values in enumerate(sheet_all_row_datas):
if i <= 4:
continue
if row_values[0] != '':
list_field = row_values[0]
config_json[list_field] = []
# if is_row_env_valid(row_values) is False:
# continue
item_data, error = parse_row_data(i + 1, row_values, field_dict, field_type_dict)
if error != '':
error_lines.append(error)
elif len(item_data) == 0:
continue
else:
config_json[list_field].append(item_data)
if len(error_lines) > 0:
notification_helper.append_msg(str(error_lines))
return
_json = json.dumps(config_json)
local_file_path = get_local_config_file_path()
utils.write_json_file(local_file_path, "profile.json", _json)
print(f"json配置生成成功{len(config_json['datas'])}")
notification_helper.append_msg(f"json配置生成成功{len(config_json['datas'])}")
return _json
def parse_field(row_values, field_dict):
for i, field_name in enumerate(row_values):
if i == 0:
continue
if field_name != '':
field_dict[i] = field_name
print(str(field_dict))
def parse_field_type(row_values, field_type_dict):
for i, field_type in enumerate(row_values):
if i == 0:
continue
if field_type != '':
field_type_dict[i] = field_type
print(str(field_type_dict))
def get_local_config_file_path():
local_file_path = f'temp_config/'
return local_file_path
def is_row_env_valid(row_values):
if row_values[1] == 'TRUE':
return True
return False
def parse_row_data(row, row_values, field_dict, field_type_dict):
item_data = {}
error = ''
for i, value in enumerate(row_values):
if i not in field_dict or i not in field_type_dict:
continue
field = field_dict[i]
field_type = field_type_dict[i]
isvalid, check_error = check_field_type_valid(field_type, value)
if isvalid:
item_data[field] = get_type_value(field_type, value)
else:
error += f'|{check_error}'
if error != '':
error = f"{row}行->id:{item_data['id']}关卡配置错误: {error}"
return item_data, error
def check_field_type_valid(field_type, value):
if value is None:
return False, f'值为空'
if value == '': # 默认值
return True, ''
try:
if field_type == 'int':
int(value)
elif field_type == 'float' or field_type == 'double':
float(value)
elif field_type == 'bool':
if value.lower() not in ['true', 'false']:
return False, f'布尔值格式错误: {value}'
elif field_type == 'string':
# 字符串类型不需要特殊验证
pass
elif field_type.startswith('List<'):
inner_type = field_type[5:-1] # 提取 List<type> 中的 type
if value == '':
return True, '' # 空列表是有效的
items = value.split('#')
for item in items:
if item == '':
continue
valid, error = check_field_type_valid(inner_type, item)
if not valid:
return False, f'列表元素 {item} {error}'
else:
return False, f'未知类型: {field_type}'
return True, ''
except ValueError:
return False, f'类型转换错误: 无法将 {value} 转换为 {field_type}'
except Exception as e:
return False, f'验证错误: {str(e)}'
def get_type_value(field_type, value):
if value is None:
return None
if field_type == 'int':
if value == '':
return 0
return int(value)
elif field_type == 'float' or field_type == 'double':
if value == '':
return 0.0
return float(value)
elif field_type == 'bool':
if value == '':
return False
return value.lower() == 'true'
elif field_type == 'string':
return value
elif field_type.startswith('List<'):
if value == '':
return []
inner_type = field_type[5:-1]
items = value.split('#')
result = []
for item in items:
if item == '':
continue
result.append(get_type_value(inner_type, item))
return result
else:
# 未知类型,返回原始值
return value
def copy_file_to_unity():
''' 软链接资源到unity里面 '''
uiraw_path = os.path.join(PROJECT_PATH, "Assets/AssetRaw/UIRaw/")
# 确保目录存在
utils.mkdirs(uiraw_path)
# 删除现有的符号链接和文件
try:
if os.path.exists(uiraw_path):
# 删除目录中的所有内容
for item in os.listdir(uiraw_path):
item_path = os.path.join(uiraw_path, item)
if os.path.islink(item_path):
# 如果是符号链接,直接删除
os.unlink(item_path)
elif os.path.isfile(item_path):
# 如果是普通文件,删除
os.remove(item_path)
elif os.path.isdir(item_path):
# 如果是目录,递归删除
shutil.rmtree(item_path)
except Exception as e:
print(f"清理现有文件时出错: {e}")
# 创建新的符号链接
utils.mkdirs(os.path.dirname(ASSET_PATH))
for root, dirs, files in os.walk(ASSET_PATH):
for dir in dirs:
source_path = os.path.join(root, dir)
target_path = os.path.join(PROJECT_PATH, f"Assets/AssetRaw/UIRaw/{dir}")
# 确保目标路径不存在
if os.path.exists(target_path) or os.path.islink(target_path):
if os.path.islink(target_path):
os.unlink(target_path)
elif os.path.isdir(target_path):
shutil.rmtree(target_path)
else:
os.remove(target_path)
os.symlink(source_path, target_path)
# 处理JSON文件
json_target_path = os.path.join(PROJECT_PATH, f"Assets/AssetRaw/UIRaw/profile.json")
if os.path.exists(json_target_path) or os.path.islink(json_target_path):
if os.path.islink(json_target_path):
os.unlink(json_target_path)
else:
os.remove(json_target_path)
if os.path.exists(json_path):
os.symlink(json_path, json_target_path)
else:
print("json文件不存在有问题")
raise Exception("json文件不存在有问题")
print("软链接拷贝成功")
def build_package():
''' 运行unity打资源包 '''
time_build_package_start = time.time()
params = "AppVersion=" + "1.0.0"
now = datetime.datetime.now()
build_number = "{:2d}{:02d}{:02d}{:02d}{:02d}".format(int(now.year - 2000), int(now.month), int(now.day),
int(now.hour), int(now.minute))
params = params + " BuildVersion=" + build_number
# params = params + " ResVersion=" + opts.resversion
params = params + " Platform=" + platform
# params = params + " BuildType=" + opts.buildtype
# params = params + " Mode=" + opts.mode
params = params + " ResourceRoot=" + RESOURCE_ROOT
print(params)
cmd = UNITY_PATH + " -quit " + " -batchmode " + " -projectPath " + PROJECT_PATH + " -executeMethod BuildBundlesHelper.BuildBundles " + params + " -logFile " + PROJECT_PATH + "/Logs/build_log.txt"
print(cmd)
# 调用Unity打包
os.system(cmd)
error = ''
is_build_success = False
# 热更Bundles文件输出文件夹Unity输出的源文件
output_dir = f"{RESOURCE_ROOT}/{platform}"
bundle_file_path = output_dir + "/ProfilesFeature/" + build_number
if (platform == "Android" or platform == "iOS") and os.path.exists(output_dir):
isAppendNewLine = False
if not os.path.exists(bundle_file_path):
error = error + f"资源打包失败 "
isAppendNewLine = True
if isAppendNewLine:
error = error + "\n"
if error == '':
is_build_success = True
time_build_package_end = time.time()
if is_build_success:
notification_helper.append_msg(
f"{platform}平台 YooAsset资源构建成功, cost time:{(time_build_package_end - time_build_package_start):.0f}s")
else:
notification_helper.append_msg(f"{platform}平台 YooAsset资源构建失败, error:{error}, 请前往Jenkins查看具体日志")
# notification_helper.append_at_people(notification_helper.at_zhouzhuo)
return is_build_success, bundle_file_path
def upload_package(is_build_success, ab_dir):
''' 上传资源包到storage '''
print(f"ab_path {ab_dir}")
if is_build_success:
print(f"开始上传资源包到firebase")
# storage_path = "Bundles/{}".format(platform) + f'/Profile/profile-{env}.bundle'
# # 从ab_dir文件夹下获取所有.bundle文件
# bundle_file = ""
# if os.path.exists(ab_dir):
# for root, dirs, files in os.walk(ab_dir):
# for file in files:
# if file.endswith('.bundle'):
# bundle_file_path = os.path.join(root, file)
# bundle_file = bundle_file_path
# print(f"找到bundle文件: {bundle_file_path}")
# break
#
# if bundle_file == "":
# print(f"没有找到资源包")
# raise Exception("没有找到资源包")
storage_path = "Bundles/{}".format(platform) + f'/ProfilesFeature/{env}'
tmUploadPackage1 = time.time()
# generation = helper.upload_file(storage_path, bundle_file)
upload_firebase_storage.upload_directory(helper, storage_path, ab_dir, ".version")
tmUploadPackage2 = time.time()
print(f"firebase上传耗时{tmUploadPackage2 - tmUploadPackage1}")
def refresh_remote_config(time_stamp):
''' 刷新云控 '''
remote_key = "profile_asset_config"
# with open(REMOTE_CONFIG_FILE, "r") as f:
# online_txt = f.read()
#
# if online_txt != None and online_txt != "":
# online_json = json.loads(online_txt)
#
# value = online_json["parameters"][remote_key]["defaultValue"]["value"]
# value_json = json.loads(value)
# value_json[env] = url
helper.update_remote_config(None, None, remote_key, env, time_stamp)
def get_time_stamp():
"""返回当前时间的时间戳"""
return int(time.time())
if __name__ == "__main__":
# platform = "Android"
# env = 'Debug'
print(1)
helper = FirebaseHelper()
get_json()
copy_file_to_unity()
tmBuildPackage1 = time.time()
is_build_success, ab_dir = build_package()
tmBuildPackage2 = time.time()
print(f" unity打包耗时{tmBuildPackage2 - tmBuildPackage1}")
time.sleep(1)
upload_package(is_build_success, ab_dir)
time.sleep(1)
refresh_remote_config(get_time_stamp())
print(2)