import { Observable, Subscription, Subject } from 'rxjs';
import { finalize } from 'rxjs/operators';
import { isMap as _isMap, isNumber as _isNumber } from 'lodash';

import { HttpError } from '../http-error/http-error.model';
import { ObservableResponse } from './observable-response.interface';


export class ObservableExecutor {
  private activeObservables: Map<string, Observable<any>>;
  private executorListener: Subject<ObservableResponse>;
  private limit: number;
  private queue: Map<string, Observable<any>>;
  private queueIterator: IterableIterator<[string, Observable<any>]>;
  private subscriptions: Subscription;

  constructor(limit: number, queue?: Map<string, Observable<any>>) {
    this.executorListener = new Subject();
    this.subscriptions = new Subscription();
    this.setLimit(limit);

    if (_isMap(queue)) {
      this.setQueue(queue);
    } else {
      this.queue = new Map();
      this.activeObservables = new Map();
    }
  }

  /**
   * Adds the given observable to the current queue with the given key.
   *
   * @param key string
   * @param obs Observable<any>
   * @returns void
   */
  add(key: string, obs: Observable<any>): void {
    this.queue.set(key, obs);
    this.setIterator();

    if (!this.hasActiveObservables()) {
      this.start();
    }
  }

  /**
   * Cancels the current execution.
   *
   * @returns void
   */
  cancelExecution(): void {
    this.queue = new Map();
    this.activeObservables = new Map();
    this.subscriptions.unsubscribe();
    this.subscriptions = new Subscription();
  }

  /**
   * Get the current executor listener.
   *
   * @returns Subject<ObservableResponse>
   */
  getExecutorListener(): Subject<ObservableResponse> {
    return this.executorListener;
  }

  /**
   * Get the queue length.
   *
   * @returns number
   */
  getQueueLength(): number {
    return this.queue.size;
  }

  /**
   * Set the given queue as the current one.
   *
   * @param queue Map<string, Observable<any>>
   * @returns void
   */
  setQueue(queue: Map<string, Observable<any>>): void {
    this.cancelExecution();
    this.queue = queue;
    this.setIterator();
    this.start();
  }

  /**
   * Indicates if it has active observables.
   *
   * @returns boolean
   */
  private hasActiveObservables(): boolean {
    return this.activeObservables.size > 0;
  }

  /**
   * Executes the current observables.
   *
   * @param iteratorResult IteratorResult<[string, Observable<any>]>
   * @returns void
   */
  private execute(iteratorResult: IteratorResult<[string, Observable<any>]>): void {
    if (!iteratorResult.done) {
      const key: string = iteratorResult.value[0];
      const obs: Observable<any> = iteratorResult.value[1];
      let observableResponse: ObservableResponse;

      this.activeObservables.set(key, obs);

      this.subscriptions.add(
        obs.pipe(
          finalize(
            () => {
              this.queue.delete(key);
              this.activeObservables.delete(key);
              this.executorListener.next(observableResponse);
              this.execute(this.queueIterator.next());
            }
          )
        ).subscribe(
          (response: any) => {
            observableResponse = {
              key: key,
              response: response,
              error: undefined,
              isError: false
            };
          },
          (error: HttpError) => {
            observableResponse = {
              key: key,
              response: undefined,
              error: error,
              isError: true
            };
          }
        )
      );
    }
  }

  /**
   * Starts the execution process.
   *
   * @returns void
   */
  private start(): void {
    for (let index = 0; index < this.limit; index++) {
      this.execute(this.queueIterator.next());
    }
  }

  /**
   * Set the current iterator with the current entries.
   *
   * @returns void
   */
  private setIterator(): void {
    this.queueIterator = this.queue.entries();
  }

  /**
   * Set the given value as the current limit.
   *
   * @param limit number
   * @returns void
   */
  private setLimit(limit: number): void {
    if (_isNumber(limit) && limit > 0) {
      this.limit = limit;
    } else {
      throw new Error('Invalid limit given for ObservableExecutor');
    }
  }
}
