Skip to content

RestService

AuthenticationMode

Bases: Enum

ACCESS_TOKEN = 9 class-attribute instance-attribute

BASIC = 1 class-attribute instance-attribute

BASIC_API_KEY = 8 class-attribute instance-attribute

CAM = 3 class-attribute instance-attribute

CAM_SSO = 4 class-attribute instance-attribute

IBM_CLOUD_API_KEY = 5 class-attribute instance-attribute

PA_PROXY = 7 class-attribute instance-attribute

SERVICE_TO_SERVICE = 6 class-attribute instance-attribute

WIA = 2 class-attribute instance-attribute

use_v12_auth property

BytesIOSocket(content)

used in urllib3_response_from_bytes method to construct urllib3 response from raw bytes

Source code in TM1py/Services/RestService.py
def __init__(self, content: bytes):
    self.handle = BytesIO(content)

handle = BytesIO(content) instance-attribute

makefile(mode)

Source code in TM1py/Services/RestService.py
def makefile(self, mode) -> BytesIO:
    return self.handle

RestService(**kwargs)

Low level communication with TM1 instance through HTTP. Allows to execute HTTP Methods - GET - POST - PATCH - DELETE Takes Care of - Encodings - TM1 User-Login - HTTP Headers - HTTP Session Management - Response Handling Based on requests module

Create an instance of RESTService

Supported kwargs arguments:

  • address (str): Address of the TM1 instance.
  • port (int): HTTPPortNumber as specified in the tm1s.cfg.
  • ssl (bool): Use SSL as specified in the tm1s.cfg.
  • instance (str): Planning Analytics Engine (v12) instance name.
  • database (str): Planning Analytics Engine (v12) database name.
  • base_url (str): Base URL.
  • auth_url (str): Auth URL for Planning Analytics Engine (v12).
  • user (str): Name of the user.
  • password (str): Password of the user.
  • decode_b64 (bool): Whether password argument is b64 encoded.
  • namespace (str): Optional CAM namespace.
  • cam_passport (str): The CAM passport.
  • session_id (str): TM1SessionId, e.g. q7O6e1w49AixeuLVxJ1GZg.
  • application_client_id (str): Planning Analytics Engine (v12) named application client ID created via manage service.
  • application_client_secret (str): Planning Analytics Engine (v12) named application secret created via manage service.
  • api_key (str): Planning Analytics Engine (v12) API Key from https://cloud.ibm.com/iam/apikeys.
  • iam_url (str): Planning Analytics Engine (v12) IBM Cloud IAM URL. Default: "https://iam.cloud.ibm.com".
  • pa_url (str): Planning Analytics Engine (v12) PA URL, e.g., "https://us-east-2.aws.planninganalytics.ibm.com".
  • cpd_url (str): Cloud Pack for Data URL (aka ZEN) CPD URL, e.g., "https://cpd-zen.apps.cp4dpa-test11.cp.fyre.ibm.com".
  • tenant (str): Planning Analytics Engine (v12) Tenant, e.g., YC4B2M1AG2Y6.
  • session_context (str): Name of the Application. Controls "Context" column in Arc / TM1top. If None, uses default: TM1py.
  • verify (str|bool): Path to .cer file or 'True' / True / 'False' / False (if no SSL verification is required).
  • logging (bool): Switch on/off verbose HTTP logging into sys.stdout.
  • timeout (float): Number of seconds that the client will wait to receive the first byte.
  • cancel_at_timeout (bool): Abort operation in TM1 when timeout is reached.
  • async_requests_mode (bool): Changes internal REST execution mode to avoid 60s timeout on IBM cloud.
  • connection_pool_size (int): Maximum number of connections to save in the pool (default: 10). In a multi-threaded environment, set higher.
  • pool_connections (int): Number of connection pools to cache (default: 1 for a single TM1 instance).
  • integrated_login (bool): True for IntegratedSecurityMode3.
  • integrated_login_domain (str): NT Domain name. Default: '.' for local account.
  • integrated_login_service (str): Kerberos Service type for remote Service Principal Name. Default: 'HTTP'.
  • integrated_login_host (str): Host name for Service Principal Name. Default: Extracted from request URI.
  • integrated_login_delegate (bool): Indicates that the user's credentials are to be delegated to the server. Default: False.
  • impersonate (str): Name of user to impersonate.
  • re_connect_on_session_timeout (bool): Attempt to reconnect once if session is timed out.
  • re_connect_on_remote_disconnect (bool): Attempt to reconnect once if connection is aborted by remote end.
  • remote_disconnect_max_retries (int): Maximum number of retry attempts after remote disconnect (default: 5).
  • remote_disconnect_retry_delay (float): Initial delay in seconds before first retry attempt (default: 1).
  • remote_disconnect_max_delay (float): Maximum delay cap in seconds between retry attempts (default: 30).
  • remote_disconnect_backoff_factor (float): Multiplier for exponential backoff between retry attempts (default: 2).
  • async_polling_initial_delay (float): Initial polling delay in seconds for async operations (default: 0.1).
  • async_polling_max_delay (float): Maximum polling delay cap in seconds for async operations (default: 1.0).
  • async_polling_backoff_factor (float): Multiplier for exponential backoff in async polling (default: 2).
  • proxies (dict): Dictionary with proxies, e.g. {'http': 'http://proxy.example.com:8080', 'https': 'http://secureproxy.example.com:8090'}.
  • ssl_context: User-defined SSL context.
  • cert (str|tuple): (Optional) If string, path to SSL client cert file (.pem). If tuple, ('cert', 'key') pair.

Parameters:

Name Type Description Default
kwargs

See description above for all supported arguments

{}
Source code in TM1py/Services/RestService.py
def __init__(self, **kwargs):
    """Create an instance of RESTService

    Supported kwargs arguments:

    - **address** (str): Address of the TM1 instance.
    - **port** (int): HTTPPortNumber as specified in the tm1s.cfg.
    - **ssl** (bool): Use SSL as specified in the tm1s.cfg.
    - **instance** (str): Planning Analytics Engine (v12) instance name.
    - **database** (str): Planning Analytics Engine (v12) database name.
    - **base_url** (str): Base URL.
    - **auth_url** (str): Auth URL for Planning Analytics Engine (v12).
    - **user** (str): Name of the user.
    - **password** (str): Password of the user.
    - **decode_b64** (bool): Whether password argument is b64 encoded.
    - **namespace** (str): Optional CAM namespace.
    - **cam_passport** (str): The CAM passport.
    - **session_id** (str): TM1SessionId, e.g. q7O6e1w49AixeuLVxJ1GZg.
    - **application_client_id** (str): Planning Analytics Engine (v12) named application client ID created via manage service.
    - **application_client_secret** (str): Planning Analytics Engine (v12) named application secret created via manage service.
    - **api_key** (str): Planning Analytics Engine (v12) API Key from https://cloud.ibm.com/iam/apikeys.
    - **iam_url** (str): Planning Analytics Engine (v12) IBM Cloud IAM URL. Default: "https://iam.cloud.ibm.com".
    - **pa_url** (str): Planning Analytics Engine (v12) PA URL, e.g., "https://us-east-2.aws.planninganalytics.ibm.com".
    - **cpd_url** (str): Cloud Pack for Data URL (aka ZEN) CPD URL, e.g., "https://cpd-zen.apps.cp4dpa-test11.cp.fyre.ibm.com".
    - **tenant** (str): Planning Analytics Engine (v12) Tenant, e.g., YC4B2M1AG2Y6.
    - **session_context** (str): Name of the Application. Controls "Context" column in Arc / TM1top. If None, uses default: TM1py.
    - **verify** (str|bool): Path to .cer file or 'True' / True / 'False' / False (if no SSL verification is required).
    - **logging** (bool): Switch on/off verbose HTTP logging into sys.stdout.
    - **timeout** (float): Number of seconds that the client will wait to receive the first byte.
    - **cancel_at_timeout** (bool): Abort operation in TM1 when timeout is reached.
    - **async_requests_mode** (bool): Changes internal REST execution mode to avoid 60s timeout on IBM cloud.
    - **connection_pool_size** (int): Maximum number of connections to save in the pool (default: 10). In a multi-threaded environment, set higher.
    - **pool_connections** (int): Number of connection pools to cache (default: 1 for a single TM1 instance).
    - **integrated_login** (bool): True for IntegratedSecurityMode3.
    - **integrated_login_domain** (str): NT Domain name. Default: '.' for local account.
    - **integrated_login_service** (str): Kerberos Service type for remote Service Principal Name. Default: 'HTTP'.
    - **integrated_login_host** (str): Host name for Service Principal Name. Default: Extracted from request URI.
    - **integrated_login_delegate** (bool): Indicates that the user's credentials are to be delegated to the server. Default: False.
    - **impersonate** (str): Name of user to impersonate.
    - **re_connect_on_session_timeout** (bool): Attempt to reconnect once if session is timed out.
    - **re_connect_on_remote_disconnect** (bool): Attempt to reconnect once if connection is aborted by remote end.
    - **remote_disconnect_max_retries** (int): Maximum number of retry attempts after remote disconnect (default: 5).
    - **remote_disconnect_retry_delay** (float): Initial delay in seconds before first retry attempt (default: 1).
    - **remote_disconnect_max_delay** (float): Maximum delay cap in seconds between retry attempts (default: 30).
    - **remote_disconnect_backoff_factor** (float): Multiplier for exponential backoff between retry attempts (default: 2).
    - **async_polling_initial_delay** (float): Initial polling delay in seconds for async operations (default: 0.1).
    - **async_polling_max_delay** (float): Maximum polling delay cap in seconds for async operations (default: 1.0).
    - **async_polling_backoff_factor** (float): Multiplier for exponential backoff in async polling (default: 2).
    - **proxies** (dict): Dictionary with proxies, e.g. {'http': 'http://proxy.example.com:8080', 'https': 'http://secureproxy.example.com:8090'}.
    - **ssl_context**: User-defined SSL context.
    - **cert** (str|tuple): (Optional) If string, path to SSL client cert file (.pem). If tuple, ('cert', 'key') pair.

    :param kwargs: See description above for all supported arguments	
    """
    # store kwargs for future use e.g. re_connect on 401 session timeout
    self._kwargs = kwargs

    # core arguments for connection
    self._ssl = self.translate_to_boolean(kwargs.get("ssl", True))
    self._address = kwargs.get("address", None)
    self._port = kwargs.get("port", None)
    self._base_url = kwargs.get("base_url", None)
    self._auth_url = kwargs.get("auth_url", None)
    self._instance = kwargs.get("instance", None)
    self._database = kwargs.get("database", None)
    self._api_key = kwargs.get("api_key", None)
    self._iam_url = kwargs.get("iam_url", None)
    self._pa_url = kwargs.get("pa_url", None)
    self._cpd_url = kwargs.get("cpd_url", None)
    self._tenant = kwargs.get("tenant", None)
    self._user = kwargs.get("user", kwargs.get("username", None))

    # other arguments
    self._auth_mode = self._determine_auth_mode()
    self._timeout = None if kwargs.get("timeout", None) is None else float(kwargs.get("timeout"))
    self._cancel_at_timeout = kwargs.get("cancel_at_timeout", False)
    self._async_requests_mode = self.translate_to_boolean(kwargs.get("async_requests_mode", False))
    self._connection_pool_size = int(kwargs.get("connection_pool_size", self.DEFAULT_CONNECTION_POOL_SIZE))
    self._pool_connections = int(kwargs.get("pool_connections", self.DEFAULT_POOL_CONNECTIONS))
    self._re_connect_on_session_timeout = kwargs.get("re_connect_on_session_timeout", True)
    self._re_connect_on_remote_disconnect = kwargs.get("re_connect_on_remote_disconnect", True)
    self._remote_disconnect_max_retries = int(kwargs.get("remote_disconnect_max_retries", 5))
    self._remote_disconnect_retry_delay = float(kwargs.get("remote_disconnect_retry_delay", 1))
    self._remote_disconnect_max_delay = float(kwargs.get("remote_disconnect_max_delay", 30))
    self._remote_disconnect_backoff_factor = float(kwargs.get("remote_disconnect_backoff_factor", 2))
    self._async_polling_initial_delay = float(kwargs.get("async_polling_initial_delay", 0.1))
    self._async_polling_max_delay = float(kwargs.get("async_polling_max_delay", 1.0))
    self._async_polling_backoff_factor = float(kwargs.get("async_polling_backoff_factor", 2))
    # is retrieved on demand and then cached
    self._sandboxing_disabled = None
    # optional verbose logging to stdout
    self.handle_logging(kwargs.get("logging", False))

    self._proxies = self._handle_proxies(kwargs.get("proxies", None))
    self._is_admin = None
    self._is_data_admin = None
    self._is_security_admin = None
    self._is_ops_admin = None
    self._ssl_context = kwargs.get("ssl_context", None)

    # populated later on the fly for users with the name different from 'Admin'
    if self._user and case_and_space_insensitive_equals(self._user, "ADMIN"):
        self._is_admin = True
        self._is_data_admin = True
        self._is_security_admin = True
        self._is_ops_admin = True

    self._verify = self._determine_verify(kwargs.get("verify", None))

    self._base_url, self._auth_url = self._construct_service_and_auth_root()

    self._version = None
    self._headers = self.HEADERS.copy()
    if "session_context" in kwargs:
        self._headers["TM1-SessionContext"] = kwargs["session_context"]

    self.disable_http_warnings()

    self._s = Session()
    self._manage_http_adapter()

    self._cert = kwargs.get("cert")
    self._s.cert = self._cert

    if self._proxies:
        self._s.proxies = self._proxies

    # First contact with TM1
    self.connect()
    if not self._version:
        self.set_version()

DEFAULT_CONNECTION_POOL_SIZE = 10 class-attribute instance-attribute

DEFAULT_POOL_CONNECTIONS = 1 class-attribute instance-attribute

HEADERS = {'Connection': 'keep-alive', 'User-Agent': 'TM1py', 'Content-Type': 'application/json; odata.streaming=true; charset=utf-8', 'Accept': 'application/json;odata.metadata=none,text/plain', 'TM1-SessionContext': 'TM1py'} class-attribute instance-attribute

is_admin property

is_data_admin property

is_ops_admin property

is_security_admin property

sandboxing_disabled property

session_id property

version property

DELETE(url, data='', headers=None, async_requests_mode=None, return_async_id=False, timeout=None, cancel_at_timeout=False, encoding='utf-8', idempotent=False, verify_response=True, **kwargs)

Perform a DELETE request against TM1 instance

Parameters:

Name Type Description Default
url str
required
data Union[str, bytes, BytesIO]

the payload

''
headers Dict

custom headers

None
async_requests_mode bool

changes internal REST execution mode to avoid 60s timeout on IBM cloud

None
return_async_id bool

If True function will return async_id after initiation and not await the execution

False
timeout float

Number of seconds that the client will wait to receive the first byte.

None
cancel_at_timeout bool

Abort operation in TM1 when timeout is reached

False
encoding str
'utf-8'

Returns:

Type Description

response object or async_id

Source code in TM1py/Services/RestService.py
def DELETE(
    self,
    url: str,
    data: Union[str, bytes, BytesIO] = "",
    headers: Dict = None,
    async_requests_mode: bool = None,
    return_async_id: bool = False,
    timeout: float = None,
    cancel_at_timeout: bool = False,
    encoding: str = "utf-8",
    idempotent: bool = False,
    verify_response: bool = True,
    **kwargs,
):
    """Perform a DELETE request against TM1 instance
    :param url:
    :param data: the payload
    :param headers: custom headers
    :param async_requests_mode: changes internal REST execution mode to avoid 60s timeout on IBM cloud
    :param return_async_id: If True function will return async_id after initiation and not await the execution
    :param timeout: Number of seconds that the client will wait to receive the first byte.
    :param cancel_at_timeout: Abort operation in TM1 when timeout is reached
    :param encoding:
    :return: response object or async_id
    """

    return self.request(
        method="delete",
        headers={**self._headers, **headers} if headers else dict(self._headers),
        url=url,
        data=data,
        async_requests_mode=async_requests_mode,
        return_async_id=return_async_id,
        timeout=timeout if timeout else self._timeout,
        cancel_at_timeout=cancel_at_timeout,
        encoding=encoding,
        idempotent=idempotent,
        verify_response=verify_response,
    )

GET(url, data='', headers=None, async_requests_mode=None, return_async_id=False, timeout=None, cancel_at_timeout=False, encoding='utf-8', idempotent=True, verify_response=True, **kwargs)

Perform a GET request against TM1 instance

Parameters:

Name Type Description Default
url str
required
data Union[str, bytes, BytesIO]

the payload

''
headers Dict

custom headers

None
async_requests_mode bool

changes internal REST execution mode to avoid 60s timeout on IBM cloud

None
return_async_id bool

If True function will return async_id after initiation and not await the execution

False
timeout float

Number of seconds that the client will wait to receive the first byte.

None
cancel_at_timeout bool

Abort operation in TM1 when timeout is reached

False
encoding str
'utf-8'

Returns:

Type Description

response object or async_id

Source code in TM1py/Services/RestService.py
def GET(
    self,
    url: str,
    data: Union[str, bytes, BytesIO] = "",
    headers: Dict = None,
    async_requests_mode: bool = None,
    return_async_id: bool = False,
    timeout: float = None,
    cancel_at_timeout: bool = False,
    encoding: str = "utf-8",
    idempotent: bool = True,
    verify_response: bool = True,
    **kwargs,
):
    """Perform a GET request against TM1 instance
    :param url:
    :param data: the payload
    :param headers: custom headers
    :param async_requests_mode: changes internal REST execution mode to avoid 60s timeout on IBM cloud
    :param return_async_id: If True function will return async_id after initiation and not await the execution
    :param timeout: Number of seconds that the client will wait to receive the first byte.
    :param cancel_at_timeout: Abort operation in TM1 when timeout is reached
    :param encoding:
    :return: response object or async_id
    """

    return self.request(
        method="get",
        headers={**self._headers, **headers} if headers else dict(self._headers),
        url=url,
        data=data,
        async_requests_mode=async_requests_mode,
        return_async_id=return_async_id,
        timeout=timeout if timeout else self._timeout,
        cancel_at_timeout=cancel_at_timeout,
        encoding=encoding,
        idempotent=idempotent,
        verify_response=verify_response,
    )

PATCH(url, data='', headers=None, async_requests_mode=None, return_async_id=False, timeout=None, cancel_at_timeout=False, encoding='utf-8', idempotent=False, verify_response=True, **kwargs)

Perform a PATCH request against TM1 instance

Parameters:

Name Type Description Default
url str
required
data Union[str, bytes, BytesIO]

the payload

''
headers Dict

custom headers

None
async_requests_mode bool

changes internal REST execution mode to avoid 60s timeout on IBM cloud

None
return_async_id bool

If True function will return async_id after initiation and not await the execution

False
timeout float

Number of seconds that the client will wait to receive the first byte.

None
cancel_at_timeout bool

Abort operation in TM1 when timeout is reached

False
encoding str
'utf-8'

Returns:

Type Description

response object or async_id

Source code in TM1py/Services/RestService.py
def PATCH(
    self,
    url: str,
    data: Union[str, bytes, BytesIO] = "",
    headers: Dict = None,
    async_requests_mode: bool = None,
    return_async_id: bool = False,
    timeout: float = None,
    cancel_at_timeout: bool = False,
    encoding: str = "utf-8",
    idempotent: bool = False,
    verify_response: bool = True,
    **kwargs,
):
    """Perform a PATCH request against TM1 instance
    :param url:
    :param data: the payload
    :param headers: custom headers
    :param async_requests_mode: changes internal REST execution mode to avoid 60s timeout on IBM cloud
    :param return_async_id: If True function will return async_id after initiation and not await the execution
    :param timeout: Number of seconds that the client will wait to receive the first byte.
    :param cancel_at_timeout: Abort operation in TM1 when timeout is reached
    :param encoding:
    :return: response object or async_id
    """

    return self.request(
        method="patch",
        headers={**self._headers, **headers} if headers else dict(self._headers),
        url=url,
        data=data,
        async_requests_mode=async_requests_mode,
        return_async_id=return_async_id,
        timeout=timeout if timeout else self._timeout,
        cancel_at_timeout=cancel_at_timeout,
        encoding=encoding,
        idempotent=idempotent,
        verify_response=verify_response,
    )

POST(url, data='', headers=None, async_requests_mode=None, return_async_id=False, timeout=None, cancel_at_timeout=False, encoding='utf-8', idempotent=False, verify_response=True, **kwargs)

Perform a POST request against TM1 instance

Parameters:

Name Type Description Default
url str
required
data Union[str, bytes, BytesIO]

the payload

''
headers Dict

custom headers

None
async_requests_mode bool

changes internal REST execution mode to avoid 60s timeout on IBM cloud

None
return_async_id bool

If True function will return async_id after initiation and not await the execution

False
timeout float

Number of seconds that the client will wait to receive the first byte.

None
cancel_at_timeout bool

Abort operation in TM1 when timeout is reached

False
encoding str
'utf-8'

Returns:

Type Description

response object or async_id

Source code in TM1py/Services/RestService.py
def POST(
    self,
    url: str,
    data: Union[str, bytes, BytesIO] = "",
    headers: Dict = None,
    async_requests_mode: bool = None,
    return_async_id: bool = False,
    timeout: float = None,
    cancel_at_timeout: bool = False,
    encoding: str = "utf-8",
    idempotent: bool = False,
    verify_response: bool = True,
    **kwargs,
):
    """Perform a POST request against TM1 instance
    :param url:
    :param data: the payload
    :param headers: custom headers
    :param async_requests_mode: changes internal REST execution mode to avoid 60s timeout on IBM cloud
    :param return_async_id: If True function will return async_id after initiation and not await the execution
    :param timeout: Number of seconds that the client will wait to receive the first byte.
    :param cancel_at_timeout: Abort operation in TM1 when timeout is reached
    :param encoding:
    :return: response object or async_id
    """

    response = self.request(
        method="post",
        headers={**self._headers, **headers} if headers else dict(self._headers),
        url=url,
        data=data,
        async_requests_mode=async_requests_mode,
        return_async_id=return_async_id,
        timeout=timeout if timeout else self._timeout,
        cancel_at_timeout=cancel_at_timeout,
        encoding=encoding,
        idempotent=idempotent,
        verify_response=verify_response,
    )

    return response

PUT(url, data='', headers=None, async_requests_mode=None, return_async_id=False, timeout=None, cancel_at_timeout=False, encoding='utf-8', idempotent=False, verify_response=True, **kwargs)

Perform a PUT request against TM1 instance

Parameters:

Name Type Description Default
url str
required
data Union[str, bytes, BytesIO]

the payload

''
headers Dict

custom headers

None
async_requests_mode bool

changes internal REST execution mode to avoid 60s timeout on IBM cloud

None
return_async_id bool

If True function will return async_id after initiation and not await the execution

False
timeout float

Number of seconds that the client will wait to receive the first byte.

None
cancel_at_timeout bool

Abort operation in TM1 when timeout is reached

False
encoding str
'utf-8'

Returns:

Type Description

response object or async_id

Source code in TM1py/Services/RestService.py
def PUT(
    self,
    url: str,
    data: Union[str, bytes, BytesIO] = "",
    headers: Dict = None,
    async_requests_mode: bool = None,
    return_async_id: bool = False,
    timeout: float = None,
    cancel_at_timeout: bool = False,
    encoding: str = "utf-8",
    idempotent: bool = False,
    verify_response: bool = True,
    **kwargs,
):
    """Perform a PUT request against TM1 instance
    :param url:
    :param data: the payload
    :param headers: custom headers
    :param async_requests_mode: changes internal REST execution mode to avoid 60s timeout on IBM cloud
    :param return_async_id: If True function will return async_id after initiation and not await the execution
    :param timeout: Number of seconds that the client will wait to receive the first byte.
    :param cancel_at_timeout: Abort operation in TM1 when timeout is reached
    :param encoding:
    :return: response object or async_id
    """

    return self.request(
        method="put",
        headers={**self._headers, **headers} if headers else dict(self._headers),
        url=url,
        data=data,
        async_requests_mode=async_requests_mode,
        return_async_id=return_async_id,
        timeout=timeout if timeout else self._timeout,
        cancel_at_timeout=cancel_at_timeout,
        encoding=encoding,
        idempotent=idempotent,
        verify_response=verify_response,
    )

__enter__()

Source code in TM1py/Services/RestService.py
def __enter__(self):
    return self

__exit__(exception_type, exception_value, traceback)

Source code in TM1py/Services/RestService.py
def __exit__(self, exception_type, exception_value, traceback):
    try:
        self.logout()
    except Exception as e:
        warnings.warn(f"Logout Failed due to Exception: {e}")

add_compact_json_header()

Source code in TM1py/Services/RestService.py
def add_compact_json_header(self) -> str:
    original_header = self.get_http_header("Accept")
    parts = original_header.split(";")

    # Point of insertion is important. Needs to come after application/json
    parts.insert(1, "tm1.compact=v0")
    modified_header = ";".join(parts)
    self.add_http_header("Accept", modified_header)

    return original_header

add_http_header(key, value)

Source code in TM1py/Services/RestService.py
def add_http_header(self, key: str, value: str):
    self._headers[key] = value

b64_decode_password(encrypted_password) staticmethod

b64 decoding

Parameters:

Name Type Description Default
encrypted_password str

encrypted password with b64

required

Returns:

Type Description
str

password in plain text

Source code in TM1py/Services/RestService.py
@staticmethod
def b64_decode_password(encrypted_password: str) -> str:
    """b64 decoding
    :param encrypted_password: encrypted password with b64
    :return: password in plain text
    """
    return b64decode(encrypted_password).decode("UTF-8")

build_response_from_binary_response(data) staticmethod

Source code in TM1py/Services/RestService.py
@staticmethod
def build_response_from_binary_response(data: bytes) -> Response:
    urllib_response = RestService.urllib3_response_from_bytes(data)

    adapter = HTTPAdapter()
    requests_response = adapter.build_response(requests.PreparedRequest(), urllib_response)
    # actual content of response needs to be set explicitly
    requests_response._content = urllib_response.data

    return requests_response

cancel_async_operation(async_id, **kwargs)

Source code in TM1py/Services/RestService.py
def cancel_async_operation(self, async_id: str, **kwargs):
    url = f"/_async('{async_id}')"
    # Use DELETE method which includes reconnect logic, but force sync mode
    response = self.DELETE(url, async_requests_mode=False, **kwargs)
    self.verify_response(response)

cancel_running_operation()

Source code in TM1py/Services/RestService.py
def cancel_running_operation(self):
    monitoring_service = self.get_monitoring_service()
    threads = monitoring_service.get_active_session_threads(exclude_idle=True)

    # if more than one thread is running in session, operation can not be identified unambiguously
    if not len(threads) == 1:
        return

    monitoring_service.cancel_thread(threads[0]["ID"])

connect()

Source code in TM1py/Services/RestService.py
def connect(self):
    if "session_id" in self._kwargs:
        self._set_session_id_cookie()
    else:
        self._start_session(
            user=self._kwargs.get("user", None),
            password=self._kwargs.get("password", None),
            namespace=self._kwargs.get("namespace", None),
            gateway=self._kwargs.get("gateway", None),
            cam_passport=self._kwargs.get("cam_passport", None),
            decode_b64=self.translate_to_boolean(self._kwargs.get("decode_b64", False)),
            integrated_login=self.translate_to_boolean(self._kwargs.get("integrated_login", False)),
            integrated_login_domain=self._kwargs.get("integrated_login_domain"),
            integrated_login_service=self._kwargs.get("integrated_login_service"),
            integrated_login_host=self._kwargs.get("integrated_login_host"),
            integrated_login_delegate=self._kwargs.get("integrated_login_delegate"),
            impersonate=self._kwargs.get("impersonate", None),
            application_client_id=self._kwargs.get("application_client_id", None),
            application_client_secret=self._kwargs.get("application_client_secret", None),
        )

disable_http_warnings() staticmethod

Source code in TM1py/Services/RestService.py
@staticmethod
def disable_http_warnings():
    # disable HTTP verification warnings from requests library
    requests.packages.urllib3.disable_warnings()

get_api_metadata()

Get API Metadata

Returns:

Type Description
dict

Dictionary

Source code in TM1py/Services/RestService.py
def get_api_metadata(self) -> dict:
    """Get API Metadata

    :return: Dictionary
    """
    url = "/$metadata"
    metadata = self.GET(url=url).content.decode("utf-8")
    return json.loads(metadata)

get_http_header(key)

Source code in TM1py/Services/RestService.py
def get_http_header(self, key: str) -> str:
    return self._headers[key]

get_monitoring_service()

Source code in TM1py/Services/RestService.py
def get_monitoring_service(self):
    from TM1py.Services import MonitoringService

    return MonitoringService(self)

handle_logging(logging)

Source code in TM1py/Services/RestService.py
def handle_logging(self, logging: Union[str, bool]):
    if logging:
        if self.translate_to_boolean(value=logging):
            http_client.HTTPConnection.debuglevel = 1

is_connected()

Check if Connection to TM1 Server is established. :Returns: Boolean

Source code in TM1py/Services/RestService.py
def is_connected(self) -> bool:
    """Check if Connection to TM1 Server is established.
    :Returns:
        Boolean
    """
    try:
        self.GET("/Configuration/ServerName/$value")
        return True
    except Exception:
        return False

logout(timeout=None, **kwargs)

End TM1 Session and HTTP session

Source code in TM1py/Services/RestService.py
def logout(self, timeout: float = None, **kwargs):
    """End TM1 Session and HTTP session"""

    try:
        self.POST(
            "/ActiveSession/tm1.Close",
            "",
            headers={"Connection": "close"},
            timeout=timeout,
            async_requests_mode=False,
            **kwargs,
        )
    finally:
        self._s.close()

remove_http_header(key)

Source code in TM1py/Services/RestService.py
def remove_http_header(self, key: str):
    if key in self._headers:
        self._headers.pop(key)

request(method, url, data='', encoding='utf-8', async_requests_mode=None, return_async_id=False, timeout=None, cancel_at_timeout=False, idempotent=False, verify_response=True, **kwargs)

Execute a request to TM1 REST API

Source code in TM1py/Services/RestService.py
def request(
    self,
    method: str,
    url: str,
    data: str = "",
    encoding="utf-8",
    async_requests_mode: Optional[bool] = None,
    return_async_id=False,
    timeout: float = None,
    cancel_at_timeout: bool = False,
    idempotent: bool = False,
    verify_response: bool = True,
    **kwargs,
):
    """
    Execute a request to TM1 REST API
    """
    url, data = self._url_and_body(url=url, data=data, encoding=encoding)

    timeout = timeout if timeout else self._timeout

    try:
        # Determine async mode
        if return_async_id:
            async_requests_mode = True
        elif async_requests_mode is None:
            async_requests_mode = self._async_requests_mode

        # Execute request based on mode
        if not async_requests_mode:
            response = self._execute_sync_request(method=method, url=url, data=data, timeout=timeout, **kwargs)
        else:
            response = self._execute_async_request(
                method=method,
                url=url,
                data=data,
                timeout=timeout,
                cancel_at_timeout=cancel_at_timeout,
                return_async_id=return_async_id,
                **kwargs,
            )

        # If async_id is returned as string, return it directly
        if return_async_id and isinstance(response, str):
            return response

        # Verify and encode response
        if verify_response:
            self.verify_response(response=response)
        response.encoding = encoding
        return response

    except Timeout:
        if cancel_at_timeout or (cancel_at_timeout is None and self._cancel_at_timeout):
            self.cancel_running_operation()
        raise TM1pyTimeout(method=method, url=url, timeout=timeout)

    except ConnectionError as e:
        # Handle read timeout issue in requests library
        if re.search("Read timed out", str(e), re.IGNORECASE):
            if cancel_at_timeout or (cancel_at_timeout is None and self._cancel_at_timeout):
                self.cancel_running_operation()
            raise TM1pyTimeout(method=method, url=url, timeout=timeout)

        # Handle RemoteDisconnected errors
        elif re.search("RemoteDisconnected|Connection aborted", str(e), re.IGNORECASE):
            if self._re_connect_on_remote_disconnect:
                return self._handle_remote_disconnect(
                    e,
                    method,
                    url,
                    data,
                    timeout,
                    idempotent,
                    async_requests_mode,
                    cancel_at_timeout,
                    return_async_id,
                    encoding,
                    **kwargs,
                )
            else:
                raise e

        # Other connection errors
        raise e

retrieve_async_response(async_id, **kwargs)

Source code in TM1py/Services/RestService.py
def retrieve_async_response(self, async_id: str, **kwargs) -> Response:
    url = f"/_async('{async_id}')"
    # Use GET method which includes reconnect logic, but force sync mode
    return self.GET(url, async_requests_mode=False, **kwargs)

set_version()

Source code in TM1py/Services/RestService.py
def set_version(self):
    url = "/Configuration/ProductVersion/$value"
    response = self.GET(url=url)
    self._version = response.text

translate_to_boolean(value) staticmethod

Takes a boolean or string (eg. true, True, FALSE, etc.) value and returns (boolean) True or False

Parameters:

Name Type Description Default
value

True, 'true', 'false' or 'False' ...

required

Returns:

Type Description
bool
Source code in TM1py/Services/RestService.py
@staticmethod
def translate_to_boolean(value) -> bool:
    """Takes a boolean or string (eg. true, True, FALSE, etc.) value and returns (boolean) True or False
    :param value: True, 'true', 'false' or 'False' ...
    :return:
    """
    if isinstance(value, bool) or isinstance(value, int):
        return bool(value)
    elif isinstance(value, str):
        return value.replace(" ", "").lower() == "true"
    else:
        raise ValueError("Invalid argument: '" + value + "'. Must be to be of type 'bool' or 'str'")

urllib3_response_from_bytes(data) staticmethod

Build urllib3.HTTPResponse based on raw bytes string

Source code in TM1py/Services/RestService.py
@staticmethod
def urllib3_response_from_bytes(data: bytes) -> HTTPResponse:
    """Build urllib3.HTTPResponse based on raw bytes string"""
    sock = BytesIOSocket(data)

    response = HTTPResponse(sock)
    response.begin()

    headers = response.msg
    if not isinstance(headers, HTTPHeaderDict):
        headers = HTTPHeaderDict(headers.items())

    urllib3_http_response = urllib3.HTTPResponse(
        body=response,
        headers=headers,
        status=response.status,
        version=response.version,
        reason=response.reason,
        original_response=response,
    )
    return urllib3_http_response

verify_response(response) staticmethod

check if Status Code is OK :Parameters: response: String the response that is returned from a method call :Exceptions: TM1pyException, raises TM1pyException when Code is not 200, 204 etc.

Source code in TM1py/Services/RestService.py
@staticmethod
def verify_response(response: Response):
    """check if Status Code is OK
    :Parameters:
        `response`: String
            the response that is returned from a method call
    :Exceptions:
        TM1pyException, raises TM1pyException when Code is not 200, 204 etc.
    """
    if not response.ok:
        raise TM1pyRestException(
            response.text, status_code=response.status_code, reason=response.reason, headers=response.headers
        )

wait_time_generator(timeout)

Generate wait times for async polling with capped exponential backoff.

Uses configurable parameters: - async_polling_initial_delay: Starting delay - async_polling_max_delay: Maximum delay cap - async_polling_backoff_factor: Multiplier for each iteration

Default behavior (0.1s initial, 1.0s max, 2x factor) produces: 0.1s -> 0.2s -> 0.4s -> 0.8s -> 1.0s -> 1.0s -> ...

Source code in TM1py/Services/RestService.py
def wait_time_generator(self, timeout: float):
    """
    Generate wait times for async polling with capped exponential backoff.

    Uses configurable parameters:
    - async_polling_initial_delay: Starting delay
    - async_polling_max_delay: Maximum delay cap
    - async_polling_backoff_factor: Multiplier for each iteration

    Default behavior (0.1s initial, 1.0s max, 2x factor) produces:
    0.1s -> 0.2s -> 0.4s -> 0.8s -> 1.0s -> 1.0s -> ...
    """
    delay = self._async_polling_initial_delay
    elapsed = 0.0

    if timeout:
        while elapsed < timeout:
            yield delay
            elapsed += delay
            delay = min(delay * self._async_polling_backoff_factor, self._async_polling_max_delay)
    else:
        while True:
            yield delay
            delay = min(delay * self._async_polling_backoff_factor, self._async_polling_max_delay)