Ionic infinite loop on changing observable value

I want to use presence channel into my ionic app to verify at any moment in the app if user is connceted or not;

inside my service i have:

import { PusherServicesService } from 'src/services/pusher-services/pusher-services.service'
export class AuthService {
  connectedUser = new BehaviorSubject<any>([])

  constructor(
    private pusherService: PusherServicesService
  )

  connectedUserObservable(): Observable<any>{
    return this.connectedUser.asObservable()
  }

  connected(){
    this.pusherService.connected().here(data => {
        console.log('here', data)
        this.connectedUser.next(data)
    }).joining(data => {
        console.log('joining', data)
        this.connectedUserObservable().subscribe(res => {
            let tmp = res
            const i = tmp.findIndex(item => item.id == data.id)
            if(i == -1) {
                tmp.push(data)
                this.connectedUser.next(tmp)
            }
        })

    }).leaving(data => {
        console.log('leaving', data)
        this.connectedUserObservable().subscribe(res => {
            var tmp = res
            const i = tmp.findIndex(item => item.id == data.id)
            console.log('tmp', tmp)
            console.log('index', i)
            if(i > -1) {
                tmp.splice(i, 1)
                console.log('tmp sliced', tmp)
                this.connectedUser.next(tmp)
                return ;
            }
        })
    })
  }
}

the joining work fine, i can get all user connected over all my application; but when user leave the channel (leaving) in my console i got an infinite loop of my console.log

tmp (2) [{…}, {…}]

index 1

tmp sliced [{…}]

and in the end it return error

tmp (2) [empty × 2]

index 1

tmp sliced [empty]

tmp (2) [empty × 2]

index 1

tmp sliced [empty]

core.js:6479 ERROR RangeError: Maximum call stack size exceeded

at SafeSubscriber.__tryOrUnsub (VM96219 vendor.js:228930)

at SafeSubscriber.next (VM96219 vendor.js:228861)

at Subscriber._next (VM96219 vendor.js:228811)

at Subscriber.next (VM96219 vendor.js:228788)

at BehaviorSubject.next (VM96219 vendor.js:228572)

at BehaviorSubject.next (VM96219 vendor.js:228041)

at SafeSubscriber._next (VM96220 main.js:2555)

at SafeSubscriber.__tryOrUnsub (VM96219 vendor.js:228922)

at SafeSubscriber.next (VM96219 vendor.js:228861)

at Subscriber._next (VM96219 vendor.js:228811)

After each observable You should use take(1) (Unsubscribtion achieved after each change in Your data) operator to avoid memory leak and infinite loop. Your new observable:

import { take } from 'rxjs/operators';
.....
  connectedUserObservable(): Observable<any>{
    return this.connectedUser.asObservable().pipe(take(1));
  }