Python 中的 Youtube 数据 API nextPageToken 循环

2024-03-19

我根据在网上找到的许多不同示例将其拼凑在一起。

目标是:

  1. 在 youtube api 中搜索
  2. 将多个页面的搜索结果转换为 csv 文件

编辑:由于提供的答案之一,这是搜索循环的工作示例。 现在按预期循环了最大次数 (10),但是执行时的问题是CSV file

似乎在调用response之后,即使调用了results and writeCSV after.

任何进一步的帮助将不胜感激!

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import argparse

DEVELOPER_KEY = "dev-key"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=DEVELOPER_KEY)


# -------------Build YouTube Search------------#
def youtubeSearch(query, order="relevance"):
    # search 50 results per page
    request = youtube.search().list(
        q=query,
        type="video",
        order=order,
        part="id,snippet",
        maxResults="50",
        relevanceLanguage='en',
        videoDuration='long',
        fields='nextPageToken, items(id,snippet)'
    )

    title = []
    channelId = []
    channelTitle = []
    categoryId = []
    videoId = []
    viewCount = []
    likeCount = []
    dislikeCount = []
    commentCount = []
    favoriteCount = []
    tags = []
    category = []
    videos = []

    while request:
        response = request.execute()
        for search_result in response.get("items", []):
            if search_result["id"]["kind"] == "youtube#video":

                # append title and video for each item
                title.append(search_result['snippet']['title'])
                videoId.append(search_result['id']['videoId'])

                # then collect stats on each video using videoId
                stats = youtube.videos().list(
                    part='statistics, snippet',
                    id=search_result['id']['videoId']).execute()

                channelId.append(stats['items'][0]['snippet']['channelId'])
                channelTitle.append(stats['items'][0]['snippet']['channelTitle'])
                categoryId.append(stats['items'][0]['snippet']['categoryId'])
                favoriteCount.append(stats['items'][0]['statistics']['favoriteCount'])
                viewCount.append(stats['items'][0]['statistics']['viewCount'])

                # Not every video has likes/dislikes enabled so they won't appear in JSON response
                try:
                    likeCount.append(stats['items'][0]['statistics']['likeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Likes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    # Appends "Not Available" to keep dictionary values aligned
                    likeCount.append("Not available")

                try:
                    dislikeCount.append(stats['items'][0]['statistics']['dislikeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Dislikes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    dislikeCount.append("Not available")

                # Sometimes comments are disabled so if they exist append, if not append nothing...
                # It's not uncommon to disable comments, so no need to wrap in try and except
                if 'commentCount' in stats['items'][0]['statistics'].keys():
                    commentCount.append(stats['items'][0]['statistics']['commentCount'])
                else:
                    commentCount.append(0)

                if 'tags' in stats['items'][0]['snippet'].keys():
                    tags.append(stats['items'][0]['snippet']['tags'])
                else:
                    # I'm not a fan of empty fields
                    tags.append("No Tags")
        request = youtube.search().list_next(
            request, response)
    # Break out of for-loop and if statement and store lists of values in dictionary
    youtube_dict = {'tags': tags, 'channelId': channelId, 'channelTitle': channelTitle,
                    'categoryId': categoryId, 'title': title, 'videoId': videoId,
                    'viewCount': viewCount, 'likeCount': likeCount, 'dislikeCount': dislikeCount,
                    'commentCount': commentCount, 'favoriteCount': favoriteCount}


    print("Search Completed...")
    print("Total results: {0} \nResults per page: {1}".format(request['pageInfo']['totalResults'],
                                                              request['pageInfo']['resultsPerPage']))
    print("Example output per item, snippet")
    print(request['items'][0]['snippet'].keys())
    # Assign first page of results (items) to item variable
    items = request['items']  # 50 "items"

    # Assign 1st results to title, channelId, datePublished then print
    title = items[0]['snippet']['title']
    channelId = items[0]['snippet']['channelId']
    datePublished = items[0]['snippet']['publishedAt']
    print("First result is: \n Title: {0} \n Channel ID: {1} \n Published on: {2}".format(title, channelId,
                                                                                          datePublished))
    return youtube_dict


# Input query
print("Please input your search query")
q = input()
# Run YouTube Search
results = youtubeSearch(q)
# Display result titles
print("Top 3 results are: \n {0}, ({1}), \n {2}, ({3}),\n {4}, ({5})".format(results['title'][0],
                                                                             results['channelTitle'][0],
                                                                             results['title'][1],
                                                                             results['channelTitle'][1],
                                                                             results['title'][2],
                                                                             results['channelTitle'][2]))

# -------------------------Save results------------------------------#
print("Input filename to store csv file")
file = "\\YouTube\\" + input() + ".csv"


def writeCSV(results, filename):
    import csv
    keys = sorted(results.keys())
    with open(filename, "w", newline="", encoding="utf-8") as output:
        writer = csv.writer(output, delimiter=",")
        writer.writerow(keys)
        writer.writerows(zip(*[results[key] for key in keys]))


writeCSV(results, file)
print("CSV file has been uploaded at: " + str(file))


由于您使用的是 GooglePython 的 API 客户端库 https://github.com/googleapis/google-api-python-client, the 蟒蛇式的方式 https://googleapis.github.io/google-api-python-client/docs/pagination.html实施的结果集分页 https://developers.google.com/youtube/v3/guides/implementation/pagination on the Search.list https://developers.google.com/youtube/v3/docs/search/listAPI 端点如下所示:

request = youtube.search().list(
    q = 'A query',
    part = 'id,snippet',
    type = 'video',
    maxResults = 50,
    relevanceLanguage = 'en',
    videoDuration = 'long'
)

while request:
    response = request.execute()

    for item in response['items']:
        ...

    request = youtube.search().list_next(
        request, response)

由于 Python 客户端库的实现方式,它是如此简单:无需显式处理 API 响应对象的属性nextPageToken https://developers.google.com/youtube/v3/docs/search/list#nextPageToken以及API请求参数pageToken at all.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 中的 Youtube 数据 API nextPageToken 循环 的相关文章

随机推荐