import { channel, buffers } from 'redux-saga';
import { call, put, fork, join, race, take, cancel, select, delay } from 'redux-saga/effects';

import log from '../../shared/logging';
import {
  syncConnected,
  syncInProgress,
  syncDisconnected,
  syncError,
  longPollingMessage,
  longPollingProcessed,
  ApiAction,
} from '../actions';
import { longPollingMessagesPausedSelector } from '../reducers/api';
import { serverUrlSelector } from '../reducers/appConfig';
import { createSession, poll } from '../longPolling';
import { AppType, GetLongPollingResponse } from '../types';

export type SyncConfig = {
  appType: AppType;
  deviceId: string | null | undefined;
  performInitialFetch: () => Generator<any, any, any>;
  performBackgroundWork?: () => Generator<any, any, any>;
  checkResyncTrigger?: (action: ApiAction) => boolean;
};

export const RECONNECT_DELAY = 30; // seconds
const BUF_SIZE = 10;

// Sync saga:
// 1. Create long polling session
// 2. Perform initial data fetch (application-dependent)
// 3. Long polling loop to retrieve update messages
// 4. In case of an error, restart with 1 after a 30 seconds delay.
export default function* syncSaga(config: SyncConfig): Generator<any, any, any> {
  const { appType, deviceId, performInitialFetch, performBackgroundWork, checkResyncTrigger } = config;

  while (true) {
    let error = null;
    let longPollingTask = null;
    let backgroundWorkTask = null;

    try {
      log.info('Sync start');

      const serverUrl: string = yield select(serverUrlSelector);
      log.info(`serverUrl: ${serverUrl}`);

      yield put(syncInProgress());

      // Collect the effects here that we want to wait for in parallel.
      const raceEffects: {
        [key: string]: unknown;
      } = {};

      const sessionId: string = yield* createSession(appType, deviceId);
      longPollingTask = yield fork(longPollingLoop, sessionId);
      raceEffects.longPollingError = join(longPollingTask!);

      yield* performInitialFetch();
      yield put(syncConnected());

      if (performBackgroundWork) {
        backgroundWorkTask = yield fork(wrapWithErrorHandler, performBackgroundWork);
        raceEffects.backgroundWorkError = join(backgroundWorkTask!);
      }

      if (checkResyncTrigger) {
        raceEffects.resyncAction = take(checkResyncTrigger);
      }

      const { resyncAction, longPollingError, backgroundWorkError } = yield race(raceEffects);

      if (resyncAction) {
        log.info('Resync triggered');
      } else if (longPollingError) {
        error = longPollingError;
        log.error('Long polling error', error);
      } else if (backgroundWorkError) {
        error = backgroundWorkError;
        log.error('Background work error', error);
      }
    } catch (err) {
      error = err;
      log.error('Exception in syncSaga', error);
    } finally {
      if (longPollingTask) {
        yield cancel(longPollingTask);
      }
      if (backgroundWorkTask) {
        yield cancel(backgroundWorkTask);
      }
    }

    if (error) {
      yield put(syncError(error));

      log.info(`Will reconnect in ${RECONNECT_DELAY} seconds`);
      yield delay(RECONNECT_DELAY * 1000);
    } else {
      yield put(syncDisconnected());

      // Avoid "reconnect DoS attacks".
      log.info('Will reconnect in 1 seconds');
      yield delay(1 * 1000);
    }
  }
}

function* longPollingLoop(sessionId: string) {
  const receiveChannel = channel(buffers.expanding(BUF_SIZE));

  yield fork(processLongPollingMessages, receiveChannel);

  try {
    while (true) {
      const serverUrl: string = yield select(serverUrlSelector);
      const response: GetLongPollingResponse = yield call(poll, serverUrl, sessionId);
      log.debug('[LP]', response);

      const { messages } = response;
      const length = messages.length;

      // Queue the messages in the channel, one-by-one in the received order.
      for (let i = 0; i < length; i++) {
        const message = messages[i];
        yield put(receiveChannel, message);
      }

      yield put(longPollingProcessed());
    }
  } catch (error) {
    // An error occurred -> return it as the task's result.
    return error;
  } finally {
    receiveChannel.close();
  }
}

// Get long polling messages from receiveChannel and process them.
// Processing may be paused by certain API calls.
function* processLongPollingMessages(receiveChannel) {
  while (true) {
    const message = yield take(receiveChannel);

    while (yield select(longPollingMessagesPausedSelector)) {
      // Long polling processing currently paused, wait for unpausing by API_CALL_SUCCESS/API_CALL_ERROR.
      yield take(['API_CALL_SUCCESS', 'API_CALL_ERROR']);
    }

    try {
      yield put(longPollingMessage(message));
    } catch (error) {
      log.error('Error processing LP message', error);
    }
  }
}

function* wrapWithErrorHandler(performBackgroundWork) {
  try {
    yield* performBackgroundWork();
    return null;
  } catch (error) {
    // An error occurred -> return it as the task's result.
    return error;
  }
}
