import {
  EMPTY,
  fromEvent,
  map,
  merge,
  mergeMap,
  of,
  Subject,
  takeUntil,
  take,
  tap,
  timer,
  scan,
  filter,
} from 'rxjs'
import { ConstEventsWebSocket, ConstWebSocketReadyStateValue } from '@dn/constants'
import { UtilsLog } from '../../../utils/logs'
import { Guards } from '../guards'
import { CommChannelRes$$ } from '../subject/response/comm-channel-partial-service'
import { CommChannelStop$$ } from '../subject/stop/comm-channel-partial-service'
import { RelayMessageBuffer } from './relay-message-buffer/comm-channel-partial-service'

// ~~~~~~ Constants

const ping$$ = new Subject<void>()

const pingTimeout$ = timer(2500).pipe(
  take(3),
  takeUntil(ping$$),
  takeUntil(CommChannelStop$$),
  scan((acc, _cur) => acc + 1, 0),
  filter((v) => v === 3),
  // Ping timeout message
  map(() => {
    const message: DN.Services.CommChannel.Messages.Internal.WSPingTimeoutMessage = {
      type: 'ws-ping-timeout',
      payload: {
        pn: performance.now(),
      },
    }

    return message
  }),
)

const pingTimeout$$ = new Subject<DN.Services.CommChannel.Messages.Internal.WSPingTimeoutMessage>()

// ####################################################################################################
// ~~~~~~ Start Listening
// ####################################################################################################

export const commChannelStartListening$ = (webSocket: WebSocket) => {
  // Open

  const open$ = fromEvent(webSocket, ConstEventsWebSocket.Open).pipe(
    mergeMap(() => {
      const message: DN.Services.CommChannel.Messages.Internal.WSOpenMessage = {
        type: 'ws-open',
        payload: {
          pn: performance.now(),
        },
      }

      return merge(of(message), pingTimeout$)
    }),
  )

  // Close

  const close$ = fromEvent(webSocket, ConstEventsWebSocket.Close).pipe(
    map(() => {
      const message: DN.Services.CommChannel.Messages.Internal.WSCloseMessage = {
        type: 'ws-close',
        payload: {
          pn: performance.now(),
        },
      }

      return message
    }),
  )

  // Error

  const error$ = fromEvent(webSocket, ConstEventsWebSocket.Error).pipe(
    map((evt) => {
      const message: DN.Services.CommChannel.Messages.Internal.WSErrorMessage = {
        type: 'ws-error',
        payload: {
          pn: performance.now(),
          error: evt,
        },
      }

      return message
    }),
  )

  // Message

  const message$ = fromEvent<MessageEvent>(webSocket, ConstEventsWebSocket.Message).pipe(
    mergeMap((evt) => {
      let message = evt.data

      // Ping-Pong + timeout ping

      if (message instanceof Blob) {
        // Old websockets implementation hack
        if (!message.text) {
          message.text = () => new Promise((res) => res('PING!'))
        }

        message
          .text()
          .then((msg) => {
            if (msg !== 'PING!') return

            ping$$.next()

            pingTimeout$
              .pipe(
                tap((msg) => pingTimeout$$.next(msg)),
                tap(() => UtilsLog.devLog('pingTimeout', new Date())), // only log 'next'
              )
              .subscribe()

            if (webSocket.readyState !== ConstWebSocketReadyStateValue.Open) return

            const pong = new TextEncoder().encode('PONG!').buffer

            webSocket.send(pong)
          })
          .catch(() => UtilsLog.devLog('ERROR: blob message is not PING!'))

        return EMPTY
      }

      try {
        message = JSON.parse(message)
      } catch (err) {
        UtilsLog.devWarn('Cannot parse message', message)
        return EMPTY
      }

      let messages = [message]

      if (!Guards.isMessage(message)) return EMPTY

      if (Guards.isAck(message)) {
        CommChannelRes$$.next(message)

        return EMPTY
      }

      // Get if buffered

      if (Guards.FromServer.isRelay(message)) {
        messages = RelayMessageBuffer.getMessages(message)
      }

      // Comm messages

      const wsMessages = messages.map((msg) => {
        const wsMessage: DN.Services.CommChannel.Messages.Internal.WSMessage = {
          type: 'ws-message',
          payload: {
            pn: performance.now(),
            data: msg,
          },
        }

        return wsMessage
      })

      if (!wsMessages.length) return EMPTY

      return of(...wsMessages)
    }),
  )

  return merge(open$, close$, error$, message$, pingTimeout$$).pipe(takeUntil(CommChannelStop$$))
}
