config-generator/config_generator.py

414 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# coding:utf-8
import json
import os
import sys
import config
import utils
class CDNConfig:
enable = False
dir = ''
filename = ''
version = ''
def __init__(self, enable: bool, dir: str, filename: str, version: str):
self.enable = enable
self.dir = dir
self.filename = filename
self.version = version
class RemoteConfig:
enable = False
key = ''
group = ''
condition = ''
def __init__(self, enable, key, group, condition):
self.enable = enable
self.key = key
self.group = group
self.condition = condition
class ConfigGenerator:
google_sheet_file_name = None
sheet_table_name = None
project = None
platform = None
env = None
cdn_config = None
remote_config = None
sheet_helper = None
firebase_helper = None
field_dict = {} # 字段名称字典
field_type_dict = {} # 字段类型字典
# region 配置生成器初始化
def __init__(self, google_sheet_file_name, sheet_table_name, project, platform, env):
self.sheet_helper = config.sheet_helper
self.google_sheet_file_name = google_sheet_file_name
self.sheet_table_name = sheet_table_name
self.project = project
self.platform = platform.lower()
self.env = env
self.init_notification_param()
self.init_config_generator_param()
def init_notification_param(self):
config.notification.append_msg(f'生成配置文件[文件名: {self.google_sheet_file_name}, 表名: {self.sheet_table_name}]')
config.notification.append_msg(f'配置构建参数[项目: {self.project}, 平台: {self.platform}, 环境: {self.env}]')
def init_config_generator_param(self):
sheet = self.sheet_helper.get_sheet_table(self.google_sheet_file_name, self.sheet_table_name)
if sheet is None:
config.wechat_alert_exception(f'获取表格失败[文件名: {self.google_sheet_file_name}, 表名: {self.sheet_table_name}]')
sheet_all_row_datas = sheet.get_all_values(major_dimension='ROWS')
for i, row_values in enumerate(sheet_all_row_datas):
if i == 0:
self.parse_cdn_config(row_values)
elif i == 1:
self.parse_remote_config(row_values)
elif i == 2:
continue
elif i == 3:
self.parse_field(row_values, self.field_dict)
elif i == 4:
self.parse_field_type(row_values, self.field_type_dict)
else:
break
def parse_cdn_config(self, row_values):
enable_index = -1
enable_value = None
dir_index = -1
dir_value = None
filename_index = -1
filename_value = None
version_index = -1
version_value = None
for i, cell_value in enumerate(row_values):
if cell_value == 'enable':
enable_index = i + 1
elif cell_value == '目录':
dir_index = i + 1
elif cell_value == '文件名':
filename_index = i + 1
elif cell_value == '版本':
version_index = i + 1
for i, cell_value in enumerate(row_values):
if i == enable_index:
if str(cell_value).lower() == 'true':
enable_value = True
elif i == dir_index:
dir_value = cell_value
elif i == filename_index:
filename_value = cell_value
elif i == version_index:
version_value = cell_value
self.cdn_config = CDNConfig(enable_value, dir_value, filename_value, version_value)
def parse_remote_config(self, row_values):
pass
enable_index = -1
enable_value = False
key_index = -1
key_value = None
group_index = -1
group_value = None
condition_index = -1
condition_value = None
for i, cell_value in enumerate(row_values):
if cell_value == 'enable':
enable_index = i + 1
if cell_value == 'key':
key_index = i + 1
elif cell_value == 'group':
group_index = i + 1
elif cell_value == 'condition':
condition_index = i + 1
for i, cell_value in enumerate(row_values):
if i == enable_index:
if str(cell_value).lower() == 'true':
enable_value = True
elif i == key_index:
remote_key = cell_value
if '#platform#' in remote_key:
remote_key = remote_key.replace('#platform#', self.platform)
key_value = remote_key
elif i == group_index:
group_value = cell_value
elif i == condition_index:
condition_value = cell_value
self.remote_config = RemoteConfig(enable_value, key_value, group_value, condition_value)
def parse_field(self, row_values, field_dict):
for i, field_name in enumerate(row_values):
if i == 0:
continue
if field_name != '':
field_dict[i] = field_name
print(str(field_dict))
def parse_field_type(self, row_values, field_type_dict):
for i, field_type in enumerate(row_values):
if i == 0:
continue
if field_type != '':
field_type_dict[i] = field_type
print(str(field_type_dict))
# endregion
def gen_config_json(self):
sheet = self.sheet_helper.get_sheet_table(self.google_sheet_file_name, self.sheet_table_name)
sheet_all_row_datas = sheet.get_all_values(major_dimension='ROWS')
config_json = {'datas': []}
error_lines = []
data_row_index = -1
for i, row_values in enumerate(sheet_all_row_datas):
if data_row_index == -1:
if 'data' not in str(row_values[0]).lower():
continue
else:
data_row_index = i
if self.is_row_env_valid(row_values) is False:
continue
item_data, error = self.parse_row_data(i + 1, row_values, self.field_dict, self.field_type_dict)
if error != '':
error_lines.append(error)
elif len(item_data) == 0:
continue
else:
config_json['datas'].append(item_data)
if len(error_lines) > 0:
config.notification.append_msg(str(error_lines))
return
_json = json.dumps(config_json)
filename = self.get_config_filename()
local_file_path = self.get_local_config_file_path()
utils.write_json_file(local_file_path, _json)
config.notification.append_msg(f"{filename} 关卡配置生成成功!当前总关卡数:{len(config_json['datas'])}")
# config.wechat_alert_message(f'{filename}:{_json}')
print(f"{filename}:{_json}")
return _json
# region 行数据解析
def is_row_env_valid(self, row_values):
if self.env == config.env.debug.value:
return True
if row_values[1] == 'TRUE':
return True
return False
def parse_row_data(self, row, row_values, field_dict, field_type_dict):
item_data = {}
error = ''
for i, value in enumerate(row_values):
if i not in field_dict or i not in field_type_dict:
continue
field = field_dict[i]
field_type = field_type_dict[i]
isvalid, check_error = self.check_field_type_valid(field_type, value)
if isvalid:
item_data[field] = self.get_type_value(field_type, value)
else:
error += f'|{check_error}'
if error != '':
error = f"{row}行->id:{item_data['id']}关卡配置错误: {error}"
return item_data, error
def check_field_type_valid(self, field_type, value):
if value is None:
return False, f'值为空'
if value == '': # 默认值
return True, ''
try:
if field_type == 'int':
int(value)
elif field_type == 'float' or field_type == 'double':
float(value)
elif field_type == 'bool':
if value.lower() not in ['true', 'false']:
return False, f'布尔值格式错误: {value}'
elif field_type == 'string':
# 字符串类型不需要特殊验证
pass
elif field_type.startswith('List<'):
inner_type = field_type[5:-1] # 提取 List<type> 中的 type
if value == '':
return True, '' # 空列表是有效的
items = value.split('#')
for item in items:
if item == '':
continue
valid, error = self.check_field_type_valid(inner_type, item)
if not valid:
return False, f'列表元素 {item} {error}'
else:
return False, f'未知类型: {field_type}'
return True, ''
except ValueError:
return False, f'类型转换错误: 无法将 {value} 转换为 {field_type}'
except Exception as e:
return False, f'验证错误: {str(e)}'
def get_type_value(self, field_type, value):
if value is None:
return None
if field_type == 'int':
if value == '':
return 0
return int(value)
elif field_type == 'float' or field_type == 'double':
if value == '':
return 0.0
return float(value)
elif field_type == 'bool':
if value == '':
return False
return value.lower() == 'true'
elif field_type == 'string':
return value
elif field_type.startswith('List<'):
if value == '':
return []
inner_type = field_type[5:-1]
items = value.split('#')
result = []
for item in items:
if item == '':
continue
result.append(self.get_type_value(inner_type, item))
return result
else:
# 未知类型,返回原始值
return value
# endregion
def upload_cdn(self):
if self.cdn_config is None:
config.notification.append_msg('cdn配置为空不上传cdn')
return
if self.cdn_config.enable is False:
config.notification.append_msg('cdn配置未开启, 不上传cdn')
return
filename = self.get_config_filename()
local_file_path = self.get_local_config_file_path()
if not os.path.exists(local_file_path):
return
storage_level_db_path = f"{self.cdn_config.dir}/{filename}"
generation = config.get_firebase_instance(self.project).upload_file(local_file_path, storage_level_db_path)
config.notification.append_msg(f"{filename}配置上传到cdn路径:{storage_level_db_path}成功")
cdn_url = f'{config.get_project_cdn(self.project)}/{storage_level_db_path}?generation={generation}'
config.notification.append_msg(f"配置最新下载链接url:{cdn_url}")
return generation
def update_remote_config(self, generation=None):
if self.remote_config is None:
config.notification.append_msg('remote配置为空不更新RemoteConfig')
return
if self.remote_config.enable is False:
config.notification.append_msg('remote配置未开启不更新RemoteConfig')
return
filename = self.get_config_filename()
storage_level_db_path = f"{self.cdn_config.dir}/{filename}"
cdn = config.get_project_cdn(self.project)
if generation is None:
generation = self.firebase_helper.get_file_generation(storage_level_db_path)
level_db_cdn_url = f'{cdn}/{storage_level_db_path}' if generation is None else f'{cdn}/{storage_level_db_path}?generation={generation}'
key = self.get_remote_config_key()
group = None if self.remote_config.group == '' else self.remote_config.group
condition = None if self.remote_config.condition == '' else self.remote_config.condition
config.get_firebase_instance(self.project).update_remote_config_json_field_value(group, condition, key, self.env, level_db_cdn_url, True)
config.notification.append_msg(f"[group:{group}, condition:{condition}, key:{key} env:{self.env}] 云控更新成功")
def get_local_config_file_path(self):
local_file_path = f'temp_config/{self.get_config_filename()}'
return local_file_path
def get_config_filename(self):
cdn_filename = self.cdn_config.filename
if '#platform#' in cdn_filename:
if self.platform == str(config.platform.No.value).lower():
cdn_filename = cdn_filename.replace('#platform#', '')
else:
cdn_filename = cdn_filename.replace('#platform#', self.platform)
cdn_filename += f'-{self.cdn_config.version}-{self.env}.json'
return cdn_filename
def get_remote_config_key(self):
remote_key = self.remote_config.key
if '#platform#' in remote_key:
if self.platform == str(config.platform.No.value).lower():
remote_key = remote_key.replace('#platform#', '')
else:
remote_key = remote_key.replace('#platform#', self.platform)
return remote_key
project_id = None
platform = None
env = None
google_sheet_file_name = None
sheet_table_name = None
param_enable_upload_cdn = None
param_enable_upload_remote_config = None
if len(sys.argv) > 1:
project_id = sys.argv[1]
if len(sys.argv) > 2:
platform = sys.argv[2]
if len(sys.argv) > 3:
env = sys.argv[3]
if len(sys.argv) > 4:
google_sheet_file_name = sys.argv[4]
if len(sys.argv) > 5:
sheet_table_name = sys.argv[5]
if len(sys.argv) > 6:
param_enable_upload_cdn = sys.argv[6]
if len(sys.argv) > 7:
param_enable_upload_remote_config = sys.argv[7]
if __name__ == "__main__":
if project_id is None:
config.notification.append_msg(f'参数错误[project_id is None]')
exit(1)
if platform is None:
config.notification.append_msg(f'参数错误[platform is None]')
exit(1)
if env is None:
config.notification.append_msg(f'参数错误[env is None]')
exit(1)
if google_sheet_file_name is None or sheet_table_name is None:
config.notification.append_msg(f'参数错误[google_sheet_file_name or sheet_table_name is None]')
exit(1)
config_generator = ConfigGenerator(google_sheet_file_name, sheet_table_name, project_id, platform, env)
config_json = config_generator.gen_config_json()
generation = None
if str(param_enable_upload_cdn).lower() == 'true':
generation = config_generator.upload_cdn()
if str(param_enable_upload_remote_config).lower() == 'true':
config_generator.update_remote_config(generation)
config.wechat_alert()