~cadence/NewLeaf

68cfbb809f212ffb5bb6a270ae67618d84219293 — Cadence Ember 4 months ago 73b4fba
Remove `with requests` when it is unnecessary
M extractors/captions.py => extractors/captions.py +6 -6
@@ 16,12 16,12 @@ def extract_captions_from_dict(captions, *, lang=None, label=None):
		return captions

	url = next(caption["second__remoteUrl"] for caption in captions["captions"] if caption["languageCode"] == lang or caption["label"] == label)
	with requests.get(url) as r:
		r.raise_for_status()
		# remove extraneous " align:start position:0%" on timestamps lines on auto-generated captions
		if (lang and "auto-generated" in lang) or (label and "auto-generated" in label):
			return re.sub(r"^([0-9:.]+ --> [0-9:.]+).*$", r"\1", r.content.decode("utf8"), flags=re.MULTILINE)
		return r
	r = requests.get(url)
	r.raise_for_status()
	# remove extraneous " align:start position:0%" on timestamps lines on auto-generated captions
	if (lang and "auto-generated" in lang) or (label and "auto-generated" in label):
		return re.sub(r"^([0-9:.]+ --> [0-9:.]+).*$", r"\1", r.content.decode("utf8"), flags=re.MULTILINE)
	return r

def extract_captions_from_video(id):
	return {

M extractors/channel.py => extractors/channel.py +186 -186
@@ 18,142 18,142 @@ def extract_channel(ucid):
			return channel_cache[ucid]

	channel_type = "channel" if len(ucid) == 24 and ucid[:2] == "UC" else "user"
	with requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies=eu_consent_cookie()) as r:
		r.raise_for_status()
		yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))

		for alert in yt_initial_data.get("alerts", []):
			alert_text = combine_runs(alert["alertRenderer"]["text"])
			if alert_text == "This channel does not exist.":
				return {
					"error": alert_text,
					"identifier": "NOT_FOUND"
				}
			elif alert_text.startswith("This account has been terminated"):
				return {
					"error": alert_text,
					"identifier": "ACCOUNT_TERMINATED"
				}
			else:
				print("Seen alert text '{}'".format(alert_text))

		header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {}
		channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"]

		if header:
			author = header["title"]
			author_id = header["channelId"]
			author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"]
	r = requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies=eu_consent_cookie())
	r.raise_for_status()
	yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))

	for alert in yt_initial_data.get("alerts", []):
		alert_text = combine_runs(alert["alertRenderer"]["text"])
		if alert_text == "This channel does not exist.":
			return {
				"error": alert_text,
				"identifier": "NOT_FOUND"
			}
		elif alert_text.startswith("This account has been terminated"):
			return {
				"error": alert_text,
				"identifier": "ACCOUNT_TERMINATED"
			}
		else:
			author = channel_metadata["title"]
			author_id = channel_metadata["externalId"]
			author_url = channel_metadata["channelUrl"]

		subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers"
		description = channel_metadata["description"]
		allowed_regions = channel_metadata["availableCountryCodes"]

		author_banners = []
		if "banner" in header:
			author_banners = header["banner"]["thumbnails"]
			for t in author_banners:
				t["url"] = normalise_url_protocol(t["url"])

		author_thumbnails = []
		avatar = header.get("avatar") or channel_metadata.get("avatar")
		if avatar:
			author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"])

		latest_videos = []
		tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
		try:
			videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos")
			tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]
		except StopIteration:
			tab_parts = {}

		# check that the channel actually has videos - this may be replaced
		# with messageRenderer.text.simpleText == "This channel has no videos."
		if "gridRenderer" in tab_parts:
			videos = (
				v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v
			)
			for v in videos:
				live = False
				is_upcoming = False
				length_text = "UNKNOWN"
				length_seconds = -1
				for o in v["thumbnailOverlays"]:
					if "thumbnailOverlayTimeStatusRenderer" in o:
						length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
						length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
						if length_text_style == "DEFAULT":
							length_seconds = length_text_to_seconds(length_text)
						elif length_text_style == "LIVE":
							live = True
						elif length_text_style == "UPCOMING":
							is_upcoming = True
				published = 0
				published_text = "Live now"
				premiere_timestamp = None
				if "publishedTimeText" in v:
					published_text = v["publishedTimeText"]["simpleText"]
					published = past_text_to_time(published_text)
				if "upcomingEventData" in v:
					premiere_timestamp = v["upcomingEventData"]["startTime"]
					published_text = time_to_past_text(int(premiere_timestamp))

				view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None
				view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None

				latest_videos.append({
					"type": "video",
					"title": combine_runs(v["title"]),
					"videoId": v["videoId"],
					"author": author,
					"authorId": author_id,
					"authorUrl": author_url,
					"videoThumbnails": generate_video_thumbnails(v["videoId"]),
					"description": "",
					"descriptionHtml": "",
					"viewCount": view_count_text_to_number(view_count_text),
					"second__viewCountText": view_count_text,
					"second__viewCountTextShort": view_count_text_short,
					"published": published,
					"publishedText": published_text,
					"lengthSeconds": length_seconds,
					"second__lengthText": length_text,
					"liveNow": live,
					"paid": None,
					"premium": None,
					"isUpcoming": is_upcoming,
					"premiereTimestamp": premiere_timestamp
				})

		channel = {
			"author": author,
			"authorId": author_id,
			"authorUrl": author_url,
			"authorBanners": author_banners,
			"authorThumbnails": author_thumbnails,
			"subCount": uncompress_counter(subscriber_count.split(" ")[0]),
			"second__subCountText": subscriber_count,
			"totalViews": None,
			"joined": None,
			"paid": None,
			"autoGenerated": None,
			"isFamilyFriendly": None,
			"description": description,
			"descriptionHtml": add_html_links(escape_html_textcontent(description)),
			"allowedRegions": allowed_regions,
			"latestVideos": latest_videos,
			"relatedChannels": []
		}
			print("Seen alert text '{}'".format(alert_text))

		with channel_cache_lock:
			channel_cache[ucid] = channel
	header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {}
	channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"]

		return channel
	if header:
		author = header["title"]
		author_id = header["channelId"]
		author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"]
	else:
		author = channel_metadata["title"]
		author_id = channel_metadata["externalId"]
		author_url = channel_metadata["channelUrl"]

	subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers"
	description = channel_metadata["description"]
	allowed_regions = channel_metadata["availableCountryCodes"]

	author_banners = []
	if "banner" in header:
		author_banners = header["banner"]["thumbnails"]
		for t in author_banners:
			t["url"] = normalise_url_protocol(t["url"])

	author_thumbnails = []
	avatar = header.get("avatar") or channel_metadata.get("avatar")
	if avatar:
		author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"])

	latest_videos = []
	tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
	try:
		videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos")
		tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]
	except StopIteration:
		tab_parts = {}

	# check that the channel actually has videos - this may be replaced
	# with messageRenderer.text.simpleText == "This channel has no videos."
	if "gridRenderer" in tab_parts:
		videos = (
			v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v
		)
		for v in videos:
			live = False
			is_upcoming = False
			length_text = "UNKNOWN"
			length_seconds = -1
			for o in v["thumbnailOverlays"]:
				if "thumbnailOverlayTimeStatusRenderer" in o:
					length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
					length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
					if length_text_style == "DEFAULT":
						length_seconds = length_text_to_seconds(length_text)
					elif length_text_style == "LIVE":
						live = True
					elif length_text_style == "UPCOMING":
						is_upcoming = True
			published = 0
			published_text = "Live now"
			premiere_timestamp = None
			if "publishedTimeText" in v:
				published_text = v["publishedTimeText"]["simpleText"]
				published = past_text_to_time(published_text)
			if "upcomingEventData" in v:
				premiere_timestamp = v["upcomingEventData"]["startTime"]
				published_text = time_to_past_text(int(premiere_timestamp))

			view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None
			view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None

			latest_videos.append({
				"type": "video",
				"title": combine_runs(v["title"]),
				"videoId": v["videoId"],
				"author": author,
				"authorId": author_id,
				"authorUrl": author_url,
				"videoThumbnails": generate_video_thumbnails(v["videoId"]),
				"description": "",
				"descriptionHtml": "",
				"viewCount": view_count_text_to_number(view_count_text),
				"second__viewCountText": view_count_text,
				"second__viewCountTextShort": view_count_text_short,
				"published": published,
				"publishedText": published_text,
				"lengthSeconds": length_seconds,
				"second__lengthText": length_text,
				"liveNow": live,
				"paid": None,
				"premium": None,
				"isUpcoming": is_upcoming,
				"premiereTimestamp": premiere_timestamp
			})

	channel = {
		"author": author,
		"authorId": author_id,
		"authorUrl": author_url,
		"authorBanners": author_banners,
		"authorThumbnails": author_thumbnails,
		"subCount": uncompress_counter(subscriber_count.split(" ")[0]),
		"second__subCountText": subscriber_count,
		"totalViews": None,
		"joined": None,
		"paid": None,
		"autoGenerated": None,
		"isFamilyFriendly": None,
		"description": description,
		"descriptionHtml": add_html_links(escape_html_textcontent(description)),
		"allowedRegions": allowed_regions,
		"latestVideos": latest_videos,
		"relatedChannels": []
	}

	with channel_cache_lock:
		channel_cache[ucid] = channel

	return channel

def extract_channel_videos(ucid):
	channel = extract_channel(ucid)


@@ 167,59 167,59 @@ def extract_channel_latest(ucid):
		if ucid in channel_latest_cache:
			return channel_latest_cache[ucid]

	with requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) as r:
		if r.status_code == 404:
			cherrypy.response.status = 404
			return {
				"error": "This channel does not exist.",
				"identifier": "NOT_FOUND"
			}
	r = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid))
	if r.status_code == 404:
		cherrypy.response.status = 404
		return {
			"error": "This channel does not exist.",
			"identifier": "NOT_FOUND"
		}

		feed = ET.fromstring(r.content)
		author_container = feed.find("{http://www.w3.org/2005/Atom}author")
		author = author_container.find("{http://www.w3.org/2005/Atom}name").text
		author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text
		channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
		results = []
		missing_published = False
		for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"):
			id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text
			media_group = entry.find("{http://search.yahoo.com/mrss/}group")
			description = media_group.find("{http://search.yahoo.com/mrss/}description").text or ""
			media_community = media_group.find("{http://search.yahoo.com/mrss/}community")
			published_entry = entry.find("{http://www.w3.org/2005/Atom}published")
			if published_entry is not None: # sometimes youtube does not provide published dates, no idea why.
				published = int(dateutil.parser.isoparse(published_entry.text).timestamp())
				results.append({
					"type": "video",
					"title": entry.find("{http://www.w3.org/2005/Atom}title").text,
					"videoId": id,
					"author": author,
					"authorId": channel_id,
					"authorUrl": author_url,
					"videoThumbnails": generate_video_thumbnails(id),
					"description": description,
					"descriptionHtml": add_html_links(escape_html_textcontent(description)),
					"viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]),
					"published": published,
					"publishedText": time_to_past_text(published),
					"lengthSeconds": None,
					"liveNow": None,
					"paid": None,
					"premium": None,
					"isUpcoming": None
				})
			else:
				missing_published = True

		if len(results) == 0 and missing_published: # no results due to all missing published
			cherrypy.response.status = 503
			return {
				"error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.",
				"identifier": "PUBLISHED_DATES_NOT_PROVIDED"
			}
	feed = ET.fromstring(r.content)
	author_container = feed.find("{http://www.w3.org/2005/Atom}author")
	author = author_container.find("{http://www.w3.org/2005/Atom}name").text
	author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text
	channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
	results = []
	missing_published = False
	for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"):
		id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text
		media_group = entry.find("{http://search.yahoo.com/mrss/}group")
		description = media_group.find("{http://search.yahoo.com/mrss/}description").text or ""
		media_community = media_group.find("{http://search.yahoo.com/mrss/}community")
		published_entry = entry.find("{http://www.w3.org/2005/Atom}published")
		if published_entry is not None: # sometimes youtube does not provide published dates, no idea why.
			published = int(dateutil.parser.isoparse(published_entry.text).timestamp())
			results.append({
				"type": "video",
				"title": entry.find("{http://www.w3.org/2005/Atom}title").text,
				"videoId": id,
				"author": author,
				"authorId": channel_id,
				"authorUrl": author_url,
				"videoThumbnails": generate_video_thumbnails(id),
				"description": description,
				"descriptionHtml": add_html_links(escape_html_textcontent(description)),
				"viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]),
				"published": published,
				"publishedText": time_to_past_text(published),
				"lengthSeconds": None,
				"liveNow": None,
				"paid": None,
				"premium": None,
				"isUpcoming": None
			})
		else:
			missing_published = True

		with channel_latest_cache_lock:
			channel_latest_cache[ucid] = results
	if len(results) == 0 and missing_published: # no results due to all missing published
		cherrypy.response.status = 503
		return {
			"error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.",
			"identifier": "PUBLISHED_DATES_NOT_PROVIDED"
		}

	with channel_latest_cache_lock:
		channel_latest_cache[ucid] = results

		return results
	return results

M extractors/manifest.py => extractors/manifest.py +3 -3
@@ 11,9 11,9 @@ def extract_manifest(id):
		return video

	if video["second__providedDashUrl"]:
		with requests.get(video["second__providedDashUrl"]) as r:
			r.raise_for_status()
			return r
		r = requests.get(video["second__providedDashUrl"])
		r.raise_for_status()
		return r

	adaptation_sets_dict = {}
	for f in video["adaptiveFormats"]:

M extractors/search.py => extractors/search.py +42 -42
@@ 17,51 17,51 @@ ytdl = yt_dlp.YoutubeDL(ytdl_opts)

def extract_search(q):
	try:
		with requests.get("https://www.youtube.com/results", params={"q": q, "hl": "en"}, cookies=eu_consent_cookie()) as r:
			r.raise_for_status()
			content = r.content.decode("utf8")
			yt_initial_data = extract_yt_initial_data(content)
		r = requests.get("https://www.youtube.com/results", params={"q": q, "hl": "en"}, cookies=eu_consent_cookie())
		r.raise_for_status()
		content = r.content.decode("utf8")
		yt_initial_data = extract_yt_initial_data(content)

			sections = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"]
			# youtube searches contain a lot of random stuff, just grab it all for now, then filter to `videoRenderer` later
			itemSections = [s for s in sections if "itemSectionRenderer" in s]
		sections = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"]
		# youtube searches contain a lot of random stuff, just grab it all for now, then filter to `videoRenderer` later
		itemSections = [s for s in sections if "itemSectionRenderer" in s]

			items = []
			for section in itemSections:
				items += section["itemSectionRenderer"]["contents"]
		items = []
		for section in itemSections:
			items += section["itemSectionRenderer"]["contents"]

			results = []
			for item in items:
				if "videoRenderer" in item:
					video = item["videoRenderer"]
					published = 0
					published_text = "Live now"
					if "publishedTimeText" in video:
						published_text = video["publishedTimeText"]["simpleText"]
						published = past_text_to_time(published_text)
					results.append({
						"type": "video",
						"title": combine_runs(video["title"]),
						"videoId": video["videoId"],
						"author": combine_runs(video["longBylineText"]),
						"authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"],
						"authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"],
						"videoThumbnails": generate_video_thumbnails(video["videoId"]),
						"description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
						"descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
						"viewCount": get_view_count_or_recommended(video),
						"second__viewCountText": get_view_count_text_or_recommended(video),
						"published": published,
						"publishedText": published_text,
						"lengthSeconds": get_length_or_live_now(video),
						"second__lengthText": get_length_text_or_live_now(video),
						"liveNow": is_live(video),
						"paid": None,
						"premium": None,
						"isUpcoming": None
					})
			search_cache[q] = results # only cache full extraction
			return results
		results = []
		for item in items:
			if "videoRenderer" in item:
				video = item["videoRenderer"]
				published = 0
				published_text = "Live now"
				if "publishedTimeText" in video:
					published_text = video["publishedTimeText"]["simpleText"]
					published = past_text_to_time(published_text)
				results.append({
					"type": "video",
					"title": combine_runs(video["title"]),
					"videoId": video["videoId"],
					"author": combine_runs(video["longBylineText"]),
					"authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"],
					"authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"],
					"videoThumbnails": generate_video_thumbnails(video["videoId"]),
					"description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
					"descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "",
					"viewCount": get_view_count_or_recommended(video),
					"second__viewCountText": get_view_count_text_or_recommended(video),
					"published": published,
					"publishedText": published_text,
					"lengthSeconds": get_length_or_live_now(video),
					"second__lengthText": get_length_text_or_live_now(video),
					"liveNow": is_live(video),
					"paid": None,
					"premium": None,
					"isUpcoming": None
				})
		search_cache[q] = results # only cache full extraction
		return results

	except Exception:
		print("messed up extracting search, using youtube-dl instead")

M extractors/suggestions.py => extractors/suggestions.py +9 -9
@@ 20,12 20,12 @@ def extract_search_suggestions(q):
		"xhr": "t",
		# "xssi": "t"
	}
	with requests.get("https://clients1.google.com/complete/search", params=params) as r:
		r.raise_for_status()
		response = r.json()
		result = {
			"query": q,
			"suggestions": [s[0] for s in response[1]]
		}
		suggestions_cache[q] = result
		return result
	r = requests.get("https://clients1.google.com/complete/search", params=params)
	r.raise_for_status()
	response = r.json()
	result = {
		"query": q,
		"suggestions": [s[0] for s in response[1]]
	}
	suggestions_cache[q] = result
	return result

M index.py => index.py +8 -8
@@ 123,17 123,17 @@ class NewLeaf(object):

	@cherrypy.expose
	def vi(self, id, file):
		with requests.get("https://i.ytimg.com/vi/{}/{}".format(id, file), stream=True) as r:
			r.raise_for_status()
			cherrypy.response.headers["content-type"] = r.headers["content-type"]
			return next(r.iter_content(chunk_size=None))
		r = requests.get("https://i.ytimg.com/vi/{}/{}".format(id, file), stream=True)
		r.raise_for_status()
		cherrypy.response.headers["content-type"] = r.headers["content-type"]
		return next(r.iter_content(chunk_size=None))

	@cherrypy.expose
	def ggpht(self, *path):
		with requests.get("https://yt3.ggpht.com/{}".format("/".join(path)), stream=True) as r:
			r.raise_for_status()
			cherrypy.response.headers["content-type"] = r.headers["content-type"]
			return next(r.iter_content(chunk_size=None))
		r = requests.get("https://yt3.ggpht.com/{}".format("/".join(path)), stream=True)
		r.raise_for_status()
		cherrypy.response.headers["content-type"] = r.headers["content-type"]
		return next(r.iter_content(chunk_size=None))

bind_port = getattr(configuration, "bind_port", 3000)
bind_host = getattr(configuration, "bind_host", "0.0.0.0")