import { Injectable, Injector } from '@angular/core';
import { QueueingSubject } from 'queueing-subject';
import { EMPTY, Observable, Observer, Subject, of, timer } from 'rxjs';
import { filter, map, mergeMap, retry, share } from 'rxjs/operators';
import { LayoutCoreMessage } from '../../models/memorypack/LayoutCoreMessage';
import { LogUtils } from '../../utils';
import { AppService } from '../app/app.service';
import { OpsStatsService, OpsStatsType } from '../app/ops-stats.service';
import { WebSocketConnectionStatus, WebSocketEx } from './web-socket';




@Injectable({
  providedIn: 'root'
})
export class WebSocketClientService {

  private upStream: QueueingSubject<string | ArrayBuffer | Blob>;
  private downStream: Observable<LayoutCoreMessage | Uint8Array>;
  private webSocket: WebSocketEx;
  private webSocketConnectionStatus: WebSocketConnectionStatus;

  private sequenceNumber: number;

  private backOffRetryInSeconds: number;

  private appService: AppService;
  private opsStatsService: OpsStatsService;
  private requestTimes: { [key: number]: number } = {};

  constructor(
    private injector: Injector,
  ) {
    this.sequenceNumber = 1;
    this.backOffRetryInSeconds = 0;

    setTimeout(() => {
      this.appService = this.injector.get(AppService);
      this.opsStatsService = this.injector.get(OpsStatsService);
    }, 1000);
  }

  getSequenceNumber(): bigint {
    return BigInt(this.sequenceNumber++);
  }

  connect(wsUrl: string) {
    const windowAny: any = window;
    if (windowAny.mobileEngine?.operationalMode === 'hybrid') {
      this.disconnect(); // probably was never connected but just in case...
      this.upStream = new QueueingSubject<string | ArrayBuffer | Blob>();
      this.downStream = new Subject<LayoutCoreMessage | Uint8Array>();

      windowAny.mobileEngine.layoutProcessMessage = (data: Uint8Array) => {
        if (data.length === 1) { // pong message
          (this.downStream as Subject<LayoutCoreMessage | Uint8Array>).next(data);
        } else { // normal message
          (this.downStream as Subject<LayoutCoreMessage | Uint8Array>).next(LayoutCoreMessage.deserialize(data.buffer));
        }
      }
      return;
    }

    if (this.webSocket) {
      if (this.webSocket.url === wsUrl) {
        return;
      } else {
        this.disconnect();
      }
    }

    this.webSocket = new WebSocketEx(
      wsUrl,
      this.upStream = new QueueingSubject<string | ArrayBuffer | Blob>(),
      // If the websocket is not connected then the QueueingSubject will ensure
      // that messages are queued and delivered when the websocket reconnects.
      // A regular Subject can be used to discard messages sent when the websocket
      // is disconnected.
    );

    // Using share() causes a single websocket to be created when the first
    // observer subscribes. This socket is shared with subsequent observers
    // and closed when the observer count falls to zero.
    this.downStream = this.webSocket.messages
    .pipe(
      map((buffer: Uint8Array) => {
        if (buffer.length === 1) { // pong message
          return buffer;
        } else { // normal message
          return LayoutCoreMessage.deserialize(buffer.buffer);
        }
      }),
      retry({ delay: this.incrementalBackOffRetry.bind(this) }),
      share(),
    );

    this.getConnectionStatusChanges()
    .subscribe((status: WebSocketConnectionStatus) => {
      this.webSocketConnectionStatus = status;
      if (status === WebSocketConnectionStatus.Open) this.backOffRetryInSeconds = 0;
    })
  }

  disconnect() {
    if (!this.webSocket) return;

    this.webSocket.close();
    this.webSocket = undefined;
  }

  send(msgContent: Uint8Array) {
    const windowAny: any = window;
    if (windowAny.mobileEngine?.operationalMode === 'hybrid') {
      if (!windowAny.mobileEngine.instance) {
        setTimeout(() => {
          this.send(msgContent);
        }, 100);
        return;
      }

      windowAny.mobileEngine.instance.invokeMethodAsync('LayoutProcessMessageAsync', msgContent);
      return;
    }

    if (
      this.upStream &&
      (this.webSocketConnectionStatus === WebSocketConnectionStatus.Open || msgContent.length > 1)
    ) {
      // try sending if connection is OPEN and / or if not ping message
      this.requestTimes[this.sequenceNumber - 1] = performance.now();
      this.upStream.next(msgContent);
    }
  }

  getMessages$(sequenceNumber?: bigint): Observable<LayoutCoreMessage | Uint8Array> {
    const windowAny: any = window;
    if (!this.webSocket && windowAny.mobileEngine?.operationalMode !== 'hybrid') return of(null);

    if (sequenceNumber) {
      return this.downStream
      .pipe(
        filter((msg: LayoutCoreMessage) => {
          return !!msg && msg.messageOrigin === 0n && msg.messageSequenceNr === sequenceNumber;
        }),
        map((msg: LayoutCoreMessage) => {
          if (this.opsStatsService) this.opsStatsService.addValue(OpsStatsType.WsRequestResponseTimeInMs, ~~(performance.now() - this.requestTimes[sequenceNumber as any]));
          return msg;
        }),
      );
    } else {
      return this.downStream
      .pipe(
        filter((msg: LayoutCoreMessage) => {
          return !!msg;
        }),
      );
    }
  }

  getConnectionStatusChanges(): Observable<WebSocketConnectionStatus> {
    const windowAny: any = window;
    if (windowAny.mobileEngine?.operationalMode === 'hybrid') return of(WebSocketConnectionStatus.Open);

    return this.webSocket.connectionStatus as Observable<WebSocketConnectionStatus>;
  }

  private incrementalBackOffRetry(error: any, retryCount: number): Observable<any> {
    // if (error?.message === 'Closed by the WebSocketManager') return EMPTY;

    if (this.backOffRetryInSeconds < 30) {
      this.backOffRetryInSeconds++;
    }

    LogUtils.log(`Attempting to reconnect WS in ${this.backOffRetryInSeconds}s...`);
    return timer(this.backOffRetryInSeconds * 1000)
    .pipe(
      mergeMap((result: any) => {
        if (this.appService?.isAppInForeground()) {
          if (this.opsStatsService) this.opsStatsService.addValue(OpsStatsType.WsReconnectRetriesCount, 1);
          return of(result);
        } else {
          return new Observable((observer: Observer<any>) => {
            const interval = setInterval(() => {
              if (!this.appService?.isAppInForeground()) return;

              clearInterval(interval);
              observer.next(result);
              observer.complete();
            }, 1000);
          });
        }
      })
    );
  }

}
