name: full layout: true class: full --- name: inverse layout: true class: center, middle, inverse --- # FRP and you --- layout: false ## What is FRP? - Unfortunately has overloaded meaning - 'Push' based programming - Iterable vs Observable - Warning: I'm not an expert! --- ## So what is it? | | Single | Multiple | | -------------|------------|---------------| | Synchronous | Option | Iterable | | Asynchronous | Promise | ??? | --- template: inverse # In the beginning there was Iterator... --- ## Iterators #### JS Array ```js [1..100] .map(function(x) { return x + 1; }) .filter(function(x) { return x % 2 == 0; }) .forEach(function(x) { ... }) ``` #### Java/Guava ```java Iterable
values = chain(1...100) .map(x -> x + 1) .filter(x -> x % 2 == 0); for (int i : values) { // ... } ``` --- template: inverse # So far so good? --- ## What about events?
Demo
--- template: full
--- ## Look familiar? #### Bacon.js ```js events .map(function(x) { return x + 1; }) .filter(function(x) { return x % 2 == 0; }) .onValue(function(x) { ... }) ``` #### RxJava ```java events .map(x -> x + 1) .filter(x -> x % 2 == 0) .subscribe(x -> ...) ``` --- ## Missing piece - From [RxJava](https://github.com/Netflix/RxJava/wiki) | | Single | Multiple | | -------------|------------|---------------| | Synchronous | Option | Iterable | | Asynchronous | Promise | Rx | --- ## From Scratch #### Bacon.js ```js Bacon.fromBinder(function(sink) { sink('a'); sink('b'); return $.noop; }); ``` #### RxJava ```java Observable.create(new Observable.OnSubscribe<String>() { void call(Subscriber<? super String> subscriber) { subscriber.onNext("a"); subscriber.onNext("b"); subscriber.onCompleted(); } }) ``` --- ## Merge #### Bacon.js ```js var a = Bacon.fromArray([1, 2, 3]); var b = Bacon.fromArray(['a', 'b', 'c']); a.merge(b) > [1, 'a', 2, 'b', 'c', 3] ``` #### RxJava ```java Observable.merge( Observable.from(new Integer[]{1, 2, 3}), Observable.from(new String[]{"a", "b", "c"}) ) > [1, "a", 2, "b", "c", 3] ``` --- ## Merge
--- ## Errors - Similar to promises, result can either be value or error #### Bacon.js ```js var from = Bacon.fromArray([1, 2, 3, 4]).flatMap(function(x) { return (x % 2 == 0) ? new Bacon.Error("too big " + x) : x; }); from.onError(function(e) { ... }) from.onValue(function(x) { ... }) ``` #### RxJava ```java Observable.from(new Integer[]{1...100}) .flatMap(i -> i % 2 == 0 ? Observable.from(s) : Observable.error(new RuntimeException(i)) ) .subscribe(new Subscriber() { void onNext(Integer i) { ... } void onError(Throwable t) { ... } void onCompleted() { ... } }); ``` --- ## Destroy/Cleanup - Want to make sure resources are cleaned up #### Bacon.js ```js var destroy = $('a').asEventStream('click') .onValue(function(e) { // ... }); // Will remove any DOM listeners destroy(); ``` #### RxJava ```java Subscription sub = Observable.from(...) .subscribe(i -> ...); sub.unsubscribe(); ``` --- ## FlatMap aka You-Know-What - What happens if you want to produce _another_ stream in map? ```js _.chain(['hello', 'world']) .map(function(s) { return [s, s.toUpperCase()]; }) .flatten().value() > [['hello', 'HELLO'], ['world', 'WORLD']] // Flatten > ['hello', 'HELLO', 'world', 'WORLD'] ``` ```js Bacon.fromArray(['hello', 'world']).flatMap(function(s) { return Bacon.fromArray([s, s.toUpperCase()]); }) > ['hello', 'HELLO', 'world', 'WORLD'] ``` - Think of `then()` on promises ```js $.Deferred().resolve('a') .then(function(a) { return $.Deferred().resolve(a.toUpperCase()) }) ``` --- ## FlatMap
--- template: inverse # Gotchas --- ## Debugging ![Goddamnit](http://monosnap.com/image/ENBbXvzdnUJjB73SdwjQp8ruMqtKT4.png) --- ## Debugging - Suggestions - `Bacon.log()` - Isolate and test the "pure" part of your code ```js function everything() { return $('.test').asEventStream('click') .merge(...) .map(...) } ``` - Test `pure()` ```js function everything() { return pure($('.test').asEventStream('click')); } function pure(stream) { return stream.merge(...) .map(...) } ``` --- ## Side Effects #### Bacon ```js var from = Bacon.fromArray(['a', 'b']); from.onValue(function(s) { console.log(s) }); from.onValue(function(s) { console.log(s) }); > "a", "b" ``` #### RxJava ```java Observable
from = Observable.from(new String[]{"a", "b"}); from.subscribe(s -> println(s)); from.subscribe(s -> println(s)); > "a", "b" > "a", "b" ``` - ["Hot" vs "Cold" Observables](https://github.com/Netflix/RxJava/wiki/Observable#hot-and-cold-observables) - Bacon only does "hot" - Can "fix" by adding `delay(0)` to Bacon example --- template: inverse # Stash Examples --- # ediff - Stream of lines - Either 'context', 'added' or 'removed' - Want to diff adjacent 'removed' then 'added' sections
--- # ediff ```js lines .split(_.identity) .slidingWindow(2, 2) .filter(function (previousLines, lines) { return previousLines[0] === 'REMOVED' && lines[0] === 'ADDED'; }); ``` ```js // Line data > [CONTEXT, REMOVED, REMOVED, ADDED, ADDED, CONTEXT, ADDED] // Group by type > [[CONTEXT], [REMOVED, REMOVED], [ADDED, ADDED], [CONTEXT], [ADDED]] // Sliding window > [[CONTEXT], [REMOVED, REMOVED]], [[REMOVED, REMOVED], [ADDED, ADDED]], [[ADDED, ADDED], [CONTEXT]], [[CONTEXT]], [[ADDED]], // Filter > [ [REMOVED, REMOVED], [ADDED, ADDED] ] ``` --- ## Var #### Before ```js var cachedSize = null; sizeProp.onValue(function(size) { cachedSize = size || cachedSize; if (!cachedSize) return; ... }); ``` #### After ```js sizeProp .scan(0, function(cachedSize, size) { return size || cachedSize; }) .filter(_.identity) .onValue(function(size) {...}) ; ``` --- ## Git API - Currently have two different ways of paging/streaming - [Blog/Spike](https://extranet.atlassian.com/display/~cofarrell/Spike+-+Stash+GitRx) ```java class CommitService { Page
getChangesetsBetween(Request request, PageRequest pageRequest); void streamChangesetsBetween(Request request, Callback callback); } ``` - Can we get the best of both worlds? --- ## Git API ```java class CommitServiceRx { Observable
getChangesetsBetween(Request request, PageRequest pageRequest); } ``` ```java Observable
changesets = getChangesetsBetween(...); // Paging changesets.buffer(10).toList().toBlockingObservable(); // Streaming historyService.onValue(c -> ...); ``` --- # RxJava Blockers? - Exposing via OSGI - Tied to 3rd party library - Guava all over again... - This is why we can't have nice things :( --- ## Libraries - [Rx.NET](http://msdn.microsoft.com/en-au/data/gg577609.aspx) - [RxJs](https://github.com/Reactive-Extensions/RxJS) - [RxJava](https://github.com/Netflix/RxJava) - [Elm](http://elm-lang.org/) - FP language for declaratively creating web browser based graphical user interfaces. - [Bacon.js](https://github.com/baconjs/bacon.js) ![Bacon](https://dujrsrsgsd3nh.cloudfront.net/img/emoticons/10804/bacon-1372112999.gif) - And more... --- ## More - [Your Mouse is a Database](http://queue.acm.org/detail.cfm?id=2169076) - [FRP in the Netflix API](http://www.infoq.com/presentations/netflix-functional-rx) - [Coursera - Reactive](https://www.coursera.org/course/reactive) - [Erik Meijer - Channel 9](http://channel9.msdn.com/Shows/Going+Deep/Expert-to-Expert-Brian-Beckman-and-Erik-Meijer-Inside-the-NET-Reactive-Framework-Rx) - [Async JS at Netflix](https://www.youtube.com/watch?v=XRYN2xt11Ek) --- template: inverse # Questions