On this page
Create
create an Observable from scratch by means of a function
   You can create an Observable from scratch by using the Create operator. You pass this operator a function that accepts the observer as its parameter. Write this function so that it behaves as an Observable — by calling the observer’s onNext, onError, and onCompleted methods appropriately.
A well-formed finite Observable must attempt to call either the observer’s onCompleted method exactly once or its onError method exactly once, and must not thereafter attempt to call any of the observer’s other methods.
See Also
- Introduction to Rx: Create
 - Introduction to Rx: Generate
 - 101 Rx Samples: Generate
 - RxJava Tutorial 03: Observable from, just, & create methods
 
Language-Specific Information
RxGroovy create
    
       RxGroovy implements this operator as create.
Sample Code
def myObservable = Observable.create({ aSubscriber ->
  try {
    for (int i = 1; i < 1000000; i++) {
      if (aSubscriber.isUnsubscribed()) {
        return;
      }
      aSubscriber.onNext(i);
    }
    if (!aSubscriber.isUnsubscribed()) {
      aSubscriber.onCompleted();
    }
  } catch(Throwable t) {
    if (!aSubscriber.isUnsubscribed()) {
      aSubscriber.onError(t);
    }
  }
})
        It is good practice to check the observer’s isUnsubscribed state so that your Observable can stop emitting items or doing expensive calculations when there is no longer an interested observer.
create does not by default operate on any particular Scheduler.
- Javadoc: 
create(OnSubscribe) 
RxJava 1․x create
    
       RxJava implements this operator as create.
It is good practice to check the observer’s isUnsubscribed state from within the function you pass to create so that your Observable can stop emitting items or doing expensive calculations when there is no longer an interested observer.
Sample Code
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }
        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
        Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
        create does not by default operate on any particular Scheduler.
- Javadoc: 
create(OnSubscribe) 
RxJS create createWithDisposable generate generateWithAbsoluteTime generateWithRelativeTime
    
       RxJS implements this operator as create (there is also an alternate name for the same operator: createWithDisposable).
Sample Code
/* Using a function */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();
    // Note that this is optional, you do not have to return this if you require no cleanup
    return function () { console.log('disposed'); };
});
var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
        Next: 42
Completed
        /* Using a disposable */
var source = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();
    // Note that this is optional, you do not have to return this if you require no cleanup
    return Rx.Disposable.create(function () {
        console.log('disposed');
    });
});
var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
        Next: 42
Completed
        create is found in the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
       You can use the generate operator to create simple Observables that can generate their next emissions, and can determine when to terminate, based on the value of the previous emission. The basic form of generate takes four parameters:
- the first item to emit
 - a function to test an item to determine whether to emit it (
true) or terminate the Observable (false) - a function to generate the next item to test and emit based on the value of the previous item
 - a function to transform items before emitting them
 
You can also pass in as an optional fifth parameter a Scheduler that generate will use to create and emit its sequence (it uses currentThread by default).
Sample Code
var source = Rx.Observable.generate(
    0,
    function (x) { return x < 3; },
    function (x) { return x + 1; },
    function (x) { return x; }
);
var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
        Next: 0
Next: 1
Next: 2
Completed
        generate is found in the following distributions:
rx.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
       You can use the generateWithRelativeTime operator to create simple Observables that can generate their next emissions, and can determine when to terminate, based on the value of the previous emission. The basic form of generateWithRelativeTime takes five parameters:
- the first item to emit
 - a function to test an item to determine whether to emit it (
true) or terminate the Observable (false) - a function to generate the next item to test and emit based on the value of the previous item
 - a function to transform items before emitting them
 - a function to indicate how long, in milliseconds, the generator should wait after the emission of the previous item before emitting this item
 
You can also pass in as an optional sixth parameter a Scheduler that generate will use to create and emit its sequence (it uses currentThread by default).
Sample Code
var source = Rx.Observable.generateWithRelativeTime(
    1,
    function (x) { return x < 4; },
    function (x) { return x + 1; },
    function (x) { return x; },
    function (x) { return 100 * x; }
).timeInterval();
var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
        Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed
        generateWithRelativeTime is found in the following distributions:
rx.lite.jsrx.lite.compat.jsrx.time.js(requiresrx.jsorrx.compat.js)
       You can use the generateWithAbsoluteTime operator to create simple Observables that can generate their next emissions, and can determine when to terminate, based on the value of the previous emission. The basic form of generateWithAbsoluteTime takes five parameters:
- the first item to emit
 - a function to test an item to determine whether to emit it (
true) or terminate the Observable (false) - a function to generate the next item to test and emit based on the value of the previous item
 - a function to transform items before emitting them
 - a function to indicate at what time (expressed as a 
Date) the generator should emit the the new item 
You can also pass in as an optional sixth parameter a Scheduler that generate will use to create and emit its sequence (it uses currentThread by default).
Sample Code
var source = Rx.Observable.generate(
    1,
    function (x) { return x < 4; },
    function (x) { return x + 1; },
    function (x) { return x; },
    function (x) { return Date.now() + (100 * x); }
).timeInterval();
var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
        Next: {value: 1, interval: 100}
Next: {value: 2, interval: 200}
Next: {value: 3, interval: 300}
Completed
        generateWithAbsoluteTime is found in the following distribution:
rx.time.js(requiresrx.jsorrx.compat.js)
RxPHP create
    RxPHP implements this operator as create.
Creates an observable sequence from a specified subscribeAction callable implementation.
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/create/create.php
//With static method
$source = \Rx\Observable::create(function (\Rx\ObserverInterface $observer) {
    $observer->onNext(42);
    $observer->onCompleted();
    return new CallbackDisposable(function () {
        echo "Disposed\n";
    });
});
$subscription = $source->subscribe($createStdoutObserver());
        Next value: 42
Complete!
Disposed
        RxSwift create generate
    
       RxSwift implements this operator as create.
Sample Code
let source : Observable = Observable.create { observer in
    for i in 1...5 {
        observer.on(.next(i))
    }
    observer.on(.completed)
    // Note that this is optional. If you require no cleanup you can return
    // `Disposables.create()` (which returns the `NopDisposable` singleton)
    return Disposables.create {
        print("disposed")
    }
}
source.subscribe {
    print($0)
}
        next(1)
next(2)
next(3)
next(4)
next(5)
completed
disposed
        
       You can use the generate operator to create simple Observables that can generate their next emissions, and can determine when to terminate, based on the value of the previous emission. The basic form of generate takes three parameters:
- the first item to emit
 - a function to test an item to determine whether to emit it (
true) or terminate the Observable (false) - a function to generate the next item to test and emit based on the value of the previous item
 
You can also pass in as an optional fourth parameter a Scheduler that generate will use to create and emit its sequence (it uses CurrentThreadScheduler by default).
Sample Code
let source = Observable.generate(
   initialState: 0,
   condition: { $0 < 3 },
   iterate: { $0 + 1 }
)
source.subscribe {
   print($0)
}
        next(0)
next(1)
next(2)
completed
        © ReactiveX contributors
Licensed under the Apache License 2.0.
 http://reactivex.io/documentation/operators/create.html