110 lines
3.7 KiB
Python
Raw Normal View History

2021-09-08 16:17:37 +02:00
import requests
import json
import m3u8
from urllib3.exceptions import HTTPError
2021-09-08 16:17:37 +02:00
from typing import List
from .auth import login
2021-09-16 18:55:43 +02:00
from .consts import API, PLATFORM_ID, security_values
2021-09-08 16:17:37 +02:00
from .helpers import now_timestamp
from .types import Program
from .exceptions import RecordingException
2021-09-08 16:17:37 +02:00
class MatteBOX:
def __init__(
self, username: str, password: str, device_id: str, subscription_code: str
) -> None:
self.subscription_code = subscription_code
self.cookies = {
"access_token": login(username, password),
"deviceId": device_id,
}
def __get(self, endpoint: str, params: dict) -> dict:
2021-09-16 18:55:43 +02:00
res = requests.get(
API + endpoint, params=params, cookies=self.cookies, headers={
"X-NanguTv-Platform-Id": PLATFORM_ID,
"X-NanguTv-Device-size": "normal",
"X-NanguTv-Device-Name": security_values.device_name,
"X-NanguTv-App-Version": security_values.app_version,
"X-NanguTv-Device-density": security_values.device_density,
"User-Agent": security_values.user_agent,
}
)
2021-09-08 16:17:37 +02:00
if res.status_code >= 400:
raise requests.exceptions.BaseHTTPError(res.text)
return json.loads(res.text)
@property
def channels(self) -> list:
res = self.__get(
"/sws/subscription/settings/subscription-channels.json",
params={"deviceType": "STB"},
)
return list(res.keys())
def get_programs(self, channel: str) -> List[Program]:
ts_now = now_timestamp()
res = self.__get(
"/sws/server/tv/channel-programs.json",
params={
"channelKey": channel,
"fromTimestamp": ts_now - 172800000, # 48h ago
"toTimestamp": ts_now,
"language": "eng",
},
)
programs = [Program.from_channel_program(r) for r in res[::-1]]
return programs
@property
def recordings(self) -> List[Program]:
res = self.__get(
"/sws/subscription/vod/pvr-programs.json",
params={"entityCount": 0, "firstEntityOffset": 0, "language": "eng"},
)
recordings = [Program.from_recording(r) for r in res["entities"]]
return recordings
def add_recording(self, program: Program) -> None:
try:
self.__get(
"/sws/subscription/vod/pvr-add-program.json",
2021-09-16 18:17:09 +02:00
params={"epgId": program.epg_id},
)
except HTTPError as e:
raise RecordingException(e) from None
def remove_recording(self, program: Program) -> None:
try:
self.__get(
"/sws/subscription/vod/pvr-remove-program.json",
params={"pvrProgramId": program.content_id},
)
except HTTPError as e:
raise RecordingException(e) from None
def get_stream(self, program: Program, end: bool = True) -> str:
2021-09-08 16:17:37 +02:00
service_type = "TIMESHIFT_TV" if program.type == "program" else "NPVR"
if end:
ts_end = program.ts_stop
else:
ts_end = now_timestamp() - 60000 * 3 # 3 minutes ago
2021-09-08 16:17:37 +02:00
params = {
"channelKey": program.channel,
"contentId": program.content_id,
"serviceType": service_type,
"subscriptionCode": self.subscription_code,
"deviceType": "STB",
"fromTimestamp": program.ts_start,
"toTimestamp": ts_end,
2021-09-08 16:17:37 +02:00
}
res = self.__get("/sws/server/streaming/uris.json", params=params)
main_playlist_uri = res["uris"][0]["uri"]
variants = m3u8.load(main_playlist_uri)
playlist = variants.playlists[0]
return playlist.uri