From daa84bab599d4fb038aec741c6b9e4fd6f5705dd Mon Sep 17 00:00:00 2001 From: Pawan Paudel Date: Tue, 20 Oct 2020 22:57:28 +0545 Subject: [PATCH] added docs string --- build/lib/m3u_parser/__init__.py | 1 - m3u_parser/__init__.py | 2 +- m3u_parser/helper.py | 30 ++- m3u_parser/m3u_parser.py | 318 ++++++++++++++++++++++--------- 4 files changed, 256 insertions(+), 95 deletions(-) delete mode 100644 build/lib/m3u_parser/__init__.py diff --git a/build/lib/m3u_parser/__init__.py b/build/lib/m3u_parser/__init__.py deleted file mode 100644 index b193d5f..0000000 --- a/build/lib/m3u_parser/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .m3u_parser import M3uParser \ No newline at end of file diff --git a/m3u_parser/__init__.py b/m3u_parser/__init__.py index b193d5f..1eef023 100644 --- a/m3u_parser/__init__.py +++ b/m3u_parser/__init__.py @@ -1 +1 @@ -from .m3u_parser import M3uParser \ No newline at end of file +from .m3u_parser import M3uParser diff --git a/m3u_parser/helper.py b/m3u_parser/helper.py index e717a54..a074041 100644 --- a/m3u_parser/helper.py +++ b/m3u_parser/helper.py @@ -1,5 +1,8 @@ +import asyncio +import csv import re + # check if the regex is present or not def is_present(regex, content): match = re.search(re.compile(regex, flags=re.IGNORECASE), content) @@ -48,13 +51,34 @@ def render_csv(header, data, out_path='output.csv'): for i in data: input.append(dict(i)) dict_writer.writerows(input) - return -# convert nested dictionary to csv + def ndict_to_csv(obj, output_path): + """Convert nested dictionary to csv + + :param obj: Stream information list + :type obj: list + :param output_path: Path to save the csv file. + :return: None + """ tree = get_tree(obj) if isinstance(obj, list): header = [i[0] for i in tree[0]] else: header = [i[0] for i in tree] - return render_csv(header, tree, output_path) \ No newline at end of file + render_csv(header, tree, output_path) + + +def run_until_completed(coros): + futures = [asyncio.ensure_future(c) for c in coros] + + async def first_to_finish(): + while True: + await asyncio.sleep(0) + for f in futures: + if f.done(): + futures.remove(f) + return f.result() + + while len(futures) > 0: + yield first_to_finish() diff --git a/m3u_parser/m3u_parser.py b/m3u_parser/m3u_parser.py index b6cde0e..c954992 100644 --- a/m3u_parser/m3u_parser.py +++ b/m3u_parser/m3u_parser.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import asyncio -import csv import json import logging import random @@ -10,74 +9,123 @@ import aiohttp import pycountry import requests -from urllib.parse import urlparse +import traceback +import time +from urllib.parse import urlparse, unquote -from helper import is_present, ndict_to_csv +from helper import is_present, ndict_to_csv, run_until_completed logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(levelname)s: %(message)s") class M3uParser: - def __init__(self, useragent, timeout=5): - self.streams_info = [] - self.lines = [] - self.timeout = timeout - self.headers = { - 'User-Agent': useragent + """A parser for m3u files. + + It parses the contents of m3u file to a list of streams information which can be saved as a JSON/CSV file. + + :Example + + >>> url = "/home/pawan/Downloads/ru.m3u" + >>> useragent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36" + >>> m3u_playlist = M3uParser(timeout=5, useragent=useragent) + >>> m3u_playlist.parse_m3u(url) + INFO: Started parsing m3u file... + >>> m3u_playlist.remove_by_extension('mp4') + >>> m3u_playlist.filter_by('status', 'GOOD') + >>> print(len(m3u_playlist.get_list())) + 4 + >> m3u_playlist.to_file('pawan.json') + INFO: Saving to file... + """ + + def __init__(self, useragent=None, timeout=5): + self.__streams_info = [] + self.__streams_info_backup = [] + self.__lines = [] + self.__timeout = timeout + self.__loop = None + self.__headers = { + 'User-Agent': useragent if useragent else "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, " + "like Gecko) Chrome/84.0.4147.89 Safari/537.36 " } - self.check_live = False - self.content = "" - self.url_regex = re.compile(r"^(?:(?:https?|ftp)://)?(?:(?!(?:10|127)(?:\.\d{1,3}){3})(?!(" - r"?:169\.254|192\.168)(?:\.\d{1,3}){2})(?!172\.(?:1[6-9]|2\d|3[0-1])(?:\.\d{1," - r"3}){2})(?:[1-9]\d?|1\d\d|2[01]\d|22[0-3])(?:\.(?:1?\d{1,2}|2[0-4]\d|25[0-5])){" - r"2}(?:\.(?:[1-9]\d?|1\d\d|2[0-4]\d|25[0-4]))|(?:(?:[a-z\u00a1-\uffff0-9]-*)*[" - r"a-z\u00a1-\uffff0-9]+)(?:\.(?:[a-z\u00a1-\uffff0-9]-*)*[" - r"a-z\u00a1-\uffff0-9]+)*(?:\.(?:[a-z\u00a1-\uffff]{2,})))(?::\d{2,5})?(?:/\S*)?$") - - # Download the file from the given url or use the local file path to get the content - def parse_m3u(self, url, check_live=False): - self.check_live = check_live - if urlparse(url).scheme != '' or re.search(self.url_regex, url): + self.__check_live = False + self.__content = "" + self.__url_regex = re.compile(r"^(?:(?:https?|ftp)://)?(?:(?!(?:10|127)(?:\.\d{1,3}){3})(?!(" + r"?:169\.254|192\.168)(?:\.\d{1,3}){2})(?!172\.(?:1[6-9]|2\d|3[0-1])(?:\.\d{1," + r"3}){2})(?:[1-9]\d?|1\d\d|2[01]\d|22[0-3])(?:\.(?:1?\d{1,2}|2[0-4]\d|25[0-5])){" + r"2}(?:\.(?:[1-9]\d?|1\d\d|2[0-4]\d|25[0-4]))|(?:(?:[a-z\u00a1-\uffff0-9]-*)*[" + r"a-z\u00a1-\uffff0-9]+)(?:\.(?:[a-z\u00a1-\uffff0-9]-*)*[" + r"a-z\u00a1-\uffff0-9]+)*(?:\.(?:[a-z\u00a1-\uffff]{2,})))(?::\d{2,5})?(?:/\S*)?$" + ) + + def parse_m3u(self, path, check_live=True): + """Parses the content of local file/URL. + + It downloads the file from the given url or use the local file path to get the content and parses line by line + to a structured format of streams information. + + :param path: Path can be a url or local filepath + :type path: str + :param check_live: To check if the stream links are working or not + :type check_live: bool + :rtype: None + + """ + self.__check_live = check_live + if urlparse(path).scheme != '' or re.search(self.__url_regex, path): + logging.info("Started parsing m3u link...") try: - self.content = requests.get(url).text + self.__content = requests.get(url).text except: logging.info("Cannot read anything from the url!!!") exit() else: + logging.info("Started parsing m3u file...") try: - with open(url, errors='ignore') as fp: - self.content = fp.read() + with open(unquote(path), errors='ignore') as fp: + self.__content = fp.read() except FileNotFoundError: logging.info("File doesn't exist!!!") exit() + # splitting contents into lines to parse them - self.lines = [line.strip('\n\r') for line in self.content.split("\n") if line.strip('\n\r') != ''] - if len(self.lines) > 0: + self.__lines = [line.strip('\n\r') for line in self.__content.split("\n") if line.strip('\n\r') != ''] + if len(self.__lines) > 0: self.__parse_lines() else: logging.info("No content to parse!!!") - # parse each lines and extract the streams information + @staticmethod + async def __run_until_completed(tasks): + for res in run_until_completed(tasks): + _ = await res + def __parse_lines(self): - num_lines = len(self.lines) + num_lines = len(self.__lines) + self.__loop = asyncio.get_event_loop() try: - loop = asyncio.get_event_loop() - f = asyncio.wait( - [self.__parse_line(line_num) for line_num in range(num_lines) if "#EXTINF" in self.lines[line_num]], return_when=asyncio.ALL_COMPLETED) - loop.run_until_complete(f) - except: - pass - finally: - loop.close() + coros = (self.__parse_line(line_num) for line_num in range(num_lines) if + "#EXTINF" in self.__lines[line_num]) + self.__loop.run_until_complete(self.__run_until_completed(coros)) + except BaseException as error: + print(str(error), str(traceback.format_exc())) + else: + self.__streams_info_backup = self.__lines.copy() + self.__loop.run_until_complete(asyncio.sleep(0)) + while self.__loop.is_running(): + time.sleep(0.3) + if not self.__loop.is_running(): + self.__loop.close() + break async def __parse_line(self, line_num): - line_info = self.lines[line_num] + line_info = self.__lines[line_num] stream_link = '' streams_link = [] try: for i in [1, 2]: - if self.lines[line_num + i] and re.search(self.url_regex, self.lines[line_num + i]): - streams_link.append(self.lines[line_num + i]) + if self.__lines[line_num + i] and re.search(self.__url_regex, self.__lines[line_num + i]): + streams_link.append(self.__lines[line_num + i]) break stream_link = streams_link[0] except IndexError: @@ -87,7 +135,7 @@ async def __parse_line(self, line_num): tvg_name = is_present(r"tvg-name=\"(.*?)\"", line_info) tvg_id = is_present(r"tvg-id=\"(.*?)\"", line_info) logo = is_present(r"tvg-logo=\"(.*?)\"", line_info) - group = is_present(r"group-title=\"(.*?)\"", line_info) + category = is_present(r"group-title=\"(.*?)\"", line_info) title = is_present("[,](?!.*[,])(.*?)$", line_info) country = is_present(r"tvg-country=\"(.*?)\"", line_info) language = is_present(r"tvg-language=\"(.*?)\"", line_info) @@ -97,12 +145,12 @@ async def __parse_line(self, line_num): country_name = country_obj.name if country_obj else '' language_code = language_obj.alpha_3 if language_obj else '' - timeout = aiohttp.ClientTimeout(total=self.timeout) + timeout = aiohttp.ClientTimeout(total=self.__timeout) status = 'BAD' - if self.check_live: + if self.__check_live: try: async with aiohttp.ClientSession() as session: - async with session.request('get', stream_link, headers=self.headers, + async with session.request('get', stream_link, headers=self.__headers, timeout=timeout) as response: if response.status == 200: status = 'GOOD' @@ -112,7 +160,7 @@ async def __parse_line(self, line_num): "name": title, "logo": logo, "url": stream_link, - "category": group, + "category": category, "language": { "code": language_code, "name": language, @@ -127,14 +175,28 @@ async def __parse_line(self, line_num): "url": tvg_url, } } - if self.check_live: + if self.__check_live: temp['status'] = status - self.streams_info.append(temp) + self.__streams_info.append(temp) except AttributeError: pass def filter_by(self, key, filters, retrieve=True, nested_key=False): - key_0, key_1 = ['']*2 + """Filter streams_info. + + It retrieves/removes stream information from streams_info list using filter/s on key. + + :param key: Key can be single or nested. eg. key='name', key='language-name' + :type key: str + :param filters: List of filter/s to perform the retrieve or remove operation. + :type filters: str or list + :param retrieve: True to retrieve and False for removing based on key. + :type retrieve: bool + :param nested_key: True/False for if the key is nested or not. + :type nested_key: bool + :rtype: None + """ + key_0, key_1 = [''] * 2 if nested_key: key_0, key_1 = key.split('-') if not filters: @@ -143,59 +205,138 @@ def filter_by(self, key, filters, retrieve=True, nested_key=False): if not isinstance(filters, list): filters = [filters] if retrieve: - self.streams_info = list(filter( + self.__streams_info = list(filter( lambda file: any( - [re.search(re.compile(fltr, flags=re.IGNORECASE), file[key_0][key_1] if nested_key else file[key]) - for fltr in filters]), - self.streams_info)) + [re.search(re.compile(fltr, flags=re.IGNORECASE), + file[key_0][key_1] if nested_key else file[key]) for fltr in filters] + ), self.__streams_info)) else: - self.streams_info = list(filter( - lambda file: any([not re.search(re.compile(fltr, flags=re.IGNORECASE), - file[key_0][key_1] if nested_key else file[key]) for fltr in filters]), - self.streams_info)) + self.__streams_info = list(filter( + lambda file: any( + [not re.search(re.compile(fltr, flags=re.IGNORECASE), + file[key_0][key_1] if nested_key else file[key]) for fltr in filters] + ), self.__streams_info)) + + def reset_operations(self): + """Reset the streams_info to initial state before various operations. + + :rtype: None + """ + self.__streams_info = self.__streams_info_backup.copy() - # Remove streams_info with a certain file extension def remove_by_extension(self, extension): - self.filter_by('tvg-url', extension, retrieve=False, nested_key=True) + """Remove stream_info with certain extension/s. + + It removes stream information from streams_info list based on extension/s provided. + + :param extension: Name of the extension like mp4, m3u8 etc. It can be a string or list of extension/s. + :type extension: str or list + :rtype: None + """ + self.filter_by('url', extension, retrieve=False, nested_key=False) - # Select only streams_info with a certain file extension def retrieve_by_extension(self, extension): - self.filter_by('tvg-url', extension, retrieve=True, nested_key=True) + """Select only streams information with a certain extension/s. + + It retrieves the stream information based on extension/s provided. + + :param extension: Name of the extension like mp4, m3u8 etc. It can be a string or list of extension/s. + :type extension: str or list + :rtype: None + """ + self.filter_by('url', extension, retrieve=True, nested_key=False) + + def remove_by_category(self, filter_word): + """Removes streams information with category containing a certain filter word/s. - # Remove streams_info that contains a certain filter word - def remove_by_grpname(self, filter_word): + It removes stream information based on category using filter word/s. + + :param filter_word: It can be a string or list of filter word/s. + :type filter_word: str or list + :rtype: None + """ self.filter_by('category', filter_word, retrieve=False) - # Retrieve only streams_info that contains a certain filter word - def retrieve_by_grpname(self, filter_word): + def retrieve_by_category(self, filter_word): + """Retrieve only streams information that contains a certain filter word/s. + + It retrieves stream information based on category/categories. + + :param filter_word: It can be a string or list of filter word/s. + :type filter_word: str or list + :rtype: None + """ self.filter_by('category', filter_word, retrieve=True) - # sort the streams_info def sort_by(self, key, asc=True, nested_key=False): - key_0, key_1 = ['']*2 + """Sort streams information. + + It sorts streams information list sorting by key in asc/desc order. + + :param key: It can be single or nested key. + :type key: str + :param asc: Sort by asc or desc order + :type asc: bool + :param nested_key: True/False for if the key is nested or not. + :type nested_key: bool + :rtype: None + """ + key_0, key_1 = [''] * 2 if nested_key: key_0, key_1 = key.split('-') - self.streams_info = sorted(self.streams_info, key=lambda file: file[key_0][key_1] if nested_key else file[key], - reverse=not asc) + self.__streams_info = sorted( + self.__streams_info, key=lambda file: file[key_0][key_1] if nested_key else file[key], reverse=not asc + ) + + def get_json(self, indent=4): + """Get the streams information as json. + + :param indent: Int value for indentation. + :type indent: int + :return: json of the streams_info list + :rtype: json + """ + return json.dumps(self.__streams_info, indent=indent) + + def get_list(self): + """Get the parsed streams information list. - # Get the streams info as json - def get_json(self): - return json.dumps(self.streams_info, indent=4) + It returns the streams information list. - # Get the streams info as dict - def get_dict(self): - return self.streams_info + :return: Streams information list + :rtype: list + """ + return self.__streams_info - # Return a random stream information def get_random_stream(self, random_shuffle=True): - if not len(self.streams_info): + """Return a random stream information + + It returns a random stream information with shuffle if required. + + :param random_shuffle: To shuffle the streams information list before returning the random stream information. + :type random_shuffle: bool + :return: A random stream info + :rtype: dict + """ + if not len(self.__streams_info): logging.info("No streams information so could not get any random stream.") return None - if random_shuffle: random.shuffle(self.streams_info) - return random.choice(self.streams_info) + if random_shuffle: + random.shuffle(self.__streams_info) + return random.choice(self.__streams_info) - # save to file (CSV or JSON) def to_file(self, filename, format='json'): + """Save to file (CSV or JSON) + + It saves streams information as a CSV or JSON file with a given filename parameter and format. + + :param filename: Name of the file to save streams_info as. + :type filename: str + :param format: csv/json to save the streams_info. + :type format: str + :rtype: None + """ + logging.info("Saving to file...") format = filename.split('.')[-1] if len(filename.split('.')) > 1 else format def with_extension(name, ext): @@ -206,25 +347,22 @@ def with_extension(name, ext): return name + f".{ext}" if format == 'json': - data = json.dumps(self.streams_info, indent=4) + data = json.dumps(self.__streams_info, indent=4) with open(with_extension(filename, format), 'w') as fp: fp.write(data) elif format == 'csv': - ndict_to_csv(self.streams_info, with_extension(filename, format)) + ndict_to_csv(self.__streams_info, with_extension(filename, format)) else: logging.info("Unrecognised format!!!") if __name__ == "__main__": - url = "https://iptv-org.github.io/iptv/categories/music.m3u" - timeout = 5 - useragent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36" - m3u_playlist = M3uParser(timeout=timeout, useragent=useragent) - m3u_playlist.parse_m3u(url, check_live=True) - m3u_playlist.remove_by_extension('m3u8') - m3u_playlist.remove_by_grpname('Zimbabwe') - m3u_playlist.filter_by('language-name', 'Hungarian', retrieve=False, nested_key=True) + url = "/home/pawan/Downloads/ru.m3u" + useragent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36" + m3u_playlist = M3uParser(timeout=5, useragent=useragent) + m3u_playlist.parse_m3u(url) + m3u_playlist.remove_by_extension('mp4') m3u_playlist.filter_by('status', 'GOOD') - print(len(m3u_playlist.get_dict())) + print(len(m3u_playlist.get_list())) m3u_playlist.to_file('pawan.json')