import { timer, map, mergeMap, merge, Observable, take, OperatorFunction, takeUntil } from 'rxjs'
import { Config } from '../../../../config'
import { CommChannelStop$$ } from '../../subject/stop/comm-channel-partial-service'

// ####################################################################################################
// ~~~~~~ Pipeline to use in all messages with ack to add a timeout to the waiting ack
// ####################################################################################################

const timeout = {
  type: 'ack-timeout',
} as const

export type Timeout = typeof timeout

const timeout$ = timer(Config.Api.Bcast.WS.AckTimout).pipe(map(() => timeout))

export const genMergeMapAckTimeout = (
  obs$: Observable<DN.Services.CommChannel.Messages.AckModelBase>,
): OperatorFunction<unknown, Timeout | DN.Services.CommChannel.Messages.AckModelBase> =>
  mergeMap(() => merge(timeout$, obs$).pipe(take(1), takeUntil(CommChannelStop$$)))
