Bases: ObjectService
Service to Query and Cancel Threads in TM1
Source code in TM1py/Services/SessionService.py
| def __init__(self, rest: RestService):
super().__init__(rest)
self.users = UserService(rest)
|
users = UserService(rest)
instance-attribute
close(session_id, **kwargs)
Source code in TM1py/Services/SessionService.py
| def close(self, session_id, **kwargs) -> Response:
url = format_url(f"/Sessions('{session_id}')/tm1.Close")
return self._rest.POST(url, **kwargs)
|
close_all(**kwargs)
Source code in TM1py/Services/SessionService.py
| @require_admin
def close_all(self, **kwargs) -> list:
current_user = self.users.get_current(**kwargs)
sessions = self.get_all(**kwargs)
closed_sessions = list()
for session in sessions:
if "User" not in session:
continue
if session["User"] is None:
continue
if "Name" not in session["User"]:
continue
if case_and_space_insensitive_equals(current_user.name, session["User"]["Name"]):
continue
self.close(session["ID"], **kwargs)
closed_sessions.append(session)
return closed_sessions
|
get_all(include_user=True, include_threads=True, **kwargs)
Source code in TM1py/Services/SessionService.py
| def get_all(self, include_user: bool = True, include_threads: bool = True, **kwargs) -> List:
url = "/Sessions"
if include_user or include_threads:
expands = list()
if include_user:
expands.append("User")
if include_threads:
expands.append("Threads")
url += "?$expand=" + ",".join(expands)
response = self._rest.GET(url, **kwargs)
return response.json()["value"]
|
get_current(**kwargs)
Source code in TM1py/Services/SessionService.py
| def get_current(self, **kwargs):
url = "/ActiveSession"
response = self._rest.GET(url, **kwargs)
return response.json()["value"]
|
get_threads_for_current(exclude_idle=True, **kwargs)
Source code in TM1py/Services/SessionService.py
| def get_threads_for_current(self, exclude_idle: bool = True, **kwargs):
url = "/ActiveSession/Threads?$filter=Function ne 'GET /ActiveSession/Threads'"
if exclude_idle:
url += " and State ne 'Idle'"
response = self._rest.GET(url, **kwargs)
return response.json()["value"]
|