import { literal, nonEmptyString, nonNegativeInteger, objectSchema, pureObject, validateTypesAsync } from '@twilio/declarative-type-validator';
import { Twilsock, InitRegistration, ConnectionState as TwilsockConnectionState } from 'twilsock';

import { UriBuilder } from './utils/uri';
import { SyncError } from './utils/syncerror';
import { deepClone } from './utils/sanitize';
import log from './utils/logger';

import { Configuration } from './configuration';
import { Subscriptions } from './subscriptions';

import { Services } from './interfaces/services';
import { NetworkService } from './services/network';
import { SessionStorage } from './services/storage';

import { RemovalHandler, SyncEntity } from './entity';
import { DocumentDescriptor, SyncDocument, SyncDocumentImpl } from './syncdocument';
import { SyncListDescriptor, SyncList, SyncListImpl } from './synclist';
import { SyncMapDescriptor, SyncMap, SyncMapImpl } from './syncmap';
import { SyncStream, SyncStreamImpl } from './streams/syncstream';
import { StreamDescriptor } from './streams/serverapi';
import { ClientInfo } from './clientInfo';
import { EntitiesCache } from './entitiesCache';
import { LiveQuery, InstantQuery, LiveQueryImpl, LiveQueryDescriptor } from './livequery';
import { LiveQueryCreator, queryItems } from './livequery';
import { version } from '../package.json';
import { ReplayEventEmitter } from '@twilio/replay-event-emitter';

const SYNC_PRODUCT_ID = 'data_sync';
const SDK_VERSION = version;

type json = { [key: string]: any };

/**
 * Mode for opening the Sync object:
 * * `'open_or_create'` - reads a Sync object or creates one if it does not exist.
 * * `'open_existing'` - reads an existing Sync object. The promise is rejected if the object does not exist.
 * * `'create_new'` - creates a new Sync object. If the *id* property is specified, it will be used as the unique name.
 */
type OpenMode = 'open_or_create'
  | 'open_existing'
  | 'create_new';

/**
 * Client connection state.
 * Directly reflects connection state of the underlying websocket transport.
 * Possible values are as follows:
 * * 'connecting' - client is offline and connection attempt is in process.
 * * 'connected' - client is online and ready.
 * * 'disconnecting' - client is going offline as disconnection is in process.
 * * 'disconnected' - client is offline and no connection attempt is in process.
 * * 'denied' - client connection is denied because of invalid JWT access token. User must refresh token in order to proceed.
 * * 'error' - client connection is in a permanent erroneous state. Client re-initialization is required.
 */
type ConnectionState = TwilsockConnectionState;

/**
 * Client options.
 */
interface SyncClientOptions {
  /**
   * The level of logging to enable.
   */
  logLevel?: 'silent' | 'error' | 'warn' | 'info' | 'debug' | 'trace';

  twilsockClient?: Twilsock;
  [key: string]: any;
}

/**
 * Options for opening a Sync object.
 *
 * @example The following example is applicable to all Sync objects
 * (i.e., `syncClient.document()`, `syncClient.list()`,` syncClient.map()`, `syncClient.stream()`)
 * ```typescript
 * // Attempts to open an existing document with a unique name of 'MyDocument'
 * // If no such document exists, the promise is rejected
 * syncClient.document({
 *   id: 'MyDocument',
 *   mode: 'open_existing'
 * })
 *   .then(...)
 *   .catch(...);
 *
 * // Attempts to create a new document with a unique name of 'MyDocument', TTL of 24 hours and initial data `{ name: 'John Smith' }`
 * // If such a document already exists, the promise is rejected
 * syncClient.document({
 *   id: 'MyDocument',
 *   mode: 'create_new',
 *   ttl: 86400
 *   data: { name: 'John Smith' } // the `data` property is only applicable for Documents
 * })
 *   .then(...)
 *   .catch(...);
 * ```
 */
interface OpenOptions {
  /**
   * Sync object SID or unique name.
   */
  id?: string;

  /**
   * Mode for opening the Sync object.
   */
  mode?: OpenMode;

  /**
   * The time-to-live of the Sync object in seconds. This is applied only if the object is created.
   */
  ttl?: number;
}

/**
 * Options for opening a document.
 */
interface OpenDocumentOptions extends OpenOptions {
  /**
   * The initial data of the Sync document.
   */
  data?: json;
}

/**
 * List options.
 */
interface OpenListOptions extends OpenOptions {
  purpose?: string;
  context?: json;
  includeItems?: boolean;
}

/**
 * Map options.
 */
interface OpenMapOptions extends OpenOptions {
  includeItems?: boolean;
}

/**
 * Stream options.
 */
interface OpenStreamOptions extends OpenOptions {
}

function decompose(arg?: string | OpenOptions): OpenOptions {
  if (!arg) {
    return {mode: 'create_new'};
  } else if (typeof arg === 'string') {
    return {id: arg, mode: 'open_or_create'};
  } else {
    let mode = arg.mode || (arg.id ? 'open_or_create' : 'create_new');
    return {...arg, mode: mode};
  }
}

const SYNC_DOCUMENT_NOTIFICATION_TYPE = 'com.twilio.rtd.cds.document';
const SYNC_LIST_NOTIFICATION_TYPE = 'com.twilio.rtd.cds.list';
const SYNC_MAP_NOTIFICATION_TYPE = 'com.twilio.rtd.cds.map';
const SYNC_NOTIFICATION_TYPE = 'twilio.sync.event';

type ConnectionError = {
  terminal: boolean;
  message: string;
};

type ClientEvents = {
  tokenAboutToExpire: () => void;
  tokenExpired: () => void;
  connectionError: (error: ConnectionError) => void;
  connectionStateChanged: (state: ConnectionState) => void;
};

/**
 * Client for the Twilio Sync service.
 *
 * @example
 * ```typescript
 * // Using NPM resolution
 * const SyncClient = require('twilio-sync');
 * const syncClient = new SyncClient(token, { loglevel: 'debug' });
 *
 * // Using CDN
 * const syncClient = new Twilio.Sync.Client(token, { logLevel: 'debug' });
 * ```
 */
class Client extends ReplayEventEmitter<ClientEvents> {
  private readonly services: Services;
  private readonly entities: EntitiesCache;

  /**
   * @param fpaToken Twilio access token.
   * @param options Options to customize the client.
   */
  constructor(fpaToken: string, options: SyncClientOptions = {}) {
    super();

    if (!fpaToken) {
      throw new Error('Sync library needs a valid Twilio token to be passed');
    }

    if (options.hasOwnProperty('logLevel')) {
      log.setLevel(options.logLevel);
    } else {
      log.setLevel('silent');
    }

    const productId = options.productId = options.productId || SYNC_PRODUCT_ID;

    // Fill ClientMetadata
    options.clientMetadata = options.clientMetadata || {};
    if (!options.clientMetadata.hasOwnProperty('type')) {
      options.clientMetadata.type = 'sync';
    }
    if (!options.clientMetadata.hasOwnProperty('sdk')) {
      options.clientMetadata.sdk = 'JS';
      options.clientMetadata.sdkv = SDK_VERSION;
    }

    const startTwilsock = !options.twilsockClient;

    // Create default init registrations if none were provided.
    // Otherwise, the outside party have to list all the init registrations they need, including Sync ones.
    if (!options.initRegistrations) {
      let initRegistration = new InitRegistration(productId);
      Client.populateInitRegistrations(initRegistration);
      options.initRegistrations = [initRegistration];
    }

    let twilsock = options.twilsockClient = options.twilsockClient ?? new Twilsock(fpaToken, productId, options);
    twilsock.on('tokenAboutToExpire', () => this.emit('tokenAboutToExpire'));
    twilsock.on('tokenExpired', () => this.emit('tokenExpired'));
    twilsock.on('connectionError', err => this.emit('connectionError', err));
    twilsock.on('stateChanged', (state) => {
      this.emit('connectionStateChanged', state);
      /**
       * Handle transport establishing event
       * If we have any subscriptions - we should check object for modifications
       */
      this.services.subscriptions.onConnectionStateChanged(state === 'connected');
    });
    twilsock.on('message', (messageType, payload) => this._routeMessage(messageType, payload));

    let config = new Configuration(options);
    let network = new NetworkService(new ClientInfo(SDK_VERSION), config, twilsock);
    let storage = new SessionStorage(config);

    this.services = {
      config,
      twilsock,
      network,
      storage,
      router: this,
      subscriptions: null
    };

    this.services.subscriptions = new Subscriptions(this.services);

    this.entities = new EntitiesCache();

    // Start only if we created twilsock locally,
    // otherwise it's the responsibility of whoever created the Twilsock client.
    if (startTwilsock) {
      twilsock.connect();
    }
  }

  /**
   * Fired when connection state has been changed.
   *
   * Parameters:
   * 1. {@link ConnectionState} `connectionState` - contains current service connection state.
   * @example
   * ```typescript
   * syncClient.on('connectionStateChanged', (newState) => {
   *   console.log('Received a new connection state:', newState);
   * });
   * ```
   * @event
   */
  static readonly connectionStateChanged = 'connectionStateChanged';

  /**
   * Fired when connection is interrupted by unexpected reason.
   *
   * Parameters:
   * 1. object `connectionError` - connection error details. It has following properties:
   *     * boolean `terminal` - twilsock will stop connection attempts
   *     * string `message` - root cause
   *     * number? `httpStatusCode` - HTTP status code if available
   *     * number? `errorCode` - Twilio public error code if available
   * @example
   * ```typescript
   * syncClient.on('connectionError', (connectionError) => {
   *   console.error('Connection was interrupted:', connectionError.message);
   *   console.error('Is terminal:', connectionError.terminal);
   * });
   * ```
   * @event
   */
  static readonly connectionError = 'connectionError';

  /**
   * Fired when the access token is about to expire and needs to be updated.
   * The trigger takes place three minutes before the JWT access token expiry.
   * For long living applications, you should refresh the token when either
   * {@link SyncClient.tokenAboutToExpire | tokenAboutToExpire} or
   * {@link SyncClient.tokenExpired | tokenExpire} events occur; handling just
   * one of them is sufficient.
   * @example
   * The following example illustrates access token refresh.
   * ```typescript
   * syncClient.on('tokenAboutToExpire', () => {
   *   // Obtain a JWT access token: https://www.twilio.com/docs/sync/identity-and-access-tokens
   *   const token = '<your-access-token-here>';
   *   syncClient.updateToken(token);
   * });
   * ```
   * @event
   */
  static readonly tokenAboutToExpire = 'tokenAboutToExpire';

  /**
   * Fired when the access token is expired.
   * In case the token is not refreshed, all subsequent Sync operations will fail and the client will disconnect.
   * For long living applications, you should refresh the token when either
   * {@link SyncClient.tokenAboutToExpire | tokenAboutToExpire} or
   * {@link SyncClient.tokenExpired | tokenExpire} events occur; handling just
   * one of them is sufficient.
   * @event
   */
  static readonly tokenExpired = 'tokenExpired';

  static populateInitRegistrations(reg: InitRegistration) {
    reg.populateInitRegistrations([SYNC_NOTIFICATION_TYPE, SYNC_DOCUMENT_NOTIFICATION_TYPE,
      SYNC_LIST_NOTIFICATION_TYPE, SYNC_MAP_NOTIFICATION_TYPE]);
  }

  /**
   * Entry point for all the incoming messages (Router).
   *
   * @param type Type of the incoming message
   * @param message Message to route
   * @internal
   */
  _routeMessage(type: string, message: any) {
    log.trace('Notification type:', type, 'content:', message);
    switch (type) {
      case SYNC_DOCUMENT_NOTIFICATION_TYPE:
      case SYNC_LIST_NOTIFICATION_TYPE:
      case SYNC_MAP_NOTIFICATION_TYPE:
        this.services.subscriptions.acceptMessage(message, false);
        break;
      case SYNC_NOTIFICATION_TYPE:
        this.services.subscriptions.acceptMessage(message, true);
        break;
    }
  }

  /**
   * Subscribe for events (Router)
   *
   * @internal
   */
  _subscribe(sid: string, entity: any) {
    this.services.subscriptions.add(sid, entity);
  }

  /**
   * Unsubscribe from events (Router)
   *
   * @internal
   */
  _unsubscribe(sid: string) {
    this.services.subscriptions.remove(sid);
  }

  /**
   * Current version of the Sync client.
   */
  static get version(): string {
    return SDK_VERSION;
  }

  /**
   * Current service connection state.
   */
  get connectionState(): ConnectionState {
    return this.services.twilsock.state;
  }

  /**
   * Returns a promise which resolves when library is correctly initialized
   * Or throws if initialization is impossible
   *
   * @internal
   */
  private async ensureReady() {
    if (!this.services.config.sessionStorageEnabled) {
      return;
    }

    try {
      let storageSettings = await this.services.twilsock.storageId();
      this.services.storage.updateStorageId(storageSettings.id);
    } catch (e) {
      log.warn('Failed to initialize storage', e);
    }
  }

  private storeRootInSessionCache(type: string, id: string, value: Object) {
    // can't store without id
    if (!this.services.config.sessionStorageEnabled || !id) {
      return;
    }

    let valueToStore = deepClone(value);
    if (type === SyncList.type || type === SyncMap.type) {
      valueToStore['last_event_id'] = null;
      delete valueToStore['items'];
    }
    this.services.storage.store(type, id, valueToStore);
  }

  private readRootFromSessionCache(type: string, id: string): Object {
    if (!this.services.config.sessionStorageEnabled || !id) {
      return null;
    }
    return this.services.storage.read(type, id);
  }

  private async _get(baseUri: string, id: string, optimistic: boolean = false) {
    if (!id) {
      throw new SyncError(`Cannot get entity without id`, 404);
    }
    const uri = new UriBuilder(baseUri).pathSegment(id)
      .queryParam('Include', optimistic ? 'items' : undefined).build();
    let response = await this.services.network.get(uri);
    return response.body;
  }

  private _createDocument(id: string, data?: Object, ttl?: number) {
    const requestBody: any = {
      unique_name: id,
      data: data || {}
    };

    if (ttl !== undefined) {
      requestBody.ttl = ttl;
    }

    return this.services.network.post(this.services.config.documentsUri, requestBody)
      .then(response => {
        response.body.data = requestBody.data;
        return response.body;
      });
  }

  private async _getDocument(id: string): Promise<DocumentDescriptor> {
    return (this.readRootFromSessionCache(SyncDocument.type, id) || this._get(this.services.config.documentsUri, id)) as Promise<DocumentDescriptor>;
  }

  private _createList(id: string, purpose?: string, context?: json, ttl?: number) {
    const requestBody: any = {
      unique_name: id,
      purpose: purpose,
      context: context
    };

    if (ttl !== undefined) {
      requestBody.ttl = ttl;
    }

    return this.services.network.post(this.services.config.listsUri, requestBody).then(response => response.body);
  }

  private async _getList(id: string): Promise<SyncListDescriptor> {
    return (this.readRootFromSessionCache(SyncList.type, id) || this._get(this.services.config.listsUri, id)) as Promise<SyncListDescriptor>;
  }

  private _createMap(id, ttl?: number) {
    const requestBody: any = {
      unique_name: id
    };

    if (ttl !== undefined) {
      requestBody.ttl = ttl;
    }

    return this.services.network.post(this.services.config.mapsUri, requestBody).then(response => response.body);
  }

  private async _getMap(id: string, optimistic: boolean = false): Promise<SyncMapDescriptor> {
    return (this.readRootFromSessionCache(SyncMap.type, id) || this._get(this.services.config.mapsUri, id, optimistic)) as Promise<SyncMapDescriptor>;
  }

  private async _getStream(id: string): Promise<StreamDescriptor> {
    return (this.readRootFromSessionCache(SyncStream.type, id) || this._get(this.services.config.streamsUri, id, false)) as Promise<StreamDescriptor>;
  }

  private async _createStream(id, ttl?: number): Promise<StreamDescriptor> {
    const requestBody: any = {
      unique_name: id
    };

    if (ttl !== undefined) {
      requestBody.ttl = ttl;
    }

    const response = await this.services.network.post(this.services.config.streamsUri, requestBody);

    return response.body;
  }

  private _getLiveQuery(sid: string): LiveQueryDescriptor {
    return this.readRootFromSessionCache(LiveQuery.type, sid) as LiveQueryDescriptor;
  }

  private getCached<T extends SyncEntity>(id: string, type: string): T {
    if (id) {
      return this.entities.get(id, type) as T || null;
    }
    return null;
  }

  private removeFromCacheAndSession(type: string, sid: string, uniqueName: string) {
    this.entities.remove(sid);
    if (this.services.config.sessionStorageEnabled) {
      this.services.storage.remove(type, sid, uniqueName);
    }
  }

  /**
   * Read or create a Sync document.
   * @param arg Could be any of the following:
   * * Unique name or SID identifying the Sync document - opens the document with the given identifier or creates one if it does not exist.
   * * none - creates a new document with a randomly assigned SID and no unique name.
   * * {@link OpenDocumentOptions} object for more granular control.
   * @return A promise which resolves after the document is successfully read (or created).
   * This promise may reject if the document could not be created or if this endpoint lacks the necessary permissions to access it.
   * @example
   * ```typescript
   * syncClient.document('MyDocument')
   *   .then((document) => {
   *     console.log('Successfully opened a document. SID:', document.sid);
   *     document.on('updated', (event) => {
   *       console.log('Received an "updated" event: ', event);
   *     });
   *   })
   *   .catch((error) => {
   *     console.error('Unexpected error', error);
   *   });
   * ```
   */
  @validateTypesAsync([
    'undefined',
    'string',
    objectSchema('open document options', {
      id: ['string', 'undefined'],
      mode: [literal('open_or_create', 'open_existing', 'create_new'), 'undefined'],
      ttl: [nonNegativeInteger, 'undefined'],
      data: [pureObject, 'undefined', literal(null)]
    })
  ])
  public async document(arg?: string | OpenDocumentOptions): Promise<SyncDocument> {
    await this.ensureReady();
    let opts: OpenDocumentOptions = decompose(arg);

    let docDescriptor: DocumentDescriptor;
    if (opts.mode === 'create_new') {
      docDescriptor = await this._createDocument(opts.id, opts.data, opts.ttl);
    } else {
      let docFromInMemoryCache = this.getCached<SyncDocumentImpl>(opts.id, SyncDocument.type);
      if (docFromInMemoryCache) {
        return new SyncDocument(docFromInMemoryCache);
      } else {
        try {
          docDescriptor = await this._getDocument(opts.id);
        } catch (err) {
          if (err.status !== 404 || opts.mode === 'open_existing') {
            throw err;
          } else {
            try {
              docDescriptor = await this._createDocument(opts.id, opts.data, opts.ttl);
            } catch (err) {
              if (err.status === 409) {
                return this.document(arg);
              } else {
                throw err;
              }
            }
          }
        }
      }
    }
    this.storeRootInSessionCache(SyncDocument.type, opts.id, docDescriptor);
    let syncDocumentImpl = new SyncDocumentImpl(this.services, docDescriptor,
      (type, sid, uniqueName) => this.removeFromCacheAndSession(type, sid, uniqueName));
    syncDocumentImpl = this.entities.store(syncDocumentImpl);
    return new SyncDocument(syncDocumentImpl);
  }

  /**
   * Read or create a Sync map.
   * @param arg Could be any of the following:
   * * Unique name or SID identifying the Sync map - opens the map with the given identifier or creates one if it does not exist.
   * * none - creates a new map with a randomly assigned SID and no unique name.
   * * {@link OpenMapOptions} object for more granular control.
   * @return A promise which resolves after the map is successfully read (or created).
   * This promise may reject if the map could not be created or if this endpoint lacks the necessary permissions to access it.
   * @example
   * ```typescript
   * syncClient.map('MyMap')
   *   .then((map) => {
   *     console.log('Successfully opened a map. SID:', map.sid);
   *     map.on('itemUpdated', (event) => {
   *       console.log('Received an "itemUpdated" event:', event);
   *     });
   *   })
   *   .catch((error) => {
   *     console.error('Unexpected error', error);
   *   });
   * ```
   */
  @validateTypesAsync([
    'undefined',
    'string',
    objectSchema('open map options', {
      id: ['string', 'undefined'],
      mode: [literal('open_or_create', 'open_existing', 'create_new'), 'undefined'],
      ttl: [nonNegativeInteger, 'undefined'],
      data: [pureObject, 'undefined', literal(null)],
      includeItems: ['boolean', 'undefined']
    })
  ])
  public async map(arg?: string | OpenMapOptions): Promise<SyncMap> {
    await this.ensureReady();
    let opts: OpenMapOptions = decompose(arg);

    let mapDescriptor: SyncMapDescriptor;
    if (opts.mode === 'create_new') {
      mapDescriptor = await this._createMap(opts.id, opts.ttl);
    } else {
      let mapFromInMemoryCache = this.getCached<SyncMapImpl>(opts.id, SyncMap.type);
      if (mapFromInMemoryCache) {
        return new SyncMap(mapFromInMemoryCache);
      } else {
        try {
          mapDescriptor = await this._getMap(opts.id, opts.includeItems);
        } catch (err) {
          if (err.status !== 404 || opts.mode === 'open_existing') {
            throw err;
          } else {
            try {
              mapDescriptor = await this._createMap(opts.id, opts.ttl);
            } catch (err) {
              if (err.status === 409) {
                return this.map(arg);
              } else {
                throw err;
              }
            }
          }
        }
      }
    }
    this.storeRootInSessionCache(SyncMap.type, opts.id, mapDescriptor);
    let syncMapImpl = new SyncMapImpl(this.services, mapDescriptor,
      (type, sid, uniqueName) => this.removeFromCacheAndSession(type, sid, uniqueName));
    syncMapImpl = this.entities.store(syncMapImpl);
    return new SyncMap(syncMapImpl);
  }

  /**
   * Read or create a Sync list.
   * @param arg Could be any of the following:
   * * Unique name or SID identifying a Sync list - opens the list with the given identifier or creates one if it does not exist.
   * * none - creates a new list with a randomly assigned SID and no unique name.
   * * {@link OpenListOptions} object for more granular control.
   * @return A promise which resolves after the list is successfully read (or created).
   * This promise may reject if the list could not be created or if this endpoint lacks the necessary permissions to access it.
   * @example
   * ```typescript
   * syncClient.list('MyList')
   *   .then((list) => {
   *     console.log('Successfully opened a List. SID:', list.sid);
   *     list.on('itemAdded', (event) => {
   *       console.log('Received an "itemAdded" event:', event);
   *     });
   *   })
   *   .catch((error) => {
   *     console.error('Unexpected error', error);
   *   });
   * ```
   */
  @validateTypesAsync([
    'undefined',
    'string',
    objectSchema('open list options', {
      id: ['string', 'undefined'],
      mode: [literal('open_or_create', 'open_existing', 'create_new'), 'undefined'],
      ttl: [nonNegativeInteger, 'undefined'],
      data: [pureObject, 'undefined', literal(null)],
      purpose: ['string', 'undefined'],
      context: [pureObject, 'undefined'],
      includeItems: ['boolean', 'undefined']
    })
  ])
  public async list(arg?: string | OpenListOptions): Promise<SyncList> {
    await this.ensureReady();
    let opts: OpenListOptions = decompose(arg);

    let listDescriptor: SyncListDescriptor;
    if (opts.mode === 'create_new') {
      listDescriptor = await this._createList(opts.id, opts.purpose, opts.context, opts.ttl);
    } else {
      let listFromInMemoryCache = this.getCached<SyncListImpl>(opts.id, SyncList.type);
      if (listFromInMemoryCache) {
        return new SyncList(listFromInMemoryCache);
      } else {
        try {
          listDescriptor = await this._getList(opts.id);
        } catch (err) {
          if (err.status !== 404 || opts.mode === 'open_existing') {
            throw err;
          } else {
            try {
              listDescriptor = await this._createList(opts.id, opts.purpose, opts.context, opts.ttl);
            } catch (err) {
              if (err.status === 409) {
                return this.list(arg);
              } else {
                throw err;
              }
            }
          }
        }
      }
    }
    this.storeRootInSessionCache(SyncList.type, opts.id, listDescriptor);
    let syncListImpl = new SyncListImpl(this.services, listDescriptor,
      (type, sid, uniqueName) => this.removeFromCacheAndSession(type, sid, uniqueName));
    syncListImpl = this.entities.store(syncListImpl);
    return new SyncList(syncListImpl);
  }

  /**
   * Read or create a Sync message stream.
   * @param arg Could be any of the following:
   * * Unique name or SID identifying a stream - opens the stream with the given identifier or creates one if it does not exist.
   * * none - creates a new stream with a randomly assigned SID and no unique name.
   * * {@link OpenStreamOptions} object for more granular control.
   * @return A promise which resolves after the stream is successfully read (or created).
   * The flow of messages will begin imminently (but not necessarily immediately) upon resolution.
   * This promise may reject if the stream could not be created or if this endpoint lacks the necessary permissions to access it.
   * @example
   * ```typescript
   * syncClient.stream('MyStream')
   *   .then((stream) => {
   *     console.log('Successfully opened a message stream. SID:', stream.sid);
   *     stream.on('messagePublished', (event) => {
   *       console.log('Received a "messagePublished" event:', event);
   *     });
   *   })
   *   .catch((error) => {
   *     console.error('Unexpected error', error);
   *   });
   * ```
   */
  @validateTypesAsync([
    'undefined',
    'string',
    objectSchema('open stream options', {
      id: ['string', 'undefined'],
      mode: [literal('open_or_create', 'open_existing', 'create_new'), 'undefined'],
      ttl: [nonNegativeInteger, 'undefined'],
      data: [pureObject, 'undefined', literal(null)]
    })
  ])
  public async stream(arg?: string | OpenStreamOptions): Promise<SyncStream> {
    await this.ensureReady();
    let opts: OpenStreamOptions = decompose(arg);

    let streamDescriptor: StreamDescriptor;
    if (opts.mode === 'create_new') {
      streamDescriptor = await this._createStream(opts.id, opts.ttl);
    } else {
      let streamFromInMemoryCache = this.getCached<SyncStreamImpl>(opts.id, SyncStream.type);
      if (streamFromInMemoryCache) {
        return new SyncStream(streamFromInMemoryCache);
      } else {
        try {
          streamDescriptor = await this._getStream(opts.id);
        } catch (err) {
          if (err.status !== 404 || opts.mode === 'open_existing') {
            throw err;
          } else {
            try {
              streamDescriptor = await this._createStream(opts.id, opts.ttl);
            } catch (err) {
              if (err.status === 409) {
                return this.stream(arg);
              } else {
                throw err;
              }
            }
          }
        }
      }
    }
    this.storeRootInSessionCache(SyncStream.type, opts.id, streamDescriptor);
    const streamRemovalHandler: RemovalHandler = (type, sid, uniqueName) => this.removeFromCacheAndSession(type, sid, uniqueName);
    let syncStreamImpl = new SyncStreamImpl(this.services, streamDescriptor, streamRemovalHandler);
    syncStreamImpl = this.entities.store(syncStreamImpl);
    return new SyncStream(syncStreamImpl);
  }

  /**
   * Gracefully shuts the Sync client down.
   */
  public async shutdown() {
    await this.services.subscriptions.shutdown();
    await this.services.twilsock.disconnect();
  }

  /**
   * Set the authentication token.
   * @param token New token to set.
   */
  @validateTypesAsync(nonEmptyString)
  public async updateToken(token: string): Promise<void> {
    return this.services.twilsock.updateToken(token)
      .catch((error) => {
        const status = error?.reply?.status;

        if (status?.code === 401 && status?.status === 'UNAUTHORIZED') {
          throw new SyncError('Updated token was rejected by server', 400, 51130);
        }

        throw error;
      });
  }

  /**
   * For Flex customers only. Establishes a long-running query against Flex data wherein the returned
   * result set is updated whenever new (or updated) records match the given expression. Updated results
   * are presented row-by-row according to the lifetime of the returned LiveQuery object.
   *
   * @param indexName Must specify one of the Flex data classes for which Live Queries are available.
   * @param queryExpression A query expression to be executed against the given data index.
   * Please review the [Live Query Language](https://www.twilio.com/docs/sync/live-query)
   * page for Sync client limits and a full list of operators currently supported in query expressions.
   *
   * @return A promise that resolves when the query has been successfully executed.
   * @example
   * ```typescript
   * syncClient.liveQuery('tr-worker', 'data.attributes.worker_name == "Bob"')
   *   .then((args) => {
   *      console.log('Subscribed to live data updates for worker Bob');
   *      const items = args.getItems();
   *      Object.entries(items).forEach(([key, value]) => {
   *        console.log('Search result item key:', key);
   *        console.log('Search result item value:', value);
   *      });
   *   })
   *   .catch((err) => {
   *      console.error('Error when subscribing to live updates for worker Bob', err);
   *   });
   * ```
   */
  @validateTypesAsync(nonEmptyString, 'string')
  public async liveQuery(indexName: string, queryExpression: string): Promise<LiveQuery> {
    await this.ensureReady();
    const queryUri = new UriBuilder(this.services.config.insightsUri)
      .pathSegment(indexName)
      .pathSegment('Items')
      .build();

    // send query to CDS to get server-generated sid and item list
    const response = await queryItems({
      network: this.services.network,
      uri: queryUri,
      queryString: queryExpression,
      type: LiveQuery.type
    });

    let liveQueryImpl: LiveQueryImpl = this.getCached<LiveQueryImpl>(response.query_id, LiveQuery.type);
    if (!liveQueryImpl) {
      let descriptor: LiveQueryDescriptor = this._getLiveQuery(response.query_id);

      if (!descriptor) {
        descriptor = {
          indexName,
          queryExpression,
          sid: response.query_id,
          queryUri,
          last_event_id: response.last_event_id
        } as LiveQueryDescriptor;
      }

      const liveQueryRemovalHandler: RemovalHandler = (type, sid, uniqueName) => this.removeFromCacheAndSession(type, sid, uniqueName);
      liveQueryImpl = new LiveQueryImpl(descriptor, this.services, liveQueryRemovalHandler, response.items);
    }

    this.storeRootInSessionCache(LiveQuery.type, response.query_id, liveQueryImpl.liveQueryDescriptor);
    liveQueryImpl = this.entities.store(liveQueryImpl);
    return new LiveQuery(liveQueryImpl);
  }

  /**
   * For Flex customers only. Creates a query object that can be used to issue one-time queries repeatedly
   * against the target index.
   *
   * @param indexName Must specify one of the Flex data classes for which live queries are available.
   * @return A promise which resolves after the instance of InstantQuery is successfully created.
   * @example
   * ```typescript
   * syncClient.instantQuery('tr-worker')
   *   .then((q) => {
   *     q.on('searchResult', (items) => {
   *       Object.entries(items).forEach(([key, value]) => {
   *         console.log('Search result item key:', key);
   *         console.log('Search result item value:', value);
   *       });
   *     });
   *   });
   * ```
   */
  @validateTypesAsync(nonEmptyString)
  public async instantQuery(indexName: string): Promise<InstantQuery> {
    await this.ensureReady();

    const liveQueryCreator: LiveQueryCreator = (indexName, queryExpression) => {
      return this.liveQuery(indexName, queryExpression);
    };

    return new InstantQuery({
      indexName,
      network: this.services.network,
      insightsUri: this.services.config.insightsUri,
      liveQueryCreator
    });
  }
}

export {
  Client,
  SyncClientOptions,
  ConnectionState,
  OpenMode,
  json,
  OpenOptions,
  OpenDocumentOptions,
  OpenMapOptions,
  OpenListOptions,
  OpenStreamOptions
};
