import { captureException } from '@sentry/react';
import { differenceInHours, format, parseISO } from 'date-fns';
import {
  EventSourceMessage,
  EventStreamContentType,
  fetchEventSource,
} from '../../../3rd-party/fetch-event-source';
import { GetChatConversationUpdatesVariables } from '../../Chat/Chat.queries';
import {
  MERCURE_RETRY_INTERVAL,
  MERCURE_UNAUTHORIZED_ERROR,
} from '../Mercure.constants';
import { MercureClientType, MercureStatus } from '../Mercure.types';
import { MercureClient, MercureClientArgs } from '../MercureClient';
import type { PingEvent } from '../Ping/PingMercure.types';
import type { GenericMercureEvent } from '../WorkerQueue/WorkerQueue.types';
import {
  MERCURE_DELTA_THRESHOLD,
  MERCURE_WORKER_QUEUE_TAG,
} from './GeneralMercure.constants';
import { StreamEvent } from './GeneralMercure.types';
import { getCurrentAccountMercureInfoApi } from '../../Account/data/CurrentAccount/dataSources/CurrentAccountApi.dataSources';

export interface GeneralMercureClientArgs extends MercureClientArgs {
  mercureClientType: MercureClientType;
  mercureUrl: string;
  initialAuthToken: string;
  workspaceIds: string[];
  fetchNotificationsDelta: (
    variables: GetChatConversationUpdatesVariables,
  ) => Promise<void>;
  resetState: () => void;
  addToQueue: (tag: string, event: GenericMercureEvent | PingEvent) => void;
}

export class GeneralMercureClient extends MercureClient {
  private workspaceIds: string[] = [];
  private fetchNotificationsDelta: (
    variables: GetChatConversationUpdatesVariables,
  ) => Promise<void>;
  private resetState: () => void;

  private addToQueue: (
    tag: string,
    event: GenericMercureEvent | PingEvent,
  ) => void;

  skipEventsBefore: string | null = null;

  mercureStatus: MercureStatus = MercureStatus.disconnected;

  constructor(props: GeneralMercureClientArgs) {
    super(props);
    this.workspaceIds = props.workspaceIds;
    this.fetchNotificationsDelta = props.fetchNotificationsDelta;
    this.resetState = props.resetState;
    this.addToQueue = props.addToQueue;
  }

  refreshAccountMercureInfo = async () => {
    console.log(
      '[General Mercure (client)] refreshAccountMercureInfo - disconneting client',
    );
    this.dissconnectMercureClient();

    try {
      const { data: mercureInfo } = await getCurrentAccountMercureInfoApi();

      const initialAuthToken =
        mercureInfo?.[MercureClientType.general]?.authorization;

      if (mercureInfo && initialAuthToken) {
        this.updateAccount(account => ({
          ...account,
          mercureInfo,
        }));

        this.setInitialAuthToken(initialAuthToken);

        console.log(
          '[General Mercure (client)] refreshAccountMercureInfo - new init',
        );

        this.init();
      }
    } catch (err) {
      captureException(err);
      console.error(err);
      return;
    }
  };

  handleMercureStreamError = (err: Error) => {
    /**
     * If it's a special error triggered by 401 response, then we stop the stream re-connection loop.
     */
    if (err.message === MERCURE_UNAUTHORIZED_ERROR) {
      this.refreshAccountMercureInfo();
      this.mercureStatus = MercureStatus.reconnecting;

      throw new Error(err.message);
    }

    /**
     * In case of any other error we'll just try to re-connect.
     */
    return MERCURE_RETRY_INTERVAL;
  };

  handleMercureStreamClose = () => {
    /**
     * If stream was closed by the server, we're throwing an error.
     * This way it will automatically re-connect.
     */
    this.mercureStatus = MercureStatus.disconnected;
    throw new Error();
  };

  handleMercureStreamOpen = async (response: Response) => {
    /**
     * On 401 response we refetch account to get new token and throw special error.
     */
    if (response.status === 401) {
      this.mercureStatus = MercureStatus.reconnecting;
      this.refreshAccountMercureInfo();
      throw new Error(MERCURE_UNAUTHORIZED_ERROR);
    }
    /**
     * This is a default content type check.
     * Copied from the original source.
     */
    const contentType = response.headers.get('content-type');
    if (contentType !== EventStreamContentType) {
      throw new Error(
        `Expected content-type to be ${EventStreamContentType}, Actual: ${contentType}`,
      );
    }

    if (this.lastEventReceivedAt) {
      if (
        differenceInHours(parseISO(this.lastEventReceivedAt), new Date()) >
        MERCURE_DELTA_THRESHOLD
      ) {
        this.logMercureEvent('Fetching notifications delta...');

        const date = this.lastEventReceivedAt;
        await Promise.all(
          this.workspaceIds.map(workspaceIri =>
            this.fetchNotificationsDelta({
              workspaceIri,
              date,
            }),
          ),
        )
          .then(() => {
            this.logMercureEvent('Notifications delta applied successfully.');
            this.skipEventsBefore = new Date().toISOString();
          })
          .catch(() => {
            this.logMercureEvent(
              'Fetching notifications delta failed, resetting state...',
            );
            this.resetState();
          });
      } else {
        this.logMercureEvent(
          'Too long for notifications delta, resetting state instead...',
        );
        this.resetState();
      }
    }

    this.mercureStatus = MercureStatus.connected;
  };

  logMercureEvent = (message: string, event?: StreamEvent) => {
    if (window.DESKTOPCOM_LOG_CHAT_MERCURE_EVENTS) {
      const text = `[${format(new Date(), 'hh:mm:ss.SSS')}] ${message}`;
      if (event) {
        console.log(
          `${text}: `,
          event['@type'] + '.' + event['@request'],
          event,
        );
      } else {
        console.log(text);
      }
    }
  };

  onMessageHandler = (e: EventSourceMessage) => {
    if (!e.data) {
      return;
    }

    if (e.id) {
      this.lastEventId = e.id;
    }

    const event: StreamEvent = JSON.parse(e.data);

    this.logMercureEvent('Mercure event received', event);
    window.desktop_MercureInterceptor?.onEventReceived('regular', event);

    this.addToQueue(MERCURE_WORKER_QUEUE_TAG, event);
  };

  dissconnectMercureClient = () => {
    this.esContoller.abort();

    this.mercureStatus = MercureStatus.disconnected;

    this.logMercureEvent('Chat Mercure stream stopped.');
  };

  reconnect = () => {
    console.log('[General Mercure (client)] reconnect');
    this.mercureStatus = MercureStatus.reconnecting;
    this.logMercureEvent('General Mercure stream reconnecting.');

    this.esContoller.abort();
    this.init();
  };

  init = () => {
    console.log('[General Mercure (client)] init');
    fetchEventSource(this.getMercureUrl(), {
      disconnectMercureClient: this.disconnectMercureClient,
      mercureClientRequestController: this.esContoller,
      headers: {
        Authorization: `Bearer ${this.getInitialAuthToken()}`,
      },
      onmessage: this.onMessageHandler,
      onopen: this.handleMercureStreamOpen,
      onerror: this.handleMercureStreamError,
      onclose: this.handleMercureStreamClose,
      openWhenHidden: true,
      signal: this.esContoller.signal,
      lastEventId: this.lastEventId,
    }).catch(err => {
      this.esContoller.abort();

      setTimeout(() => {
        if (this.mercureStatus !== MercureStatus.connected && this.isOnline) {
          this.reconnect();
        }
      }, MERCURE_RETRY_INTERVAL);
    });

    this.logMercureEvent('General Mercure stream started.');
    this.mercureStatus = MercureStatus.connected;
  };

  updateVariables = (args: GeneralMercureClientArgs) => {
    console.log('[General Mercure (client)] updateVariables', args);
    this.workspaceIds = args.workspaceIds;
    this.fetchNotificationsDelta = args.fetchNotificationsDelta;
    this.resetState = args.resetState;
    this.addToQueue = args.addToQueue;
    this.updateAccount = args.updateAccount;
  };
}
