Aug 29th, 2013

Releasing Rx.PHP 0.2.0!

I'm happy to announce the 0.2.0 release of Rx.PHP. This release focussed on stabilizing the core features of the library. It adds the skip operator, several helpers to functionally test your reactive code and all new and previous core functionality is now tested.

Skip operator

The skip operator was added to skip a specified amount of messages when observing a sequence:

$observable = new ArrayObservable(array(1, 1, 2, 3, 5, 8, 13));
$skipsThree = $observable->skip(3);

$observable->subscribeCallback(function($elem) { echo 'x' . $elem . ' '; });
// outputs: x1 x1 x2 x3 x5 x8 x13

$skipsThree->subscribeCallback(function($elem) { echo 'y' . $elem . ' '; });
// outputs: y3 y5 y8 y13

Testing helpers

At its core Rx is about observing sequences of events. In order to test a program it is useful to be in control of an event stream. To help with simulating a specific series of events the library now supports a VirtualTimeScheduler and an extending TestScheduler to be in control of "time" in your test. Next are the ColdObservable and HotObservable objects that allow you to build test streams to run through your code while keeping track of when observers subscribe and unsubscribe. There are new objects representing one of the onCompleted, onError and onNext notifications. Finally there is a MockObserver that records emitted events.

VirtualTimeScheduler and TestScheduler

Events happen over time. Testing against events in "real" time would be quite cumbersome. Instead Rx introduces the concept of virtual time to be used in testing. The VirtualTimeScheduler keeps an internal clock that ticks after all the work for the current tick is done. In the world of virtual time, the scheduled work is assumed to take zero time. This makes it possible to assert on the scheduling of events.

ColdObservable

A ColdObservable represents a series of notifications that start to "stream" as soon as you subscribe to it.

$xs = $this->createColdObservable(array(
    onNext(150, "foo"),
    onNext(250, "Bar"),
    onCompleted(350)
));

// will subscribe at timepoint 200 by default
$results = $this->scheduler->startWithCreate(function() use ($xs) {
    return $xs;
});

$this->assertCount(3, $results->getMessages());
$this->assertMessages(array(
    onNext(350, "foo"),
    onNext(450, "Bar"),
    onCompleted(550)
), $results->getMessages());

// subscribed at time 200 and unsubscribed at 550
$this->assertSubscriptions(array(subscribe(200, 550)), $xs->getSubscriptions());

HotObservable

A HotObservable represents a series of notifications that start to stream as soon as the observable is created. That means that if you define it to contain an onNext() notification at timepoint 150, the notification will be scheduled at absolute time 150. Not relative to the times the observers subscribe to it:

$xs = $this->createHotObservable(array(
    onNext(150, "foo"),
    onNext(250, "Bar"),
    onCompleted(350)
));

// will subscribe at timepoint 200 by default
$results = $this->scheduler->startWithCreate(function() use ($xs) {
    return $xs;
});

$this->assertCount(2, $results->getMessages());
$this->assertMessages(array(
    onNext(250, "Bar"),
    onCompleted(350)
), $results->getMessages());

$this->assertSubscriptions(array(subscribe(200, 350)), $xs->getSubscriptions());

onCompleted, onError and onNext

As can be seen above, the library ships with helper functions to create a representation of the three possible notifications to an observer.

onCompleted()
// translates to
new Recorded(250, new OnCompletedNotification());

onError(new SomeException('bar'))
new Recorded(250, new OnErrorNotification($error));

onNext(250, 'foo');
new Recorded(250, new OnNextNotification('foo'));

MockObserver

In the previous examples the MockObserver is not used directly. Instead the TestScheduler returns a MockObserver when using the scheduler's helper methods to subscribe to a stream:

/** @var Rx\Testing\MockObserver */
$results = $this->scheduler->startWithCreate(function() use ($xs) {
    return $xs;
});

The observer records all calls to it's onCompleted, onError and onNext methods and exposes the recorded messages through MockObserver#getMessages().

What's next?

The next version of Rx.PHP will focus on documentation and creating more observables (0mq, sockets, async redis, http?, ...).

Want to know more about what Rx is all about? Checkout the talk Rx: Curing Your Asynchronous Programming Blues, read the Introduction to Rx book or checkout a small demo application that shows the capabilities of the library.

Not into PHP, but interested? Checkout one of the libraries in the language of your choice: