在启动期间从读取文件加载数据,然后处理新文件并从映射中清除旧状态

2023-12-01

我正在开发一个项目,在启动过程中,我需要读取某些文件并将其存储在地图的内存中,然后定期查找新文件(如果有),然后用此替换启动期间地图中内存中的所有文件新数据。基本上每次如果有一个新文件full state然后我想将内存中的映射对象刷新到这个新对象,而不是附加到它。

下面的方法loadAtStartupAndProcessNewChanges在服务器启动期间调用,它读取文件并将数据存储在内存中。它还启动了一个 go-routinedetectNewFiles它定期检查是否有任何新文件并将其存储在deltaChan稍后由另一个 go-routine 访问的通道processNewFiles再次读取该新文件并将数据存储在同一个映射中。如果有任何错误,我们会将其存储在err渠道。loadFiles是读取内存中的文件并将其存储在中的函数map.

type customerConfig struct {
  deltaChan   chan string
  err         chan error
  wg          sync.WaitGroup
  data        *cmap.ConcurrentMap
}

// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
  path, err := r.GetPath("...", "....")
  if err != nil {
    return err
  }

  r.wg.Add(1)
  go r.detectNewFiles(path)
  err = r.loadFiles(4, path)
  if err != nil {
    return err
  }
  r.wg.Add(1)
  go r.processNewFiles()
  return nil
}

该方法基本上计算出是否有任何需要消耗的新文件,如果有则将其放在deltaChan稍后将被消耗的通道processNewFilesgo-routine 并读取内存中的文件。如果有任何错误,则会将错误添加到错误通道。

func (r *customerConfig) detectNewFiles(rootPath string) {

}

这将读取全部s3文件并将其存储在内存中并返回错误。在这种方法中,我清除了地图的先前状态,以便它可以从新文件中获得新的状态。该方法在服务器启动期间调用,并且每当我们需要处理新文件时也会调用processNewFiles例行公事。

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  r.data.Clear()
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

该方法读取文件并添加到data并发地图。

func (r *customerConfig) read(file string, bucket string) error {
  // read file and store it in "data" concurrent map 
  // and if there is any error then return the error
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return errs.Wrap(err)
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return errs.Wrap(err)
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return nil
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      r.data.Set(key, hasInventory)
    }
  }
  return nil
}

该方法将选择delta channel如果有任何新文件,那么它将通过调用开始读取该新文件loadFiles方法。如果有任何错误,则会将错误添加到错误通道。

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
  // find new files on delta channel
  // and call "loadFiles" method to read it
  // if there is any error, then it will add it to the error channel.
}

如果上面有任何错误error channel然后它将通过以下方法记录这些错误 -

func (r *customerConfig) handleError() {
  // read error from error channel if there is any
  // then log it
}

问题陈述

上述逻辑对我来说没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。正如你所看到的,我有一个并发地图,我正在将其填充到我的read方法并清除整个地图loadFiles方法。因为每当增量通道上有新文件时,我都不想在地图中保留以前的状态,所以这就是为什么我要从地图中删除所有内容,然后从新文件中添加新状态。

现在如果有任何错误read方法然后错误发生因为我已经清除了我的所有数据data地图将有空地图,这不是我想要的。基本上,如果有任何错误,那么我想保留以前的状态data地图。我如何在上述当前设计中解决这个问题。

Note:我正在使用 golangconcurrent map


我认为你的设计过于复杂。它可以更简单地解决,这提供了您想要的所有好处:

  • 并发访问安全
  • 重新加载检测到的更改
  • 访问配置将为您提供最新的、成功加载的配置
  • 即使由于检测到的更改而加载新配置需要很长时间,最新的配置始终可以立即访问
  • 如果加载新配置失败,则保留先前的“快照”并保持当前状态
  • 作为奖励,它更简单,甚至不使用第 3 方库

让我们看看如何实现这一目标:


Have a CustomerConfig保存您想要缓存的所有内容的结构(这是“快照”):

type CustomerConfig struct {
    Data map[string]bool

    // Add other props if you need:
    LoadedAt time.Time
}

提供一个加载您想要缓存的配置的函数。注意:该函数是无状态的,它不会访问/操作包级变量:

func loadConfig() (*CustomerConfig, error) {
    cfg := &CustomerConfig{
        Data:     map[string]bool{},
        LoadedAt: time.Now(),
    }

    // Logic to load files, and populate cfg.Data
    // If an error occurs, return it

    // If loading succeeds, return the config
    return cfg, nil
}

现在让我们创建我们的“缓存管理器”。缓存管理器存储实际/当前配置(快照),并提供对其的访问。为了安全的并发访问(和更新),我们使用sync.RWMutex。还有停止管理器的方法(停止并发刷新):

type ConfigCache struct {
    configMu sync.RWMutex
    config   *CustomerConfig
    closeCh  chan struct{}
}

创建缓存会加载初始配置。还启动一个 goroutine 负责定期检查更改。

func NewConfigCache() (*ConfigCache, error) {
    cfg, err := loadConfig()
    if err != nil {
        return nil, fmt.Errorf("loading initial config failed: %w", err)
    }

    cc := &ConfigCache{
        config:  cfg,
        closeCh: make(chan struct{}),
    }

    // launch goroutine to periodically check for changes, and load new configs
    go cc.refresher()

    return cc, nil
}

The refresher()定期检查更改,如果检测到更改,则调用loadConfig()加载要缓存的新数据,并将其存储为当前/实际配置(同时锁定configMu)。它还监视closeCh如果有要求则停止:

func (cc *ConfigCache) refresher() {
    ticker := time.NewTicker(1 * time.Minute) // Every minute
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // Check if there are changes
            changes := false // logic to detect changes
            if !changes {
                continue // No changes, continue
            }

            // Changes! load new config:
            cfg, err := loadConfig()
            if err != nil {
                log.Printf("Failed to load config: %v", err)
                continue // Keep the previous config
            }

            // Apply / store new config
            cc.configMu.Lock()
            cc.config = cfg
            cc.configMu.Unlock()

        case <-cc.closeCh:
            return
        }
    }
}

关闭缓存管理器(刷新 goroutine)非常简单:

func (cc *ConfigCache) Stop() {
    close(cc.closeCh)
}

最后缺少的部分是如何访问当前配置。这是一个简单的GetConfig()方法(也使用configMu,但处于只读模式):

func (cc *ConfigCache) GetConfig() *CustomerConfig {
    cc.configMu.RLock()
    defer cc.configMu.RUnlock()
    return cc.config
}

您可以这样使用它:

cc, err := NewConfigCache()
if err != nil {
    // Decide what to do: retry, terminate etc.
}

// Where ever, whenever you need the actual (most recent) config in your app:

cfg := cc.GetConfig()
// Use cfg

在您关闭应用程序之前(或者您想停止刷新),您可以调用cc.Stop().

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在启动期间从读取文件加载数据,然后处理新文件并从映射中清除旧状态 的相关文章

  • std::map 只读操作的线程安全

    我有一个 std map 用于将值 字段 ID 映射到人类可读的字符串 当我的程序在任何其他线程启动之前启动时 该映射会被初始化一次 之后就不会再被修改 现在 我为每个线程提供了这个 相当大的 映射的自己的副本 但这显然是内存使用效率低下
  • 使用 Thread.Sleep() 时,异步编程如何与线程一起工作?

    假设 前言 在之前的问题中 我们注意到Thread Sleep阻塞线程参见 什么时候使用Task Delay 什么时候使用Thread Sleep https stackoverflow com questions 20082221 whe
  • 展平嵌套结构会导致切片的切片

    所以我有一个像这样的结构 type Bus struct Number string Name string DirectStations Station Station is another struct ReverseStations
  • Openresty 中的并发模型是什么?

    我很难理解 openresty 或 nginx 的并发模型 我读了Lua变量作用域 http wiki nginx org HttpLuaModule Lua Variable Scope 它解释了变量的生命周期 但它没有说明对它们的并发访
  • 我怎么知道我的所有 goroutine 确实正在使用 golang 的同步包等待一个条件

    我有一个应用程序 我正在创建多个 goroutine 来同时执行某个任务 所有工作协程都会等待条件 事件发生 一旦事件被触发 它们就会开始执行 创建完所有goroutines后 主线程在发送广播信号之前应该知道所有goroutines确实处
  • 在不支持 CAS 操作的处理器上进行 CompareAndSet

    今天 我在一次采访中被问到下一个问题 如果您在具有不支持 CAS 操作的处理器的机器上调用 AtomicLong 的compareAndSet 方法 会发生什么情况 您能否帮我解决这个问题 并在可能的情况下提供一些全面描述的链接 From
  • 为什么在谈论线程和进程时,“不要同时格式化软盘”的评论很有趣?

    我正在阅读之间的区别线程和进程 https stackoverflow com questions 200469 what is the difference between a process and a thread并在第二个答案中发现
  • 我对线程失去了理智

    我想要这个类的对象 public class Chromosome implements Runnable Comparable
  • 如何将UTC时间转换为unix时间戳

    我正在寻找将 UTC 时间字符串转换为 unix 时间戳的选项 我的字符串变量是02 28 2016 10 03 46 PM并且需要将其转换为 unix 时间戳 例如1456693426 知道该怎么做吗 首先 unix时间戳14566934
  • 线程池,C++

    我正在使用 C 开发一个网络程序 我想实现一个 pthread 池 每当我从接收套接字接收到一个事件时 我都会将数据放入线程池中的队列中 我正在考虑创建 5 个独立的线程 并将持续检查队列以查看是否有任何传入数据需要完成 这是一个非常简单的
  • C++11 动态线程池

    最近 我一直在尝试寻找一个用于线程并发任务的库 理想情况下 是一个在线程上调用函数的简单接口 任何时候都有 n 个线程 有些线程比其他线程完成得更快 并且到达的时间不同 首先我尝试了 Rx 它在 C 中非常棒 我还研究了 Blocks 和
  • 创建具有特定权限的线程C++

    我有一个多线程应用程序 我想创建一个具有不同用户权限的线程 例如 多域管理员权限 但我找不到任何 Win32 APICreateThread要做到这一点 如何创建具有特定用户权限的线程 thanks 调用 CreateThread CREA
  • Python 中的错误? threading.Thread.start() 并不总是返回

    我有一个很小的 Python 脚本 在我看来 threading Thread start 表现出意外 因为它不会立即返回 在线程内我想调用一个方法boost python基于对象 不会立即返回 为此 我将对象 方法包装如下 import
  • 断点会停止所有线程吗?

    如果我的程序中有两个线程同时运行 并在其中一个线程上设置了断点 那么当遇到此断点时 另一个线程也会停止 还是会继续执行 我用 Java 编写并使用 NetBeans 断点可以选择它们的行为方式 挂起单个线程或所有线程
  • 使用 boost::thread 特定的 ptr<>::get() 是否会很慢?有什么解决方法吗?

    我目前正在使用 Valgrind 的 Callgrind 分析一个存在性能问题的应用程序 在查看分析数据时 似乎有 25 的处理时间花费在boost detail get tss data在主要目的是物理模拟和可视化的应用程序中 get t
  • 信号处理程序有单独的堆栈吗?

    信号处理程序是否有单独的堆栈 就像每个线程都有单独的堆栈一样 这是在 Linux C 环境中 来自 Linux 手册页signal 7 http kernel org doc man pages online pages man7 sign
  • Log4j2 ThreadContext 映射不适用于parallelStream()

    我有以下示例代码 public class Test static System setProperty isThreadContextMapInheritable true private static final Logger LOGG
  • C# WinForms:使用一个或多个附加线程进行绘图。如何?

    如果我有一张包含各种几何形式 直线 矩形 圆形等 的大图 线程需要花费大量时间来绘制所有内容 但在现实生活中 一栋建筑是由不止一名工人建造的 因此 如果绘图是建筑物而线程是构建者 则绘制速度会快得多 但我想知道怎么做 你能告诉我怎么做吗 有
  • 为什么我不能将左大括号放在下一行?

    当我尝试编译以下代码时遇到奇怪的错误 package main import fmt fmt func main var arr 3 int for i 0 i lt 3 i fmt Printf d arr i 错误如下 unexpect
  • 如何停止提交给 ExecutorService 的 Callable?

    我正在尝试实现一个示例应用程序来测试Callable and ExecutorService接口 在我的应用程序中我已经声明 ExecutorService exSvc Executors newSingleThreadExecutor T

随机推荐