// // Manager.swift // GuruAnalytics_iOS // // Created by 袁仕崇 on 16/11/22. // import Foundation import RxCocoa import RxSwift internal class Manager { // MARK: - temporary, will be removed soon @available(*, deprecated, message: "used for debug, will be removed on any future released versions") private var loggedEventsCount: Int = 0 @available(*, deprecated, message: "used for debug, will be removed on any future released versions") private func accumulateLoggedEventsCount(_ count: Int) { loggedEventsCount += count } @available(*, deprecated, message: "used for debug, will be removed on any future released versions") private var uploadedEventsCount: Int = 0 @available(*, deprecated, message: "used for debug, will be removed on any future released versions") private func accumulateUploadedEventsCount(_ count: Int) { uploadedEventsCount += count } @available(*, deprecated, message: "used for debug, will be removed on any future released versions") internal func debug_eventsStatistics(_ callback: @escaping (_ uploadedEventsCount: Int, _ loggedEventsCount: Int) -> Void) { callback(uploadedEventsCount, loggedEventsCount) } // MARK: - internal members internal static let shared = Manager() /// 时间维度,默认每1分钟后批量上传1次 private var scheduleInterval: TimeInterval = GuruAnalytics.uploadPeriodInSecond /// 数量维度,默认满25条批量上传1次 private var numberOfCountPerConsume: Int = GuruAnalytics.batchLimit /// event过期时间,默认7天 private var eventExpiredIntervel: TimeInterval = GuruAnalytics.eventExpiredSeconds private var initializeTimeout: Double = GuruAnalytics.initializeTimeout /// 根据时差计算的当前服务端时间 internal var serverNowMs: Int64 { serverInitialMs + (Date.absoluteTimeMs - serverSyncedAtAbsoluteMs)} // MARK: - private members private typealias PropertyName = GuruAnalytics.PropertyName private let bag = DisposeBag() private let db = Database() private let ntwkMgr = NetworkManager() /// 生成background 任务时,将 key 和当前任务的 disposeable 一一对应 private var taskKeyDisposableMap: [Int: Disposable] = [:] /// 从数据库中一次性拉取最多条数 private var maxEventFetchingCount: Int = 100 /// 工作队列 private let workQueue = DispatchQueue.init(label: "com.guru.analytics.manager.work.queue", qos: .userInitiated) ///网络服务队列 private lazy var rxNetworkScheduler = SerialDispatchQueueScheduler(qos: .default, internalSerialQueueName: "com.guru.analytics.manager.rx.network.queue") private lazy var rxConsumeScheduler = SerialDispatchQueueScheduler(qos: .default, internalSerialQueueName: "com.guru.analytics.manager.rx.consume.queue") private lazy var rxWorkScheduler = SerialDispatchQueueScheduler.init(queue: workQueue, internalSerialQueueName: "com.guru.analytics.manager.rx.work.queue") private let bgWorkQueue = DispatchQueue.init(label: "com.guru.analytics.manager.background.work.queue", qos: .background) private lazy var rxBgWorkScheduler = SerialDispatchQueueScheduler.init(queue: bgWorkQueue, internalSerialQueueName: "com.guru.analytics.manager.background.work.queue") /// 过期event记录已清除 private let outdatedEventsCleared = BehaviorSubject(value: false) /// 服务端时间 private var serverInitialMs = Date().msSince1970 { didSet { serverSyncedAtAbsoluteMs = Date.absoluteTimeMs } } private var serverSyncedAtAbsoluteMs = Date.absoluteTimeMs private let startAt = Date() /// 服务器时间已同步信号 private let _serverTimeSynced = BehaviorRelay(value: false) private var serverNowMsSingle: Single { guard _serverTimeSynced.value == false else { return .just(serverNowMs) } return _serverTimeSynced.observe(on: rxNetworkScheduler) .filter { $0 } .take(1).asSingle() .timeout(.seconds(10), scheduler: rxNetworkScheduler) .catchAndReturn(false) .map({ [weak self] _ in return self?.serverNowMs ?? 0 }) } /// 统计fg起始时间 private var fgStartAtAbsoluteMs = Date.absoluteTimeMs private var fgAccumulateTimer: Disposable? = nil /// 内存中user property 信息 private var userProperty: Observable<[String : String]> { let p = userPropertyUpdated.startWith(()).observe(on: rxWorkScheduler).flatMap { [weak self] _ -> Observable<[String : String]> in guard let `self` = self else { return .just([:]) } return .create({ subscriber in subscriber.onNext(self._userProperty) subscriber.onCompleted() // debugPrint("userProperty thread queueName: \(Thread.current.queueName)") return Disposables.create() }) } let latency = self.initializeTimeout - Date().timeIntervalSince(self.startAt) let intLatency = Int(latency) guard latency > 0 else { return p } return p.filter({ property in /// 需要等待以下userproperty已设置 /// PropertyName.deviceId /// PropertyName.uid /// PropertyName.firebaseId guard let deviceId = property[PropertyName.deviceId.rawValue], !deviceId.isEmpty, let uid = property[PropertyName.uid.rawValue], !uid.isEmpty, let firebaseId = property[PropertyName.firebaseId.rawValue], !firebaseId.isEmpty else { return false } return true }) .timeout(.milliseconds(intLatency), scheduler: rxNetworkScheduler) .catch { _ in return p } } private var _userProperty: [String : String] = [:] { didSet { userPropertyUpdated.onNext(()) } } private var userPropertyUpdated = PublishSubject() /// 同步服务器时间触发器 private let syncServerTrigger = PublishSubject() /// 轮询上传event任务 private var pollingUploadTask: Disposable? /// 重置轮询上传触发器 private let reschedulePollingTrigger = BehaviorSubject(value: ()) /// 记录events相关的logger private lazy var eventsLogger: LoggerManager = { let l = LoggerManager(logCategoryName: "eventLogs") return l }() /// 将错误上报给上层的 private typealias InternalEventReporter = ((_ eventCode: Int, _ info: String) -> Void) private var internalEventReporter: InternalEventReporter? private init() { // first open logFirstOpenIfNeeded() // 监听事件 setupOberving() // 检查旧数据 clearOutdatedEventsIfNeeded() // 设置轮询上传任务 setupPollingUpload() // 先打一个fg logFirstFgEvent() ntwkMgr.networkErrorReporter = self } } // MARK: - internal functions internal extension Manager { func logEvent(_ eventName: String, parameters: [String : Any]?, priority: Entity.EventRecord.Priority = .DEFAULT) { _ = _logEvent(eventName, parameters: parameters, priority: priority) .subscribe() .disposed(by: bag) } func setUserProperty(_ value: String, forName name: String) { eventsLogger.verbose(#function + "name: \(name) value: \(value)") workQueue.async { [weak self] in self?._userProperty[name] = value } } func removeUserProperties(forNames names: [String]) { eventsLogger.verbose(#function + "names: \(names)") workQueue.async { [weak self] in guard let `self` = self else { return } var temp = self._userProperty for name in names { temp.removeValue(forKey: name) } self._userProperty = temp } } func setScreen(_ name: String) { setUserProperty(name, forName: PropertyName.screen.rawValue) } private func constructEvent(_ eventName: String, parameters: [String : Any]?, timestamp: Int64, priority: Entity.EventRecord.Priority) -> Single { return userProperty.take(1).observe(on: rxWorkScheduler).asSingle().flatMap { p in .create { subscriber in do { debugPrint("userProperty thread queueName: \(Thread.current.queueName) count: \(p.count)") var userProperty = p var eventParam = parameters ?? [:] // append screen if let screen = userProperty.removeValue(forKey: PropertyName.screen.rawValue) { eventParam[PropertyName.screen.rawValue] = screen } let userInfo = Entity.UserInfo( uid: userProperty.removeValue(forKey: PropertyName.uid.rawValue), deviceId: userProperty.removeValue(forKey: PropertyName.deviceId.rawValue), adjustId: userProperty.removeValue(forKey: PropertyName.adjustId.rawValue), adId: userProperty.removeValue(forKey: PropertyName.adId.rawValue), firebaseId: userProperty.removeValue(forKey: PropertyName.firebaseId.rawValue) ) let event = try Entity.Event(timestamp: timestamp, event: eventName, userInfo: userInfo, parameters: eventParam, properties: userProperty) let eventRecord = Entity.EventRecord(eventName: event.event, event: event, priority: priority) subscriber(.success(eventRecord)) } catch { subscriber(.failure(error)) } return Disposables.create() } } } func eventsLogsArchive(_ callback: @escaping (URL?) -> Void) { eventsLogger.logFilesZipArchive() .subscribe(onSuccess: { url in callback(url) }, onFailure: { error in callback(nil) cdPrint("events logs archive error: \(error)") }) .disposed(by: bag) } func eventsLogsDirURL(_ callback: @escaping (URL?) -> Void) { eventsLogger.logFilesDirURL() .subscribe(onSuccess: { url in callback(url) }, onFailure: { error in callback(nil) cdPrint("events logs archive error: \(error)") }) .disposed(by: bag) } func registerInternalEventObserver(reportCallback: @escaping (_ eventCode: Int, _ info: String) -> Void) { self.internalEventReporter = reportCallback } func getUserProperties() -> [String : String] { return _userProperty } } // MARK: - private functions private extension Manager { func setupOberving() { syncServerTrigger .debounce(.seconds(1), scheduler: rxConsumeScheduler) .subscribe(onNext: { [weak self] _ in self?.syncServerTime() }) .disposed(by: bag) var activeNoti = NotificationCenter.default.rx.notification(UIApplication.didBecomeActiveNotification) if UIApplication.shared.applicationState == .active { activeNoti = activeNoti.startWith(.init(name: UIApplication.didBecomeActiveNotification)) } activeNoti .subscribe(onNext: { [weak self] _ in self?.syncServerTrigger.onNext(()) // fg计时器 self?.setupFgAccumulateTimer() }) .disposed(by: bag) NotificationCenter.default.rx.notification(UIApplication.didEnterBackgroundNotification) .subscribe(onNext: { [weak self] _ in guard let `self` = self else { return } //这里log fg和上传events任务并行关系改为前后依赖关系 _ = self.logForegroundDuration() .catchAndReturn(()) .map { self.consumeEvents() } .subscribe() self._serverTimeSynced.accept(false) self.invalidFgAccumulateTimer() }) .disposed(by: bag) } func syncServerTime() { //有网时立即同步,无网时等待有网后同步 ntwkMgr.reachableObservable.filter { $0 }.map { _ in }.take(1).asSingle() .flatMap { [weak self] _ -> Single in guard let `self` = self else { return Observable.empty().asSingle()} return self.ntwkMgr.syncServerTime() } .observe(on: rxNetworkScheduler) .subscribe(onSuccess: { [weak self] ms in self?.serverInitialMs = ms self?._serverTimeSynced.accept(true) }) .disposed(by: bag) } func logForegroundDuration() -> Single { return _logEvent(GuruAnalytics.fgEvent.name, parameters: [GuruAnalytics.fgEvent.paramKeyType.duration.rawValue : fgDurationMs()]) .observe(on: MainScheduler.asyncInstance) .do(onSuccess: { _ in UserDefaults.fgAccumulatedDuration = 0 }) } func clearOutdatedEventsIfNeeded() { /// 1. 删除过期的数据 serverNowMsSingle .flatMap({ [weak self] serverNowMs -> Single in guard let `self` = self else { return .just(()) } let earlierThan: Int64 = serverNowMs - self.eventExpiredIntervel.int64Ms return self.db.removeOutdatedEventRecords(earlierThan: earlierThan) }) .catch({ error in cdPrint("remove outdated records error: \(error)") return .just(()) }) .subscribe(onSuccess: { [weak self] _ in self?.outdatedEventsCleared.onNext(true) }) .disposed(by: bag) } func logFirstOpenIfNeeded() { if let t = UserDefaults.defaults?.value(forKey: UserDefaults.firstOpenTimeKey), let firstOpenTimeMs = t as? Int64 { setUserProperty("\(firstOpenTimeMs)", forName: PropertyName.firstOpenTime.rawValue) } else { /// log first open event logEvent(GuruAnalytics.firstOpenEvent.name, parameters: nil, priority: .EMERGENCE) /// save first open time /// set to userProperty let firstOpenAt = Date() let saveFirstOpenTime = { [weak self] (ms: Int64) -> Void in UserDefaults.defaults?.set(ms, forKey: UserDefaults.firstOpenTimeKey) self?.setUserProperty("\(ms)", forName: PropertyName.firstOpenTime.rawValue) } serverNowMsSingle .subscribe(onSuccess: { _ in let latency = Date().timeIntervalSince(firstOpenAt) let adjustedFirstOpenTimeMs = self.serverInitialMs - latency.int64Ms saveFirstOpenTime(adjustedFirstOpenTimeMs) }, onFailure: { error in cdPrint("waiting for server time syncing error: \(error)") saveFirstOpenTime(firstOpenAt.timeIntervalSince1970.int64Ms) }) .disposed(by: bag) } } func _logEvent(_ eventName: String, parameters: [String : Any]?, priority: Entity.EventRecord.Priority = .DEFAULT) -> Single { eventsLogger.verbose(#function + " eventName: \(eventName)" + " params: \(parameters?.jsonString() ?? "")") return { [weak self] () -> Single in guard let `self` = self else { return Observable.empty().asSingle() } return self.serverNowMsSingle .flatMap { self.constructEvent(eventName, parameters: parameters, timestamp: $0, priority: priority) } .flatMap { self.db.addEventRecords($0) } .do(onSuccess: { _ in self.accumulateLoggedEventsCount(1) self.eventsLogger.verbose("log event success") }, onError: { error in self.eventsLogger.error("log event error: \(error)") }) }() } } // MARK: - 轮询上传相关 private extension Manager { typealias TaskCallback = (() -> Void) typealias Task = ((@escaping TaskCallback, Int) -> Void) func performBackgroundTask(task: @escaping Task) -> Single { return Single.create { [weak self] subscriber in var backgroundTaskID: UIBackgroundTaskIdentifier? let stopTaskHandler = { ///结束任务时需要找到对应的 dispose 取消当前任务 guard let taskId = backgroundTaskID, let disposable = self?.taskKeyDisposableMap[taskId.rawValue] else { return } cdPrint("[performBackgroundTask] performBackgroundTask expired: \(backgroundTaskID?.rawValue ?? -1)") disposable.dispose() } // Request the task assertion and save the ID. backgroundTaskID = UIApplication.shared.beginBackgroundTask (withName: "com.guru.analytics.manager.background.task", expirationHandler: { // End the task if time expires. self?.eventsLogger.verbose("performBackgroundTask expirationHandler: \(backgroundTaskID?.rawValue ?? -1)") stopTaskHandler() }) self?.eventsLogger.verbose("performBackgroundTask start: \(backgroundTaskID?.rawValue ?? -1)") if let taskID = backgroundTaskID { task({ self?.eventsLogger.verbose("performBackgroundTask finish: \(taskID.rawValue)") subscriber(.success(())) }, taskID.rawValue) } return Disposables.create { if var taskID = backgroundTaskID { self?.eventsLogger.verbose("performBackgroundTask dispose: \(taskID.rawValue)") UIApplication.shared.endBackgroundTask(taskID) taskID = .invalid backgroundTaskID = nil } } } .subscribe(on: rxBgWorkScheduler) } /// 上传数据库中的event func consumeEvents() { guard GuruAnalytics.enableUpload else { return } self.eventsLogger.verbose("consumeEvents start") performBackgroundTask { [weak self] callback, taskId in guard let `self` = self else { return } cdPrint("consumeEvents start background task") // 等待清理过期记录完成 let disposable = outdatedEventsCleared .filter { $0 } .take(1) .observe(on: rxBgWorkScheduler) .asSingle() .flatMap { _ -> Single<[Entity.EventRecord]> in self.eventsLogger.verbose("consumeEvents fetchEventRecordsToUpload") ///step1: 拉取数据库记录 return self.db.fetchEventRecordsToUpload(limit: self.maxEventFetchingCount) } .map { records -> [[Entity.EventRecord]] in /// step2: 将event数组分割成若干批次,numberOfCountPerConsume个一批 /// self.eventsLogger.verbose("consumeEvents fetchEventRecordsToUpload") self.eventsLogger.verbose("consumeEvents fetchEventRecordsToUpload result: \(records.count)") return records.chunked(into: self.numberOfCountPerConsume) } .flatMap({ batches -> Single<[[Entity.EventRecord]]> in guard batches.count > 0 else { return .just([]) } /// 监听网络信号 return self.ntwkMgr.reachableObservable.filter { $0 } .take(1).asSingle() .map { _ in batches } }) .map { batches -> [Single<[String]>] in /// step3: 转为批次上传任务 self.eventsLogger.verbose("consumeEvents uploadEvents") return batches.map { records in return self.ntwkMgr.uploadEvents(records) .do(onSuccess: { t in self.eventsLogger.verbose("consumeEvents upload events succeed: \(t.eventsJson)") }) .catch({ error in self.eventsLogger.error("consumeEvents upload events error: \(error)") // 上传失败,移除对应的缓存ID let recordIds = records.map { $0.recordId } return self.db.resetTransitionStatus(for: recordIds) .map { _ in ([], "") } }) .map { $0.recordIDs } } } .flatMap { uploadBatches -> Single<[String]> in guard uploadBatches.count > 0 else { return .just([]) } /// 合并上传结果 return Observable.from(uploadBatches) .merge() .toArray().map { batches -> [String] in batches.flatMap { $0 } } } .flatMap { recordIDs -> Single in self.accumulateUploadedEventsCount(recordIDs.count) /// step4: 删除数据库中对应记录 return self.db.deleteEventRecords(recordIDs) .catch { error in cdPrint("consumeEvents delete events from DB error: \(error)") return .just(()) } } .observe(on: self.rxBgWorkScheduler) .subscribe(onFailure: { error in cdPrint("consumeEvents error: \(error)") }, onDisposed: { [weak self] in self?.taskKeyDisposableMap.removeValue(forKey: taskId) cdPrint("consumeEvents onDisposed") callback() }) taskKeyDisposableMap[taskId] = disposable } .subscribe() .disposed(by: bag) } func startPollingUpload() { pollingUploadTask?.dispose() pollingUploadTask = nil // 每scheduleInterval时间间隔启动一次,立即启动一次 let timer = Observable.timer(.seconds(0), period: .milliseconds(Int(scheduleInterval.int64Ms)), scheduler: rxConsumeScheduler) .do(onNext: { _ in cdPrint("consumeEvents timer") }) // 每满numberOfCountPerConsume个数启动一次,立即启动一次 let counter = db.uploadableEventRecordCountOb() .distinctUntilChanged() .compactMap({ [weak self] count -> Int? in cdPrint("consumeEvents uploadableEventRecordCountOb count: \(count) numberOfCountPerConsume: \(self?.numberOfCountPerConsume)") guard let `self` = self, count >= self.numberOfCountPerConsume else { return nil } return count }) .map { _ in } .startWith(()) pollingUploadTask = Observable.combineLatest(timer, counter) .throttle(.seconds(1), scheduler: rxConsumeScheduler) .flatMap({ [weak self] t -> Single<(Int, Void)> in guard let `self` = self else { return .just(t) } return Observable.combineLatest(self.db.hasFgEventRecord().asObservable(), self.db.uploadableEventRecordCount().asObservable()) .take(1).asSingle() .flatMap({ (hasFgEventInDb, eventsCount) -> Single<(Int, Void)> in guard !hasFgEventInDb, eventsCount > 0 else { return .just(t) } return self.logForegroundDuration().catchAndReturn(()).map({ _ in t }) }) }) .subscribe(onNext: { [weak self] (timer, counter) in self?.consumeEvents() }) } func setupPollingUpload() { reschedulePollingTrigger .debounce(.seconds(1), scheduler: rxConsumeScheduler) .subscribe(onNext: { [weak self] _ in self?.startPollingUpload() }) .disposed(by: bag) } func logFirstFgEvent() { _ = Single.just(()).delay(.milliseconds(500), scheduler: MainScheduler.asyncInstance) .flatMap({ [weak self] _ in self?.logForegroundDuration() ?? .just(()) }) .subscribe() } } // MARK: - fg相关 private extension Manager { func setupFgAccumulateTimer() { invalidFgAccumulateTimer() fgStartAtAbsoluteMs = Date.absoluteTimeMs fgAccumulateTimer = Observable.timer(.seconds(0), period: .seconds(1), scheduler: MainScheduler.asyncInstance) .subscribe(onNext: { [weak self] _ in guard let `self` = self else { return } UserDefaults.fgAccumulatedDuration = self.fgDurationMs() }, onDisposed: { cdPrint("fg accumulate timer disposed") }) } func invalidFgAccumulateTimer() { fgAccumulateTimer?.dispose() fgAccumulateTimer = nil } /// 前台停留时长 func fgDurationMs() -> Int64 { let slice = Date.absoluteTimeMs - fgStartAtAbsoluteMs fgStartAtAbsoluteMs = Date.absoluteTimeMs // cdPrint("accumulate fg duration: \(slice)") let totalDuration = UserDefaults.fgAccumulatedDuration + slice // cdPrint("total fg duration: \(totalDuration)") return totalDuration } } extension Manager: GuruAnalyticsNetworkErrorReportDelegate { func reportError(networkError: GuruAnalyticsNetworkError) { enum UserInfoKey: String, Encodable { case httpCode = "h_c" case errorCode = "e_c" case url, msg } let errorCode = networkError.internalErrorCategory.rawValue let userInfo = (networkError.originError as NSError).userInfo var info: [UserInfoKey : String] = [ .url : (userInfo[NSURLErrorFailingURLStringErrorKey] as? String) ?? "", .msg : networkError.originError.localizedDescription, ] if let httpCode = networkError.httpStatusCode { info[.httpCode] = "\(httpCode)" } else { info[.errorCode] = "\((networkError.originError as NSError).code)" } info = info.compactMapValues { $0.isEmpty ? nil : $0 } let jsonString = info.asString ?? "" DispatchQueue.main.async { [weak self] in self?.internalEventReporter?(errorCode, jsonString) } } }