import { Injectable } from '@angular/core';

import { Subject, Observable, combineLatest, ReplaySubject } from 'rxjs';
import { map, delay, startWith, tap, filter } from 'rxjs/operators';

import { HubConnection, HubConnectionBuilder, HubConnectionState, LogLevel, HttpTransportType } from '@microsoft/signalr';

import * as _ from 'lodash';

import { MsalBroadcastService } from '@azure/msal-angular';

import { environment } from 'src/environments/environment';
import { ChatterMessageType } from './model/chatter-message-type';
import { ObjectDateParserService } from 'src/app/api/api-util/object-date-parser.service';
import { AuthService } from '../authentication';
import { AuthenticationResult, EventType } from '@azure/msal-browser';

/**
 * A service to wrap the WebSocket connection to the server.
 */
export interface IChatterService {

  /**
   * Gets a values detetmining whether the connection is open or not.
   */
  readonly isConnectionOpen: boolean;

  /**
   * Gets the ID of the connection to the server.
   */
  readonly hubConnectionId: string;

  /**
   * Gets a stream that emits whenever the service connects or reconnects to the server.
   */
  readonly connectionOpened$: Observable<void>;

  /**
   * Adds a listener to be called whenever a particular type of message is recieved.
   *
   * @template T The type of the payload of the message.
   * @param type The message type to listen for.
   * @param callback The listener to be called when the message is recieved.
   */
  addListener<T>(type: ChatterMessageType, callback: (payload: T) => void): void;

  /**
   * Removes all listeners for a message type.
   *
   * @param type The message type to remove the listeners for.
   */
  removeListener(type: ChatterMessageType): void;

  /**
   * Sends a message to the server.
   *
   * @template T The type of the message payload.
   * @param type The message type to send.
   * @param payload The payload to send with the message.
   */
  send<T>(type: ChatterMessageType, payload: T): Observable<T>;
}

/**
 * A service to wrap the WebSocket connection to the server.
 */
@Injectable({
  providedIn: 'root'
})
export class ChatterService {

  /**
   * The connection to the server.
   */
  private hubConnection: HubConnection;

  /**
   * The connection ID of the current connection.
   */
  private connectionId: string;

  /**
   * A stream that emits whenever the service connects or reconnects to the server.
   */
  private readonly chatterConnected$ = new Subject<void>();

  /**
   * The array of hub listeners.
   */
  private listeners: { type: ChatterMessageType; callback: (payload: any) => void }[] = [];

  /**
   * The last provided access token from MSAL.
   */
  private accessToken: string;

  /**
   * A stream that emits whenever a new MSAL access token is available.
   */
  private newAccessToken$ = new ReplaySubject<string>(1);

  /**
   * A stream that emits whenever the hub connection is disconnected or
   * the hub connection attempt is unsuccessful.
   */
  private connectionDisconnected$ = new Subject<void>();

  /**
   * A stream that emits after 5 seconds following a hub disconnection or failed
   * hub connection attempt.
   * A new token request is triggered in case the MSAL token has expired.
   */
  private reconnect$ = this.connectionDisconnected$
    .pipe(
      delay(5000),
      tap(() => this.authService.getAccessTokenForEndpoint(environment.chatterUri)),
      startWith(null as string)
    );

  /**
   * Creates an instance of chatter service.
   *
   * @param dateParserService The {@link ObjectDateParserService} instance to use.
   * @param broadcastService The {@link MsalBroadcastService} instance to use.
   * @param authService The {@link AuthService} instance to use.
   */
  constructor(
    private dateParserService: ObjectDateParserService,
    private broadcastService: MsalBroadcastService,
    private authService: AuthService
  ) {
    this.broadcastService.msalSubject$.pipe(
      filter(msg => msg.eventType === EventType.ACQUIRE_TOKEN_SUCCESS ||
        msg.eventType === EventType.LOGIN_SUCCESS ||
        msg.eventType === EventType.SSO_SILENT_SUCCESS)
    ).subscribe({
      next: event => {
        const authResponse = event.payload as AuthenticationResult;
        if (this.accessToken !== authResponse.accessToken) {
          this.accessToken = authResponse.accessToken;

          this.newAccessToken$.next(authResponse.accessToken);
        }
      }
    });

    this.newConnectionAttempt$.subscribe(accessToken => this.startNewConnection(accessToken));
  }

  /**
   * A stream that emits the MSAL access token whenever a new
   * access token is available or following a failed hub connection / the
   * hub connection is disconnected.
   */
  private newConnectionAttempt$ = combineLatest(
    [
      this.newAccessToken$,
      this.reconnect$
    ])
    .pipe(
      map(([accessToken]) => accessToken)
    );

  /**
   * Starts a new hub connection and stops the existing connection if
   * it's already connected. This is typically when the existing MSAL
   * access token has expired and a new token is available.
   *
   * @param accessToken The MSAL access token.
   */
  private startNewConnection(accessToken: string): void {
    if (this.hubConnection && this.hubConnection.state === HubConnectionState.Connected) {
      this.hubConnection.stop();
    } else {
      this.establishHubConnection(accessToken);
    }
  }

  /**
   * Gets the ID of the connection to the server.
   */
  public get hubConnectionId(): string {
    return this.hubConnection && this.connectionId;
  }

  /**
   * Adds a listener to be called whenever a particular type of message is recieved.
   *
   * @template T The type of the payload of the message.
   * @param type The message type to listen for.
   * @param callback The listener to be called when the message is recieved.
   */
  public addListener<T>(
    type: ChatterMessageType,
    callback: (payload: T) => void
  ): void {
    this.listeners.push({
      type,
      callback
    });

    if (this.hubConnection) {
      this.hubConnection.on(type, payload => {
        const data = payload;
        this.dateParserService.convertDates(data);
        callback(data);
      });
    }
  }

  /**
   * Attempts to establish a new hub connection.
   *
   * @param accessToken The MSAL access token.
   */
  private establishHubConnection(accessToken: string): void {
    this.hubConnection = this.buildHubConnection(accessToken);
    this.addListeners();

    this.hubConnection.start()
      .then(() => {
        this.chatterConnected$.next();
      })
      .catch(() => this.connectionDisconnected$.next());

    this.hubConnection.onclose(() => this.connectionDisconnected$.next());
  }

  /**
   * Adds a hub connection event handler for each hub listener.
   */
  private addListeners(): void {
    this.listeners.forEach(x => this.hubConnection.on(x.type, payload => {
      const data = payload;
      this.dateParserService.convertDates(data);
      x.callback(data);
    }));
  }

  /**
   * Builds a new hub connection with the provided MSAL access token.
   *
   * @param accessToken The MSAL access token.
   * @returns A new hub connection.
   */
  private buildHubConnection(accessToken: string): HubConnection {
    return new HubConnectionBuilder()
      .withUrl(environment.chatterUri, {
        accessTokenFactory: () => accessToken,
        skipNegotiation: true,
        transport: HttpTransportType.WebSockets
      })
      .configureLogging(LogLevel.Information)
      .build();
  }
}
