Source code for controllers.utilities

from re import compile as recompile, sub
try:
	from Components.Converter.genre import getGenreStringLong
except ImportError:
# Fallback genre
	def getGenreStringLong(hn, ln):
		return ""

try:  # this is only for the testsuite
	from .defaults import DEBUG_ENABLED
except Exception:
	DEBUG_ENABLED = False

MANY_SLASHES_PATTERN = r'[\/]+'
MANY_SLASHES_REGEX = recompile(MANY_SLASHES_PATTERN)

PATTERN_ITEM_OR_KEY_ACCESS = r'^(?P<attr_name>[a-zA-Z][\w\d]*)' \
							r'\[((?P<index>\d+)|' \
							r'[\'\"](?P<key>[\s\w\d]+)[\'\"])\]$'
REGEX_ITEM_OR_KEY_ACCESS = recompile(PATTERN_ITEM_OR_KEY_ACCESS)

# stolen from enigma2_http_api ...
# https://wiki.neutrino-hd.de/wiki/Enigma:Services:Formatbeschreibung
# Dezimalwert: 1=TV, 2=Radio, 4=NVod, andere=Daten

SERVICE_TYPE_TV = 0x01
SERVICE_TYPE_RADIO = 0x02
SERVICE_TYPE_SD4 = 0x16
SERVICE_TYPE_HDTV = 0x19
SERVICE_TYPE_UHD = 0x1f
SERVICE_TYPE_OPT = 0xd3
SERVICE_TYPE_RADIOA = 0x0a

# type 1 = digital television service
# type 2 = digital radio sound service
# type 4 = nvod reference service (NYI)
# type 10 = advanced codec digital radio sound service
# type 17 = MPEG-2 HD digital television service
# type 22 = advanced codec SD digital television
# type 24 = advanced codec SD NVOD reference service (NYI)
# type 25 = advanced codec HD digital television
# type 27 = advanced codec HD NVOD reference service (NYI)


SERVICE_TYPE = {
	SERVICE_TYPE_TV: 'TV',
	SERVICE_TYPE_HDTV: 'HDTV',
	SERVICE_TYPE_RADIO: 'RADIO',
	SERVICE_TYPE_RADIOA: 'RADIO',
	SERVICE_TYPE_UHD: 'UHD',
	SERVICE_TYPE_SD4: 'SD4',
	SERVICE_TYPE_OPT: 'OPT',
}

SERVICE_TYPE_LOOKUP = {k: v for k, v in iter(SERVICE_TYPE.items())}

#: Namespace - DVB-C services
NS_DVB_C = 0xffff0000

#: Namespace - DVB-S services
# NS_DVB_S = 0x00c00000

#: Namespace - DVB-T services
NS_DVB_T = 0xeeee0000

#: Label:Namespace map
NS = {
	'DVB-C': NS_DVB_C,
	# 'DVB-S': NS_DVB_S,
	'DVB-T': NS_DVB_T,
}

#: Namespace:Label lookup map
NS_LOOKUP = {v: k for k, v in iter(NS.items())}


[docs] def lenient_decode(value, encoding=None): """ Decode an encoded string and convert it to an unicode string. Args: value: input value encoding: string encoding, defaults to utf-8 Returns: (unicode) decoded value >>> lenient_decode("Hallo") u'Hallo' >>> lenient_decode(u"Hallo") u'Hallo' >>> lenient_decode("HällöÜ") u'H\\xe4ll\\xf6\\xdc' """ if isinstance(value, str): return value if encoding is None: encoding = 'utf_8' return value.decode(encoding, 'ignore')
[docs] def lenient_force_utf_8(value): """ Args: value: input value Returns: (basestring) utf-8 encoded value >>> isinstance(lenient_force_utf_8(''), basestring) True >>> lenient_force_utf_8(u"Hallo") 'Hallo' >>> lenient_force_utf_8("HällöÜ") 'H\\xc3\\xa4ll\\xc3\\xb6\\xc3\\x9c' """ if isinstance(value, str): return value return lenient_decode(value).encode('utf_8')
[docs] def sanitise_filename_slashes(value): """ Args: value: input value Returns: value w/o multiple slashes >>> in_value = "///tmp/x/y/z" >>> expected = sub("^/+", "/", "///tmp/x/y/z") >>> sanitise_filename_slashes(in_value) == expected True """ return sub(MANY_SLASHES_REGEX, '/', value)
[docs] def get_config_attribute(path, root_obj, head=None): """ Determine attribute of *root_obj* to be accessed by *path* in a (somewhat) safe manner. This implementation will allow key and index based accessing too (e.g. ``config.some_list[0]`` or ``config.some_dict['some_key']``) The *path* value needs to start with *head* (default='config'). Args: path: character string specifying which attribute is to be accessed root_obj: An object whose attributes are to be accessed. head: Value of the first portion of *path* Returns: Attribute of *root_obj* Raises: ValueError: If *path* is invalid. AttributeError: If attribute cannot be accessed """ if head is None: head = 'config' portions = path.split('.') if len(portions) < 2: raise ValueError('Invalid path length') if portions[0] != head: raise ValueError( f'Head is {portions[0]!r}, expected {head!r}') current_obj = root_obj for attr_name in portions[1:]: if not attr_name: raise ValueError("empty attr_name") if attr_name.startswith('_'): raise ValueError('private member') matcher = REGEX_ITEM_OR_KEY_ACCESS.match(attr_name) if matcher: gdict = matcher.groupdict() attr_name = gdict.get('attr_name') next_obj = getattr(current_obj, attr_name) if gdict.get("index"): index = int(gdict.get("index")) current_obj = next_obj[index] else: key = gdict["key"] current_obj = next_obj[key] else: current_obj = getattr(current_obj, attr_name) return current_obj
[docs] def parse_servicereference(serviceref): """ Parse a Enigma2 style service reference string representation. :param serviceref: Enigma2 style service reference :type serviceref: string >>> sref = '1:0:1:300:7:85:00c00000:0:0:0:' >>> result = parse_servicereference(sref) >>> result {'service_type': 1, 'oid': 133, 'tsid': 7, 'ns': 12582912, 'sid': 768} >>> sref_g = create_servicereference(**result) >>> sref_g '1:0:1:300:7:85:00c00000:0:0:0:' >>> sref_g2 = create_servicereference(result) >>> sref_g2 '1:0:1:300:7:85:00c00000:0:0:0:' >>> sref == sref_g True >>> sref2 = '1:64:A:0:0:0:0:0:0:0::SKY Sport' >>> result2 = parse_servicereference(sref2) >>> result2 {'service_type': 10, 'oid': 0, 'tsid': 0, 'ns': 0, 'sid': 0} >>> sref3 = '1:0:0:0:0:0:0:0:0:0:/media/hdd/movie/20170921 2055 - DASDING - DASDING Sprechstunde - .ts' >>> result3 = parse_servicereference(sref3) >>> result3 {'service_type': 0, 'oid': 0, 'tsid': 0, 'ns': 0, 'sid': 0} """ parts = serviceref.split(":") sref_data = { 'service_type': int(parts[2], 16), 'sid': int(parts[3], 16), 'tsid': int(parts[4], 16), 'oid': int(parts[5], 16), 'ns': int(parts[6], 16) } return sref_data
[docs] def create_servicereference(*args, **kwargs): """ Generate a (Enigma2 style) service reference string representation. :param args[0]: Service Reference Parameter as dict :type args[0]: :class:`dict` :param service_type: Service Type :type service_type: int :param sid: SID :type sid: int :param tsid: TSID :type tsid: int :param oid: OID :type oid: int :param ns: Enigma2 Namespace :type ns: int """ if len(args) == 1 and isinstance(args[0], dict): kwargs = args[0] service_type = kwargs.get('service_type', 0) sid = kwargs.get('sid', 0) tsid = kwargs.get('tsid', 0) oid = kwargs.get('oid', 0) ns = kwargs.get('ns', 0) return f'{1:x}:0:{service_type:x}:{sid:x}:{tsid:x}:{oid:x}:{ns:08x}:0:0:0:'
def toBinary(s): if not isinstance(s, bytes): return s.encode(encoding='utf-8', errors='strict') return s def toString(s): if isinstance(s, bytes): try: return s.decode(encoding='utf-8', errors='strict') except UnicodeDecodeError: return s.decode(encoding='cp1252', errors='strict') return s def getUrlArg(request, key, default=None): k = toBinary(key) if k in list(request.args.keys()): return toString(request.args[k][0]) return default def getUrlArg2(args, key, default=None): k = toBinary(key) if k in list(args.keys()): return toString(args[k][0]) return default def removeBad(val): if val is not None: return val.replace('\x86', '').replace('\x87', '') return val def removeBad2(val): if val is not None: return val.replace('\x86', '').replace('\x87', '').replace('\xc2\x8a', '\n') return val def getEventInfoProvider(moviedb): exteventinfoproviders = { 'kinopoisk': { 'id': 'kinopoisk', 'name': 'KinoPoisk', # КиноПоиск 'url': 'https://www.kinopoisk.ru/index.php?kp_query=' }, 'csfd': { 'name': 'CSfd', # Česko-Slovenská filmová databáze 'url': 'https://www.csfd.cz/hledat/?q=' }, 'tvguideuk': { 'name': 'TV Guide UK', 'url': 'https://www.tvguide.co.uk/search.asp?title=' }, 'imdb': { 'name': 'IMDb', 'url': 'https://www.imdb.com/find?s=tt&q=' } } providerData = None try: providerId = moviedb.lower() providerData = exteventinfoproviders[providerId] providerData['id'] = providerId return providerData except KeyError: pass return providerData def error(text, context=""): if context: print(f"[OpenWebif] [{context}] Error: {text}") else: print(f"[OpenWebif] Error: {text}") def debug(text, context=""): if DEBUG_ENABLED: if context: print(f"[OpenWebif] [{context}] {text}") else: print(f"[OpenWebif] {text}") def e2simplexmlresult(result, resulttext): if not isinstance(resulttext, bytes): resulttext = resulttext.encode() return b'<?xml version="1.0" encoding="UTF-8" ?><e2simplexmlresult><e2state>%s</e2state><e2statetext>%s</e2statetext></e2simplexmlresult>' % (b"true" if result else b"false", resulttext) if __name__ == '__main__': import doctest (FAILED, SUCCEEDED) = doctest.testmod() print(f"[doctest] SUCCEEDED/FAILED: {SUCCEEDED:d}/{FAILED:d}")