Skip to content

SessionService

SessionService(rest)

Bases: ObjectService

Service to Query and Cancel Threads in TM1

Source code in TM1py/Services/SessionService.py
def __init__(self, rest: RestService):
    super().__init__(rest)
    self.users = UserService(rest)

users = UserService(rest) instance-attribute

close(session_id, **kwargs)

Source code in TM1py/Services/SessionService.py
def close(self, session_id, **kwargs) -> Response:
    url = format_url(f"/Sessions('{session_id}')/tm1.Close")
    return self._rest.POST(url, **kwargs)

close_all(**kwargs)

Source code in TM1py/Services/SessionService.py
@require_admin
def close_all(self, **kwargs) -> list:
    current_user = self.users.get_current(**kwargs)
    sessions = self.get_all(**kwargs)
    closed_sessions = list()
    for session in sessions:
        if "User" not in session:
            continue
        if session["User"] is None:
            continue
        if "Name" not in session["User"]:
            continue
        if case_and_space_insensitive_equals(current_user.name, session["User"]["Name"]):
            continue
        self.close(session["ID"], **kwargs)
        closed_sessions.append(session)
    return closed_sessions

get_all(include_user=True, include_threads=True, **kwargs)

Source code in TM1py/Services/SessionService.py
def get_all(self, include_user: bool = True, include_threads: bool = True, **kwargs) -> List:
    url = "/Sessions"
    if include_user or include_threads:
        expands = list()
        if include_user:
            expands.append("User")
        if include_threads:
            expands.append("Threads")
        url += "?$expand=" + ",".join(expands)

    response = self._rest.GET(url, **kwargs)
    return response.json()["value"]

get_current(**kwargs)

Source code in TM1py/Services/SessionService.py
def get_current(self, **kwargs):
    url = "/ActiveSession"

    response = self._rest.GET(url, **kwargs)
    return response.json()["value"]

get_threads_for_current(exclude_idle=True, **kwargs)

Source code in TM1py/Services/SessionService.py
def get_threads_for_current(self, exclude_idle: bool = True, **kwargs):
    url = "/ActiveSession/Threads?$filter=Function ne 'GET /ActiveSession/Threads'"
    if exclude_idle:
        url += " and State ne 'Idle'"

    response = self._rest.GET(url, **kwargs)
    return response.json()["value"]