import { QueueingSubject } from 'queueing-subject';
import { BehaviorSubject, Observable, Unsubscribable } from 'rxjs';
import { LayoutCoreMessage } from '../../models/memorypack/LayoutCoreMessage';

export enum WebSocketConnectionStatus {
  Closed = 0,
  Open = 1,
}

export interface IWebSocketConnection {
  connectionStatus: Observable<WebSocketConnectionStatus>,
  messages: Observable<LayoutCoreMessage | Uint8Array>,
}

export class WebSocketEx implements IWebSocketConnection {

  connectionStatus: BehaviorSubject<WebSocketConnectionStatus>;
  messages: Observable<LayoutCoreMessage | Uint8Array>;

  private socket: WebSocket | undefined;

  constructor(
    public url: string,
    upStream: QueueingSubject<string | ArrayBuffer | Blob>,
    protocols?: string | string[],
  ) {
    this.connectionStatus = new BehaviorSubject<WebSocketConnectionStatus>(
      WebSocketConnectionStatus.Closed
    );

    this.messages = new Observable<LayoutCoreMessage | Uint8Array>((observer) => {
      this.socket = new WebSocket(url, protocols || []);
      this.socket.binaryType = 'arraybuffer';
      let inputSubscription: Unsubscribable;

      let open = false;
      const onClose = () => {
        if (!open) return;

        this.connectionStatus.next(WebSocketConnectionStatus.Closed);
        open = false;
      }

      this.socket.onopen = () => {
        open = true;
        this.connectionStatus.next(WebSocketConnectionStatus.Open);
        inputSubscription = upStream.subscribe((data: string | ArrayBuffer | Blob) => {
          if (!this.socket) return;

          this.socket.send(data);
        });
      }

      this.socket.onmessage = (message: MessageEvent) => {
        const bytes = new Uint8Array(message.data, 0);
        observer.next(bytes);
      }

      this.socket.onerror = (error: Event) => {
        onClose();
        observer.error(error);
      }

      this.socket.onclose = (event: CloseEvent) => {
        onClose();
        // if (event && event.wasClean) {
        //   observer.complete();
        // } else {
        //   observer.error(new Error(event ? event.reason : event.reason));
        // }
        observer.error(new Error(event?.reason || 'onClose()'));
      }

      return () => {
        // this is the complete() for the observable
        if (inputSubscription) {
          inputSubscription.unsubscribe();
        }

        if (!this.socket) return;

        onClose();
        this.socket.close();
      }
    });
  }

  close(): void {
    if (!this.socket) return;

    this.socket.close();
    this.socket = undefined;
  }

}
