我正在开发一个项目,在启动过程中,我需要读取某些文件并将其存储在地图的内存中,然后定期查找新文件(如果有),然后用此替换启动期间地图中内存中的所有文件新数据。基本上每次如果有一个新文件full state
然后我想将内存中的映射对象刷新到这个新对象,而不是附加到它。
下面的方法loadAtStartupAndProcessNewChanges
在服务器启动期间调用,它读取文件并将数据存储在内存中。它还启动了一个 go-routinedetectNewFiles
它定期检查是否有任何新文件并将其存储在deltaChan
稍后由另一个 go-routine 访问的通道processNewFiles
再次读取该新文件并将数据存储在同一个映射中。如果有任何错误,我们会将其存储在err
渠道。loadFiles
是读取内存中的文件并将其存储在中的函数map
.
type customerConfig struct {
deltaChan chan string
err chan error
wg sync.WaitGroup
data *cmap.ConcurrentMap
}
// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
path, err := r.GetPath("...", "....")
if err != nil {
return err
}
r.wg.Add(1)
go r.detectNewFiles(path)
err = r.loadFiles(4, path)
if err != nil {
return err
}
r.wg.Add(1)
go r.processNewFiles()
return nil
}
该方法基本上计算出是否有任何需要消耗的新文件,如果有则将其放在deltaChan
稍后将被消耗的通道processNewFiles
go-routine 并读取内存中的文件。如果有任何错误,则会将错误添加到错误通道。
func (r *customerConfig) detectNewFiles(rootPath string) {
}
这将读取全部s3
文件并将其存储在内存中并返回错误。在这种方法中,我清除了地图的先前状态,以便它可以从新文件中获得新的状态。该方法在服务器启动期间调用,并且每当我们需要处理新文件时也会调用processNewFiles
例行公事。
func (r *customerConfig) loadFiles(workers int, path string) error {
var err error
...
var files []string
files = .....
// reset the map so that it can have fresh state from new files.
r.data.Clear()
g, ctx := errgroup.WithContext(context.Background())
sem := make(chan struct{}, workers)
for _, file := range files {
select {
case <-ctx.Done():
break
case sem <- struct{}{}:
}
file := file
g.Go(func() error {
defer func() { <-sem }()
return r.read(spn, file, bucket)
})
}
if err := g.Wait(); err != nil {
return err
}
return nil
}
该方法读取文件并添加到data
并发地图。
func (r *customerConfig) read(file string, bucket string) error {
// read file and store it in "data" concurrent map
// and if there is any error then return the error
var err error
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return errs.Wrap(err)
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, 8)
if err != nil {
return errs.Wrap(err)
}
if pr.GetNumRows() == 0 {
spn.Infof("Skipping %s due to 0 rows", file)
return nil
}
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return errs.Wrap(err)
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var invMods []CompModel
err = json.Unmarshal(byteSlice, &invMods)
if err != nil {
return errs.Wrap(err)
}
for i := range invMods {
key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
hasInventory := false
if invMods[i].Available > 0 {
hasInventory = true
}
r.data.Set(key, hasInventory)
}
}
return nil
}
该方法将选择delta channel
如果有任何新文件,那么它将通过调用开始读取该新文件loadFiles
方法。如果有任何错误,则会将错误添加到错误通道。
// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
// find new files on delta channel
// and call "loadFiles" method to read it
// if there is any error, then it will add it to the error channel.
}
如果上面有任何错误error channel
然后它将通过以下方法记录这些错误 -
func (r *customerConfig) handleError() {
// read error from error channel if there is any
// then log it
}
问题陈述
上述逻辑对我来说没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。正如你所看到的,我有一个并发地图,我正在将其填充到我的read
方法并清除整个地图loadFiles
方法。因为每当增量通道上有新文件时,我都不想在地图中保留以前的状态,所以这就是为什么我要从地图中删除所有内容,然后从新文件中添加新状态。
现在如果有任何错误read
方法然后错误发生因为我已经清除了我的所有数据data
地图将有空地图,这不是我想要的。基本上,如果有任何错误,那么我想保留以前的状态data
地图。我如何在上述当前设计中解决这个问题。
Note:我正在使用 golangconcurrent map