import datetime import json import os import shutil import sys import time curr_dir = os.path.split(os.path.abspath(__file__))[0] print("curr_dir = " + curr_dir) # 添加工具路径到系统路径 tools_path = os.path.abspath(os.path.join(curr_dir, "../../FindObjectBundleBuilder/Tools")) print("tools_path = " + tools_path) # 检查路径是否存在 # if os.path.exists(tools_path): # print("Tools path exists") # if os.path.exists(os.path.join(tools_path, "utils.py")): # print("utils.py found") # else: # print("utils.py not found") # else: # print("Tools path does not exist") sys.path.insert(0, tools_path) sys.path.insert(0, os.path.join(tools_path, "ipm")) sys.path.insert(0, os.path.join(tools_path, "google_drive")) sys.path.insert(0, os.path.join(tools_path, "config_convert")) sys.path.insert(0, os.path.join(tools_path, "build_package")) platform = "Android" env = "Release" if len(sys.argv) > 1: platform = sys.argv[1] if len(sys.argv) > 1: env = sys.argv[2] from google_drive.google_sheet import GoogleSheetHelper from firebase.firebase_helper import FirebaseHelper from notification_helper import NotificationHelper import upload_firebase_storage import config as config notification_helper = NotificationHelper() # 尝试导入 utils 模块 try: import utils print("utils module imported successfully") except ImportError as e: print(f"Failed to import utils: {e}") # 如果导入失败,直接定义 mkdirs 函数 def mkdirs(dir_path: str): if not os.path.exists(dir_path): os.makedirs(dir_path) # 创建一个简单的 utils 对象来模拟模块 class Utils: @staticmethod def mkdirs(dir_path: str): if not os.path.exists(dir_path): os.makedirs(dir_path) utils = Utils() PROJECT_PATH = os.path.abspath(os.path.join(curr_dir, "../build_profile_package")) ASSET_PATH = os.path.abspath(os.path.join(curr_dir, "../profile_asset")) # Unity执行文件路径 UNITY_PATH = "/Applications/Unity/Hub/Editor/2021.3.45f1/Unity.app/Contents/MacOS/Unity" RESOURCE_ROOT = os.path.abspath(os.path.join(curr_dir, "../build_profile_package/Bundles")) REMOTE_CONFIG_FILE = os.path.join(curr_dir, "../../FindObjectBundleBuilder/Tools/firebase/remote_config.json") json_path = os.path.abspath(os.path.join(curr_dir, 'temp_config/profile.json')) google_sheet_file_name = 'FIndObject装饰配置表' sheet_table_name = 'default' def get_json(): ''' 访问配置的谷歌表格生成json文件 ''' sheet_helper = GoogleSheetHelper() sheet = sheet_helper.get_sheet_table(google_sheet_file_name, sheet_table_name) sheet_all_row_datas = sheet.get_all_values(major_dimension='ROWS') config_json = {} error_lines = [] list_field = '' if sheet is None: notification_helper.wechat_alert_exception( f'获取表格失败[文件名: {google_sheet_file_name}, 表名: {sheet_table_name}]') sheet_all_row_datas = sheet.get_all_values(major_dimension='ROWS') field_dict = {} field_type_dict = {} for i, row_values in enumerate(sheet_all_row_datas): if i == 0: continue # parse_cdn_config(row_values) elif i == 1: continue # parse_remote_config(row_values) elif i == 2: continue elif i == 3: parse_field(row_values, field_dict) elif i == 4: parse_field_type(row_values, field_type_dict) else: break for i, row_values in enumerate(sheet_all_row_datas): if i <= 4: continue if row_values[0] != '': list_field = row_values[0] config_json[list_field] = [] # if is_row_env_valid(row_values) is False: # continue item_data, error = parse_row_data(i + 1, row_values, field_dict, field_type_dict) if error != '': error_lines.append(error) elif len(item_data) == 0: continue else: config_json[list_field].append(item_data) if len(error_lines) > 0: notification_helper.append_msg(str(error_lines)) return _json = json.dumps(config_json) local_file_path = get_local_config_file_path() utils.write_json_file(local_file_path, "profile.json", _json) print(f"json配置生成成功:{len(config_json['datas'])}") notification_helper.append_msg(f"json配置生成成功:{len(config_json['datas'])}") return _json def parse_field(row_values, field_dict): for i, field_name in enumerate(row_values): if i == 0: continue if field_name != '': field_dict[i] = field_name print(str(field_dict)) def parse_field_type(row_values, field_type_dict): for i, field_type in enumerate(row_values): if i == 0: continue if field_type != '': field_type_dict[i] = field_type print(str(field_type_dict)) def get_local_config_file_path(): local_file_path = f'temp_config/' return local_file_path def is_row_env_valid(row_values): if row_values[1] == 'TRUE': return True return False def parse_row_data(row, row_values, field_dict, field_type_dict): item_data = {} error = '' for i, value in enumerate(row_values): if i not in field_dict or i not in field_type_dict: continue field = field_dict[i] field_type = field_type_dict[i] isvalid, check_error = check_field_type_valid(field_type, value) if isvalid: item_data[field] = get_type_value(field_type, value) else: error += f'|{check_error}' if error != '': error = f"第{row}行->id:{item_data['id']}关卡配置错误: {error}" return item_data, error def check_field_type_valid(field_type, value): if value is None: return False, f'值为空' if value == '': # 默认值 return True, '' try: if field_type == 'int': int(value) elif field_type == 'float' or field_type == 'double': float(value) elif field_type == 'bool': if value.lower() not in ['true', 'false']: return False, f'布尔值格式错误: {value}' elif field_type == 'string': # 字符串类型不需要特殊验证 pass elif field_type.startswith('List<'): inner_type = field_type[5:-1] # 提取 List 中的 type if value == '': return True, '' # 空列表是有效的 items = value.split('#') for item in items: if item == '': continue valid, error = check_field_type_valid(inner_type, item) if not valid: return False, f'列表元素 {item} {error}' else: return False, f'未知类型: {field_type}' return True, '' except ValueError: return False, f'类型转换错误: 无法将 {value} 转换为 {field_type}' except Exception as e: return False, f'验证错误: {str(e)}' def get_type_value(field_type, value): if value is None: return None if field_type == 'int': if value == '': return 0 return int(value) elif field_type == 'float' or field_type == 'double': if value == '': return 0.0 return float(value) elif field_type == 'bool': if value == '': return False return value.lower() == 'true' elif field_type == 'string': return value elif field_type.startswith('List<'): if value == '': return [] inner_type = field_type[5:-1] items = value.split('#') result = [] for item in items: if item == '': continue result.append(get_type_value(inner_type, item)) return result else: # 未知类型,返回原始值 return value def copy_file_to_unity(): ''' 软链接资源到unity里面 ''' uiraw_path = os.path.join(PROJECT_PATH, "Assets/AssetRaw/UIRaw/") # 确保目录存在 utils.mkdirs(uiraw_path) # 删除现有的符号链接和文件 try: if os.path.exists(uiraw_path): # 删除目录中的所有内容 for item in os.listdir(uiraw_path): # 如果是name_font文件夹,跳过不删除 if item == "name_font": continue item_path = os.path.join(uiraw_path, item) if os.path.islink(item_path): # 如果是符号链接,直接删除 os.unlink(item_path) elif os.path.isfile(item_path): # 如果是普通文件,删除 os.remove(item_path) elif os.path.isdir(item_path): # 如果是目录,递归删除 shutil.rmtree(item_path) except Exception as e: print(f"清理现有文件时出错: {e}") # 创建新的符号链接 utils.mkdirs(os.path.dirname(ASSET_PATH)) for root, dirs, files in os.walk(ASSET_PATH): for dir in dirs: source_path = os.path.join(root, dir) target_path = os.path.join(PROJECT_PATH, f"Assets/AssetRaw/UIRaw/{dir}") # 确保目标路径不存在 if os.path.exists(target_path) or os.path.islink(target_path): if os.path.islink(target_path): os.unlink(target_path) elif os.path.isdir(target_path): shutil.rmtree(target_path) else: os.remove(target_path) os.symlink(source_path, target_path) # 处理JSON文件 json_target_path = os.path.join(PROJECT_PATH, f"Assets/AssetRaw/UIRaw/profile.json") if os.path.exists(json_target_path) or os.path.islink(json_target_path): if os.path.islink(json_target_path): os.unlink(json_target_path) else: os.remove(json_target_path) if os.path.exists(json_path): os.symlink(json_path, json_target_path) else: print("json文件不存在,有问题") raise Exception("json文件不存在,有问题") print("软链接拷贝成功") def build_package(): ''' 运行unity打资源包 ''' time_build_package_start = time.time() params = "AppVersion=" + "1.0.0" now = datetime.datetime.now() build_number = "{:2d}{:02d}{:02d}{:02d}{:02d}".format(int(now.year - 2000), int(now.month), int(now.day), int(now.hour), int(now.minute)) params = params + " BuildVersion=" + build_number # params = params + " ResVersion=" + opts.resversion params = params + " Platform=" + platform # params = params + " BuildType=" + opts.buildtype # params = params + " Mode=" + opts.mode params = params + " ResourceRoot=" + RESOURCE_ROOT print(params) cmd = UNITY_PATH + " -quit " + " -batchmode " + " -projectPath " + PROJECT_PATH + " -executeMethod BuildBundlesHelper.BuildBundles " + params + " -logFile " + PROJECT_PATH + "/Logs/build_log.txt" print(cmd) # 调用Unity打包 os.system(cmd) error = '' is_build_success = False # 热更Bundles文件输出文件夹(Unity输出的源文件) output_dir = f"{RESOURCE_ROOT}/{platform}" bundle_file_path = output_dir + "/ProfilesFeatureV2/" + build_number if (platform == "Android" or platform == "iOS") and os.path.exists(output_dir): isAppendNewLine = False if not os.path.exists(bundle_file_path): error = error + f"资源打包失败 " isAppendNewLine = True if isAppendNewLine: error = error + "\n" if error == '': is_build_success = True time_build_package_end = time.time() if is_build_success: notification_helper.append_msg( f"{platform}平台 YooAsset资源构建成功, cost time:{(time_build_package_end - time_build_package_start):.0f}s") else: notification_helper.append_msg(f"{platform}平台 YooAsset资源构建失败, error:{error}, 请前往Jenkins查看具体日志") # notification_helper.append_at_people(notification_helper.at_zhouzhuo) return is_build_success, bundle_file_path def upload_package(is_build_success, ab_dir): ''' 上传资源包到storage ''' print(f"ab_path {ab_dir}") if is_build_success: print(f"开始上传资源包到firebase") # storage_path = "Bundles/{}".format(platform) + f'/Profile/profile-{env}.bundle' # # 从ab_dir文件夹下获取所有.bundle文件 # bundle_file = "" # if os.path.exists(ab_dir): # for root, dirs, files in os.walk(ab_dir): # for file in files: # if file.endswith('.bundle'): # bundle_file_path = os.path.join(root, file) # bundle_file = bundle_file_path # print(f"找到bundle文件: {bundle_file_path}") # break # # if bundle_file == "": # print(f"没有找到资源包") # raise Exception("没有找到资源包") storage_path = "Bundles/{}".format(platform) + f'/ProfilesFeatureV2/{env}' tmUploadPackage1 = time.time() # generation = helper.upload_file(storage_path, bundle_file) upload_firebase_storage.upload_directory(helper, storage_path, ab_dir, ".version") tmUploadPackage2 = time.time() print(f"firebase上传耗时:{tmUploadPackage2 - tmUploadPackage1}") def refresh_remote_config(time_stamp): ''' 刷新云控 ''' remote_key = "profile_asset_config" # with open(REMOTE_CONFIG_FILE, "r") as f: # online_txt = f.read() # # if online_txt != None and online_txt != "": # online_json = json.loads(online_txt) # # value = online_json["parameters"][remote_key]["defaultValue"]["value"] # value_json = json.loads(value) # value_json[env] = url helper.update_remote_config(None, None, remote_key, env, time_stamp) def get_time_stamp(): """返回当前时间的时间戳""" return int(time.time()) if __name__ == "__main__": # platform = "iOS" # env = 'Debug' print(1) helper = FirebaseHelper() get_json() copy_file_to_unity() tmBuildPackage1 = time.time() is_build_success, ab_dir = build_package() tmBuildPackage2 = time.time() print(f" unity打包耗时:{tmBuildPackage2 - tmBuildPackage1}") time.sleep(1) upload_package(is_build_success, ab_dir) time.sleep(1) refresh_remote_config(get_time_stamp()) print(2)