I have a third party library that I am intending to integrate with RxJS. This is a messaging library called Tiger Text. According to them I can listen to an event called messages and when the stream has a message I can use it to further utilize it. The code snippet for the same is as follows:-

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' })

client.signIn('[email protected]', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
  onSignedIn(session)
})

function onSignedIn(session) {
  console.log('Signed in as', session.user.displayName)

  client.messages.sendToUser(
    '[email protected]',
    'hello!'
  ).then(function (message) {
    console.log('sent', message.body, 'to', message.recipient.displayName)
  })

  client.events.connect()

  client.on('message', function (message) {
    console.log(
      'message event',
      message.sender.displayName,
      'to',
      message.recipient.displayName,
      ':',
      message.body
    )
  })
}

Now please have a look at the place where you have the below mentioned piece of code.

client.on('message', function (message) {
    console.log(
      'message event',
      message.sender.displayName,
      'to',
      message.recipient.displayName,
      ':',
      message.body
    )
  })

I wanted to know how to use RxJS so as to create an observable out of this piece of code so as to subscribe to the stream and whenever we have a change I take the new data and process it as I wish.

Please Advice.

share|improve this question
    
It seems like you're using promises to check the data. Promise will only check the data once. You need to use Observable. If you need more details I can write as an answer. Let me know. – D. Ister yesterday
    
Its actually a third party library whose code I have written or pasted. So within the client.on method I wait for the data to come in an async manner and once there I just use the data. I dont know the internal mechanism of the library but will request if you can write and Observable. I am not sure how to :( – Shiv Kumar Ganesh yesterday
    
Are you able to change "then(function (session) " part? – D. Ister yesterday
    
Yes I can do that.But creating an Observable is a pain :( – Shiv Kumar Ganesh yesterday
    
I created a basic one for you with get request, I also wrote some comments for you because I don't know the size of your application , and provided a link because there are some stuffs you need to do on the component which they show it exactly how I would. – D. Ister 23 hours ago

You can use fromEventPattern to create an observable from a custom event:

import { Observable } from 'rx/Observable';
import 'rxjs/add/observable/fromEventPattern';

const messages = Observable.fromEventPattern(
  handler => client.on('message', handler),
  handler => client.off('message', handler)
);
messages.subscribe(message => console.log(message));

You pass to fromEventPattern functions that add and remove the event handler using the custom API's add and remove mechanism. You've not included it in your question, but I've assumed the API you're using implements an off method.

share|improve this answer

For this use-cases you typically don't need to write a custom Observable and you can use just Observable.create(). Then it depends on whether you want to write a cold or a hot observable.

For cold Observables you create the producer of values when subscribing and close it when unsubscribing:

Observable.create(obs => {
  var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
  client.signIn('[email protected]', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
    onSignedIn(session);
  });

  client.on('message', function (message) {
    obs.next(...);
  });

  return () => {
    client.close(); // or whatever...
  };
});

Or if you want to write a hot Observable the producer will exist independently on any subscriptions and just add/remove the listener:

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
client.signIn('[email protected]', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
  onSignedIn(session);
});

Observable.create(obs => {
  let listener = client.on('message', function (message) {
    obs.next(...);
  });

  () => {
    // remove the event listener somehow
    listener.remove();
  };
});

Sometimes you can see this solved by using a Subject but this is usually more complicated than using Observable.create() because then you need to handle the creation and tear down logic yourself and also Subjects have internal state (see Subject and its internal state).

Here's a very similar question as yours:

Articles on the topics related to your question by the lead developer of RxJS:

share|improve this answer
    
I am so glad that you replied with links. And Yes the links are really useful to understand the same. Since the third party does not give me a handle to subscribe, writing the code in Angular 2 was a pain. Thanks – Shiv Kumar Ganesh 48 mins ago

As I said in the comments you can use observable to subscribe the stream. I am not sure if you can change the code because you copied from their code but if you're able to change it. This is how observable works:

Generally in big applications you have a data.service.ts(you can check Angular 2 tutorial). In that:

@Injectable()

    export class MyMessageDataService{ 

        constructor(private http: Http) {}

        getMessageData(message: //whatever your component is called): Observable<any> {
           //I said any but it can be something like <Message[]>
          console.log("we got " + myendpoint//whatever endpoint you're listening from;
        }
    .
    . //implement your parameters and code here, this part is different for everyone but you will need a bodyString = JSON.stringify(body), a headers and a options variables. Look below for examples.
    .

     //Stringify payload
     let bodyString = JSON.stringify(body);

     // Contetn type to JSON
     let headers = new Headers({'Content-Type' : 'application/json'}) // if you have an access token new Headers({'Content-Type' : 'application/json', 'access-token : token})  //whatever your token's name is such as token

     //You need a request so you need to create a request option
     let options = new RequestOIptions({headers: headers});

    //a get call because I think you need to stream so a get should help you
    return this.http.get(this.commentsUrl)
                    .map((res:Response) => res.json())
                    .catch((error:any) => Observable.throw(error.json().error || 'Error message')

    }

That's how I created mine, but you need to do some other things in the component which you can check from: here

If you need further help for components or want to set an interval let me know, I can complete those parts too.

share|improve this answer

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.