如何在 Telethon 中正确使用 iter_download 功能进行多连接下载

2024-01-26

我一直在尝试实现一个多线程电报下载客户端。对于单个下载,我们可以简单地使用 download_media 功能。

但 telethon 提供了 iter_download 函数,根据文档,它用于流媒体,还包括暂停和恢复功能。我们可以使用它来通过多个连接下载单个文件。

这就是我到目前为止所编写的脚本。没有地方可以找到任何可靠的多连接示例下载

async def multi_downloader(file, total_size, part, offset, part_size):

    f = open('output.mkv.'+str(part), 'wb')
    size = 0

    global chunk_size
    limit = 10485760#closestInteger(part_size / chunk_size, 10485760)
    print(limit)
    print(part)
    async for chunk in client.iter_download(obj, offset = offset, limit = limit, chunk_size = chunk_size, request_size = chunk_size, file_size = total_size):
        f.write(chunk)
        f.flush()
        size += (len(chunk))
        if size >= (part_size):
            print("Part "+str(part)+" completed. "+str(part_size))
            break
    f.close()

问题是,如果我更改查找的偏移量,它总是会抛出无效限制错误。如果偏移量为零,那么一切都很好。

telethon.errors.rpcerrorlist.LimitInvalidError:无效的限制 假如。看https://core.telegram.org/api/files#downloading-files https://core.telegram.org/api/files#downloading-files(由 GetFileRequest 引起)


我们已经制作了类似的东西,您可以在这里找到https://gist.github.com/painor/7e74de80ae0c819d3e9abcf9989a8dd6 https://gist.github.com/painor/7e74de80ae0c819d3e9abcf9989a8dd6。 代码 :

"""
> Based on parallel_file_transfer.py from mautrix-telegram, with permission to distribute under the MIT license
> Copyright (C) 2019 Tulir Asokan - https://github.com/tulir/mautrix-telegram
"""
import asyncio
import hashlib
import inspect
import logging
import os
from collections import defaultdict
from typing import Optional, List, AsyncGenerator, Union, Awaitable, DefaultDict, Tuple, BinaryIO

import math
from telethon import utils, helpers, TelegramClient
from telethon.crypto import AuthKey
from telethon.network import MTProtoSender
from telethon.tl.functions.auth import ExportAuthorizationRequest, ImportAuthorizationRequest
from telethon.tl.functions.upload import (GetFileRequest, SaveFilePartRequest,
                                          SaveBigFilePartRequest)
from telethon.tl.types import (Document, InputFileLocation, InputDocumentFileLocation,
                               InputPhotoFileLocation, InputPeerPhotoFileLocation, TypeInputFile,
                               InputFileBig, InputFile)

log: logging.Logger = logging.getLogger("telethon")
logging.basicConfig(level=logging.WARNING)
TypeLocation = Union[Document, InputDocumentFileLocation, InputPeerPhotoFileLocation,
                     InputFileLocation, InputPhotoFileLocation]


def stream_file(file_to_stream: BinaryIO, chunk_size=1024):
    while True:
        data_read = file_to_stream.read(chunk_size)
        if not data_read:
            break
        yield data_read


class DownloadSender:
    sender: MTProtoSender
    request: GetFileRequest
    remaining: int
    stride: int

    def __init__(self, sender: MTProtoSender, file: TypeLocation, offset: int, limit: int,
                 stride: int, count: int) -> None:
        self.sender = sender
        self.request = GetFileRequest(file, offset=offset, limit=limit)
        self.stride = stride
        self.remaining = count

    async def next(self) -> Optional[bytes]:
        if not self.remaining:
            return None
        result = await self.sender.send(self.request)
        self.remaining -= 1
        self.request.offset += self.stride
        return result.bytes

    def disconnect(self) -> Awaitable[None]:
        return self.sender.disconnect()


class UploadSender:
    sender: MTProtoSender
    request: Union[SaveFilePartRequest, SaveBigFilePartRequest]
    part_count: int
    stride: int
    previous: Optional[asyncio.Task]
    loop: asyncio.AbstractEventLoop

    def __init__(self, sender: MTProtoSender, file_id: int, part_count: int, big: bool, index: int,
                 stride: int, loop: asyncio.AbstractEventLoop) -> None:
        self.sender = sender
        self.part_count = part_count
        if big:
            self.request = SaveBigFilePartRequest(file_id, index, part_count, b"")
        else:
            self.request = SaveFilePartRequest(file_id, index, b"")
        self.stride = stride
        self.previous = None
        self.loop = loop

    async def next(self, data: bytes) -> None:
        if self.previous:
            await self.previous
        self.previous = self.loop.create_task(self._next(data))

    async def _next(self, data: bytes) -> None:
        self.request.bytes = data
        log.debug(f"Sending file part {self.request.file_part}/{self.part_count}"
                  f" with {len(data)} bytes")
        await self.sender.send(self.request)
        self.request.file_part += self.stride

    async def disconnect(self) -> None:
        if self.previous:
            await self.previous
        return await self.sender.disconnect()


class ParallelTransferrer:
    client: TelegramClient
    loop: asyncio.AbstractEventLoop
    dc_id: int
    senders: Optional[List[Union[DownloadSender, UploadSender]]]
    auth_key: AuthKey
    upload_ticker: int

    def __init__(self, client: TelegramClient, dc_id: Optional[int] = None) -> None:
        self.client = client
        self.loop = self.client.loop
        self.dc_id = dc_id or self.client.session.dc_id
        self.auth_key = (None if dc_id and self.client.session.dc_id != dc_id
                         else self.client.session.auth_key)
        self.senders = None
        self.upload_ticker = 0

    async def _cleanup(self) -> None:
        await asyncio.gather(*[sender.disconnect() for sender in self.senders])
        self.senders = None

    @staticmethod
    def _get_connection_count(file_size: int, max_count: int = 20,
                              full_size: int = 100 * 1024 * 1024) -> int:
        if file_size > full_size:
            return max_count
        return math.ceil((file_size / full_size) * max_count)

    async def _init_download(self, connections: int, file: TypeLocation, part_count: int,
                             part_size: int) -> None:
        minimum, remainder = divmod(part_count, connections)

        def get_part_count() -> int:
            nonlocal remainder
            if remainder > 0:
                remainder -= 1
                return minimum + 1
            return minimum

        # The first cross-DC sender will export+import the authorization, so we always create it
        # before creating any other senders.
        self.senders = [
            await self._create_download_sender(file, 0, part_size, connections * part_size,
                                               get_part_count()),
            *await asyncio.gather(
                *[self._create_download_sender(file, i, part_size, connections * part_size,
                                               get_part_count())
                  for i in range(1, connections)])
        ]

    async def _create_download_sender(self, file: TypeLocation, index: int, part_size: int,
                                      stride: int,
                                      part_count: int) -> DownloadSender:
        return DownloadSender(await self._create_sender(), file, index * part_size, part_size,
                              stride, part_count)

    async def _init_upload(self, connections: int, file_id: int, part_count: int, big: bool
                           ) -> None:
        self.senders = [
            await self._create_upload_sender(file_id, part_count, big, 0, connections),
            *await asyncio.gather(
                *[self._create_upload_sender(file_id, part_count, big, i, connections)
                  for i in range(1, connections)])
        ]

    async def _create_upload_sender(self, file_id: int, part_count: int, big: bool, index: int,
                                    stride: int) -> UploadSender:
        return UploadSender(await self._create_sender(), file_id, part_count, big, index, stride,
                            loop=self.loop)

    async def _create_sender(self) -> MTProtoSender:
        dc = await self.client._get_dc(self.dc_id)
        sender = MTProtoSender(self.auth_key, self.loop, loggers=self.client._log)
        await sender.connect(self.client._connection(dc.ip_address, dc.port, dc.id,
                                                     loop=self.loop, loggers=self.client._log,
                                                     proxy=self.client._proxy))
        if not self.auth_key:
            log.debug(f"Exporting auth to DC {self.dc_id}")
            auth = await self.client(ExportAuthorizationRequest(self.dc_id))
            req = self.client._init_with(ImportAuthorizationRequest(
                id=auth.id, bytes=auth.bytes
            ))
            await sender.send(req)
            self.auth_key = sender.auth_key
        return sender

    async def init_upload(self, file_id: int, file_size: int, part_size_kb: Optional[float] = None,
                          connection_count: Optional[int] = None) -> Tuple[int, int, bool]:
        connection_count = connection_count or self._get_connection_count(file_size)
        print("init_upload count is ", connection_count)
        part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024
        part_count = (file_size + part_size - 1) // part_size
        is_large = file_size > 10 * 1024 * 1024
        await self._init_upload(connection_count, file_id, part_count, is_large)
        return part_size, part_count, is_large

    async def upload(self, part: bytes) -> None:
        await self.senders[self.upload_ticker].next(part)
        self.upload_ticker = (self.upload_ticker + 1) % len(self.senders)

    async def finish_upload(self) -> None:
        await self._cleanup()

    async def download(self, file: TypeLocation, file_size: int,
                       part_size_kb: Optional[float] = None,
                       connection_count: Optional[int] = None) -> AsyncGenerator[bytes, None]:
        connection_count = connection_count or self._get_connection_count(file_size)
        print("download count is ", connection_count)

        part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024
        part_count = math.ceil(file_size / part_size)
        log.debug("Starting parallel download: "
                  f"{connection_count} {part_size} {part_count} {file!s}")
        await self._init_download(connection_count, file, part_count, part_size)

        part = 0
        while part < part_count:
            tasks = []
            for sender in self.senders:
                tasks.append(self.loop.create_task(sender.next()))
            for task in tasks:
                data = await task
                if not data:
                    break
                yield data
                part += 1
                log.debug(f"Part {part} downloaded")

        log.debug("Parallel download finished, cleaning up connections")
        await self._cleanup()


parallel_transfer_locks: DefaultDict[int, asyncio.Lock] = defaultdict(lambda: asyncio.Lock())


async def _internal_transfer_to_telegram(client: TelegramClient,
                                         response: BinaryIO,
                                         progress_callback: callable
                                         ) -> Tuple[TypeInputFile, int]:
    file_id = helpers.generate_random_long()
    file_size = os.path.getsize(response.name)

    hash_md5 = hashlib.md5()
    uploader = ParallelTransferrer(client)
    part_size, part_count, is_large = await uploader.init_upload(file_id, file_size)
    buffer = bytearray()
    for data in stream_file(response):
        if progress_callback:
            r = progress_callback(response.tell(), file_size)
            if inspect.isawaitable(r):
                await r
        if not is_large:
            hash_md5.update(data)
        if len(buffer) == 0 and len(data) == part_size:
            await uploader.upload(data)
            continue
        new_len = len(buffer) + len(data)
        if new_len >= part_size:
            cutoff = part_size - len(buffer)
            buffer.extend(data[:cutoff])
            await uploader.upload(bytes(buffer))
            buffer.clear()
            buffer.extend(data[cutoff:])
        else:
            buffer.extend(data)
    if len(buffer) > 0:
        await uploader.upload(bytes(buffer))
    await uploader.finish_upload()
    if is_large:
        return InputFileBig(file_id, part_count, "upload"), file_size
    else:
        return InputFile(file_id, part_count, "upload", hash_md5.hexdigest()), file_size


async def download_file(client: TelegramClient,
                                        location: TypeLocation,
                                        out: BinaryIO,
                                        progress_callback: callable = None
                                        ) -> BinaryIO:
    size = location.size
    dc_id, location = utils.get_input_location(location)
    # We lock the transfers because telegram has connection count limits
    downloader = ParallelTransferrer(client, dc_id)
    downloaded = downloader.download(location, size)
    async for x in downloaded:
        out.write(x)
        if progress_callback:
            r = progress_callback(out.tell(), size)
            if inspect.isawaitable(r):
                await r

    return out


async def upload_file(client: TelegramClient,
                                        file: BinaryIO,
                                        progress_callback: callable = None,

                                        ) -> TypeInputFile:
    res = (await _internal_transfer_to_telegram(client, file, progress_callback))[0]
    return res

usage :

下载文件:

await download_file(client, msg.document, file, progress_callback=prog)

上传文件:

result = await parallel_transfer_to_telegram(client, file, progress_callback=prog)
await client.send_file(event.chat_id, file=result)

已知的问题 :

如果您使用的是机器人帐户,DC ID 可能会混乱,因此您需要在调用 .start() 后立即执行此操作:

config = await client(functions.help.GetConfigRequest())
for option in config.dc_options:
    if option.ip_address == client.session.server_address:
        if client.session.dc_id != option.id:
            log.warning(f"Fixed DC ID in session from {client.session.dc_id} to {option.id}")
        client.session.set_dc(option.id, option.ip_address, option.port)
        client.session.save()
        break
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Telethon 中正确使用 iter_download 功能进行多连接下载 的相关文章

  • Telegram 机器人等待用户回复

    下面的代码是针对电报机器人这基本上需要一个人username and password并验证它以提供他的平均支出 我们看到的问题是机器人等待用户发送他的用户名和密码10 sec要么浪费时间 要么 没有给予足够的时间 我怎样才能编程 让机器人
  • 在 python 中添加与 telethon 的联系

    最近 我尝试根据本教程在 telegram 和 telethon 中添加联系人 1 在 api telegram python telethon 中添加新联系人 我用了这个代码 contact InputPhoneContact clien
  • Telegram bot webhook 将用户标识符 id 返回为 null

    我正在开发一个 telegram 机器人 并设置一个 webhook 将用户的聊天 id 存储到我的 laravel 应用程序的数据库中 我之前得到了聊天 ID 但今天 我得到了 NULL 我正在使用 ngok for https 来与 w
  • Telegram 客户端消息发送限制是什么?

    From here https core telegram org bots faq我知道 Telegram 对 BOT 消息的限制如下 gt 每个聊天 1 条消息 秒 gt 30 条消息 秒不同的聊天 碰巧我没有使用python tele
  • 如何使用 telegram bot api 在群聊中接收消息

    我的电报机器人在私人聊天中接收用户发送到我的机器人的消息 但不接收用户在群聊中发送的消息 任何用于获取群聊消息的选项 api Talk to botfather http telegram me botfather并禁用隐私模式
  • Telegram 机器人不适用于所有用户

    我创建了几个 Telegram 机器人 它们适用于我的帐户以及我测试过的其他几个帐户 但我收到一些用户的报告称机器人从未做出回应 是否有一些用户设置会阻止帐户从机器人获取消息 或者还有其他想法为什么它不适用于某些帐户 好的 找到问题了 是p
  • 使用aiogram创建后台进程

    我正在尝试在我正在开发的使用 aiogram 的电报机器人中发送加密货币的价格警报 我遇到的问题是 我不确定如何启动一个函数作为后台 非阻塞线程 然后继续启动调度程序 我知道如何使用标准同步电报机器人来做到这一点 但我对我应该用 aiogr
  • Telegram Bot API 4.5 MarkdownV2 上的转义字符给超链接带来麻烦

    电报机器人 API 4 5带有新的解析模式 MarkdownV2 同时这些 gt 字符必须与前面的字符一起转义 replace g 用作添加转义字符的解决方案 效果非常好 但不幸的是 该解决方案确实影响超链接方法 inline URL ht
  • Telegram 机器人:如何通过 ID(而不是用户名)提及用户

    我正在创建一个电报机器人并使用sendMessage发送消息的方法 很容易提及用户使用 username 但是当用户没有用户名时如何提及用户呢 如果使用电报应用程序 网络 我们可以通过以下方式提及用户 integer id name 而 t
  • 使 Python-Telegram-Bot 持久化

    我最近使用 python telegram bot 库编写了一个简单的电报机器人 并将该机器人部署在 Heroku 上 现在我正在寻找一种有效的方法来使机器人持久化 允许它在运行之间存储 Bot data 和 user data 我查看了库
  • 如何在 Telethon 中正确使用 iter_download 功能进行多连接下载

    我一直在尝试实现一个多线程电报下载客户端 对于单个下载 我们可以简单地使用 download media 功能 但 telethon 提供了 iter download 函数 根据文档 它用于流媒体 还包括暂停和恢复功能 我们可以使用它来通
  • Python Telegram Bot - run_daily 不起作用

    我有与此相同的问题thread https stackoverflow com questions 61650938 telegram bot how to send messages daily new answer newreg 4f6
  • 强制关闭电报上的弹出窗口“打开此链接?”

    当我在帖子中使用 html 格式并创建链接时 Telegram 会显示一个弹出窗口before打开链接 是否有脚本或其他东西可以强制关闭弹出窗口并立即打开链接 我不想插入链接没有 HTML 在单击链接之前 我按下 Enter 键 我使用 A
  • 电报机器人 - 保留问题和答案

    我的电报机器人是一个对话框 它需要保留问题和答案 如 TriviaBot 执行此操作的最佳 最有效 方法是什么 以用户 ID 作为键的数据库 有很多电报机器人 但是哪里有带有源代码的示例来获取想法 您的问题与 telegram bot AP
  • Telegram bot API:我可以混合使用自定义键盘和force_reply吗?

    我有一个自定义键盘 但要处理答案 我需要强制回复 以便我在下一条消息中收到问题 我已经这样做了 var opts reply markup JSON stringify keyboard OK Cancel one time keyboar
  • 使用 PHP 的 telegram API 发送图像总是失败

    我想制作一个使用 Telegram API 发送图像的函数 参考 API https github com mgp25 Telegram Bot API https github com mgp25 Telegram Bot API 但是当
  • Telegram Bot getUpdates VS setWebhook

    我想为企业开发一个机器人 我不知道使用获取更新 https core telegram org bots api getupdates开发 Windows 桌面应用程序并在 vps 上运行该应用程序的方法 通过https github co
  • 冲突:被其他getUpdates请求终止;确保只有一个机器人实例正在运行

    有人遇到这样的错误吗 我该如何修复它们 2021 11 07 08 29 38 643 telegram ext updater ERROR Error while getting Updates Conflict terminated b
  • 我正在用 python 编写一个电报机器人

    我想通过Python编写一个电报机器人 但它不起作用 import telebot bot telebot TeleBot my token bot message handler content types text def sendin
  • 自动更改 github 文件

    我制作了一个带有白名单的应用程序 withelist 位于 github 存储库上 只有一个文件 即 withelist 每次下载我的应用程序的用户想要被允许使用该应用程序时 都必须向我发送一个消息插入白名单 现在这个过程真的很慢 我想加快

随机推荐