使用 Play Scala 使用 Iteratees 将文件直接逐块上传到 S3

2024-03-24

我尝试使用 Iteratees 将文件直接上传到 s3,但没有成功。我对函数式编程仍然很陌生,并且发现很难拼凑一些工作代码。

我编写了一个 iteratee 来处理上传文件的块并将它们发送到 S3。最后上传失败并出现错误。

请帮我解决这个问题。

下面是我想出的代码

控制器处理程序

  def uploadFile = Action.async(BodyParser(rh => S3UploadHelper("bucket-name").s3Iteratee() ))  { implicit request =>
    Future {
      if(uploadLogger.isInfoEnabled) uploadLogger.info(s"Contents of Uploaded file :: \n " + request.body)
      Ok(views.html.index("File uploaded"))
    }
  }

助手类

case class S3UploadHelper(bucket: String, key: String = UUID.generate()) {

  private val AWS_ACCESS_KEY = ""
  private val AWS_SECRET_KEY = ""
  private val yourAWSCredentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY)
  val amazonS3Client = new AmazonS3Client(yourAWSCredentials)


  private val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  private val initResponse = amazonS3Client.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val uploadLogger = Logger("upload")

  def s3Iteratee(etags: Seq[PartETag] = Seq.empty[PartETag]): Iteratee[Array[Byte], Either[Result, CompleteMultipartUploadResult]] = Cont {
    case in: El[Array[Byte]] =>
      // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
      val uploadRequest = new UploadPartRequest()
        .withBucketName(bucket)
        .withKey(key)
        .withPartNumber(etags.length + 1)
        .withUploadId(uploadId)
        .withInputStream(new ByteArrayInputStream(in.e))
        .withPartSize(in.e.length)
      if(uploadLogger.isDebugEnabled) uploadLogger.debug(">> " + String.valueOf(in.e))
      val etag = Future { amazonS3Client.uploadPart(uploadRequest).getPartETag }
      etag.map(etags :+ _)
      Await.result(etag, 1.seconds)
      s3Iteratee(etags)
    case in @ Empty => s3Iteratee(etags)
    case in @ EOF =>
      import scala.collection.JavaConversions._
      val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toList)
      val result = amazonS3Client.completeMultipartUpload(compRequest)
      Done(Right(result), in)
    case in => s3Iteratee(etags)
  }

}

尽管 Iteratee 似乎可以工作并且我能够逐块处理文件,但上传失败并出现奇怪的错误。这是日志

[debug] upload - >> [B@1df9048d
[debug] upload - >> [B@152dcf59
[debug] upload - >> [B@7cfeb0d8
[debug] upload - >> [B@136844c5
[debug] upload - >> [B@16f41590
[debug] upload - >> [B@6dd85710
[debug] upload - >> [B@64294203
[debug] upload - >> [B@35366c2f
[debug] upload - >> [B@358a78c
[debug] upload - >> [B@2c171020
[debug] upload - >> [B@20076fb
[debug] upload - >> [B@4d13580
[debug] upload - >> [B@42738651
[debug] upload - >> [B@5671082f
[debug] upload - >> [B@57c70bb4
[debug] upload - >> [B@4154394f
[debug] upload - >> [B@4f93cf15
[debug] upload - >> [B@4bac523f
[debug] upload - >> [B@eaec52e
[debug] upload - >> [B@6ed00bf5
[debug] upload - >> [B@3f6a8a5d
[debug] upload - >> [B@16fe1a25
[debug] upload - >> [B@6e813a61
[debug] upload - >> [B@e01be7
[debug] upload - >> [B@6bb351c4
[debug] upload - >> [B@dfa51a5
[debug] upload - >> [B@6acf2049
[debug] upload - >> [B@6a7021d4
[debug] upload - >> [B@1b3c602f
[debug] upload - >> [B@44146d94
[debug] upload - >> [B@574ac037
[debug] upload - >> [B@3cdf258b
[debug] upload - >> [B@441a0727
[debug] upload - >> [B@2385aafd
[debug] upload - >> [B@224f9dc2
[debug] upload - >> [B@6779077d
[debug] upload - >> [B@734e178a
[debug] upload - >> [B@7d92895c
[debug] upload - >> [B@23edaaa1
[debug] upload - >> [B@c00134e
[debug] upload - >> [B@ff1a703
[error] play - Cannot invoke the action, eventually got an error: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 98h72s0EBA7653AD, AWS Error Code: MalformedXML, AWS Error Message: The XML you provided was not well-formed or did not validate against our published schema, S3 Extended Request ID: R7e44g8oRy5b4xd7MU++atibwrBSRFezeMxNCXE38gyzcwci5Zf
[error] application - 

! @6k2maob49 - Internal server error, for (POST) [/v1/file_upload] ->

play.api.Application$$anon$1: Execution exception[[AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema]]
        at play.api.Application$class.handleError(Application.scala:296) ~[play_2.10-2.3.2.jar:2.3.2]
        at play.api.DefaultApplication.handleError(Application.scala:402) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at scala.Option.map(Option.scala:145) [scala-library.jar:na]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964) ~[aws-java-sdk-1.3.11.jar:na]

我过去曾这样做过,Amazon s3 需要 5Mb 块,我最后返回元组,您可以根据您的要求进行更改。

val client = new AmazonS3Client(new BasicAWSCredentials(access_key, secret_key))

def my_parser = BodyParser { 

val consume_5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) &>> Iteratee.consume()
val rechunkAdapter: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped(consume_5MB)

multipartFormData(Multipart.handleFilePart({

  case Multipart.FileInfo(partName, file_name, content_type) => {

    val object_id = java.util.UUID.randomUUID().toString().replaceAll("-", "")
    val object_id_key = if (content_type.getOrElse("").contains("video") || content_type.getOrElse("").contains("audio")) object_id else object_id + file_name.substring(file_name.lastIndexOf('.'))
    var position = 0
    val etags = new java.util.ArrayList[PartETag]()

    val initRequest = new InitiateMultipartUploadRequest(bucket, object_id_key)
    val initResponse = client.initiateMultipartUpload(initRequest)
    println("fileName = " + file_name)
    println("contentType = " + content_type)

    (rechunkAdapter &>> Iteratee.foldM[Array[Byte], Int](1) { (c, bytes) =>
      Future {
        println("got a chunk!  :" + c + " size in KB: " + (bytes.length / 1024));
        val is = new java.io.ByteArrayInputStream(bytes)

        val uploadRequest = new UploadPartRequest()
          .withBucketName(bucket).withKey(object_id_key)
          .withUploadId(initResponse.getUploadId())
          .withPartNumber(c)
          .withFileOffset(position)
          .withInputStream(is)
          .withPartSize(bytes.length)

        etags.add(client.uploadPart(uploadRequest).getPartETag)
        position = position + bytes.length

        c + 1
      }
    }).map { v =>
      try {
        val compRequest = new CompleteMultipartUploadRequest(
          bucket,
          object_id_key,
          initResponse.getUploadId(),
          etags)
        client.completeMultipartUpload(compRequest)
        println("Finished uploading " + file_name)   
        client.setObjectAcl(bucket, object_id_key, com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
        (object_id_key, file_name, content_type.getOrElse("application/octet-stream")) 
      } catch {
        case e: Exception => {
          println("S3 upload Ex " + e.getMessage())
          val abortMPURequest = new AbortMultipartUploadRequest("xxxxxxx", object_id, initResponse.getUploadId())
          client.abortMultipartUpload(abortMPURequest);
         ("error", file_name, content_type.getOrElse("application/octet-stream"))
        }
      }
    }
  }
}))

}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Play Scala 使用 Iteratees 将文件直接逐块上传到 S3 的相关文章

随机推荐

  • SwiftUI 选择器选择绑定未更新

    我正在尝试让选择器列出所有类型 称为Course然后让用户在添加新课程时选择适当的课程Assignment到托管对象上下文 选择器选择绑定 courseIndex 当用户点击选取器视图中的行时不会更新 我不完全确定如何解决这个问题 也不知道
  • 客户端无权调用此 JAX-RS EJB 错误

    我已经寻找这个问题的解决方案有一段时间了 这里是 我按照本教程从数据库自动生成 jax rs Web 服务 https netbeans org kb docs websvc rest html https netbeans org kb
  • 创建具有两个列表中的多个值的字典。将多个键组合为一个

    我有两个清单 lists a b c d e keys 18 18 3 4 5 我想要的是这样的字典 18 a b 3 c 4 d 5 e 我不断得到这个 18 a b c d e 3 a b c d e 4 a b c d e 5 a b
  • String.concat 比连接字符串的数组方法慢吗

    JavaScript 中的字符串是不可变的 通过网络和Stack Overflow 上也有 https stackoverflow com questions 51185 are javascript strings immutable d
  • C++ 当模板参数推导失败时

    为什么 C 不能确定我打算创建一个unique ptr a 用这个语法 a 之前已被声明为unique ptr a a unique ptr new A 必须包括在内似乎非常多余 a 这适用于我使用的大多数函数模板 为什么 unique p
  • 单个 LINQ 查询中生成的迭代次数

    我在使用LINQ的时候总是有一个疑问 以下代码生成了多少个迭代器 测试是一个列表 var result from t in test where t Length gt 0 t Length lt 5 orderby t 0 select
  • 在Rplot_ly中添加箱线图显着性指示线和星号

    用于向绘图添加水平显着性条形图 和星形 的 Rplot ly 命令是什么 的答案 如何绘制显着水平的箱线图 https stackoverflow com questions 29263046 how to draw the boxplot
  • C/C++波形快速显示

    我有兴趣在 Windows 和 Linux 上用 C 或 C 实现音频编辑器 我不知道如何在完全缩小的视图中足够快地显示波形 我不是在寻找有关快速帧缓冲区技术的信息 这是一个关于有效确定显示内容的算法和数据结构的问题 假设我希望能够编辑 2
  • Interface Builder 文件中的未知类。 Xcode 6 和 Swift

    我用 swift 启动了一个普通的主细节项目 如果我添加新的视图控制器并设置自定义类 则模块列表为空 并且无法选择模块 错误消息 Interface Builder 文件中存在未知类 如果我运行代码 就会出现在控制台中 如何设置故事板以了解
  • IE 中的下拉菜单隐藏在其他元素后面

    我已经在我的一个项目上安装了 site5 上的 Boldy 主题 但遇到了一个主要问题 在 Internet Explorer 8 或更低版本 中浏览时 顶部下拉菜单会显示在其他一些元素 主要内容滑块 H1 等 后面 我已经尝试了一切 从将
  • 在 Web 部署上应用 EF 迁移

    我正在使用 asp net core 2 我似乎找不到任何选项或任何关于如何将迁移应用到 Web 部署发布上的数据库的指南 这将在应用程序启动时迁移您的数据库 您可以从 Startup 类中调用它 using var context new
  • 即使应用程序关闭后,NotifyIcon 仍保留在托盘中,但在鼠标悬停时消失

    有很多问题都提出同样的疑问 解决这个问题的方法是设置 notifyIcon icon null并打电话Dispose在 FormClosing 事件中 在我的应用程序中 没有这样的表单 但有根据事件更新的通知图标 在创作时 我隐藏我的形式并
  • Kendo UI Grid - 在哪里寻找字体和图标

    我正处于学习 Kendo UI 的早期阶段 我已经完成网格工作并加载数据 但没有出现应出现在网格中的图像 图标 当我加载包含网格的页面时 出现以下 404 错误 错误消息没有告诉我期望在哪里找到这些文件 Kendo UI 在哪里寻找这些文件
  • 在悬停时添加 CSS 边框而不移动元素 [重复]

    这个问题在这里已经有答案了 我有一行在悬停时应用背景突出显示 jobs item hover background e1e1e1 border top 1px solid d0d0d0 但是 由于边框增加了 1px额外的到元素 它使其 移动
  • Swift 中闭包如何捕获值?

    我正在运行下面的代码 class Element var name String init name String self name name deinit print Element is deinitializing var elem
  • Get-ChildItem -force 报告“我的文档”文件夹和其他连接点上的“访问被拒绝”

    我有一个我编写的脚本来替换文件 我将参数传递给它以获取文件名以及要搜索的基本位置 工人线是 SubLocations Get ChildItem Path Startlocation Recurse include Filename For
  • 如何在基于paraview的python脚本中显示vtkUnstructedGrid?

    我在 Ubuntu 18 04 系统上安装 paraview 5 6 我想编写一个 python 脚本来显示 vtkUnstructedGrid import numpy as np from paraview simple import
  • powershell “<”运算符保留供将来在 Java 中使用

    我读过很多关于这个问题的答案 但没有发现两个文件之间的比较 实际上这是这本书的一个样本算法基于二分查找 这是源代码 import java util Arrays import edu princeton cs algs4 public c
  • Pandas 将滚动功能与 Group By 相结合

    df pd DataFrame cust id 1 1 1 1 1 1 2 2 2 2 2 2 period 1 2 3 4 5 6 1 2 3 4 5 6 volume 1 2 3 4 5 6 7 8 9 10 11 12 我有一个客户表
  • 使用 Play Scala 使用 Iteratees 将文件直接逐块上传到 S3

    我尝试使用 Iteratees 将文件直接上传到 s3 但没有成功 我对函数式编程仍然很陌生 并且发现很难拼凑一些工作代码 我编写了一个 iteratee 来处理上传文件的块并将它们发送到 S3 最后上传失败并出现错误 请帮我解决这个问题