248 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			248 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
#!/usr/bin/python
 | 
						|
# coding:utf-8
 | 
						|
 | 
						|
import sys
 | 
						|
import os
 | 
						|
 | 
						|
curr_dir = os.path.split(os.path.abspath(__file__))[0]
 | 
						|
sys.path.append(os.path.join(curr_dir, "../../"))
 | 
						|
 | 
						|
import json
 | 
						|
import datetime
 | 
						|
import config as config
 | 
						|
from tornado import gen
 | 
						|
from tornado.escape import json_decode, json_encode, utf8
 | 
						|
from server.handler.base import BaseHandler
 | 
						|
from firebase.firebase_helper import FirebaseHelper
 | 
						|
from google_drive.google_sheet import GoogleSheetHelper
 | 
						|
from config_convert.game_play_type import MainPlayType
 | 
						|
 | 
						|
class FirebaseStorageHandler(BaseHandler):
 | 
						|
# class FirebaseStorageHandler():
 | 
						|
    def initialize(self, data):
 | 
						|
        self.data = data
 | 
						|
 | 
						|
    @gen.coroutine
 | 
						|
    def prepare(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def parse_pass_count(self, metadata):
 | 
						|
        """解析pass_count"""
 | 
						|
        pass_count = 0
 | 
						|
        if metadata != None and config.meta_pass_count in metadata:
 | 
						|
            try:
 | 
						|
                pass_count = int(metadata[config.meta_pass_count])
 | 
						|
            except:
 | 
						|
                pass
 | 
						|
 | 
						|
        return pass_count
 | 
						|
 | 
						|
    def parse_level_state(self, metadata):
 | 
						|
        level_state = 0
 | 
						|
        try:
 | 
						|
            if metadata != None and config.level_state in metadata:
 | 
						|
                level_state = int(metadata[config.level_state])
 | 
						|
        except Exception as e:
 | 
						|
            print(f'出错 {e}')
 | 
						|
            pass
 | 
						|
        return level_state
 | 
						|
 | 
						|
    def get_all_test_level_config(self, platform, type):
 | 
						|
        """
 | 
						|
            生成所有测试关卡的配置文件
 | 
						|
            mode: config_type
 | 
						|
            platform: Android or iOS
 | 
						|
            type: 类型
 | 
						|
            """
 | 
						|
        sheet_helper = GoogleSheetHelper()
 | 
						|
        firebase_helper = FirebaseHelper()
 | 
						|
        mainPlayType = MainPlayType()
 | 
						|
 | 
						|
        # region 读取表获取所有测试关卡ID
 | 
						|
        types_level_dict = {}
 | 
						|
        sheet = sheet_helper.get_sheet_table(config.sheet_all_level, f"{platform}-{config.table_all_levels}")
 | 
						|
        sheet_datas = sheet.get_all_values(major_dimension='COLUMNS')
 | 
						|
        for i, j in enumerate(sheet_datas):
 | 
						|
            if len(j) <= 0:
 | 
						|
                continue
 | 
						|
            level_type = j[0]
 | 
						|
            if level_type in mainPlayType.all_main_play_type_list and level_type not in types_level_dict:
 | 
						|
                types_level_dict[level_type] = j[1:]
 | 
						|
        print(types_level_dict)
 | 
						|
        # endregion
 | 
						|
 | 
						|
        # region 生成所有测试关卡的配置文件
 | 
						|
        config_json_dict = {}
 | 
						|
        # 遍历types_level_dict
 | 
						|
        for gameplay in types_level_dict:
 | 
						|
            level_list = types_level_dict[gameplay]
 | 
						|
            if len(level_list) <= 0:
 | 
						|
                continue
 | 
						|
 | 
						|
            datas_list = []
 | 
						|
            dict = {}
 | 
						|
            # 遍历每个gameplay下的所有关卡
 | 
						|
            level_blobs = firebase_helper.get_files_match(f"Bundles/{platform}/Level/{gameplay}/{gameplay}_", "**.version")
 | 
						|
            for level_id in level_list:
 | 
						|
                dict[level_id] = {}
 | 
						|
                dict[level_id]["LevelId"] = level_id
 | 
						|
                dict[level_id]["LevelType"] = gameplay
 | 
						|
            for level_blob in level_blobs:
 | 
						|
                filename = os.path.splitext(os.path.split(level_blob.name)[1])[0]
 | 
						|
                asset_id = filename.replace("PackageManifest_", "").replace(".version", "")
 | 
						|
                if asset_id in level_list:
 | 
						|
                    dict[asset_id]["LevelMapName"] = ""
 | 
						|
                    dict[asset_id]["LevelThumName"] = ""
 | 
						|
                    blob_create_time = datetime.datetime.fromtimestamp(level_blob.updated.timestamp())
 | 
						|
                    dict[asset_id]["LevelUpdateTime"] = blob_create_time.strftime("%m-%d %H:%M")
 | 
						|
                    dict[asset_id]["LevelSize"] = "%.2f" % (level_blob.size / 1024 / 1024)
 | 
						|
                    passCount = self.parse_pass_count(level_blob.metadata)
 | 
						|
                    level_state = self.parse_level_state(level_blob.metadata)
 | 
						|
                    if passCount >= 1 and level_state != 1:
 | 
						|
                        level_state = 2
 | 
						|
                    dict[asset_id]["LevelState"] = level_state
 | 
						|
                    if type == "all":
 | 
						|
                        dict[asset_id]["LevelPassCount"] = passCount
 | 
						|
                    elif type == "checked":
 | 
						|
                        if passCount >= 1:
 | 
						|
                            dict[asset_id]["LevelPassCount"] = passCount
 | 
						|
                        else:
 | 
						|
                            dict.pop(asset_id)
 | 
						|
                    elif type == "unchecked":
 | 
						|
                        if passCount <= 0:
 | 
						|
                            dict[asset_id]["LevelPassCount"] = passCount
 | 
						|
                        else:
 | 
						|
                            dict.pop(asset_id)
 | 
						|
            for item in dict:
 | 
						|
                datas_list.append(dict[item])
 | 
						|
            config_json_dict[gameplay] = datas_list
 | 
						|
        print(config_json_dict)
 | 
						|
        return config_json_dict
 | 
						|
 | 
						|
    def set_pass_count_and_state(self, platform, filename, diff_num, state, operator):
 | 
						|
        """设置验收通过次数"""
 | 
						|
        helper = FirebaseHelper()
 | 
						|
        mainPlayType = MainPlayType()
 | 
						|
        gameplay_type = ""
 | 
						|
        try:
 | 
						|
            for type in mainPlayType.all_main_play_type_list:
 | 
						|
                if type in filename:
 | 
						|
                    gameplay_type = type
 | 
						|
                    break
 | 
						|
            if gameplay_type == "":
 | 
						|
                raise Exception("gameplay_type is null")
 | 
						|
        except Exception as e:
 | 
						|
            print(e)
 | 
						|
 | 
						|
        blob = helper.get_file(f"Bundles/{platform}/Level/{gameplay_type}/{filename}/PackageManifest_{filename}.version")
 | 
						|
        pass_count = 0
 | 
						|
        operator_count = 0
 | 
						|
        if config.meta_pass_count in blob.metadata:
 | 
						|
            try:
 | 
						|
                pass_count = int(blob.metadata[config.meta_pass_count])
 | 
						|
            except Exception as e:
 | 
						|
                print(e)
 | 
						|
 | 
						|
        if operator in blob.metadata:
 | 
						|
            try:
 | 
						|
                operator_count = int(blob.metadata[operator])
 | 
						|
            except Exception as e:
 | 
						|
                print(e)
 | 
						|
 | 
						|
        pass_count = pass_count + diff_num
 | 
						|
        operator_count = operator_count + diff_num
 | 
						|
 | 
						|
        metadata = blob.metadata
 | 
						|
        metadata[config.meta_pass_count] = pass_count
 | 
						|
        metadata[operator] = operator_count
 | 
						|
 | 
						|
        print(f'设置data {state}')
 | 
						|
        metadata[config.level_state] = state
 | 
						|
 | 
						|
        blob.metadata = metadata
 | 
						|
        blob.patch()
 | 
						|
        return pass_count
 | 
						|
 | 
						|
    def check_argument(self):
 | 
						|
        argument_dic = {}
 | 
						|
        # 动作
 | 
						|
        argument_dic["action"] = self.get_argument("action", "")
 | 
						|
        # 平台
 | 
						|
        argument_dic["platform"] = self.get_argument("platform", "")
 | 
						|
        # 类型
 | 
						|
        argument_dic["op_type"] = self.get_argument("op_type", "")
 | 
						|
        # 需要操作的文件
 | 
						|
        argument_dic["filename"] = self.get_argument("filename", "")
 | 
						|
        # 与filename参数对应的generation
 | 
						|
        argument_dic["generation"] = self.get_argument("generation", "")
 | 
						|
        # 验收标记
 | 
						|
        argument_dic["pass_count_diff_num"] = self.get_argument("pass_count_diff_num", "")
 | 
						|
        # 验收设置关卡状态
 | 
						|
        argument_dic["level_state"] = self.get_argument("level_state", "")
 | 
						|
        # 验收标记的姓名
 | 
						|
        argument_dic["operator"] = self.get_argument("operator", "")
 | 
						|
        print(f'check_argument {argument_dic}')
 | 
						|
        return argument_dic
 | 
						|
 | 
						|
    # 定义get方法对HTTP的GET请求做出响应
 | 
						|
    def get(self):
 | 
						|
        result = {
 | 
						|
            "err_code": 0,
 | 
						|
            "msg": "",
 | 
						|
            "data": {}
 | 
						|
        }
 | 
						|
 | 
						|
        print(f'self.request.arguments {self.request.arguments}')
 | 
						|
        argument_dic = self.check_argument()
 | 
						|
        if argument_dic["platform"] == "Android" or argument_dic["platform"] == "iOS":
 | 
						|
            # 获取验收关卡列表
 | 
						|
            if argument_dic["action"] == "get_list":
 | 
						|
                """获取整个列表信息"""
 | 
						|
                dict = self.get_all_test_level_config(argument_dic["platform"], argument_dic["op_type"])
 | 
						|
                print(f"get_all_test_level_config len = {len(dict)}")
 | 
						|
                result['data'] = dict
 | 
						|
            else:
 | 
						|
                result["err_code"] = 2
 | 
						|
                result["msg"] = f"NOT FOUND ACT[{argument_dic['act']}]"
 | 
						|
        else:
 | 
						|
            result["err_code"] = 1
 | 
						|
            result["msg"] = f"NOT FOUND PLATFORM[{argument_dic['platform']}]"
 | 
						|
 | 
						|
        result["data"] = json.dumps(result["data"])
 | 
						|
        # write方法将字符串写入HTTP响应
 | 
						|
        self.write(json_encode(result))
 | 
						|
 | 
						|
    def post(self):
 | 
						|
        result = {
 | 
						|
            'status': True,
 | 
						|
            'msg': '',
 | 
						|
            'data': {}
 | 
						|
        }
 | 
						|
 | 
						|
        print(f'self.request.arguments {self.request.arguments}')
 | 
						|
        argument_dic = self.check_argument()
 | 
						|
        if argument_dic["operator"] == "":
 | 
						|
            result["err_code"] = 2
 | 
						|
            result["msg"] = "Invalid Operator"
 | 
						|
        elif argument_dic["platform"] == "Android" or argument_dic["platform"] == "iOS":
 | 
						|
            # 设置验收标记
 | 
						|
            if argument_dic["action"] == "set_pass_count":
 | 
						|
                pass_count = self.set_pass_count_and_state(argument_dic["platform"], argument_dic["filename"],
 | 
						|
                                                           int(argument_dic["pass_count_diff_num"]),
 | 
						|
                                                           argument_dic["level_state"], argument_dic["operator"])
 | 
						|
                result["data"][config.meta_pass_count] = pass_count
 | 
						|
            else:
 | 
						|
                result["err_code"] = 2
 | 
						|
                result["msg"] = f"NOT FOUND ACT[{argument_dic['action']}]"
 | 
						|
        else:
 | 
						|
            result["err_code"] = 1
 | 
						|
            result["msg"] = "Error Platform"
 | 
						|
 | 
						|
        result["data"] = json.dumps(result["data"])
 | 
						|
        self.write(json_encode(result))
 | 
						|
 | 
						|
# 应用运行入口,解析命令行参数
 | 
						|
# if __name__ == "__main__":
 | 
						|
#     handler = FirebaseStorageHandler()
 | 
						|
#     # config =  handler.get_all_test_level_config("Android","all")
 | 
						|
#     print(handler.parse_level_state(0)) |