import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { WebSocketService } from '@platform/shared';
import { Observable, ReplaySubject, Subject, interval, merge, of, throwError, EMPTY } from 'rxjs';
import { catchError, distinctUntilChanged, filter, shareReplay, switchMap, take, takeUntil } from 'rxjs/operators';
import ApiWs = require('typings/generated/websocket');
import { getTimestamp } from '../helpers/date';
import { TmOptionsService } from '../options';
import { WsTablespaceMessage } from '../../typings/generated/tablespace';

const BROADCAST_CHANNEL_NAME = 'broadcast';
const SERVICE_CONFIG_CHANNEL_NAME = 'service_config';

type RmChannelCache = {
  [key: string]: Observable<ApiWs.WsMessage>;
};

@Injectable({
  providedIn: 'root',
})
export class TmChannelService {
  private _channels: RmChannelCache = {};

  private _stopChannel$: Subject<string> = new Subject();

  private _userChannelName$: ReplaySubject<string | null> = new ReplaySubject(1);

  private _userChannel$: Observable<ApiWs.WsMessage | WsTablespaceMessage> = this._userChannelName$.pipe(
    distinctUntilChanged(),
    switchMap((key) => (key ? this.getChannel(key) : EMPTY)),
    shareReplay(1)
  );

  constructor(private _http: HttpClient, private _options: TmOptionsService) {
    _options.set({
      availableSocketChannels: {
        BROADCAST_CHANNEL_NAME: BROADCAST_CHANNEL_NAME,
        SERVICE_CONFIG_CHANNEL_NAME: SERVICE_CONFIG_CHANNEL_NAME,
      },
    });
  }

  /**
   * Returns channel for specified module
   */
  public getUserChannel<T extends ApiWs.WsMessage>(moduleName: string): Observable<T> {
    return this._userChannel$.pipe(
      filter((message: T) => message && message.meta && message.meta.module === moduleName)
    );
  }

  /**
   * Opens a user channel
   */
  public start(key: string) {
    this._userChannelName$.next(key);
  }

  /**
   * Return channel, use force flag to recreate channel
   */
  public getChannel(name: string, forceReopen = false): Observable<ApiWs.WsMessage | WsTablespaceMessage> {
    // If exists, return channel, or close it before reinitialisation
    if (this._channels[name]) {
      if (!forceReopen) {
        return this._channels[name];
      }

      this._closeChannel(name);
    }

    this._channels[name] = this._openChannel(name);

    return this._channels[name];
  }

  /**
   * Closes all channels
   */
  public stop() {
    this._userChannelName$.next(null);
    this._channels = {};
  }

  /**
   * Close channel if it exists
   */
  private _closeChannel(name: string): void {
    this._stopChannel$.next(name);

    if (this._channels[name]) {
      delete this._channels[name];
    }
  }

  private _openChannel(channelName: string): Observable<ApiWs.WsMessage> {
    const stopByChannelName$ = this._stopChannel$.pipe(filter((name) => name === channelName));

    return this._options.getWithUpdates('websocketHost').pipe(
      switchMap((host) => this._openWsConnection(host, channelName)),
      catchError((e) => (this._options.all.longPollingHost ? this._longPollingConnection(channelName) : throwError(e))),
      takeUntil(stopByChannelName$)
    );
  }

  private _openWsConnection(websocketHost: string, channelName: string): Observable<ApiWs.WsMessage> {
    const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws';
    const url = `${protocol}://${window.location.host}/${websocketHost}\
${channelName}?_=${getTimestamp('s')}`;
    const wsConnection = new WebSocketService(url);
    // WebSocketService is not working without this subscription
    wsConnection.connectionStatus.pipe(take(1)).subscribe();
    return wsConnection.connect();
  }

  /**
   * Longpolling observable
   */
  private _longPollingConnection(channelName: string): Observable<ApiWs.WsMessage> {
    const stopPollingByChannelName$ = this._stopChannel$.pipe(filter((name) => name === channelName));
    const url = `${window.location.protocol}//${window.location.host}/\
${this._options.all.longPollingHost}${channelName}?_=${getTimestamp('s')}`;

    return merge(of(null), interval(this._options.all.longPollingInterval)).pipe(
      switchMap(() => this._http.get(url) as Observable<ApiWs.WsMessage>),
      catchError(() => EMPTY),
      takeUntil(stopPollingByChannelName$)
    );
  }
}
