.stream()
データベースからレコードをストリームし、結果セット全体をメモリにバッファリングすることなく、一度に1つずつまたはバッチで消費できます。
await Something.stream(criteria)
.eachRecord(async (record)=>{
});
引数 | 型 | 詳細 | |
---|---|---|---|
1 | criteria | データベース内のレコードを照合するために使用するWaterlineの条件。 |
次のいずれかを使用してください
.eachRecord(async (record)=>{ ... })
.eachBatch(async (records)=>{ ... })
eachRecord()
またはeachBatch()
に提供するカスタム関数は、次の引数を受け取ります
引数 | 型 | 詳細 | |
---|---|---|---|
1 | record または records | 現在のレコード、または現在のレコードのバッチ。バッチ配列には常に少なくとも1つのレコードが含まれ、バッチサイズ(デフォルトでは30)を超えるレコードが含まれることはありません。 |
名前 | 型 | いつ? |
---|---|---|
UsageError | 無効なものが渡された場合にスローされます。 | |
AdapterError | データベースアダプタで何か問題が発生した場合にスローされます。 | |
Error | 予期しない事態が発生した場合にスローされます。 |
SailsとWaterlineでのエラー処理の例については、概念 > モデルとORM > エラーを参照してください。
.stream()
メソッドは、.find()
とほぼ同じですが、一度に1バッチずつレコードをフェッチする点が異なります。レコードのバッチがロードされるたびに、提供したイテレータ関数が1回以上呼び出されます。.eachRecord()
を使用した場合、バッチ内の各レコードに対して、レコードごとの関数が1回呼び出されます。それ以外の場合、.eachBatch()
を使用すると、バッチ全体の関数が1回呼び出されます。
これは、結果セット全体をメモリに同時に保持しようとするとサーバーの利用可能なRAMがオーバーフローする可能性のある、非常に大きな結果セットを扱う場合に便利です。Waterlineの.stream()
メソッドを使用すると、Mongoカーソルで既におなじみである可能性のあることを実行できます。レポートの作成、シェルスクリプトでのデータベースレコードのループ処理と変更、大量のデータをある場所から別の場所への移動、複雑な変換の実行、さらにはマップ/リデュースジョブの編成などです。
以下に4つの例の状況を探ります
データベース内でFinnという名前のユーザーを1つずつ反復処理するアクション
await User.stream({name:'Finn'})
.eachRecord(async (user)=>{
if (Math.random() > 0.5) {
throw new Error('Oops! This is a simulated error.');
}
sails.log(`Found a user ${user.id} named Finn.`);
});
動的に生成されたサイトマップで応答するアクション
// e.g. in an action that handles `GET /sitemap.xml`:
var sitemapXml = '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">';
await BlogPost.stream()
.limit(50000)
.sort('title ASC')
.eachRecord((blogPost)=>{
sitemapXml += (
'<url>\n'+
' <loc>https://blog.example.com/' + _.escape(encodeURIComponent(blogPost.slug))+'</loc>\n'+
' <lastmod>'+_.escape(blogPost.updatedAt)+'</lastmod>\n'+
'<changefreq>monthly</changefreq>\n'+
'</url>'
);
});
sitemapXml += '</urlset>';
.populate()
を使用"Bailey Bitterbumps"という名前の人物からの不気味なコメントを検索し、当局に報告するコマンドラインスクリプトのスニペット
// e.g. in a shell script
var numReported = 0;
await Comment.stream({ author: 'Bailey Bitterbumps' })
.limit(1000)
.skip(40)
.sort('title ASC')
.populate('attachedFiles', {
limit: 3,
sort: 'updatedAt'
})
.populate('fromBlogPost')
.eachRecord(async (comment)=>{
var isCreepyEnoughToWorryAbout = comment.rawMessage.match(/creepy/) && comment.attachedFiles.length > 1;
if (!isCreepyEnoughToWorryAbout) {
return;
}
await sails.helpers.sendTemplateEmail.with({
template: 'email-creepy-comment-notification',
templateData: {
url: `https://blog.example.com/${comment.fromBlogPost.slug}/comments/${comment.slug}.`
},
to: '[email protected]',
subject: 'Creepy comment alert'
});
numReported++;
});
sails.log(`Successfully reported ${numReported} creepy comments.`);
前の例のコードを実行した場合、不気味なコメントごとに1通のメールを送信することになります... これは、Bailey Bitterbumpsを知っていると、大量になる可能性があります。これは遅くなるだけでなく、トランザクションメールプロバイダーに数千の個別のAPIリクエストを送信することを意味し、APIレート制限をすぐに圧倒する可能性があります。
この場合、.eachBatch()
を使用して、個々のレコードを1つずつ処理するのではなく、フェッチされているレコードのバッチ全体を取得することで、必要なAPIリクエストの数を大幅に減らすことができます。
デフォルトでは、.stream()
はバッチサイズ30を使用します。つまり、バッチあたり最大30件のレコードをロードします。したがって、.eachBatch()
を使用している場合、カスタム関数は呼び出されるたびに1〜30件のレコードを受け取ります。
バッチサイズを増減するには、.eachBatch()
に追加の引数を渡します
.eachBatch(100, async (records)=>{
console.log(`Got ${records.length} records.`);
})
コードで
.eachBatch()
を使用することは、.eachRecord()
を使用するよりも効率的とは限りません。これは、使用するイテレータに関係なく、Waterlineが一度に複数のレコード(デフォルトでは30)をデータベースに要求するためです。.eachBatch()
を使用すると、上記で説明した追加の引数を使用して、このバッチサイズを簡単に構成できます。.eachRecord
を使用しながらバッチサイズをカスタマイズすることもできます(たとえば、使用しているサードパーティAPIによるレート制限を回避するため)。.meta()
を使用してください。たとえば、.meta({batchSize: 100})
です。
- このメソッドは、
await
、プロミスチェーン、または従来のNodeコールバックで使用できます。.stream()
は、いずれかのイテレータから最初のエラーを受け取ると、直ちに中断してエラーをスローします。.stream()
は、提供されたイテレータ関数を各レコードまたはバッチに対して、一度に1つずつ、順番に実行します。Sails 1.1.0より前のバージョンでは、.stream()
の推奨される使用法では、イテレータがコールバック(next
)を呼び出すことを想定していました。これは、2番目の引数として提供されます。関数のシグネチャに2番目の引数を実際には含めない限り、これは不要になりました。- Sails v1.0 / Waterline 0.13より前のバージョンでは、このメソッドはより低レベルのインターフェイスを持ち、Readable "object stream"を公開していました。これは強力でしたが、エラーが発生しやすい傾向がありました。新しいアダプタに依存しない
.stream()
は、エミッタや特定の種類のNodeストリームに依存していません。(以前の方法で機能させる必要がありますか?ご心配なく。少しのコードを使用すると、新しいインターフェイスを使用して、streams2/streams3互換のReadable「オブジェクトストリーム」を簡単に構築できます。)- 追加の例、背景情報、実装の詳細を含む、
.stream()
を作成するきっかけとなった背景の詳細については、こちらをご覧ください。