~cadence/NewLeaf

ref: 73b4fbabf7c5f46975ebc69cc3d2a66600e98514 NewLeaf/extractors/channel.py -rw-r--r-- 8.2 KiB
73b4fbabCadence Ember Do not actually write out pages. 5 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import cherrypy
import dateutil.parser
import requests
import xml.etree.ElementTree as ET
from tools.converters import *
from tools.extractors import extract_yt_initial_data, eu_consent_cookie
from threading import Lock
from cachetools import TTLCache

channel_cache = TTLCache(maxsize=50, ttl=300)
channel_cache_lock = Lock()
channel_latest_cache = TTLCache(maxsize=500, ttl=300)
channel_latest_cache_lock = Lock()

def extract_channel(ucid):
	with channel_cache_lock:
		if ucid in channel_cache:
			return channel_cache[ucid]

	channel_type = "channel" if len(ucid) == 24 and ucid[:2] == "UC" else "user"
	with requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies=eu_consent_cookie()) as r:
		r.raise_for_status()
		yt_initial_data = extract_yt_initial_data(r.content.decode("utf8"))

		for alert in yt_initial_data.get("alerts", []):
			alert_text = combine_runs(alert["alertRenderer"]["text"])
			if alert_text == "This channel does not exist.":
				return {
					"error": alert_text,
					"identifier": "NOT_FOUND"
				}
			elif alert_text.startswith("This account has been terminated"):
				return {
					"error": alert_text,
					"identifier": "ACCOUNT_TERMINATED"
				}
			else:
				print("Seen alert text '{}'".format(alert_text))

		header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {}
		channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"]

		if header:
			author = header["title"]
			author_id = header["channelId"]
			author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"]
		else:
			author = channel_metadata["title"]
			author_id = channel_metadata["externalId"]
			author_url = channel_metadata["channelUrl"]

		subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers"
		description = channel_metadata["description"]
		allowed_regions = channel_metadata["availableCountryCodes"]

		author_banners = []
		if "banner" in header:
			author_banners = header["banner"]["thumbnails"]
			for t in author_banners:
				t["url"] = normalise_url_protocol(t["url"])

		author_thumbnails = []
		avatar = header.get("avatar") or channel_metadata.get("avatar")
		if avatar:
			author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"])

		latest_videos = []
		tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]
		try:
			videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos")
			tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]
		except StopIteration:
			tab_parts = {}

		# check that the channel actually has videos - this may be replaced
		# with messageRenderer.text.simpleText == "This channel has no videos."
		if "gridRenderer" in tab_parts:
			videos = (
				v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v
			)
			for v in videos:
				live = False
				is_upcoming = False
				length_text = "UNKNOWN"
				length_seconds = -1
				for o in v["thumbnailOverlays"]:
					if "thumbnailOverlayTimeStatusRenderer" in o:
						length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"])
						length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"]
						if length_text_style == "DEFAULT":
							length_seconds = length_text_to_seconds(length_text)
						elif length_text_style == "LIVE":
							live = True
						elif length_text_style == "UPCOMING":
							is_upcoming = True
				published = 0
				published_text = "Live now"
				premiere_timestamp = None
				if "publishedTimeText" in v:
					published_text = v["publishedTimeText"]["simpleText"]
					published = past_text_to_time(published_text)
				if "upcomingEventData" in v:
					premiere_timestamp = v["upcomingEventData"]["startTime"]
					published_text = time_to_past_text(int(premiere_timestamp))

				view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None
				view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None

				latest_videos.append({
					"type": "video",
					"title": combine_runs(v["title"]),
					"videoId": v["videoId"],
					"author": author,
					"authorId": author_id,
					"authorUrl": author_url,
					"videoThumbnails": generate_video_thumbnails(v["videoId"]),
					"description": "",
					"descriptionHtml": "",
					"viewCount": view_count_text_to_number(view_count_text),
					"second__viewCountText": view_count_text,
					"second__viewCountTextShort": view_count_text_short,
					"published": published,
					"publishedText": published_text,
					"lengthSeconds": length_seconds,
					"second__lengthText": length_text,
					"liveNow": live,
					"paid": None,
					"premium": None,
					"isUpcoming": is_upcoming,
					"premiereTimestamp": premiere_timestamp
				})

		channel = {
			"author": author,
			"authorId": author_id,
			"authorUrl": author_url,
			"authorBanners": author_banners,
			"authorThumbnails": author_thumbnails,
			"subCount": uncompress_counter(subscriber_count.split(" ")[0]),
			"second__subCountText": subscriber_count,
			"totalViews": None,
			"joined": None,
			"paid": None,
			"autoGenerated": None,
			"isFamilyFriendly": None,
			"description": description,
			"descriptionHtml": add_html_links(escape_html_textcontent(description)),
			"allowedRegions": allowed_regions,
			"latestVideos": latest_videos,
			"relatedChannels": []
		}

		with channel_cache_lock:
			channel_cache[ucid] = channel

		return channel

def extract_channel_videos(ucid):
	channel = extract_channel(ucid)
	if "error" in channel:
		return channel
	else:
		return channel["latestVideos"]

def extract_channel_latest(ucid):
	with channel_latest_cache_lock:
		if ucid in channel_latest_cache:
			return channel_latest_cache[ucid]

	with requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) as r:
		if r.status_code == 404:
			cherrypy.response.status = 404
			return {
				"error": "This channel does not exist.",
				"identifier": "NOT_FOUND"
			}

		feed = ET.fromstring(r.content)
		author_container = feed.find("{http://www.w3.org/2005/Atom}author")
		author = author_container.find("{http://www.w3.org/2005/Atom}name").text
		author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text
		channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text
		results = []
		missing_published = False
		for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"):
			id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text
			media_group = entry.find("{http://search.yahoo.com/mrss/}group")
			description = media_group.find("{http://search.yahoo.com/mrss/}description").text or ""
			media_community = media_group.find("{http://search.yahoo.com/mrss/}community")
			published_entry = entry.find("{http://www.w3.org/2005/Atom}published")
			if published_entry is not None: # sometimes youtube does not provide published dates, no idea why.
				published = int(dateutil.parser.isoparse(published_entry.text).timestamp())
				results.append({
					"type": "video",
					"title": entry.find("{http://www.w3.org/2005/Atom}title").text,
					"videoId": id,
					"author": author,
					"authorId": channel_id,
					"authorUrl": author_url,
					"videoThumbnails": generate_video_thumbnails(id),
					"description": description,
					"descriptionHtml": add_html_links(escape_html_textcontent(description)),
					"viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]),
					"published": published,
					"publishedText": time_to_past_text(published),
					"lengthSeconds": None,
					"liveNow": None,
					"paid": None,
					"premium": None,
					"isUpcoming": None
				})
			else:
				missing_published = True

		if len(results) == 0 and missing_published: # no results due to all missing published
			cherrypy.response.status = 503
			return {
				"error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.",
				"identifier": "PUBLISHED_DATES_NOT_PROVIDED"
			}

		with channel_latest_cache_lock:
			channel_latest_cache[ucid] = results

		return results