我正在尝试使用多个 Goroutine 读取同一个文件,其中每个 Goroutine 都被分配一个字节来开始读取,并指定要读取的行数lineLimit
.
当文件适合内存时,我成功地通过设置csv.ChunkSize
的选项chunkSize
多变的。但是,当文件大于内存时,我需要减少csv.ChunkSize
选项。我正在尝试这样的事情
package main
import (
"io"
"log"
"os"
"sync"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/csv"
)
// A reader to read lines from the file starting from the byteOffset. The number
// of lines is specified by linesLimit.
func produce(
id int,
ch chan<- arrow.Record,
byteOffset int64,
linesLimit int64,
filename string,
wg *sync.WaitGroup,
) {
defer wg.Done()
fd, _ := os.Open(filename)
fd.Seek(byteOffset, io.SeekStart)
var remainder int64 = linesLimit % 10
limit := linesLimit - remainder
chunkSize := limit / 10
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(chunkSize)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(true),
csv.WithColumnTypes(map[string]arrow.DataType{
"Start_Time": arrow.FixedWidthTypes.Timestamp_ns,
"End_Time": arrow.FixedWidthTypes.Timestamp_ns,
"Weather_Timestamp": arrow.FixedWidthTypes.Timestamp_ns,
}))
reader.Retain()
defer reader.Release()
var count int64
for reader.Next() {
rec := reader.Record()
rec.Retain() // released at the other end of the channel
ch <- rec
count += rec.NumRows()
if count == limit {
if remainder != 0 {
flush(id, ch, fd, remainder)
}
break
} else if count > limit {
log.Panicf("Reader %d read more than it should, expected=%d, read=%d", id, linesLimit, count)
}
}
if reader.Err() != nil {
log.Panicf("error: %s in line %d,%d", reader.Err().Error(), count, id)
}
}
func flush(id int,
ch chan<- arrow.Record,
fd *os.File,
limit int64,
) {
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(limit)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(false),
)
reader.Retain()
defer reader.Release()
record := reader.Record()
record.Retain() // nil pointer dereference error here
ch <- record
}
我尝试了先前代码的多个版本,包括:
- 复制文件描述符
- 复制文件描述符的偏移量,打开同一个文件
并寻求这种抵消。
- 调用前关闭第一个阅读器
flush
或关闭第一个fd
.
无论我如何更改代码,错误似乎都是相同的。请注意,任何调用flush
的读者提出了一个错误。包括reader.Next
, and reader.Err()
.
我使用 csv 阅读器是否错误?这是重复使用同一文件的问题吗?
编辑:我不知道这是否有帮助,但是在中打开一个新的 fdflush
没有任何Seek
避免错误(不知何故任何Seek
导致出现原始错误)。但是,如果没有Seek
(即删除Seek
导致任何 Goroutine 根本无法读取文件的一部分)。