使用 Python Paramiko 使用多个连接/线程将大文件上传到 SFTP 服务器

2023-12-03

我正在尝试使用线程和 python paramiko 库将文件分块 SFTP 到远程服务器。

它在不同的线程中打开本地文件和 sftp 块到远程服务器。

我基本上遵循这个解决方案,它使用相同的方法通过 SFTP 下载大文件。我想改为发送大文件。下载解决方案

然而,我正在进入write_chunks()在线为chunk in infile.readv(chunks):收到此错误时:

AttributeError:'_io.BufferedReader'对象没有属性'readv'

有人可以帮忙解决这个错误吗?我以为infile是一个文件描述符。我不明白为什么这是一个_io.BufferedReader object.

import threading, os, time, paramiko

import time, paramiko

MAX_RETRIES = 10

ftp_server = "server.com"
port = 22
remote_file = "/home/filecopy.bin"
local_file = "/home/file.bin"
ssh_conn = sftp_client = None
username = "none"
password = "none"

#you could make the number of threads relative to file size
NUM_THREADS = 2
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    """creates filepart path from filepath"""
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, remote_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(remote_file_part, "wb") as outfile:
                with open(local_file, "rb") as infile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        # connect to get the file's size in order to calculate chunks
        #filesize = sftp_client.stat(remote_file).st_size
        filesize = os.stat(local_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(2, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        # break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            remote_file_part = make_filepart_path(remote_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, remote_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(remote_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        # join file parts into one file, remove fileparts
        with sftp_client.open(remote_file_part, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))

堆栈跟踪:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "simpleNNInference.py", line 210, in write_chunks
    for chunk in infile.readv(chunks):
AttributeError: '_io.BufferedReader' object has no attribute 'readv'

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "simpleNNInference.py", line 210, in write_chunks
    for chunk in infile.readv(chunks):
AttributeError: '_io.BufferedReader' object has no attribute 'readv'

有关如何对一个大文件进行并行分段上传的示例,请参阅以下示例。

请注意,大多数 SFTP 服务器(包括 OpenSSH,直到最近的 9.0)不允许远程合并文件。所以你必须恢复到 shell 命令.

import os
import threading
import paramiko

sftp_server = "example.com"
username = "username"
password = "password"

local_path = "/local/path/file.dat"
remote_path = "/remote/path/file.dat"

threads_count = 4

size = os.path.getsize(local_path)
part_size = int(size / threads_count)

def open_ssh():
    ssh = paramiko.SSHClient()
    ssh.connect(sftp_server, username=username, password=password)
    return ssh

def upload_part(num, offset, part_size, remote_path_part):
    print(f"Running thread {num}")
    try:
        ssh = open_ssh()
        sftp = ssh.open_sftp()
        with open(local_path, "rb") as fl:
            fl.seek(offset)
            with sftp.open(remote_path_part, "wb") as fr:
                fr.set_pipelined(True)
                size = 0
                while size < part_size:
                    s = 32768
                    if size + s > part_size:
                        s = part_size - size
                    data = fl.read(s)
                    fr.write(data)
                    size += len(data)
                    if len(data) == 0:
                        break
    except (paramiko.ssh_exception.SSHException) as x:
        print(f"Thread {num} failed: {x}")
    print(f"Thread {num} done")

print("Starting")
offset = 0
threads = []
part_filenames = []
for num in range(threads_count):
    if num == threads_count - 1:
        part_size = size - offset
    remote_path_part = f"{remote_path}.{num}"
    args = (num, offset, part_size, remote_path_part)
    print(f"Starting thread {num} offset {offset} size {part_size} " +
          f"part name {remote_path_part}")
    thread = threading.Thread(target=upload_part, args=args)
    threads.append(thread)
    part_filenames.append(remote_path_part)
    thread.start()
    print(f"Started thread {num}")
    offset += part_size

for num in range(len(threads)):
    print(f"Waiting for thread {num}")
    threads[num].join()

print("All thread done")

parts_list = " ".join(part_filenames)
merge_command =
    f"rm \"{remote_path}\" 2> /dev/null ; " + \
    f"for i in {parts_list} ; do cat \"$i\" >> {remote_path} && " + \
     "rm \"$i\" || break ; done"
print(f"Merge command: {merge_command}");

ssh = open_ssh()
stdin, stdout, stderr = ssh.exec_command(merge_command)
print(stdout.read().decode("utf-8"))
print(stderr.read().decode("utf-8"))

我不确定 SFTP 规范在多大程度上支持了这一点,但许多 SFTP 服务器(包括 OpenSSH)允许从多个连接并行写入同一文件。因此,即使不合并文件,您也可以直接上传到目标文件的各个部分:

import os
import threading
import paramiko

sftp_server = "example.com"
username = "username"
password = "password"

local_path = "/local/path/file.dat"
remote_path = "/remote/path/file.dat"

threads_count = 4

size = os.path.getsize(local_path)
part_size = int(size / threads_count)
lock = threading.Lock()
created = False

def upload_part(num, offset, part_size):
    print(f"Running thread {num}")
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(sftp_server, port=port, username=username, password=password)
        sftp = ssh.open_sftp()
        with open(local_path, "rb") as fl:
            fl.seek(offset)
            with lock:
                global created
                m = "r+" if created else "w"
                created = True
                fr = sftp.open(remote_path, m)
            with fr:
                fr.seek(offset)
                fr.set_pipelined(True)
                size = 0
                while size < part_size:
                    s = 32768
                    if size + s > part_size:
                        s = part_size - size
                    data = fl.read(s)
                    fr.write(data)
                    size += len(data)
                    if len(data) == 0:
                        break
    except (paramiko.ssh_exception.SSHException) as x:
        print(f"Thread {num} failed: {x}")
    print(f"Thread {num} done")

print("Starting")
offset = 0
threads = []
for num in range(threads_count):
    if num == threads_count - 1:
        part_size = size - offset
    args = (num, offset, part_size)
    print(f"Starting thread {num} offset {offset} size {part_size}")
    thread = threading.Thread(target=upload_part, args=args)
    threads.append(thread)
    thread.start()
    print(f"Started thread {num}")
    offset += part_size

for num in range(len(threads)):
    print(f"Waiting for thread {num}")
    threads[num].join()

print("All thread done")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Python Paramiko 使用多个连接/线程将大文件上传到 SFTP 服务器 的相关文章

  • 如何使用 opencv.omnidir 模块对鱼眼图像进行去扭曲

    我正在尝试使用全向模块 http docs opencv org trunk db dd2 namespacecv 1 1omnidir html用于对鱼眼图像进行扭曲处理Python 我正在尝试适应这一点C 教程 http docs op
  • Python 中的舍入浮点问题

    我遇到了 np round np around 的问题 它没有正确舍入 我无法包含代码 因为当我手动设置值 而不是使用我的数据 时 返回有效 但这是输出 In 177 a Out 177 0 0099999998 In 178 np rou
  • Python getstatusoutput 替换不返回完整输出

    我发现了这个很棒的替代品getstatusoutput Python 2 中的函数在 Unix 和 Windows 上同样有效 不过我觉得这个方法有问题output被构建 它只返回输出的最后一行 但我不明白为什么 任何帮助都是极好的 def
  • 跟踪 pypi 依赖项 - 谁在使用我的包

    无论如何 是否可以通过 pip 或 PyPi 来识别哪些项目 在 Pypi 上发布 可能正在使用我的包 也在 PyPi 上发布 我想确定每个包的用户群以及可能尝试积极与他们互动 预先感谢您的任何答案 即使我想做的事情是不可能的 这实际上是不
  • 删除flask中的一对一关系

    我目前正在使用 Flask 开发一个应用程序 并且在删除一对一关系中的项目时遇到了一个大问题 我的模型中有以下结构 class User db Model tablename user user id db Column db String
  • Pandas 日期时间格式

    是否可以用零后缀表示 pd to datetime 似乎零被删除了 print pd to datetime 2000 07 26 14 21 00 00000 format Y m d H M S f 结果是 2000 07 26 14
  • 您可以格式化 pandas 整数以进行显示,例如浮点数的“pd.options.display.float_format”?

    我见过this https stackoverflow com questions 18404946 py pandas formatdataframe and this https stackoverflow com questions
  • YOLOv8获取预测边界框

    我想将 OpenCV 与 YOLOv8 集成ultralytics 所以我想从模型预测中获取边界框坐标 我该怎么做呢 from ultralytics import YOLO import cv2 model YOLO yolov8n pt
  • datetime.datetime.now() 返回旧值

    我正在通过匹配日期查找 python 中的数据存储条目 我想要的是每天选择 今天 的条目 但由于某种原因 当我将代码上传到 gae 服务器时 它只能工作一天 第二天它仍然返回相同的值 例如当我上传代码并在 07 01 2014 执行它时 它
  • 为什么 PyYAML 花费这么多时间来解析 YAML 文件?

    我正在解析一个大约 6500 行的 YAML 文件 格式如下 foo1 bar1 blah name john age 123 metadata whatever1 whatever whatever2 whatever stuff thi
  • 如何使用 pybrain 黑盒优化训练神经网络来处理监督数据集?

    我玩了一下 pybrain 了解如何生成具有自定义架构的神经网络 并使用反向传播算法将它们训练为监督数据集 然而 我对优化算法以及任务 学习代理和环境的概念感到困惑 例如 我将如何实现一个神经网络 例如 1 以使用 pybrain 遗传算法
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • Numpy - 根据表示一维的坐标向量的条件替换数组中的值

    我有一个data多维数组 最后一个是距离 另一方面 我有距离向量r 例如 Data np ones 20 30 100 r np linspace 10 50 100 最后 我还有一个临界距离值列表 称为r0 使得 r0 shape Dat
  • 如何使用原始 SQL 查询实现搜索功能

    我正在创建一个由 CS50 的网络系列指导的应用程序 这要求我仅使用原始 SQL 查询而不是 ORM 我正在尝试创建一个搜索功能 用户可以在其中查找存储在数据库中的书籍列表 我希望他们能够查询 书籍 表中的 ISBN 标题 作者列 目前 它
  • 如何在 MacBook Pro 上的 Docker 容器内运行 tkinter?

    我正在尝试运行一个使用以下命令的 python GUI 应用程序tkinter我的 MacBook Pro 上的 docker 容器内的模块 所以我安装了XQuartz https www xquartz org 并跟随本教程 https
  • 如何在 Windows 命令行中使用参数运行 Python 脚本

    这是我的蟒蛇hello py script def hello a b print hello and that s your sum sum a b print sum import sys if name main hello sys
  • 如何在 pygtk 中创建新信号

    我创建了一个 python 对象 但我想在它上面发送信号 我让它继承自 gobject GObject 但似乎没有任何方法可以在我的对象上创建新信号 您还可以在类定义中定义信号 class MyGObjectClass gobject GO
  • 实现 XGboost 自定义目标函数

    我正在尝试使用 XGboost 实现自定义目标函数 在 R 中 但我也使用 python 所以有关 python 的任何反馈也很好 我创建了一个返回梯度和粗麻布的函数 它工作正常 但是当我尝试运行 xgb train 时它不起作用 然后 我
  • Django-tables2 列总计

    我正在尝试使用此总结列中的所有值文档 https github com bradleyayers django tables2 blob master docs pages column headers and footers rst 但页
  • 使用随机放置的 NaN 创建示例 numpy 数组

    出于测试目的 我想创建一个M by Nnumpy 数组与c随机放置的 NaN import numpy as np M 10 N 5 c 15 A np random randn M N A mask np nan 我在创建时遇到问题mas

随机推荐

  • shell_exec() 和 exec() 在 PHP 中不起作用

    像许多其他人一样 我对 PHP 中的 shell exec 函数有疑问 我已禁用安全模式并从 php ini 中删除了disabled functions 如果我从终端 php print php 运行 php 脚本 它工作正常 但如果我从
  • 在 MATLAB 中使用 i 和 j 作为变量

    i and j是非常流行的变量名称 例如 参见这个问题 and this one 例如 在循环中 for i 1 10 Do something end 作为矩阵的索引 mat i j 4 Why 不应该它们可以用作 MATLAB 中的变量
  • 教程的版本控制设置

    我正在尝试为与编程相关的教程设置版本控制 事实证明这是有问题的 因为有两种不同的历史 本教程提供了项目构建的历史记录 每个章节都提供了该项目的历史记录 读者将看到这些历史记录 如果我从未打算再次更改教程中已编写的章节 我可以将每个章节作为标
  • 不支持的绑定命名空间“”

    我有一个架构JAXB每次都能完美生成java类 我想得到hyperjaxb处理相同的模式 为此 我下载并解压了hyperjaxbMaven项目从这个链接然后使用导航到根目录cmd exe并通过运行示例数据对其进行测试mvn clean in
  • spring destroy-method + 请求范围 bean

    所以我想做这样的事情 Component Scope value request proxyMode ScopedProxyMode INTERFACES public class MyBean Autowired HttpServletR
  • Distcp - 容器运行超出物理内存限制

    我已经在 distcp 上苦苦挣扎了好几天 我发誓我已经用谷歌搜索得够多了 这是我的用例 USE CASE 我在某个位置有一个主文件夹 hdfs 根目录 有很多子目录 深度不固定 和文件 容量 200 000 个文件 30 GO 我只需要为
  • 提取每月第一个星期一

    如何提取从2010 01 01到2015 12 31每个月的第一个星期一 我们可以用lubridate wday测试这是否是星期一 并且day测试这是否是该月的第一周 library lubridate x lt seq ymd 2010
  • 将项目从 SQL 表插入 bootstrap-dropdown

    我正在开发 asp net 项目 我的代码背后的语言是 c 我有一个引导下拉列表 我想从 SQL 表中获取项目 有没有人可以在这方面帮助我 提前致谢 li class nav item dropdown a class btn btn li
  • durandal 优化器在 Visual Studio 中将其构建为构建后过程时引用了错误的路径

    我在 Visual Studio 中设置了一个构建后事件 以使用 durandal 的优化器 使用 Nodejs 来构建用于生产的 main built js 文件 收到错误消息说找不到 main built js 我相信这是因为它没有正确
  • 使用 ELMAH 配置自定义授权

    如何在没有默认 ASP NET 授权角色管理器的情况下将 ELMAH 配置为仅向某些人员显示 我 以及我认为的许多其他人 使用自己的授权逻辑并从零开始构建我的项目 而不使用提供的模板 我想记录错误 但似乎不可能配置 ELMAH 以某种方式覆
  • YouTube 嵌入上的播放按钮在 android-chrome 上不起作用

    我一直在页面上制作嵌入的 YouTube 视频 在桌面浏览器上运行良好 但是 在 android chrome 上 当您触摸中心的红色播放按钮时 嵌入的视频将不会播放 当你触摸播放按钮外面时 它确实可以正常播放 这很奇怪 我的客户也在 iP
  • 在 Mapbox 中使用 Leafletjs MarkerClusterGroup 和过滤器时出现问题

    我尝试过 Mapbox 及其 API 来创建交互式地图 目的是获取 geojson 文件中的点 并将其显示在地图上 它们必须通过标记图标进行过滤 并根据所应用的缩放进行分组 我在使用 MarkerClusterGroup 插件与 leafl
  • JavaScript 沙箱:隐藏给定范围内的全局变量

    我想创建一个 HTML JS 环境 用户可以在其中输入并运行任意 JavaScript 代码 这些代码将在给定监狱对象的上下文中执行 我已经设置了一个游乐场来说明我到目前为止所拥有的 这个做得相当不错 Basic evaluation wo
  • ASP / 获取行和计数

    为了增强性能和资源 我刚刚开始在一些脚本上使用 getRows 我刚刚遇到一个问题 想请教一下 我这样做是为了获取记录集并获取计数 If NOT rs EOF Then arrResultSet rs GetRows arrRowCount
  • java文本字段中的数据可以在没有数据库交互的情况下发送到jasper报表吗?

    我们正在使用 netbeans 用 java 开发一个桌面应用程序 我们已经安装了 Netbeans 的 jasper 报告 并且能够根据数据库中的数据设计报告 有一个表格 我们想要打印而不将数据存储在数据库中 我们可以将表单数据发送到ja
  • 如何使用 initSelection 附加 jquery select2 值

    这是我使用 ajax 进行的 select2 多重选择 最初我在 initselection 中设置一些值 如下所示 initSelection function element callback var data id 4 zipcode
  • 对引用程序集中的类进行 GetType 失败

    我有一个引用域项目的 asp net Web 项目 在 Web 项目中 我想使用反射从域项目创建类的实例 但我总是得到 null 在 VB 中什么也没有 注意 我使用的是非完全限定的类名 并希望按照 MSDN 似乎指示的那样执行搜索 在程序
  • 如何在不使用 vba 创建 Internet Explorer 对象的情况下解析 html?

    我工作的任何计算机上都没有 Internet Explorer 因此创建 Internet Explorer 对象并使用 ie navigate 解析 html 并搜索标签是不可能的 我的问题是 如何在不使用 IE 的情况下自动将带有标签的
  • 使用芬威克树或 BIT 的数组中非递减子序列的最大和

    我们如何使用芬威克树找到数组中非递减子序列的最大和 例如我们有 1 4 4 2 2 3 3 1 这里非递减子序列的最大和是 11 1 2 2 3 3 可以使用动态规划算法找到最大和 扫描数组并将每个元素的值添加到有效的最大子序列和 子序列以
  • 使用 Python Paramiko 使用多个连接/线程将大文件上传到 SFTP 服务器

    我正在尝试使用线程和 python paramiko 库将文件分块 SFTP 到远程服务器 它在不同的线程中打开本地文件和 sftp 块到远程服务器 我基本上遵循这个解决方案 它使用相同的方法通过 SFTP 下载大文件 我想改为发送大文件