find-object-bundle-builder/FindObjectBundleBuilder/Tools/server/handler/firebase_storage.py

248 lines
9.5 KiB
Python

#!/usr/bin/python
# coding:utf-8
import sys
import os
curr_dir = os.path.split(os.path.abspath(__file__))[0]
sys.path.append(os.path.join(curr_dir, "../../"))
import json
import datetime
import config as config
from tornado import gen
from tornado.escape import json_decode, json_encode, utf8
# from server.handler.base import BaseHandler
from firebase.firebase_helper import FirebaseHelper
from google_drive.google_sheet import GoogleSheetHelper
from config_convert.game_play_type import MainPlayType
class FirebaseStorageHandler(BaseHandler):
# class FirebaseStorageHandler():
def initialize(self, data):
self.data = data
@gen.coroutine
def prepare(self):
pass
def parse_pass_count(self, metadata):
"""解析pass_count"""
pass_count = 0
if metadata != None and config.meta_pass_count in metadata:
try:
pass_count = int(metadata[config.meta_pass_count])
except:
pass
return pass_count
def parse_level_state(self, metadata):
level_state = 0
try:
if metadata != None and config.level_state in metadata:
level_state = int(metadata[config.level_state])
except Exception as e:
print(f'出错 {e}')
pass
return level_state
def get_all_test_level_config(self, platform, type):
"""
生成所有测试关卡的配置文件
mode: config_type
platform: Android or iOS
type: 类型
"""
sheet_helper = GoogleSheetHelper()
firebase_helper = FirebaseHelper()
mainPlayType = MainPlayType()
# region 读取表获取所有测试关卡ID
types_level_dict = {}
sheet = sheet_helper.get_sheet_table(config.sheet_all_level, f"{platform}-{config.table_all_levels}")
sheet_datas = sheet.get_all_values(major_dimension='COLUMNS')
for i, j in enumerate(sheet_datas):
if len(j) <= 0:
continue
level_type = j[0]
if level_type in mainPlayType.all_main_play_type_list and level_type not in types_level_dict:
types_level_dict[level_type] = j[1:]
print(types_level_dict)
# endregion
# region 生成所有测试关卡的配置文件
config_json_dict = {}
# 遍历types_level_dict
for gameplay in types_level_dict:
level_list = types_level_dict[gameplay]
if len(level_list) <= 0:
continue
datas_list = []
dict = {}
# 遍历每个gameplay下的所有关卡
level_blobs = firebase_helper.get_files_match(f"Bundles/{platform}/Level/{gameplay}/{gameplay}_", "**.version")
for level_id in level_list:
dict[level_id] = {}
dict[level_id]["LevelId"] = level_id
dict[level_id]["LevelType"] = gameplay
for level_blob in level_blobs:
filename = os.path.splitext(os.path.split(level_blob.name)[1])[0]
asset_id = filename.replace("PackageManifest_", "").replace(".version", "")
if asset_id in level_list:
dict[asset_id]["LevelMapName"] = ""
dict[asset_id]["LevelThumName"] = ""
blob_create_time = datetime.datetime.fromtimestamp(level_blob.updated.timestamp())
dict[asset_id]["LevelUpdateTime"] = blob_create_time.strftime("%m-%d %H:%M")
dict[asset_id]["LevelSize"] = "%.2f" % (level_blob.size / 1024 / 1024)
passCount = self.parse_pass_count(level_blob.metadata)
level_state = self.parse_level_state(level_blob.metadata)
if passCount >= 1 and level_state != 1:
level_state = 2
dict[asset_id]["LevelState"] = level_state
if type == "all":
dict[asset_id]["LevelPassCount"] = passCount
elif type == "checked":
if passCount >= 1:
dict[asset_id]["LevelPassCount"] = passCount
else:
dict.pop(asset_id)
elif type == "unchecked":
if passCount <= 0:
dict[asset_id]["LevelPassCount"] = passCount
else:
dict.pop(asset_id)
for item in dict:
datas_list.append(dict[item])
config_json_dict[gameplay] = datas_list
print(config_json_dict)
return config_json_dict
def set_pass_count_and_state(self, platform, filename, diff_num, state, operator):
"""设置验收通过次数"""
helper = FirebaseHelper()
mainPlayType = MainPlayType()
gameplay_type = ""
try:
for type in mainPlayType.all_main_play_type_list:
if type in filename:
gameplay_type = type
break
if gameplay_type == "":
raise Exception("gameplay_type is null")
except Exception as e:
print(e)
blob = helper.get_file(f"Bundles/{platform}/Level/{gameplay_type}/{filename}/PackageManifest_{filename}.version")
pass_count = 0
operator_count = 0
if config.meta_pass_count in blob.metadata:
try:
pass_count = int(blob.metadata[config.meta_pass_count])
except Exception as e:
print(e)
if operator in blob.metadata:
try:
operator_count = int(blob.metadata[operator])
except Exception as e:
print(e)
pass_count = pass_count + diff_num
operator_count = operator_count + diff_num
metadata = blob.metadata
metadata[config.meta_pass_count] = pass_count
metadata[operator] = operator_count
print(f'设置data {state}')
metadata[config.level_state] = state
blob.metadata = metadata
blob.patch()
return pass_count
def check_argument(self):
argument_dic = {}
# 动作
argument_dic["action"] = self.get_argument("action", "")
# 平台
argument_dic["platform"] = self.get_argument("platform", "")
# 类型
argument_dic["op_type"] = self.get_argument("op_type", "")
# 需要操作的文件
argument_dic["filename"] = self.get_argument("filename", "")
# 与filename参数对应的generation
argument_dic["generation"] = self.get_argument("generation", "")
# 验收标记
argument_dic["pass_count_diff_num"] = self.get_argument("pass_count_diff_num", "")
# 验收设置关卡状态
argument_dic["level_state"] = self.get_argument("level_state", "")
# 验收标记的姓名
argument_dic["operator"] = self.get_argument("operator", "")
print(f'check_argument {argument_dic}')
return argument_dic
# 定义get方法对HTTP的GET请求做出响应
def get(self):
result = {
"err_code": 0,
"msg": "",
"data": {}
}
print(f'self.request.arguments {self.request.arguments}')
argument_dic = self.check_argument()
if argument_dic["platform"] == "Android" or argument_dic["platform"] == "iOS":
# 获取验收关卡列表
if argument_dic["action"] == "get_list":
"""获取整个列表信息"""
dict = self.get_all_test_level_config(argument_dic["platform"], argument_dic["op_type"])
print(f"get_all_test_level_config len = {len(dict)}")
result['data'] = dict
else:
result["err_code"] = 2
result["msg"] = f"NOT FOUND ACT[{argument_dic['act']}]"
else:
result["err_code"] = 1
result["msg"] = f"NOT FOUND PLATFORM[{argument_dic['platform']}]"
result["data"] = json.dumps(result["data"])
# write方法将字符串写入HTTP响应
self.write(json_encode(result))
def post(self):
result = {
'status': True,
'msg': '',
'data': {}
}
print(f'self.request.arguments {self.request.arguments}')
argument_dic = self.check_argument()
if argument_dic["operator"] == "":
result["err_code"] = 2
result["msg"] = "Invalid Operator"
elif argument_dic["platform"] == "Android" or argument_dic["platform"] == "iOS":
# 设置验收标记
if argument_dic["action"] == "set_pass_count":
pass_count = self.set_pass_count_and_state(argument_dic["platform"], argument_dic["filename"],
int(argument_dic["pass_count_diff_num"]),
argument_dic["level_state"], argument_dic["operator"])
result["data"][config.meta_pass_count] = pass_count
else:
result["err_code"] = 2
result["msg"] = f"NOT FOUND ACT[{argument_dic['action']}]"
else:
result["err_code"] = 1
result["msg"] = "Error Platform"
result["data"] = json.dumps(result["data"])
self.write(json_encode(result))
# 应用运行入口,解析命令行参数
# if __name__ == "__main__":
# handler = FirebaseStorageHandler()
# # config = handler.get_all_test_level_config("Android","all")
# print(handler.parse_level_state(0))