基本上放了一个$addToSet运算符无法为您工作,因为您的数据不是真实的"set"根据定义,它是“完全不同”对象的集合。
这里的另一个逻辑意义是,您将在数据到达时对其进行处理,无论是作为单个对象还是提要。我假设它是某种形式的许多项目的提要,并且您可以使用某种流处理器来获得每个收到的文档的结构:
{
"date": new Date("2015-03-09 13:23:00.000Z"),
"symbol": "AAPL",
"open": 127.14
"high": 127.17,
"low": 127.12
"close": 127.15,
"volume": 19734
}
转换为标准十进制格式以及 UTC 日期,因为当然,一旦从数据存储中检索数据,任何区域设置实际上都应该是应用程序的域。
我还至少会通过删除对其他集合的引用并将数据放在那里来稍微展平您的“intraDayQuoteSchema”。您仍然需要在插入时进行查找,但是读取时额外填充的开销似乎比存储开销更高:
intradayQuotesSchema = Schema({
symbol:{
name: String,
code: String
},
day:Date,
quotes:[quotesSchema]
});
这取决于您的使用模式,但这种方式可能会更有效。
剩下的实际上取决于什么是可以接受的
stream.on(function(data) {
var symbol = data.symbol,
myDay = new Date(
data.date.valueOf() -
( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
delete data.symbol;
symbol.findOne({ "code": symbol },function(err,stock) {
intraDayQuote.findOneAndUpdate(
{ "symbol.code": symbol , "day": myDay },
{ "$setOnInsert": {
"symbol.name": stock.name
"quotes": [data]
}},
{ "upsert": true }
function(err,doc) {
intraDayQuote.findOneAndUpdate(
{
"symbol.code": symbol,
"day": myDay,
"quotes.date": data.date
},
{ "$set": { "quotes.$": data } },
function(err,doc) {
intraDayQuote.findOneAndUpdate(
{
"symbol.code": symbol,
"day": myDay,
"quotes.date": { "$ne": data.date }
},
{ "$push": { "quotes": data } },
function(err,doc) {
}
);
}
);
}
);
});
});
如果您实际上不需要响应中修改的文档,那么通过在此处实现批量操作 API 并在单个数据库请求中发送此包中的所有更新,您将获得一些好处:
stream.on("data",function(data) {
var symbol = data.symbol,
myDay = new Date(
data.date.valueOf() -
( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
delete data.symbol;
symbol.findOne({ "code": symbol },function(err,stock) {
var bulk = intraDayQuote.collection.initializeOrderedBulkOp();
bulk.find({ "symbol.code": symbol , "day": myDay })
.upsert().updateOne({
"$setOnInsert": {
"symbol.name": stock.name
"quotes": [data]
}
});
bulk.find({
"symbol.code": symbol,
"day": myDay,
"quotes.date": data.date
}).updateOne({
"$set": { "quotes.$": data }
});
bulk.find({
"symbol.code": symbol,
"day": myDay,
"quotes.date": { "$ne": data.date }
}).updateOne({
"$push": { "quotes": data }
});
bulk.execute(function(err,result) {
// maybe do something with the response
});
});
});
关键是只有其中一个语句会实际修改数据,并且由于所有这些都是在同一个请求中发送的,因此应用程序和服务器之间的来回次数较少。
另一种情况是,在这种情况下,在另一个集合中引用实际数据可能会更简单。然后,这就变成了处理更新插入的简单问题:
intradayQuotesSchema = Schema({
symbol:{
name: String,
code: String
},
day:Date,
quotes:[{ type: Schema.Types.ObjectId, ref: "quote" }]
});
// and in the steam processor
stream.on("data",function(data) {
var symbol = data.symbol,
myDay = new Date(
data.date.valueOf() -
( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
delete data.symbol;
symbol.findOne({ "code": symbol },function(err,stock) {
quote.update(
{ "date": data.date },
{ "$setOnInsert": data },
{ "upsert": true },
function(err,num,raw) {
if ( !raw.updatedExisting ) {
intraDayQuote.update(
{ "symbol.code": symbol , "day": myDay },
{
"$setOnInsert": {
"symbol.name": stock.name
},
"$addToSet": { "quotes": data }
},
{ "upsert": true },
function(err,num,raw) {
}
);
}
}
);
});
});
这实际上取决于将报价数据嵌套在“日”文档中对您来说有多重要。主要区别是,如果您想根据其中一些“引用”字段的数据查询这些文档,或者以其他方式承受使用的开销.populate()
从其他集合中提取“引用”。
当然,如果引用并且报价数据对您的查询过滤很重要,那么您始终可以只查询该集合以获取_id
匹配并使用的值$in对“day”文档的查询仅匹配包含那些匹配的“quote”文档的日期。
这是一个重大决定,最重要的是根据应用程序使用数据的方式采取哪条路径。希望这可以指导您了解实现您想要实现的目标背后的一般概念。
P.S 除非您“确定”源数据始终是四舍五入到精确“分钟”的日期,否则您可能希望采用与获取离散“日”相同的日期四舍五入数学。