import { RTCPublisher } from '@dn/patched-red5pro-sdk'
import { Observable, filter, takeUntil } from 'rxjs'

import { Config } from '../../../config'
import { UtilsLog } from '../../../utils/logs'
import { startListeningPublication$ } from '../start-listening/publisher/broadcast-partial-service'
import { BroadcastStop$$ } from '../subject/stop/broadcast-partial-service'
import { SubjectStopGuards } from '../subject/stop/guards'

// ~~~~~ Types

type Event$ = ReturnType<typeof startListeningPublication$>

type Emit = {
  red5pub: R5P.Publisher
  close$: Event$['0']
  allEvents$: Event$['1']
}

// ~~~~~

export const genPublisher$ = (
  config: DN.Services.Broadcast.PubConfig,
  stream: MediaStream,
  userPassToken: DN.Services.Broadcast.UserPassToken,
  iceServers: RTCIceServer[],
) => {
  Config.Logs.BroadcastService &&
    UtilsLog.devLog(
      '\nBCastService',
      'genPublisher$',
      '\n- config\n',
      config,
      '\n- stream\n',
      stream,
      '\n- userPassToken\n',
      userPassToken,
      '\n- iceServers\n',
      iceServers,
    )

  const publisherInitConfig: R5P.PublisherInitConfig = {
    protocol: 'wss',
    port: 443,
    app: 'streammanager',
    mediaElementId: config.mediaElementId,

    host: config.host,

    connectionParams: {
      host: config.serverAddress,
      app: 'live',
      ...userPassToken,
    },

    streamName: config.broadcastId,
    streamMode: 'live',

    rtcConfiguration: {
      iceServers,
      iceCandidatePoolSize: 2,
      bundlePolicy: 'max-bundle',
    },

    bandwidth: {
      video: config.videoBitrate,
      audio: config.audioBitrate,
    },

    keyFrameRate: 3000,
  }

  Config.Logs.BroadcastEvents &&
    UtilsLog.devLog(
      'BCastService',
      'genPublisher$',
      '\n- publisherInitConfig\n',
      publisherInitConfig,
    )

  const obs$ = new Observable<Emit>((observer) => {
    const publisher = new RTCPublisher()

    try {
      publisher
        .initWithStream(publisherInitConfig, stream)

        .then((red5pub) => {
          const [close$, allEvents$] = startListeningPublication$(red5pub, config.broadcastId)

          observer.next({
            red5pub,
            close$: close$,
            allEvents$,
          })

          observer.complete()
        })

        .catch((err) => observer.error(err))
    } catch (err) {
      UtilsLog.devLog('Lib err', err)

      observer.error(err)
    }
  })

  return obs$.pipe(
    takeUntil(
      BroadcastStop$$.pipe(
        filter((stop) => {
          if (SubjectStopGuards.isStopAll(stop)) return true

          if (SubjectStopGuards.isStopId(stop) && stop.id === config.broadcastId) return true

          return false
        }),
      ),
    ),
  )
}
