import { EMPTY, Observable, of, timer, mergeMap } from 'rxjs'

import { Config } from '../../../config'
import { UtilsLog } from '../../../utils/logs'
import { BroadcastServiceConst } from '../constants/broadcast-partial-service'
import { genTakeUntilStop$ } from '../subject/stop/take-until/broadcast-partial-service'

const { OurSubscriberEvents } = BroadcastServiceConst

type ClosedByConnFail$ = Observable<typeof OurSubscriberEvents.ClosedByConnFail>

export const startSub$ = (sub: R5P.Subscription, id: string) => {
  Config.Logs.BroadcastService &&
    UtilsLog.devLog('BCastService', 'startSub$', '\n- id\n', id, '\n- subscriber\n', sub)

  const obs$ = new Observable<[R5P.Subscription, ClosedByConnFail$]>((observer) => {
    sub
      .subscribe()
      .then((subscription) => {
        // Workarround because red5pro library doesn't emit
        // close event in this scenario:
        // You are connected to a router (a) connected to another router (b) with internet connection
        // Router b loses the connection to internet
        // If red5pro fixes the problem, the event should be
        // in startListeningSubscription$

        const closed$ = timer(1000, 500).pipe(
          mergeMap(() => {
            const connClosed = (subscription as any)._connectionClosed

            if (typeof connClosed === 'undefined') return EMPTY

            return (subscription as any)._connectionClosed
              ? of(OurSubscriberEvents.ClosedByConnFail)
              : EMPTY
          }),

          genTakeUntilStop$(id),
        )

        observer.next([subscription, closed$])
      })
      .catch(() => {
        observer.error()
      })
  })

  return obs$.pipe(genTakeUntilStop$(id))
}
