import { Injectable } from '@angular/core';
import { Observable, Subject, Subscriber } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
import { AppConfigService } from './app-config.service';
import { PaymentStatisticsModel } from '../models/api/payment-statistics.model';

const SOCKET_ENDPOINT = `/websocket/statistics`;
const SOCKET_CHECK_INTERVAL = 12500;
const SOCKET_RECONNECT_INTERVAL = 3000;

@Injectable()
export class StatisticsSocketService {
  private disposeOpenSocket: () => void;

  constructor(private appConfig: AppConfigService) {}

  receiveStatistics(): Observable<PaymentStatisticsModel> {
    return new Observable((subscriber) => {
      const socketUrl = `${this.appConfig.config.apiSocketUrl}${SOCKET_ENDPOINT}`;
      const connect = () => {
        this.initSocket(socketUrl).subscribe({
          next: (invoice) => subscriber.next(invoice),
          error: () => setTimeout(connect, SOCKET_RECONNECT_INTERVAL),
          complete: () => subscriber.complete(),
        });
      };
      connect();
    });
  }

  private initSocket(url: string): Observable<PaymentStatisticsModel> {
    return new Observable((subscriber) => {
      // dispose last socket if there was one
      if (this.disposeOpenSocket != null) {
        this.disposeOpenSocket();
      }

      // create socket and heartbeat-nexting subject
      const socket = new WebSocket(url);
      const heartbeat = new Subject<void>();

      // close socket and error-break returned observable chain when heartbeat not received for too long
      const heartbeatSub = heartbeat.pipe(debounceTime(SOCKET_CHECK_INTERVAL)).subscribe(() => {
        socket.close();
        heartbeat.complete();
        subscriber.error();
      });

      heartbeat.next();

      // create disposal function to be used before next initialization
      this.disposeOpenSocket = () => {
        socket.close();
        heartbeatSub.unsubscribe();
        heartbeat.complete();
        subscriber.complete();
      };

      this.addMessageReceiveListener(socket, heartbeat, subscriber);
    });
  }

  private addMessageReceiveListener(
    socket: WebSocket,
    heartbeat: Subject<void>,
    subscriber: Subscriber<PaymentStatisticsModel>
  ): void {
    socket.addEventListener('message', (event) => {
      const model = JSON.parse(event.data);
      heartbeat.next();
      subscriber.next(model);
    });

    // error-break returned observable chain on error event emission
    socket.addEventListener('error', () => {
      heartbeat.complete();
      subscriber.error();
    });
  }
}
