diff --git a/Sources/Database/FIRDatabaseQuery+Rx.swift b/Sources/Database/FIRDatabaseQuery+Rx.swift index 1675e19..d844a08 100644 --- a/Sources/Database/FIRDatabaseQuery+Rx.swift +++ b/Sources/Database/FIRDatabaseQuery+Rx.swift @@ -79,16 +79,15 @@ extension Reactive where Base: DatabaseQuery { * @param block The block that should be called. It is passed the data as a FIRDataSnapshot. * @param cancelBlock The block that will be called if you don't have permission to access this data */ - public func observeSingleEvent(_ eventType: DataEventType) -> Observable { - return Observable.create { observer in - self.base.observeSingleEvent(of: eventType, with: { snapshot in - observer.onNext(snapshot) - observer.onCompleted() - }, withCancel: { error in - observer.onError(error) + public func observeSingleEvent(_ eventType: DataEventType) -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.observeSingleEvent(of: eventType, with: { (snapshot) in + singleEventListener(.success(snapshot)) + }, withCancel: { (error) in + singleEventListener(.error(error)) }) return Disposables.create() - } + }) } /** @@ -101,15 +100,14 @@ extension Reactive where Base: DatabaseQuery { * @param block The block that should be called. It is passed the data as a FIRDataSnapshot and the previous child's key. * @param cancelBlock The block that will be called if you don't have permission to access this data */ - public func observeSingleEventAndPreviousSiblingKey(_ eventType: DataEventType) -> Observable { - return Observable.create { observer in - self.base.observeSingleEvent(of: eventType, andPreviousSiblingKeyWith: { snapshot, prevKey in - observer.onNext(PreviousSiblingKeyDataSnapshot(snapshot, prevKey)) - observer.onCompleted() - }, withCancel: { error in - observer.onError(error) + public func observeSingleEventAndPreviousSiblingKey(_ eventType: DataEventType) -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.observeSingleEvent(of: eventType, andPreviousSiblingKeyWith: { (snapshot, prevKey) in + singleEventListener(.success(PreviousSiblingKeyDataSnapshot(snapshot, prevKey))) + }, withCancel: { (error) in + singleEventListener(.error(error)) }) return Disposables.create() - } + }) } } diff --git a/Sources/Database/FIRDatabaseReference+Rx.swift b/Sources/Database/FIRDatabaseReference+Rx.swift index fd58adf..e2ae733 100644 --- a/Sources/Database/FIRDatabaseReference+Rx.swift +++ b/Sources/Database/FIRDatabaseReference+Rx.swift @@ -20,18 +20,18 @@ extension Reactive where Base: DatabaseReference { * @param value The value to be written. * @param priority The priority to be attached to that data. */ - public func setValue(_ value: Any?, andPriority priority: Any? = nil) -> Observable { - return Observable.create { observer in - self.base.setValue(value, andPriority: priority, withCompletionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func setValue(_ value: Any?, andPriority priority: Any? = nil) -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.setValue(value, andPriority: priority, withCompletionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -43,18 +43,18 @@ extension Reactive where Base: DatabaseReference { * * remove: is equivalent to calling setValue:nil */ - public func removeValue() -> Observable { - return Observable.create { observer in - self.base.removeValue(completionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func removeValue() -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.removeValue(completionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -80,18 +80,18 @@ extension Reactive where Base: DatabaseReference { * * @param priority The priority to set at the specified location. */ - public func setPriority(_ priority: Any?) -> Observable { - return Observable.create { observer in - self.base.setPriority(priority, withCompletionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func setPriority(_ priority: Any?) -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.setPriority(priority, withCompletionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -100,18 +100,18 @@ extension Reactive where Base: DatabaseReference { * * @param values A dictionary of the keys to change and their new values */ - public func updateChildValues(_ values: [AnyHashable: Any]) -> Observable { - return Observable.create { observer in - self.base.updateChildValues(values, withCompletionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func updateChildValues(_ values: [AnyHashable: Any]) -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.updateChildValues(values, withCompletionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -122,18 +122,18 @@ extension Reactive where Base: DatabaseReference { * @param value The value to be set after the connection is lost. * @param priority The priority to be set after the connection is lost. */ - public func onDisconnectSetValue(_ value: Any?, andPriority priority: Any? = nil) -> Observable { - return Observable.create { observer in - self.base.onDisconnectSetValue(value, andPriority: priority, withCompletionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func onDisconnectSetValue(_ value: Any?, andPriority priority: Any? = nil) -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.onDisconnectSetValue(value, andPriority: priority, withCompletionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -143,18 +143,18 @@ extension Reactive where Base: DatabaseReference { * * onDisconnectRemoveValue is especially useful for implementing "presence" systems. */ - public func onDisconnectRemoveValue() -> Observable { - return Observable.create { observer in - self.base.onDisconnectRemoveValue(completionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func onDisconnectRemoveValue() -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.onDisconnectRemoveValue(completionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -165,18 +165,18 @@ extension Reactive where Base: DatabaseReference { * * @param values A dictionary of child node keys and the values to set them to after the connection is lost. */ - public func onDisconnectUpdateChildValues(_ values: [AnyHashable: Any]) -> Observable { - return Observable.create { observer in - self.base.onDisconnectUpdateChildValues(values, withCompletionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func onDisconnectUpdateChildValues(_ values: [AnyHashable: Any]) -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.onDisconnectUpdateChildValues(values, withCompletionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -184,18 +184,18 @@ extension Reactive where Base: DatabaseReference { * onDisconnectRemoveValue:, or onDisconnectUpdateChildValues:, and no longer want the values updated when the * connection is lost, call cancelDisconnectOperations: */ - public func cancelDisconnectOperations() -> Observable { - return Observable.create { observer in - self.base.cancelDisconnectOperations(completionBlock: { error, ref in - guard let error = error else { - observer.onNext(ref) - observer.onCompleted() - return + public func cancelDisconnectOperations() -> Single { + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.cancelDisconnectOperations(completionBlock: { (error, ref) in + if let error = error { + singleEventListener(.error(error)) + } + else { + singleEventListener(.success(ref)) } - observer.onError(error) }) return Disposables.create() - } + }) } /** @@ -214,18 +214,19 @@ extension Reactive where Base: DatabaseReference { * @param completionBlock This block will be triggered once the transaction is complete, whether it was successful or not. It will indicate if there was an error, whether or not the data was committed, and what the current value of the data at this location is. * @param localEvents Set this to NO to suppress events raised for intermediate states, and only get events based on the final state of the transaction. */ - public func runTransactionBlock(_ block: @escaping (MutableData) -> TransactionResult, withLocalEvents: Bool) -> Observable { - return Observable.create { observer in - self.base.runTransactionBlock(block, andCompletionBlock: { error, committed, snapshot in - guard let error = error else { - observer.onNext(DatabaseReferenceTransactionResult(committed, snapshot)) - observer.onCompleted() - return + public func runTransactionBlock(_ block: @escaping (MutableData) -> TransactionResult, withLocalEvents: Bool) -> Single { + + return Single.create(subscribe: { (singleEventListener) -> Disposable in + self.base.runTransactionBlock(block, andCompletionBlock: { (error, committed, snapshot) in + if let error = error { + singleEventListener(.error(error)) } - observer.onError(error) - }, withLocalEvents: withLocalEvents) + else { + singleEventListener(.success(DatabaseReferenceTransactionResult(committed, snapshot))) + } + }) return Disposables.create() - } + }) } /** @@ -243,7 +244,7 @@ extension Reactive where Base: DatabaseReference { * @param block This block receives the current data at this location and must return an instance of FIRTransactionResult * @param completionBlock This block will be triggered once the transaction is complete, whether it was successful or not. It will indicate if there was an error, whether or not the data was committed, and what the current value of the data at this location is. */ - public func runTransactionBlock(_ block: @escaping (MutableData) -> TransactionResult) -> Observable { + public func runTransactionBlock(_ block: @escaping (MutableData) -> TransactionResult) -> Single { return self.runTransactionBlock(block, withLocalEvents: true) } }