RxSwift minimal Observable.create example

Currently I am trying to get RxSwift working. And I want to create a custom Observable. But I think I am doing something wrong.

I have distilled what I do to this minimal sample:

import Foundation
import RxSwift

class Example
{

    let exampleObservable : Observable<String> = Observable.create { (observer) in
        observer.on(.Next("hello"))
        observer.on(.Completed)

        return AnonymousDisposable { }
    }

    let exampleObserver : AnyObserver<String>?

    func run()
    {
        self.exampleObserver = exampleObservable.subscribeNext({ (text) -> Void in
            print(text)
        })  
    }

}

let ex = Example()
ex.run()

Is this correct? In the run method, the subscribeNext method is autocompleted that way by XCode.

Example

But when I run it I get the following compilation error:

Cannot Invoke 'substribeNext' with an argument list of type ((String) -> Void)

You may use RxExamples for better understanding RxSwift. I found it in RxSwift repo. It helped me in understanding RxSwift.

Ok, let's try to send simple request using Alamofire and RxSwift. First we write request function:

 func getApi() -> Observable<AnyObject?> {
    return create{ observer in
        let request = Alamofire.request(.GET, "http://someapiurl.com", parameters: nil)
            .response(completionHandler:  { request, response, data, error in
                if ((error) != nil) {
                    observer.on(.Error(error!))
                } else {
                    observer.on(.Next(data))
                    observer.on(.Completed)
                }
            });
        return AnonymousDisposable {
            request.cancel()
        }
    }
}

getApi() method sends request and gets response from server using Alamofire. I used RxSwift observer for sending success or errors messages. Second we must call this function. You can use rx_tap for button:

class ViewController: UIViewController {

        var disposeBag = DisposeBag()

        override func viewDidLoad() {
            super.viewDidLoad()


            getApi()
                // Set 3 attempts to get response
                .retry(3)
                // Set 2 seconds timeout
                .timeout(2, MainScheduler.sharedInstance)
                // Subscribe in background thread
                .subscribeOn(Dependencies.sharedDependencies.backgroundWorkScheduler)
                // Observe in main thread
                .observeOn(Dependencies.sharedDependencies.mainScheduler)
                // Subscribe on observer
                .subscribe(
                    onNext: { data in
                        do {
                            let post = try NSJSONSerialization.JSONObjectWithData(data as! NSData, options: []) as! NSDictionary
                            print(post)
                        } catch  {
                            print(NSString(data: data as! NSData, encoding: NSUTF8StringEncoding))
                            return
                        }
                    },
                    onError: { error in
                        print(error)
                    },
                    onCompleted: {
                        print("Completed")
                    },
                    onDisposed: {
                        print("Disposed")
                    }
                )
                .addDisposableTo(disposeBag)
        }
    }

This is my simple example. Hope this helps you. ReactiveX is a huge opportunities. Good luck in learn RxSwift!


This implementation has changed slightly with Swift 3:

    func observableFunc() -> Observable<Bool> {
        return Observable.create { observer in

            self.apiClient.fetchData(callback: { results, error in

                if let error = error {
                    observer.onError(error)
                }

                if let results = results {
                    observer.onNext(true)
                    observer.onCompleted()
                }
            })
            return Disposables.create()
        }
    }

It is a good idea use traits whenever you can, I suggest you take a look to RxSwift documentation and Traits documentation Here.

For example when you create an API call method, it usually returns a Single Trait.

Then can can do something like this:

func getSomething() -> Single<YourType> {
    return Single<YourType>.create { single in
        //perform API call
        //Then emmit success event
        single(.success(YourType))

        //Or error event
        single(.error(Error))
        return Disposables.create()
    }
}

That are many other traits you can use in different cases with different approaches.


Swift 3 and above: Simple example of using Observable.create using URLSession

func createObservableExample() -> Observable<[ToDo]> {
        return Observable.create { observer -> Disposable in
            let dataTask = self.getTodos(from: "https://jsonplaceholder.typicode.com/todos/1") { result in
                switch result {
                case .success(let todos):
                    observer.onNext(todos)
                    observer.onCompleted()
                case .failure(let error):
                    observer.onError(error)
                }
            }
            return Disposables.create {
                dataTask.cancel()
            }
        }
    }
    
    func getTodos(from url: String, completion: @escaping ((Result<[ToDo], Error>) -> Void)) -> URLSessionDataTask {
        let task = URLSession.shared.dataTask(with: URL(string: url)!) { (data, response, error) in
            if let error = error {
                completion(.failure(error)); return
            }
            guard let data = data else {
                let error = NSError(domain: "dataNilError", code: -10001, userInfo: nil)
                completion(.failure(error)); return
            }
            do {
                let todos = try JSONDecoder().decode([ToDo].self, from: data)
                completion(.success(todos))
            } catch {
                completion(.failure(error))
            }
        }
        task.resume()
        return task
    }