同一文件上的多个 Arrow CSV 读取器返回 null

2023-12-23

我正在尝试使用多个 Goroutine 读取同一个文件,其中每个 Goroutine 都被分配一个字节来开始读取,并指定要读取的行数lineLimit.

当文件适合内存时,我成功地通过设置csv.ChunkSize的选项chunkSize多变的。但是,当文件大于内存时,我需要减少csv.ChunkSize选项。我正在尝试这样的事情

package main

import (
    "io"
    "log"
    "os"
    "sync"

    "github.com/apache/arrow/go/v11/arrow"
    "github.com/apache/arrow/go/v11/arrow/csv"
)

// A reader to read lines from the file starting from the byteOffset. The number
// of lines is specified by linesLimit.
func produce(
    id int,
    ch chan<- arrow.Record,
    byteOffset int64,
    linesLimit int64,
    filename string,
    wg *sync.WaitGroup,
) {
    defer wg.Done()

    fd, _ := os.Open(filename)
    fd.Seek(byteOffset, io.SeekStart)

    var remainder int64 = linesLimit % 10
    limit := linesLimit - remainder
    chunkSize := limit / 10

    reader := csv.NewInferringReader(fd,
        csv.WithChunk(int(chunkSize)),
        csv.WithNullReader(true, ""),
        csv.WithComma(','),
        csv.WithHeader(true),
        csv.WithColumnTypes(map[string]arrow.DataType{
            "Start_Time":        arrow.FixedWidthTypes.Timestamp_ns,
            "End_Time":          arrow.FixedWidthTypes.Timestamp_ns,
            "Weather_Timestamp": arrow.FixedWidthTypes.Timestamp_ns,
        }))
    reader.Retain()
    defer reader.Release()

    var count int64
    for reader.Next() {
        rec := reader.Record()
        rec.Retain() // released at the other end of the channel
        ch <- rec
        count += rec.NumRows()
        if count == limit {
            if remainder != 0 {
                flush(id, ch, fd, remainder)
            }
            break
        } else if count > limit {
            log.Panicf("Reader %d read more than it should, expected=%d, read=%d", id, linesLimit, count)
        }
    }

    if reader.Err() != nil {
        log.Panicf("error: %s in line %d,%d", reader.Err().Error(), count, id)
    }
}

func flush(id int,
    ch chan<- arrow.Record,
    fd *os.File,
    limit int64,
) {
    reader := csv.NewInferringReader(fd,
        csv.WithChunk(int(limit)),
        csv.WithNullReader(true, ""),
        csv.WithComma(','),
        csv.WithHeader(false),
    )

    reader.Retain()
    defer reader.Release()

    record := reader.Record()
    record.Retain() // nil pointer dereference error here
    ch <- record
}

我尝试了先前代码的多个版本,包括:

  1. 复制文件描述符
  2. 复制文件描述符的偏移量,打开同一个文件 并寻求这种抵消。
  3. 调用前关闭第一个阅读器flush或关闭第一个fd.

无论我如何更改代码,错误似乎都是相同的。请注意,任何调用flush的读者提出了一个错误。包括reader.Next, and reader.Err().

我使用 csv 阅读器是否错误?这是重复使用同一文件的问题吗?

编辑:我不知道这是否有帮助,但是在中打开一个新的 fdflush没有任何Seek避免错误(不知何故任何Seek导致出现原始错误)。但是,如果没有Seek(即删除Seek导致任何 Goroutine 根本无法读取文件的一部分)。


主要问题是,csv 阅读器使用bufio.Reader下面,它的默认缓冲区大小为 4096。这意味着reader.Next()将读取超出需要的字节,并缓存额外的字节。如果之后直接从文件中读取reader.Next(),您将错过缓存的字节。

下面的演示展示了这种行为:

package main

import (
    "bytes"
    "fmt"
    "io"
    "os"

    "github.com/apache/arrow/go/v11/arrow"
    "github.com/apache/arrow/go/v11/arrow/csv"
)

func main() {
    // Create a two-column csv file with this content (the second column has 1024 bytes):
    // 0,000000....
    // 1,111111....
    // 2,222222....
    // 3,333333....
    temp := createTempFile()

    schema := arrow.NewSchema(
        []arrow.Field{
            {Name: "i64", Type: arrow.PrimitiveTypes.Int64},
            {Name: "str", Type: arrow.BinaryTypes.String},
        },
        nil,
    )
    r := csv.NewReader(
        temp, schema,
        csv.WithComma(','),
        csv.WithChunk(3),
    )
    defer r.Release()

    r.Next()

    // To check what's left after the first chunk is read.
    // If the reader stop at the end of the chunk, the content left will be:
    // 3,333333....
    // But in fact, the content left is:
    // 33333333333
    buf, err := io.ReadAll(temp)
    if err != nil {
        panic(err)
    }

    fmt.Printf("%s\n", buf)
}

func createTempFile() *os.File {
    temp, err := os.CreateTemp("", "test*.csv")
    if err != nil {
        panic(err)
    }
    for i := 0; i < 4; i++ {
        fmt.Fprintf(temp, "%d,", i)
        if _, err := temp.Write(bytes.Repeat([]byte{byte('0' + i)}, 1024)); err != nil {
            panic(err)
        }
        if _, err := temp.Write([]byte("\n")); err != nil {
            panic(err)
        }
    }

    if _, err := temp.Seek(0, io.SeekStart); err != nil {
        panic(err)
    }

    return temp
}

看来第二个读取器的目的是防止它读取另一个 csv 数据块。如果您提前知道下一个 csv 数据块的偏移量,则可以将文件包装在io.SectionReader使其仅读取当前的 csv 数据块。当前的问题没有提供有关这部分的足够信息,也许我们应该把它留给另一个问题。

Notes:

  1. fd, _ := os.Open(filename):永远不要忽略错误。至少记录它们。
  2. fd大多数时候表示文件描述符。不要将其用于类型变量*os.File,特别是当*os.File有一个方法Fd.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

同一文件上的多个 Arrow CSV 读取器返回 null 的相关文章

  • Go客户端程序生成大量TIME_WAIT状态的socket

    我有一个 Go 程序 它从多个 goroutine 生成大量 HTTP 请求 运行一段时间后 程序报错 connect cannot allocaterequestedaddress 当检查时netstat 我得到大量 28229 个连接T
  • 在复杂的文件夹结构中进行测试

    我正在 golang 中构建一个设计模式存储库 为了运行所有测试 我使用这个 bash 脚本 有用 bin bash go test creational abstract factory go go test creational bui
  • 有没有办法间歇性地执行重复性任务?

    有没有办法在 Go 中执行重复的后台任务 我在想类似的事情Timer schedule task delay period 在爪哇 我知道我可以用 goroutine 来做到这一点Time sleep 但我想要一些容易停止的东西 这是我得到
  • 我们如何在 Go 中使用通道来代替互斥锁?

    通道将通信 值的交换 与同步相结合 保证两个计算 goroutine 处于已知状态 如何使用 Google Go 中的通道来执行互斥量的功能 package main import sync var global int 0 var m s
  • pprof 和 ps 之间的内存使用差异

    我一直在尝试分析用 cobra 构建的 cli 工具的堆使用情况 这pprof工具显示如下 Flat Flat Sum Cum Cum Name Inlined 1 58GB 49 98 49 98 1 58GB 49 98 os Read
  • 如何拥有在标准输出上更新的就地字符串

    我想输出到标准输出并让输出 覆盖 以前的输出 例如 如果我输出On 1 10 我想要下一个输出On 2 10覆盖On 1 10 我怎样才能做到这一点 stdout是一个流 io Writer 您无法修改已写入其中的内容 什么can更改的是该
  • 云存储 API 的错误导入“系统调用”

    我正在按照以下说明进行操作https cloud google com appengine docs go googlecloudstorageclient download开始将一些代码从现已弃用的文件 API 迁移到新的 Cloud S
  • 如何将长 Go 模板函数拆分为多行?

    我有一个很长的printf调用 Go 模板 例子 printf mongodb s s s s authSource admin replicaSet s readPreference nearest w majority Values r
  • 所有可能的 GOOS 价值?

    如果我做对了 GOOS在编译源代码时确定 为了更好地支持多个操作系统 我感兴趣的是GOOS可能 当然 Go 是开源的 所以它可能有无限的可能性 所以我真正想要的是一个 通用列表 已知值为 windows linux darwin or fr
  • golang中如何将相对路径解析为绝对路径?

    节点中是否有类似 path resolve 的API 或者有什么东西可以做同样的事情 例如 nodejs代码 path resolve sample sh 应该得到 home currentuser sample sh 解决 表示用户主目录
  • 结构体到磁盘的高效 Go 序列化

    我的任务是将 C 代码替换为 Go 而且我对 Go API 还很陌生 我正在使用 gob 将数百个键 值条目编码到磁盘页面 但 gob 编码有太多不需要的膨胀 package main import bytes encoding gob f
  • Golang:如何在HTTP客户端的TLS配置中指定证书

    我有一个证书文件 该位置是 usr abc my crt我想将该证书用于我的 tls 配置 以便我的 http 客户端在与其他服务器通信时使用该证书 我当前的代码如下 mTLSConfig tls Config CipherSuites u
  • 如何在Google AppEngine上设置环境变量?

    我正在尝试在谷歌应用程序引擎上设置和使用环境变量 我的 app yaml 文件如下所示 但是 当我使用 os Getenv mytoken 时 我得到一个空字符串 而不是我设置的实际值 是GAE的bug吗 api version go1 h
  • 不支持的 Perl 语法:`(?<`

    我想解析 cmd gpg list keys 的结果以将其显示在浏览器上 cmd输出是这样的 pub rsa3072 2021 08 03 SC expires 2023 08 03 07C47E284765D5593171C18F00B1
  • 在 OSX 上交叉编译 Go?

    我正在尝试在 OSX 上交叉编译 go 应用程序以构建适用于 Windows 和 Linux 的二进制文件 我已经阅读了网上能找到的所有内容 我发现的最接近的例子已经发布在 除了疯狂邮件列表上许多未完成的讨论之外 http solovyov
  • golang中默认的HTTP拨号超时值

    我正在运行 golang http 客户端来对服务器进行压力测试 有时我会收到错误 拨号 tcp 161 170 xx xxx 80 操作超时 错误 我认为这是 HTTP 客户端超时 我正在考虑增加超时值https stackoverflo
  • 带有导出字段的私有类型

    在 Go 教程的第二天有这样的练习 为什么拥有带有导出字段的私有类型会很有用 例如 package geometry type point struct X Y int name string 请注意point是小写的 因此不会导出 而字段
  • 在 Go 中解析 RFC-3339 / ISO-8601 日期时间字符串

    我尝试解析日期字符串 2014 09 12T11 45 26 371Z 在围棋中 该时间格式定义为 RFC 3339 日期时间 https datatracker ietf org doc html rfc3339 section 5 6
  • 模块路径格式错误...第一个路径元素中缺少点

    我有一个包含 2 个不同可执行文件的项目 每个可执行文件都有自己的依赖项以及对根的共享依赖项 如下所示 Root gt server gt main go gt someOtherFiles go gt go mod gt go sum g
  • 在 Go 中,如何将函数的 stdout 捕获到字符串中?

    例如 在 Python 中 我可以执行以下操作 realout sys stdout sys stdout StringIO StringIO some function prints to stdout get captured in t

随机推荐