使用ticker定期从经常变化的路径加载内存中的所有文件?

2024-06-23

我有一个应用程序需要从两个不同的路径读取文件。读取所有这些文件后,我需要将它们加载到内存中products map.

Path:

  • Full:这是内存中服务器启动期间需要加载的所有文件的路径。该路径将包含大约 50 个文件,每个文件大小约为 60MB。
  • Delta:这是包含我们需要每 1 分钟定期加载到内存中的所有增量文件的路径。这些文件仅包含与完整路径文件的差异。该路径将包含大约 60 个文件,每个文件大小约为 20MB。

下面的代码watchDeltaPath在服务器启动期间调用以监视增量更改。它将获得增量路径GetDeltaPath方法并从该路径我需要加载内存中的所有文件。该增量路径每隔几分钟就会发生变化,我不会错过任何一个增量路径以及该路径中的所有文件.

加载内存中的所有文件loadAllFiles方法可能需要一些时间(大约 5 分钟),所以我试图找到一种方法,我不应该错过任何新的增量路径(因为它每隔几分钟不断变化),并且应该能够从增量路径加载内存中的所有这些文件定期一次又一次地进行,没有任何问题,而且效率很高。

我得到下面的代码,每 1 分钟运行一次并寻找新的delta path每次然后加载内存中该路径的所有文件。它工作得很好,但我认为这不是正确的方法。如果发生什么情况loadAllFiles方法需要 10 多分钟才能加载内存中的所有文件,而我的代码每 1 分钟运行一次以查找新的增量路径,然后找到该新路径中的所有文件,然后加载到内存中?它会继续创建大量后台线程并且可能会大量增加 cpu 使用率吗?

type applicationRepository struct {
  client         customer.Client
  logger         log.Logger
  done           chan struct{}
  products       *cmap.ConcurrentMap
}

// this will be called only once
func (r *applicationRepository) watchDeltaPath() error {
    ticker := time.NewTicker(1 * time.Minute)
    go func() {
        select {
        case <-r.done:
            ticker.Stop()
            return
        case <-ticker.C:
            func() (result error) {
                trans := r.logger.StartTransaction(nil, "delta-changes", "")
                defer trans.End()
                defer func() {
                    if result != nil {
                        trans.Errorf("Recovered from error: %v")
                    } else if err := recover(); err != nil {
                        trans.Errorf("Recovered from panic: %v", err)
                    }
                }()
                // get latest delta path everytime as it keeps changing every few minutes
                path, err := r.client.GetDeltaPath("delta")
                if err != nil {
                    return err
                }
                // load all the files in memory in "products" map from that path
                err = r.loadAllFiles(path)
                if err != nil {
                    return err
                }
                return nil
            }()
        }
    }()
    return nil
}

func (r *applicationRepository) Stop() {
    r.done <- struct{}{}
}

在产品中有效地做到这一点的最佳方法是什么?

这是我对代码的执行方式的玩弄 -https://go.dev/play/p/FS4-B0FWwTe https://go.dev/play/p/FS4-B0FWwTe


根据评论,“在产品中有效地做到这一点的最佳方法”取决于很多因素,并且在像 Stack Overflow 这样的网站上可能无法回答。话虽如此,我可以提出一种方法,可以让您更容易地思考如何最好地解决问题。

下面的代码(操场 https://go.dev/play/p/HLzu0Zauu91;相当粗糙且未经测试)演示了一种具有三个 go 例程的方法:

  1. 检测新的增量路径并将它们推送到缓冲通道
  2. 处理初始负载
  3. 等待初始加载完成,然后应用增量(请注意,这会处理初始加载正在进行时发现的增量)

如上所述,问题中没有足够的细节来确定这是否是一个好方法。初始负载和增量可能可以同时运行而不会使 IO 饱和,但这需要测试(并且是一个相对较小的变化)。

// Simulation of process to perform initial load and handle deltas
package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

const deltaBuffer = 100
const initialLoadTime = time.Duration(time.Duration(1.5 * float32(time.Second)))
const deltaCheckFrequency = time.Duration(500 * time.Millisecond)

func main() {
    ar := NewApplicationRepository()
    time.Sleep(5 * time.Second)
    ar.Stop()
    fmt.Println(time.Now(), "complete")
}

type applicationRepository struct {
    deltaChan       chan string   // Could be some other type...
    initialLoadDone chan struct{} // Closed when initial load finished

    done chan struct{}
    wg   sync.WaitGroup
}

func NewApplicationRepository() *applicationRepository {
    ar := applicationRepository{
        deltaChan:       make(chan string, deltaBuffer),
        initialLoadDone: make(chan struct{}),
        done:            make(chan struct{}),
    }

    ar.wg.Add(3)
    go ar.detectNewDeltas()
    go ar.initialLoad()
    go ar.deltaLoad()

    return &ar
}

// detectNewDeltas - watch for new delta paths
func (a *applicationRepository) detectNewDeltas() {
    defer a.wg.Done()
    var previousDelta string
    for {
        select {
        case <-time.After(deltaCheckFrequency):
            dp := a.getDeltaPath()
            if dp != previousDelta {
                select {
                case a.deltaChan <- dp:
                default:
                    panic("channel full - no idea what to do here!")
                }
                previousDelta = dp
            }
        case <-a.done:
            return
        }
    }
}

// getDeltaPath in real application this will retrieve the delta path
func (a *applicationRepository) getDeltaPath() string {
    return strconv.Itoa(time.Now().Second()) // For now just return the current second..
}

// initialLoad - load the initial data
func (a *applicationRepository) initialLoad() {
    defer a.wg.Done()
    defer close(a.initialLoadDone)
    time.Sleep(initialLoadTime) // Simulate time taken for initial load
}

// deltaLoad- load deltas found by detectNewDeltas
func (a *applicationRepository) deltaLoad() {
    defer a.wg.Done()
    fmt.Println(time.Now(), "deltaLoad started")

    // Wait for initial load to complete before doing anything
    <-a.initialLoadDone
    fmt.Println(time.Now(), "Initial Load Done")

    // Wait for incoming deltas and load them
    for {
        select {
        case newDelta := <-a.deltaChan:
            fmt.Println(time.Now(), newDelta)
        case <-a.done:
            return
        }
    }
}

// Stop - signal loader to stop and wait until this is done
func (a *applicationRepository) Stop() {
    close(a.done)
    a.wg.Wait()
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用ticker定期从经常变化的路径加载内存中的所有文件? 的相关文章

随机推荐

  • Android Studio 3.0 布局编辑器中的渲染错误

    我刚刚开始学习用于 Android 开发的 Kotlin 并启动了一个空项目并添加了一个 Activity 我已经添加了所需的 gradle 依赖项 如 Kotlin 文档中所述 默认情况下 MainActivity 的 xml 文件仅包含
  • 如何制作 MatPlotLib 补丁模式的图例或将它们添加到预先存在的图例中

    如何为图表上使用的补丁模式制作图例或将有关补丁模式的信息添加到预先存在的图例中而不影响图例中已存在的信息 创建补丁时包含 kwarglabel ex bunch of code ax add patch mpl patches Rectan
  • BulkRequestBuilder 的 Elasticsearch 索引速度变慢

    大家好 elasticsearch 大师们 我有数百万数据需要由elasticsearch Java API 索引 Elasticsearch 的集群节点数量为 3 个 1 个主节点 2 个节点 我的代码片段如下 Settings sett
  • 使用 SVG 多边形元素

    我正在尝试使用 SVG 多边形和 javascript 我创建一个多边形并设置其初始点列表 如下所示 var polygon document createElementNS http www w3 org 2000 svg polygon
  • 带箭头的样式项目符号列表

    我创建了一个箭头 我想将其附加到列表而不是圆形项目符号点 我尝试过使用 after 但还没有成功 不得不承认我对伪元素非常陌生 这是我到目前为止得到的 arrow border right 2px solid black border bo
  • 通过 HTTPS/SSL 的 Java 客户端证书

    我正在使用 Java 6 并尝试创建一个HttpsURLConnection使用客户端证书针对远程服务器 服务器使用自签名根证书 并要求提供受密码保护的客户端证书 我已将服务器根证书和客户端证书添加到我在其中找到的默认 java 密钥库中
  • 父视图控制器如何通知其子视图控制器已删除自身?

    子视图控制器的实现中通过以下代码删除子视图控制器 void commandFinishVC self view removeFromSuperview self removeFromParentViewController 子视图控制器及其
  • SQLite 用于大数据集?

    我有一个相当大的数据集 并且希望将其存储在文件中而不是 RDBMS 中 数据集中的主表的 CSV 大小刚刚超过 100 万行 30 列 大小约为 600Mb 我正在考虑 SQLite 对于这种大小的数据集 SQLite 值得研究吗 SQLi
  • Android:为什么系统重启后警报通知停止

    我正在开发一个 Android 应用程序 每天应该触发五次警报 每天的时间不是固定的 警报响起后 我将安排下一个警报 我的问题是 警报通知有效 1 天 然后停止 并且当设备重新启动两次时 通知不起作用 我现在不知道是否有其他方法可以做到这一
  • Swift:延迟封装映射、过滤器、FlatMap 链

    我有一份动物清单 let animals bear dog cat 以及一些改变该列表的方法 typealias Transform String gt String let containsA Transform 0 contains a
  • 如何添加 glassfish 工具到 eclipseoxy 3a

    我下载了 Eclipse Oxygen 但我没有在此 IDE 中安装 glassfish 工具 它是在 JDK 8 Update 172 上运行的 Oxygen 3A 64 位 当我尝试通过 Marketplace 安装这些工具时 它指出这
  • SQL在单个命令中在表中添加列和注释

    我的 Web 应用程序使用 Oracle 11g 我想向现有表添加列和注释 我可以使用以下命令轻松做到这一点 ALTER TABLE product ADD product description VARCHAR2 20 and COMME
  • AngularJS - 您可以在不修改其核心源代码的情况下重命名服务吗?

    我遇到过这样的情况 我下载了书面服务angular js 并且它工作得很好 但我更愿意在我的代码中将其称为不同的名称 只是为了方便和可读性 这并不是真正的要求 只是一种愿望 我可以在服务的实际源代码中仔细检查并更改它 但这显然会导致各种问题
  • 分区交换列类型或大小不匹配 (ORA-14097)

    我正在尝试对数据库进行交换分区 但出现以下错误 ORA 14097 ALTER TABLE EXCHANGE PARTITION 中的列类型或大小不匹配 剧本这样做已经创建了并且它在 Oracle 11g 数据库上按预期运行 当我更新到 1
  • 插入到子查询中具有多个值的表中

    INSERT INTO Reference TB RequestID WaveID VALUES 2222 select tWaveID from Table2 我正在使用上面的查询插入表中 我知道 Table2有多个tWaveID这就是为
  • Swift 中的精确字符串格式说明符

    下面是我之前如何将浮点数截断到小数点后两位 NSLog 02f 02f 02f r g b 我检查了文档和电子书 但无法弄清楚 谢谢 下面的代码 import Foundation required for String format pr
  • AWS RDS:从S3存储桶中的sql文件导入数据

    我有一个数据库备份作为 sql 文件存储在 s3 存储桶中 如何将该文件直接导入到 Aurora RDS 中 而不需要将其下载到我的 PC 上并手动导入 如果您的数据是有效的 SQL 转储 您可以在创建新的 Aurora 实例时指定其 S3
  • Google Groups API - getUsers() 您无权查看该群组的成员列表:

    大家干杯 我有一个 Google 脚本 它通过使用 getUsers 函数检查电子邮件地址是否是某个组的成员 So far 我已激活 Admin SDK目录服务 我有管理员权限 对于大多数组来说 它确实很神奇 但是在某些组的情况下我会收到授
  • CSS/Javascript:如何制作具有多种状态的旋转圆形菜单?

    通常我不会自己发布内容 我通常会通过其他人的线程找到我需要的内容 因此如果其中任何内容位于错误的位置或格式不正确 我很抱歉 我以前从来没有这样做过 所以情况是这样的 我正在尝试重建我的网站 并选择使用 WordPress 的 X 主题 大多
  • 使用ticker定期从经常变化的路径加载内存中的所有文件?

    我有一个应用程序需要从两个不同的路径读取文件 读取所有这些文件后 我需要将它们加载到内存中products map Path Full 这是内存中服务器启动期间需要加载的所有文件的路径 该路径将包含大约 50 个文件 每个文件大小约为 60