Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom OAuth token file path #120

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions pytubefix/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def __init__(
on_complete_callback: Optional[Callable[[Any, Optional[str]], None]] = None,
proxies: Optional[Dict[str, str]] = None,
use_oauth: bool = False,
allow_oauth_cache: bool = True
allow_oauth_cache: bool = True,
token_file: str | None = None,
):
"""Construct a :class:`YouTube <YouTube>`.

Expand All @@ -83,6 +84,9 @@ def __init__(
:param bool allow_oauth_cache:
(Optional) Cache OAuth tokens locally on the machine. Defaults to True.
These tokens are only generated if use_oauth is set to True as well.
:param str token_file:
(Optional) Path to the file where the OAuth tokens will be stored.
Defaults to None, which means the tokens will be stored in the pytubefix/__cache__ directory.
"""
# js fetched by js_url
self._js: Optional[str] = None
Expand Down Expand Up @@ -132,6 +136,7 @@ def __init__(

self.use_oauth = use_oauth
self.allow_oauth_cache = allow_oauth_cache
self.token_file = token_file

def __repr__(self):
return f'<pytubefix.__main__.YouTube object: videoId={self.video_id}>'
Expand Down Expand Up @@ -200,17 +205,16 @@ def initial_data(self):
def streaming_data(self):
"""Return streamingData from video info."""


# List of YouTube error video IDs
invalid_id_list = ['aQvGIIdgFDM']
invalid_id_list = ['aQvGIIdgFDM']

# If my previously valid video_info doesn't have the streamingData,
# or it is an invalid video,
# or it is an invalid video,
# try to get a new video_info with a different client.
if 'streamingData' not in self.vid_info or self.vid_info['videoDetails']['videoId'] in invalid_id_list:
original_client = self.client

# for each fallback client set, revert videodata, and run check_availability, which
# for each fallback client set, revert videodata, and run check_availability, which
# will try to get a new video_info with a different client.
# if it fails try the next fallback client, and so on.
# If none of the cleints have valid streamingData, raise an exception.
Expand Down Expand Up @@ -321,7 +325,7 @@ def check_availability(self):
raise exceptions.VideoUnavailable(video_id=self.video_id)
elif reason == 'This video is no longer available because the YouTube account associated with this video has been terminated.':
raise exceptions.VideoUnavailable(video_id=self.video_id)
else:
else:
raise exceptions.UnknownVideoError(video_id=self.video_id, status=status, reason=reason, developer_message=f'Unknown reason type for Error status')
elif status == 'LIVE_STREAM':
raise exceptions.LiveStreamError(video_id=self.video_id)
Expand Down Expand Up @@ -357,7 +361,12 @@ def vid_info(self):
if self._vid_info:
return self._vid_info

innertube = InnerTube(client=self.client, use_oauth=self.use_oauth, allow_cache=self.allow_oauth_cache)
innertube = InnerTube(
client=self.client,
use_oauth=self.use_oauth,
allow_cache=self.allow_oauth_cache,
token_file=self.token_file,
)
if innertube.require_js_player:
innertube.innertube_context.update(self.signature_timestamp)

Expand All @@ -374,7 +383,8 @@ def age_check(self):
innertube = InnerTube(
client='WEB',
use_oauth=self.use_oauth,
allow_cache=self.allow_oauth_cache
allow_cache=self.allow_oauth_cache,
token_file=self.token_file,
)

if innertube.require_js_player:
Expand Down Expand Up @@ -449,7 +459,7 @@ def chapters(self) -> List[pytubefix.Chapter]:
result.append(pytubefix.Chapter(chapter_data, chapter_end - chapter_start))

return result

@property
def key_moments(self) -> List[pytubefix.KeyMoment]:
"""Get a list of :class:`KeyMoment <KeyMoment>`.
Expand All @@ -460,7 +470,8 @@ def key_moments(self) -> List[pytubefix.KeyMoment]:
mutations = self.initial_data['frameworkUpdates']['entityBatchUpdate']['mutations']
found = False
for mutation in mutations:
if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get('markerType') == "MARKER_TYPE_TIMESTAMPS":
if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get(
'markerType') == "MARKER_TYPE_TIMESTAMPS":
key_moments_data = mutation['payload']['macroMarkersListEntity']['markersList']['markers']
found = True
break
Expand All @@ -487,7 +498,7 @@ def key_moments(self) -> List[pytubefix.KeyMoment]:
result.append(pytubefix.KeyMoment(key_moment_data, key_moment_end - key_moment_start))

return result

@property
def replayed_heatmap(self) -> List[Dict[str, float]]:
"""Get a list of : `Dict<str, float>`.
Expand All @@ -498,7 +509,8 @@ def replayed_heatmap(self) -> List[Dict[str, float]]:
mutations = self.initial_data['frameworkUpdates']['entityBatchUpdate']['mutations']
found = False
for mutation in mutations:
if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get('markerType') == "MARKER_TYPE_HEATMAP":
if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get(
'markerType') == "MARKER_TYPE_HEATMAP":
heatmaps_data = mutation['payload']['macroMarkersListEntity']['markersList']['markers']
found = True
break
Expand All @@ -514,7 +526,6 @@ def replayed_heatmap(self) -> List[Dict[str, float]]:
heatmap_start = int(heatmap_data['startMillis']) / 1000
duration = int(heatmap_data['durationMillis']) / 1000


norm_intensity = float(heatmap_data['intensityScoreNormalized'])

result.append({
Expand Down Expand Up @@ -578,13 +589,13 @@ def title(self) -> str:
)

translation_table = str.maketrans({
'/': '',
':': '',
'*': '',
'"': '',
'<': '',
'>': '',
'|': '',
'/': '',
':': '',
'*': '',
'"': '',
'<': '',
'>': '',
'|': '',
})

if self._title:
Expand Down
9 changes: 5 additions & 4 deletions pytubefix/innertube.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@

class InnerTube:
"""Object for interacting with the innertube API."""
def __init__(self, client='ANDROID_TESTSUITE', use_oauth=False, allow_cache=True):
def __init__(self, client='ANDROID_TESTSUITE', use_oauth=False, allow_cache=True, token_file=None):
"""Initialize an InnerTube object.

:param str client:
Expand All @@ -369,8 +369,9 @@ def __init__(self, client='ANDROID_TESTSUITE', use_oauth=False, allow_cache=True
self.expires = None

# Try to load from file if specified
if self.use_oauth and self.allow_cache and os.path.exists(_token_file):
with open(_token_file) as f:
self.token_file = token_file or _token_file
if self.use_oauth and self.allow_cache and os.path.exists(self.token_file):
with open(self.token_file) as f:
data = json.load(f)
self.access_token = data['access_token']
self.refresh_token = data['refresh_token']
Expand All @@ -389,7 +390,7 @@ def cache_tokens(self):
}
if not os.path.exists(_cache_dir):
os.mkdir(_cache_dir)
with open(_token_file, 'w') as f:
with open(self.token_file, 'w') as f:
json.dump(data, f)

def refresh_bearer_token(self, force=False):
Expand Down