Releasing Rx.PHP 0.2.0!
I'm happy to announce the 0.2.0 release of Rx.PHP. This release focussed on
stabilizing the core features of the library. It adds the skip
operator,
several helpers to functionally test your reactive code and all new and
previous core functionality is now tested.
Skip operator
The skip
operator was added to skip a specified amount of messages when
observing a sequence:
$observable = new ArrayObservable(array(1, 1, 2, 3, 5, 8, 13));
$skipsThree = $observable->skip(3);
$observable->subscribeCallback(function($elem) { echo 'x' . $elem . ' '; });
// outputs: x1 x1 x2 x3 x5 x8 x13
$skipsThree->subscribeCallback(function($elem) { echo 'y' . $elem . ' '; });
// outputs: y3 y5 y8 y13
Testing helpers
At its core Rx is about observing sequences of events. In order to test a
program it is useful to be in control of an event stream. To help with
simulating a specific series of events the library now supports a
VirtualTimeScheduler
and an extending TestScheduler
to be in control of
"time" in your test. Next are the ColdObservable
and HotObservable
objects
that allow you to build test streams to run through your code while keeping
track of when observers subscribe and unsubscribe. There are new objects
representing one of the onCompleted
, onError
and onNext
notifications.
Finally there is a MockObserver
that records emitted events.
VirtualTimeScheduler and TestScheduler
Events happen over time. Testing against events in "real" time would be quite cumbersome. Instead Rx introduces the concept of virtual time to be used in testing. The VirtualTimeScheduler keeps an internal clock that ticks after all the work for the current tick is done. In the world of virtual time, the scheduled work is assumed to take zero time. This makes it possible to assert on the scheduling of events.
ColdObservable
A ColdObservable
represents a series of notifications that start to "stream"
as soon as you subscribe to it.
$xs = $this->createColdObservable(array(
onNext(150, "foo"),
onNext(250, "Bar"),
onCompleted(350)
));
// will subscribe at timepoint 200 by default
$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs;
});
$this->assertCount(3, $results->getMessages());
$this->assertMessages(array(
onNext(350, "foo"),
onNext(450, "Bar"),
onCompleted(550)
), $results->getMessages());
// subscribed at time 200 and unsubscribed at 550
$this->assertSubscriptions(array(subscribe(200, 550)), $xs->getSubscriptions());
HotObservable
A HotObservable
represents a series of notifications that start to stream as
soon as the observable is created. That means that if you define it to contain
an onNext()
notification at timepoint 150, the notification will be scheduled
at absolute time 150. Not relative to the times the observers subscribe to it:
$xs = $this->createHotObservable(array(
onNext(150, "foo"),
onNext(250, "Bar"),
onCompleted(350)
));
// will subscribe at timepoint 200 by default
$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs;
});
$this->assertCount(2, $results->getMessages());
$this->assertMessages(array(
onNext(250, "Bar"),
onCompleted(350)
), $results->getMessages());
$this->assertSubscriptions(array(subscribe(200, 350)), $xs->getSubscriptions());
onCompleted, onError and onNext
As can be seen above, the library ships with helper functions to create a representation of the three possible notifications to an observer.
onCompleted()
// translates to
new Recorded(250, new OnCompletedNotification());
onError(new SomeException('bar'))
new Recorded(250, new OnErrorNotification($error));
onNext(250, 'foo');
new Recorded(250, new OnNextNotification('foo'));
MockObserver
In the previous examples the MockObserver
is not used directly. Instead the
TestScheduler
returns a MockObserver
when using the scheduler's helper
methods to subscribe to a stream:
/** @var Rx\Testing\MockObserver */
$results = $this->scheduler->startWithCreate(function() use ($xs) {
return $xs;
});
The observer records all calls to it's onCompleted
, onError
and onNext
methods and exposes the recorded messages through MockObserver#getMessages()
.
What's next?
The next version of Rx.PHP will focus on documentation and creating more observables (0mq, sockets, async redis, http?, ...).
Want to know more about what Rx is all about? Checkout the talk Rx: Curing Your Asynchronous Programming Blues, read the Introduction to Rx book or checkout a small demo application that shows the capabilities of the library.
Not into PHP, but interested? Checkout one of the libraries in the language of your choice: