Skip to content

Commit

Permalink
feat: Add remove duplicates method #24
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanpaudel93 committed Nov 7, 2023
1 parent 7595daf commit 86c1b30
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 1 deletion.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,21 @@ Sorts the streams information based on a key in ascending or descending order.
parser.sort_by(key, SortConfig(key_splitter="-", asc=True, nested_key=False))
```

#### remove_duplicates

`remove_duplicates(self, name: str = None, url: str = None) -> self`

Removes duplicate stream entries based on the provided 'name' pattern and exact 'url' match or remove all duplicates if name and url is not provided.

- `name` (str, optional): The name pattern to filter duplicates. Defaults to None.
- `url` (str, optional): The exact URL to filter duplicates. Defaults to None.

```python
parser.remove_duplicates()
# or
parser.remove_duplicates("Channel 1", "http://example.com/stream1")
```

### get_json

`get_json(indent: int = 4) -> str`
Expand Down
4 changes: 4 additions & 0 deletions m3u_parser/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ class NoStreamsException(Exception):
"""Raised when streams information is not available."""

pass


class ParamNotPassedException(Exception):
"""Raised when a parameter is not passed."""
55 changes: 55 additions & 0 deletions m3u_parser/m3u_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SavingNotSupportedException,
UnrecognizedFormatException,
UrlReadException,
ParamNotPassedException,
)
from .helper import (
default_useragent,
Expand Down Expand Up @@ -653,6 +654,60 @@ def sort_by(self, key: str, config: SortConfig = SortConfig()):
reverse=not config.asc,
)

def remove_duplicates(self, name: str = None, url: str = None):
"""
Removes duplicate stream entries based on the provided 'name' pattern and exact 'url' match or
remove all duplicates if name and url is not provided.
Args:
- `name` (str, optional): The name pattern to filter duplicates. Defaults to None.
- `url` (str, optional): The exact URL to filter duplicates. Defaults to None.
Returns:
- `self`: The modified object after removing duplicate stream entries.
"""
if name is None and url is not None:
raise ParamNotPassedException(f"Param name is not passed.")

if name is not None and url is None:
raise ParamNotPassedException(f"Param url is not passed.")

filtered_streams = []
seen_entries = set()

name_pattern = re.compile(name, re.IGNORECASE) if name else None

for stream_info in self._streams_info:
stream_name = stream_info.get("name")
stream_url = stream_info.get("url")

both_none = name is None and url is None

if (
(stream_name is not None and name_pattern is not None and re.search(name_pattern, stream_name))
and (stream_url is not None and stream_url.lower() == url.lower())
) or both_none:
is_found = False
unique_key = (stream_name.lower(), stream_url.lower())

if both_none:
is_found = unique_key in seen_entries
else:
for seen_name, seen_url in seen_entries:
if re.search(name_pattern, seen_name) and seen_url == stream_url.lower():
is_found = True
break

if not is_found:
seen_entries.add(unique_key)
filtered_streams.append(stream_info)
else:
filtered_streams.append(stream_info)

self._streams_info = filtered_streams

return self

def get_json(self, indent: int = 4):
"""
Get the streams information as a JSON string.
Expand Down
46 changes: 45 additions & 1 deletion tests/test_m3uparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
sys.path.append(str(package_root_directory))

from m3u_parser import FilterConfig, M3uParser, ParseConfig, SortConfig
from m3u_parser.exceptions import KeyNotFoundException, NoStreamsException
from m3u_parser.exceptions import KeyNotFoundException, NoStreamsException, ParamNotPassedException

# Sample M3U content for testing
SAMPLE_M3U_CONTENT = """
Expand All @@ -26,6 +26,16 @@
rtsp://10.0.0.1:554/?avm=1&freq=514&bw=8&msys=dvbc&mtype=256qam&sr=6900&specinv=0&pids=0,16,17,18,20,800,810,850
"""

DUPLICATE_M3U_CONTENT = """
#EXTM3U
#EXTINF:-1 tvg-id="Channel 1" tvg-logo="https://i.imgur.com/AvCQYgu.png" tvg-country="NP" tvg-language="Newari" group-title="News",Channel 1
http://example.com/stream1
#EXTINF:-1 tvg-id="Channel 1" tvg-logo="https://i.imgur.com/AvCQYgu.png" tvg-country="NP" tvg-language="Newari" group-title="News",Channel 1
http://example.com/stream1
#EXTINF:-1 tvg-id="Channel 2" tvg-logo="https://i.imgur.com/AvCQYgu.png" tvg-country="CN" tvg-language="Chinesee" group-title="News",Channel 2
http://example.com/stream2
"""

SAMPLE_JSON_CONTENT = json.dumps(
[
{
Expand Down Expand Up @@ -78,6 +88,14 @@ def temp_m3u_file(tmpdir):
return str(m3u_file)


@pytest.fixture
def temp_duplicate_m3u_file(tmpdir):
m3u_file = tmpdir.join("test.m3u")
with open(m3u_file, "w") as f:
f.write(DUPLICATE_M3U_CONTENT)
return str(m3u_file)


# Fixture to create a temporary JSON file for testing
@pytest.fixture
def temp_json_file(tmpdir):
Expand Down Expand Up @@ -247,3 +265,29 @@ def test_invalid_m3u_content(self, tmpdir):
with pytest.raises(NoStreamsException):
parser.parse_m3u(str(invalid_m3u_file))
parser.get_random_stream()

def test_remove_specific_duplicates(self, temp_duplicate_m3u_file):
parser = M3uParser()
parser.parse_m3u(temp_duplicate_m3u_file, ParseConfig(check_live=False))
parser.remove_duplicates("Channel 1", "http://example.com/stream1")
streams = parser.get_list()
assert len(streams) == 2

def test_remove_all_duplicates(self, temp_duplicate_m3u_file):
parser = M3uParser()
parser.parse_m3u(temp_duplicate_m3u_file, ParseConfig(check_live=False))
parser.remove_duplicates()
streams = parser.get_list()
assert len(streams) == 2

def test_remove_duplicates_name_param_only(self, temp_duplicate_m3u_file):
parser = M3uParser()
parser.parse_m3u(temp_duplicate_m3u_file, ParseConfig(check_live=False))
with pytest.raises(ParamNotPassedException):
parser.remove_duplicates("Channel 1")

def test_remove_duplicates_url_param_only(self, temp_duplicate_m3u_file):
parser = M3uParser()
parser.parse_m3u(temp_duplicate_m3u_file, ParseConfig(check_live=False))
with pytest.raises(ParamNotPassedException):
parser.remove_duplicates(url="http://example.com/stream1")

0 comments on commit 86c1b30

Please sign in to comment.