config-generator/firebase_tools/firebase_helper.py

446 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# coding:utf-8
import io
import json
import os
import sys
import google.cloud.storage
import requests
curr_dir = os.path.split(os.path.abspath(__file__))[0]
print(curr_dir)
sys.path.append(os.path.join(curr_dir, "../"))
import utils as utils
import firebase_admin
from firebase_admin import credentials, db
from firebase_admin import storage, firestore
from oauth2client.service_account import ServiceAccountCredentials
DOF_GOOGLE_SERVER_FILE = os.path.join(curr_dir, "find-differences-65e47.json")
D2_GOOGLE_SERVER_FILE = os.path.join(curr_dir, "dof2-b9070.json")
FindOut_GOOGLE_SERVER_FILE = os.path.join(curr_dir, "dof2-b9070.json")
FindMaster_GOOGLE_SERVER_FILE = os.path.join(curr_dir, "find-master-387702.json")
FindIt_GOOGLE_SERVER_FILE = os.path.join(curr_dir, "find-it-a08e5.json")
DOF_PROJECT_ID = "find-differences-65e47"
D2_PROJECT_ID = "dof2-b9070"
FindOut_PROJECT_ID = "dof2-b9070"
FindMaster_PROJECT_ID = "find-master-88ffb"
FindIt_PROJECT_ID = "find-it-a08e5"
BASE_URL = "https://firebaseremoteconfig.googleapis.com"
def get_remote_config_file(project_id):
return os.path.join(curr_dir, f"remote_config_{project_id}.json")
def get_remote_config_url(project_id):
return f"{BASE_URL}/v1/projects/{project_id}/remoteConfig"
class Singleton:
_instances = {}
def __new__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__new__(cls)
return cls._instances[cls]
class FirebaseHelperInstance(Singleton):
def __init__(self):
print("初始化")
self.firebase_dof = None
self.firebase_d2 = None
self.firebase_find_out = None
self.firebase_find_master = None
self.firebase_find_it = None
def get_firebase_dof(self):
if self.firebase_dof is None:
self.firebase_dof = FirebaseHelper(DOF_PROJECT_ID, DOF_GOOGLE_SERVER_FILE)
return self.firebase_dof
def get_firebase_d2(self):
if self.firebase_d2 is None:
self.firebase_d2 = FirebaseHelper(D2_PROJECT_ID, D2_GOOGLE_SERVER_FILE)
return self.firebase_d2
def get_firebase_find_out(self):
if self.firebase_find_out is None:
self.firebase_find_out = FirebaseHelper(FindOut_PROJECT_ID, FindOut_GOOGLE_SERVER_FILE)
return self.firebase_find_out
def get_firebase_find_master(self):
if self.firebase_find_master is None:
self.firebase_find_master = FirebaseHelper(FindMaster_PROJECT_ID, FindMaster_GOOGLE_SERVER_FILE)
return self.firebase_find_master
def get_firebase_find_it(self):
if self.firebase_find_it is None:
self.firebase_find_it = FirebaseHelper(FindIt_PROJECT_ID, FindIt_GOOGLE_SERVER_FILE)
return self.firebase_find_it
class FirebaseHelper:
def __init__(self, project_id, google_service_file):
print(f'init--{project_id}')
self.firebase_app = None
self.storage_instance = None
self.config = {}
self.config["project_id"] = project_id
self.GOOGLE_SERVER_FILE = google_service_file
self.REMOTE_CONFIG_FILE = get_remote_config_file(project_id)
self.REMOTE_CONFIG_URL = get_remote_config_url(project_id)
self.init()
def init(self):
self.init_firebase(self.config["project_id"])
self.init_storage(self.config["project_id"])
def init_firebase(self, project_id):
storage_bucket = "gs://" + project_id + ".appspot.com"
databaseURL = "https://" + project_id + ".firebaseio.com/"
cred = credentials.Certificate(self.GOOGLE_SERVER_FILE)
self.firebase_app = firebase_admin.initialize_app(cred, {
"databaseURL": databaseURL,
"storageBucket": storage_bucket
}, name=project_id)
self.refFirestore = firestore.client(app=self.firebase_app)
print("初始 firebase 成功")
def init_storage(self, project_id):
bucket = project_id + ".appspot.com"
if self.firebase_app:
self.storage_instance = storage.bucket(name=bucket, app=self.firebase_app)
print("初始 storage 成功")
def get_files(self, prefix=""):
print("prefix = " + prefix)
blobs = self.storage_instance.list_blobs(prefix=prefix)
return blobs
def get_files_match(self, prefix, match_glob):
blobs = self.storage_instance.list_blobs(prefix=prefix, match_glob=match_glob)
return blobs
def get_files_all_versions(self, prefix=""):
print("prefix = " + prefix)
blobs = self.storage_instance.list_blobs(prefix=prefix, versions=True)
return blobs
def get_file(self, storage_file, generation=None):
blob = self.storage_instance.get_blob(storage_file, generation=generation)
return blob
def get_file_generation(self, storage_file):
blob = self.storage_instance.get_blob(storage_file)
if blob is None:
return None
else:
return blob.generation
# 上传单个文件会比较hash和md5码
def upload_single_bundle(self, local_file, storage_file, blob_dic, ext_meta={}, try_count=0):
try_count = try_count + 1
if try_count > 3:
return False
try:
blob = None
if storage_file in blob_dic:
blob = blob_dic[storage_file]
is_same = False
if blob is not None:
local_hash = utils.calc_hash(local_file)
local_md5 = utils.calc_md5(local_file)
storage_hash = ""
if blob.metadata is not None and 'hash' in blob.metadata:
storage_hash = blob.metadata['hash']
storage_md5 = ""
if blob.metadata is not None and 'md5' in blob.metadata:
storage_md5 = blob.metadata['md5']
is_same = storage_hash == local_hash and storage_md5 == local_md5
if not is_same:
print(f"{local_file} 上传中..")
self.upload_file(local_file, storage_file, ext_meta)
else:
print(f"{local_file} hash和md5码和storage文件一致不重复上传..")
return True
except Exception as e:
print(local_file + " 上传失败,尝试重试,错误信息:" + repr(e))
self.upload_single_bundle(local_file, storage_file, blob_dic, ext_meta, try_count)
def upload_thumbnail_image(self, local_file, storage_file, ext_meta={}, try_count=0):
try_count = try_count + 1
if try_count > 3:
return False
try:
self.upload_file(local_file, storage_file, ext_meta)
return True
except Exception as e:
print(local_file + " 上传失败,尝试重试,错误信息:" + repr(e))
self.upload_thumbnail_image(local_file, storage_file, ext_meta, try_count)
def upload_file(self, local_file, storage_file, ext_meta={}):
try:
upload_blob = self.storage_instance.blob(storage_file)
meta = {
'hash': utils.calc_hash(local_file),
'md5': utils.calc_md5(local_file),
}
meta.update(ext_meta)
upload_blob.metadata = meta
if local_file.endswith(".json"):
utils.gzip_file(local_file)
upload_blob.content_encoding = "gzip"
upload_blob.upload_from_filename(local_file + '.gz')
os.unlink(local_file + '.gz')
else:
upload_blob.upload_from_filename(local_file)
blob = self.storage_instance.get_blob(storage_file)
print(local_file + " 上传成功 generation = {}".format(blob.generation))
return blob.generation
except Exception as e:
raise Exception(e)
def update_metadata(self, storage_file, ext_meta={}):
try:
blob = self.storage_instance.get_blob(storage_file)
if blob is not None:
meta = blob.metadata
meta.update(ext_meta)
blob.metadata = meta
blob.patch()
print(storage_file + " 更新metadata成功")
else:
print(storage_file + " 文件不存在")
except Exception as e:
raise Exception(e)
# region RemoteConfig更新
def get_access_token(self):
file_value = ""
with open(self.GOOGLE_SERVER_FILE, "r") as f:
file_value = json.load(f)
credentials = ServiceAccountCredentials.from_json_keyfile_dict(file_value, [
"https://www.googleapis.com/auth/firebase.remoteconfig"])
access_token_info = credentials.get_access_token()
return access_token_info.access_token
def get_remote_value(self):
"""
获取RemoteConfig配置并写入到remote_config.json中
:return: ETag
"""
try:
headers = {
"Authorization": "Bearer " + self.get_access_token()
}
resp = requests.get(self.REMOTE_CONFIG_URL, headers=headers)
if resp.status_code == 200:
with io.open(self.REMOTE_CONFIG_FILE, "wb") as f:
f.write(resp.text.encode("utf-8"))
print("remote config 写入完成: remote_config.json")
print("ETag from server: {}".format(resp.headers["ETag"]))
return resp.headers["ETag"]
else:
print("remote_config.json上传失败")
print(resp.text)
return None
except Exception as e:
print("获取 RemoteConfig值失败 " + repr(e))
raise Exception("Fail")
def upload_remote_config_value(self, etag):
"""
上传 remote_config.json 文件至firebase后台
:param: etag
"""
try:
with open(self.REMOTE_CONFIG_FILE, "r", encoding="utf-8") as f:
content = f.read()
print("开始上传 remote config:>" + content + "<")
headers = {
"Authorization": "Bearer " + self.get_access_token(),
"Content-Type": "application/json; UTF-8",
"If-Match": etag
}
resp = requests.put(self.REMOTE_CONFIG_URL, data=content.encode("utf-8"), headers=headers)
if resp.status_code == 200:
print("推送成功")
print("ETag from server: {}".format(resp.headers["ETag"]))
return True
else:
print("推送失败")
print(resp.text)
return False
except Exception as e:
print("更新 RemoteConfig值失败 " + repr(e))
return False
def check_or_create_string_field(self, json_dict, keys, value):
arr = keys.split("/")
field = json_dict
index = 0
for item in arr:
if index == len(arr) - 1:
print("item = " + item)
field[item] = value
print(field[item])
else:
field = field[item]
index = index + 1
def check_or_create_json_value(self, json_dict, keys, value):
arr = keys.split("/")
field = json_dict
index = 0
for item in arr:
if index == len(arr) - 1:
print("item = " + item)
field[item] = json.dumps(value)
print(field[item])
else:
field = field[item]
index = index + 1
def check_or_create_json_field_value(self, json_dict, keys, sub_key, sub_value):
arr = keys.split("/")
field = json_dict
index = 0
for item in arr:
if index == len(arr) - 1:
print("item = " + item)
value = json.loads(field[item])
value[sub_key] = sub_value
field[item] = json.dumps(value)
print(field[item])
else:
field = field[item]
index = index + 1
def update_remote_config_json_value(self, group, condition, remote_key, value, is_upload=False):
try:
etag = None
if is_upload or not os.path.exists(self.REMOTE_CONFIG_FILE):
etag = self.get_remote_value()
remote_content = ""
with open(self.REMOTE_CONFIG_FILE, "r") as f:
remote_content = f.read()
if remote_content != None and remote_content != "":
remote_content_json = json.loads(remote_content)
keys = ""
if group is not None and group != "":
keys = f"parameterGroups/{group}/parameters/"
else:
keys = f"parameters/"
if condition is not None and condition != "":
keys = keys + f"{remote_key}/conditionalValues/{condition}/value"
else:
keys = keys + f"{remote_key}/defaultValue/value"
self.check_or_create_json_value(remote_content_json, keys, value)
print("\n\n")
print(remote_content_json)
print("\n\n")
# 将online_json写入到remote_config.json文件
utils.write_json(self.REMOTE_CONFIG_FILE, remote_content_json)
if is_upload:
self.upload_remote_config_value(etag)
except Exception as e:
print(e)
raise Exception(f"[remote_config group:{group}, condition:{condition}, remote_key:{remote_key}] 更新失败")
def update_remote_config_json_field_value(self, group, condition, remote_key, json_field_key, value, is_upload=False):
try:
etag = None
if is_upload or not os.path.exists(self.REMOTE_CONFIG_FILE):
etag = self.get_remote_value()
remote_content = ""
with open(self.REMOTE_CONFIG_FILE, "r") as f:
remote_content = f.read()
if remote_content != None and remote_content != "":
remote_content_json = json.loads(remote_content)
keys = ""
if group is not None and group != "":
keys = f"parameterGroups/{group}/parameters/"
else:
keys = f"parameters/"
if condition is not None and condition != "":
keys = keys + f"{remote_key}/conditionalValues/{condition}/value"
else:
keys = keys + f"{remote_key}/defaultValue/value"
self.check_or_create_json_field_value(remote_content_json, keys, json_field_key, value)
# 将online_json写入到remote_config.json文件
utils.write_json(self.REMOTE_CONFIG_FILE, remote_content_json)
if is_upload:
self.upload_remote_config_value(etag)
except Exception as e:
print(e)
raise Exception(f"[remote_config group:{group}, condition:{condition}, remote_key:{remote_key}, json_field_key:{json_field_key}] 更新失败")
def update_remote_config_string_value(self, group, condition, remote_key, value, is_upload=False):
try:
etag = None
if is_upload or not os.path.exists(self.REMOTE_CONFIG_FILE):
etag = self.get_remote_value()
remote_content = ""
with open(self.REMOTE_CONFIG_FILE, "r") as f:
remote_content = f.read()
if remote_content != None and remote_content != "":
remote_content_json = json.loads(remote_content)
keys = ""
if group is not None and group != "":
keys = f"parameterGroups/{group}/parameters/"
else:
keys = f"parameters/"
if condition is not None and condition != "":
keys = keys + f"{remote_key}/conditionalValues/{condition}/value"
else:
keys = keys + f"{remote_key}/defaultValue/value"
self.check_or_create_string_field(remote_content_json, keys, value)
print("\n\n")
print(remote_content_json)
print("\n\n")
# 将online_json写入到remote_config.json文件
utils.write_json(self.REMOTE_CONFIG_FILE, remote_content_json)
if is_upload:
self.upload_remote_config_value(etag)
except Exception as e:
print(e)
raise Exception(f"[remote_config group:{group}, condition:{condition}, remote_key:{remote_key}] 更新失败")
def GetInfor(self, targetFile: str) -> google.cloud.storage.bucket.Bucket:
blob = self.storage_instance.get_blob(targetFile)
return blob
def GetGeneration(self, targetFile: str) -> str:
return str(self.GetInfor(targetFile).generation)