Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with amb operator #2538

Closed
pradhakrishnan opened this issue Jan 24, 2015 · 15 comments
Closed

Issue with amb operator #2538

pradhakrishnan opened this issue Jan 24, 2015 · 15 comments
Labels

Comments

@pradhakrishnan
Copy link

This issue is the continuation of my old issue #2537 . I couldn't reopen my old issue so i just created new one.

I am using amb operator and to get the value i am using firstOrDefault(), some times it works fine, but most of the time it is giving default value. Is there anything wrong in my code?

private Observable<RecordStatus> linearStatus = Observable.empty();
private Observable<RecordStatus> nlvStatus = Observable.empty();

if(LINEAR.equals(message)){
    linearStatus = getVideoStatusOnSubscribe(message);
}
if(NONLINEAR.equals(message)){
    nlvStatus = getVideoStatusOnSubscribe(message);
}

Observable<RecordStatus> obStatus = Observable.amb(linearStatus.subscribeOn(Schedulers.io()).filter(new Func1<RecordStatus, Boolean>() {

    @Override
    public Boolean call(RecordStatus t1) {
        return t1.equals(RecordStatus.CURRENT);
    }
}), nlvStatus.subscribeOn(Schedulers.io()).filter(new Func1<RecordStatus, Boolean>() {

    @Override
    public Boolean call(RecordStatus t1) {
        return t1.equals(RecordStatus.CURRENT);
    }
})
RecordStatus status = obStatus.toBlocking().firstOrDefalut(RecordStatus.DEFAULT);
System.out.println("Final status Status === "+status);

 private Observable<RecordStatus> getVideoStatusOnSubscribe(
    final String message) {
    return Observable.create(new OnSubscribe<RecordStatus>() {

    @Override
    public void call(Subscriber<? super RecordStatus> subscriber) {
        RecordStatus status = (RecordStatus) restTemplate.postForObject(url, message, String.class);
        logger.info("Thread "+ Thread.currentThread().getId() + " for "+ message
                        + " giving status "+status);
        subscriber.onNext(status);
        subscriber.onCompleted();
    }
});
}

Output trace:

157 [2015-01-24 18:08:49,917] [main] WARN com.umapranesh.sample.observable.asynceventhandler.GetVideoStatusEventHandler - *_Status Event_*
com.umapranesh.sample.observable.GetVideoStatusMessage
Final status Status === DEFAULT
Thread 10 for LINEAR giving status INVALID
Thread 11 for NONLINEAR giving status CURRENT

Thread 11 is giving the value what i expected, but before that observable returns the default value.

Can someone help on this.

@zsxwing
Copy link
Member

zsxwing commented Jan 25, 2015

amb will use the Observable that first emits an item (onNext, onCompleted, or onError). So if linearStatus emits INVALID first, the final status will be DEFAULT. Because linearStatus and nlvStatus will run concurrently, the result will be non-deterministic.

@zsxwing
Copy link
Member

zsxwing commented Jan 25, 2015

Looks getVideoStatusOnSubscribe will emit only one item. So you can use

Observable.merge(linearStatus.subscribeOn(Schedulers.io()), nlvStatus.subscribeOn(Schedulers.io()))
.takeWhile(t ->  t.equals(RecordStatus.CURRENT)).firstOrDefalut(RecordStatus.DEFAULT)

@pradhakrishnan
Copy link
Author

Thanks Zsxwing, I have tried using merge and takewhile, but still it is giving the same issue.
If I run single request it works fine ( Amb operator also works fine when i execute single request), but it failed if i test it through some load.

@pradhakrishnan
Copy link
Author

I have tried some sample Junit test, both my tests are failed. It should pass, don't know what is wrong in this code, Could some one help me on that?

package com.umapranesh.hystrix;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.List;

import org.junit.Before;
import org.junit.Test;

import rx.Observable;
import rx.Subscriber;
import rx.Observable.OnSubscribe;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class ObservableMergeTest {


    private Observable<String> obs1 = Observable.empty();
    private Observable<String> obs2 = Observable.empty();
    private Observable<String> obs3 = Observable.empty();
    private Observable<String> obs4 = Observable.empty();
    private Observable<String> obs5 = Observable.empty();
    List<String> nameList = null;

    @Before
    public void setUp(){
        nameList = new ArrayList<String>();
    }

    @Test
    public void testObservableWithCompleteList(){
        nameList.add("person1");
        nameList.add("person2");
        nameList.add("person3");
        nameList.add("person4");
        nameList.add("person5");
        for(String str: nameList){
            if("person1".equalsIgnoreCase(str))
                obs1 = getObservableName(str);
            if("person2".equalsIgnoreCase(str))
                obs2 = getObservableName(str);
            if("person3".equalsIgnoreCase(str))
                obs3 = getObservableName(str);
            if("person4".equalsIgnoreCase(str))
                obs4 = getObservableName(str);
            if("person5".equalsIgnoreCase(str)){
                obs5 = getObservableName(str);
            }
        }
        String finalStaus = getStatus(obs1, obs2, obs3, obs4, obs5);
        assertEquals("single", finalStaus);
    }

    @Test
    public void testObservableWithInCompleteList(){

        nameList.add("person1");
        nameList.add("person2");
        nameList.add("person3");
        nameList.add("person5");

        for(String str: nameList){
            if("person1".equalsIgnoreCase(str))
                obs1 = getObservableName(str);
            if("person2".equalsIgnoreCase(str))
                obs2 = getObservableName(str);
            if("person3".equalsIgnoreCase(str))
                obs3 = getObservableName(str);
            if("person5".equalsIgnoreCase(str))
                obs5 = getObservableName(str);

        }
        String finalStaus = getStatus(obs1, obs2, obs3, obs4, obs5);
        assertEquals("single", finalStaus);

    }

    private String getStatus(Observable<String> obs1, Observable<String> obs2, Observable<String> obs3
            ,Observable<String> obs4, Observable<String> obs5){
        String finalStaus = Observable.merge(obs1.subscribeOn(Schedulers.computation())
                , obs2.subscribeOn(Schedulers.computation())
                , obs3.subscribeOn(Schedulers.computation())
                , obs4.subscribeOn(Schedulers.computation())
                , obs5.subscribeOn(Schedulers.computation()))
                .takeWhile(new Func1<String, Boolean>() {
                    public Boolean call(String t1) {
                        return Boolean.valueOf(t1.equalsIgnoreCase("single"));
                    }
                }).toBlocking().firstOrDefault(" all are married ");    
        return finalStaus;
    }

    private Observable<String> getObservableName(final String str) {
        return Observable.create(new OnSubscribe<String>() {

            public void call(Subscriber<? super String> subscriber) {

                    if("person1".equalsIgnoreCase(str)){
                        subscriber.onNext("married");
                    }
                    if("person2".equalsIgnoreCase(str)){
                        subscriber.onNext("married");
                    }
                    if("person3".equalsIgnoreCase(str)){
                        subscriber.onNext("married");
                    }
                    if("person4".equalsIgnoreCase(str)){
                        subscriber.onNext("married");
                    }
                    if("person5".equalsIgnoreCase(str)){
                        subscriber.onNext("single");
                    }
                    subscriber.onCompleted();
            }
        });
    }

}

@akarnokd
Copy link
Member

testObservableWithInCompleteList will never see single because you exclude that value with takeUntil. So if "single" is the first one to fire, the code will ignore any further value and the default value is returned.

@pradhakrishnan
Copy link
Author

Thanks Akarnokd, I have one scenario please let me know whether can we do it using observable or not.

Scenario:

I am executing 5 observables in parallel, I am expecting status should be CURRENT from any of the observable. if any of the observable emits the value CURRENT then my method should return the status CURRENT and if none of the observables emits CURRENT status then my method should return the value which emits by the last observable.

Please let me know can we do this by Observable?

@akarnokd
Copy link
Member

You'd need takeUntil(predicate) from #1649 / #2493 followed by a lastOrDefault().

@pradhakrishnan
Copy link
Author

Hi Akarnokd, Is takeUntil(predicate) available in any of the released jar? I found only takeUntil(Observable) .

@akarnokd
Copy link
Member

You'd need to check out #2493 and build it for yourself. I'm still waiting for response regarding the naming and exact behavior.

@pradhakrishnan
Copy link
Author

Got it!! Thanks Akarnokd.

@pradhakrishnan
Copy link
Author

I am getting an error when build the project.

IST-pradhakr-6351:RxJava pradhakr$ ./gradlew clean build
Inferred version: 1.1.0-SNAPSHOT

FAILURE: Build failed with an exception.

  • Where:
    Build file '/Users/pradhakr/git/RxJava/build.gradle' line: 9

  • What went wrong:
    A problem occurred evaluating root project 'rxjava'.

    Failed to apply plugin [id 'java']
    Could not generate a proxy class for class nebula.core.NamedContainerProperOrder.

  • Try:
    Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

@DavidMGross
Copy link
Collaborator

This doesn't match the current documentation for amb (wiki or javadoc)
which says that amb mirrors the first of the source observables to emit an
item
(in the terminology of our documentation, only onNext calls emit
items; onCompleted/onError calls are described as termination notifications
rather than item emissions).

Is the documentation misleading here, or is the implementation incorrect?

On Sat, Jan 24, 2015 at 7:47 PM, Shixiong Zhu [email protected]
wrote:

amb will use the Observable that first emits an item (onNext,
onCompleted, or onError). So if linearStatus emits INVALID first, the
final status will be DEFAULT. Because linearStatus and nlvStatus will run
concurrently, the result will be non-deterministic.


Reply to this email directly or view it on GitHub
#2538 (comment).

David M. Gross
PLP Consulting

@zsxwing
Copy link
Member

zsxwing commented Jan 28, 2015

@pradhakrishnan See here for the build problem: #2336 (comment)

@zsxwing
Copy link
Member

zsxwing commented Jan 28, 2015

Is the documentation misleading here, or is the implementation incorrect?

I think the doc is misleading. amb selects the first Observable which emits any onNext/onCompleted/onError, and ignore other Observables.

@pradhakrishnan
Copy link
Author

Thanks @zsxwing , It works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants