Lucene.Net  3.0.3
Lucene.Net is a port of the Lucene search engine library, written in C# and targeted at .NET runtime users.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties Pages
DocumentsWriter.cs
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 using System;
19 using System.Collections.Generic;
20 using System.Linq;
21 using System.Threading;
22 using Lucene.Net.Support;
23 using Analyzer = Lucene.Net.Analysis.Analyzer;
24 using Document = Lucene.Net.Documents.Document;
25 using AlreadyClosedException = Lucene.Net.Store.AlreadyClosedException;
26 using Directory = Lucene.Net.Store.Directory;
27 using ArrayUtil = Lucene.Net.Util.ArrayUtil;
28 using Constants = Lucene.Net.Util.Constants;
29 using IndexSearcher = Lucene.Net.Search.IndexSearcher;
30 using Query = Lucene.Net.Search.Query;
31 using Scorer = Lucene.Net.Search.Scorer;
32 using Similarity = Lucene.Net.Search.Similarity;
33 using Weight = Lucene.Net.Search.Weight;
34 
35 namespace Lucene.Net.Index
36 {
37 
38  /// <summary> This class accepts multiple added documents and directly
39  /// writes a single segment file. It does this more
40  /// efficiently than creating a single segment per document
41  /// (with DocumentWriter) and doing standard merges on those
42  /// segments.
43  ///
44  /// Each added document is passed to the <see cref="DocConsumer" />,
45  /// which in turn processes the document and interacts with
46  /// other consumers in the indexing chain. Certain
47  /// consumers, like <see cref="StoredFieldsWriter" /> and <see cref="TermVectorsTermsWriter" />
48  ///, digest a document and
49  /// immediately write bytes to the "doc store" files (ie,
50  /// they do not consume RAM per document, except while they
51  /// are processing the document).
52  ///
53  /// Other consumers, eg <see cref="FreqProxTermsWriter" /> and
54  /// <see cref="NormsWriter" />, buffer bytes in RAM and flush only
55  /// when a new segment is produced.
56  /// Once we have used our allowed RAM buffer, or the number
57  /// of added docs is large enough (in the case we are
58  /// flushing by doc count instead of RAM usage), we create a
59  /// real segment and flush it to the Directory.
60  ///
61  /// Threads:
62  ///
63  /// Multiple threads are allowed into addDocument at once.
64  /// There is an initial synchronized call to getThreadState
65  /// which allocates a ThreadState for this thread. The same
66  /// thread will get the same ThreadState over time (thread
67  /// affinity) so that if there are consistent patterns (for
68  /// example each thread is indexing a different content
69  /// source) then we make better use of RAM. Then
70  /// processDocument is called on that ThreadState without
71  /// synchronization (most of the "heavy lifting" is in this
72  /// call). Finally the synchronized "finishDocument" is
73  /// called to flush changes to the directory.
74  ///
75  /// When flush is called by IndexWriter we forcefully idle
76  /// all threads and flush only once they are all idle. This
77  /// means you can call flush with a given thread even while
78  /// other threads are actively adding/deleting documents.
79  ///
80  ///
81  /// Exceptions:
82  ///
83  /// Because this class directly updates in-memory posting
84  /// lists, and flushes stored fields and term vectors
85  /// directly to files in the directory, there are certain
86  /// limited times when an exception can corrupt this state.
87  /// For example, a disk full while flushing stored fields
88  /// leaves this file in a corrupt state. Or, an OOM
89  /// exception while appending to the in-memory posting lists
90  /// can corrupt that posting list. We call such exceptions
91  /// "aborting exceptions". In these cases we must call
92  /// abort() to discard all docs added since the last flush.
93  ///
94  /// All other exceptions ("non-aborting exceptions") can
95  /// still partially update the index structures. These
96  /// updates are consistent, but, they represent only a part
97  /// of the document seen up until the exception was hit.
98  /// When this happens, we immediately mark the document as
99  /// deleted so that the document is always atomically ("all
100  /// or none") added to the index.
101  /// </summary>
102 
103  public sealed class DocumentsWriter : IDisposable
104  {
105  internal class AnonymousClassIndexingChain:IndexingChain
106  {
107 
108  internal override DocConsumer GetChain(DocumentsWriter documentsWriter)
109  {
110  /*
111  This is the current indexing chain:
112 
113  DocConsumer / DocConsumerPerThread
114  --> code: DocFieldProcessor / DocFieldProcessorPerThread
115  --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
116  --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
117  --> code: DocInverter / DocInverterPerThread / DocInverterPerField
118  --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
119  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
120  --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
121  --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
122  --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
123  --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
124  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
125  --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
126  */
127 
128  // Build up indexing chain:
129 
130  TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
131  TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
132 
133  InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, new TermsHash(documentsWriter, false, termVectorsWriter, null));
134  NormsWriter normsWriter = new NormsWriter();
135  DocInverter docInverter = new DocInverter(termsHash, normsWriter);
136  return new DocFieldProcessor(documentsWriter, docInverter);
137  }
138  }
139  private void InitBlock()
140  {
141  maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
142  maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
143  ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024);
144  waitQueuePauseBytes = (long) (ramBufferSize * 0.1);
145  waitQueueResumeBytes = (long) (ramBufferSize * 0.05);
146  freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 1.05);
147  freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB * 1024 * 1024 * 0.95);
148  maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;
149  skipDocWriter = new SkipDocWriter();
150  byteBlockAllocator = new ByteBlockAllocator(this, DocumentsWriter.BYTE_BLOCK_SIZE);
151  perDocAllocator = new ByteBlockAllocator(this,DocumentsWriter.PER_DOC_BLOCK_SIZE);
152  waitQueue = new WaitQueue(this);
153  }
154 
155  internal IndexWriter writer;
156  internal Directory directory;
157 
158  internal System.String segment; // Current segment we are working on
159  private System.String docStoreSegment; // Current doc-store segment we are writing
160  private int docStoreOffset; // Current starting doc-store offset of current segment
161 
162  private int nextDocID; // Next docID to be added
163  private int numDocsInRAM; // # docs buffered in RAM
164  internal int numDocsInStore; // # docs written to doc stores
165 
166  // Max # ThreadState instances; if there are more threads
167  // than this they share ThreadStates
168  private const int MAX_THREAD_STATE = 5;
169  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
170  private HashMap<ThreadClass, DocumentsWriterThreadState> threadBindings = new HashMap<ThreadClass, DocumentsWriterThreadState>();
171 
172  private int pauseThreads; // Non-zero when we need all threads to
173  // pause (eg to flush)
174  internal bool flushPending; // True when a thread has decided to flush
175  internal bool bufferIsFull; // True when it's time to write segment
176  private bool aborting; // True if an abort is pending
177 
178  private DocFieldProcessor docFieldProcessor;
179 
180  internal System.IO.StreamWriter infoStream;
181  internal int maxFieldLength;
182  internal Similarity similarity;
183 
184  internal IList<string> newFiles;
185 
186  internal class DocState
187  {
188  internal DocumentsWriter docWriter;
189  internal Analyzer analyzer;
190  internal int maxFieldLength;
191  internal System.IO.StreamWriter infoStream;
192  internal Similarity similarity;
193  internal int docID;
194  internal Document doc;
195  internal System.String maxTermPrefix;
196 
197  // Only called by asserts
198  public bool TestPoint(System.String name)
199  {
200  return docWriter.writer.TestPoint(name);
201  }
202 
203  public void Clear()
204  {
205  // don't hold onto doc nor analyzer, in case it is
206  // largish:
207  doc = null;
208  analyzer = null;
209  }
210  }
211 
212  /// <summary>Consumer returns this on each doc. This holds any
213  /// state that must be flushed synchronized "in docID
214  /// order". We gather these and flush them in order.
215  /// </summary>
216  internal abstract class DocWriter
217  {
218  internal DocWriter next;
219  internal int docID;
220  public abstract void Finish();
221  public abstract void Abort();
222  public abstract long SizeInBytes();
223 
224  internal void SetNext(DocWriter next)
225  {
226  this.next = next;
227  }
228  }
229 
230  /*
231  * Create and return a new DocWriterBuffer.
232  */
233  internal PerDocBuffer NewPerDocBuffer()
234  {
235  return new PerDocBuffer(this);
236  }
237 
238  /*
239  * RAMFile buffer for DocWriters.
240  */
241  internal class PerDocBuffer : Lucene.Net.Store.RAMFile
242  {
243  DocumentsWriter enclosingInstance;
244  public PerDocBuffer(DocumentsWriter enclosingInstance)
245  {
246  this.enclosingInstance = enclosingInstance;
247  }
248  /*
249  * Allocate bytes used from shared pool.
250  */
251  public override byte[] NewBuffer(int size)
252  {
253  System.Diagnostics.Debug.Assert(size == PER_DOC_BLOCK_SIZE);
254  return enclosingInstance.perDocAllocator.GetByteBlock(false);
255  }
256 
257  /*
258  * Recycle the bytes used.
259  */
260  internal void Recycle()
261  {
262  lock (this)
263  {
264  if (buffers.Count > 0)
265  {
266  Length = 0;
267 
268  // Recycle the blocks
269  enclosingInstance.perDocAllocator.RecycleByteBlocks(buffers);
270  buffers.Clear();
271  sizeInBytes = 0;
272 
273  System.Diagnostics.Debug.Assert(NumBuffers() == 0);
274  }
275  }
276  }
277  }
278 
279  /// <summary> The IndexingChain must define the <see cref="GetChain(DocumentsWriter)" /> method
280  /// which returns the DocConsumer that the DocumentsWriter calls to process the
281  /// documents.
282  /// </summary>
283  internal abstract class IndexingChain
284  {
285  internal abstract DocConsumer GetChain(DocumentsWriter documentsWriter);
286  }
287 
288  internal static readonly IndexingChain DefaultIndexingChain;
289 
290  internal DocConsumer consumer;
291 
292  // Deletes done after the last flush; these are discarded
293  // on abort
294  private BufferedDeletes deletesInRAM = new BufferedDeletes(false);
295 
296  // Deletes done before the last flush; these are still
297  // kept on abort
298  private BufferedDeletes deletesFlushed = new BufferedDeletes(true);
299 
300  // The max number of delete terms that can be buffered before
301  // they must be flushed to disk.
302  private int maxBufferedDeleteTerms;
303 
304  // How much RAM we can use before flushing. This is 0 if
305  // we are flushing by doc count instead.
306  private long ramBufferSize;
307  private long waitQueuePauseBytes;
308  private long waitQueueResumeBytes;
309 
310  // If we've allocated 5% over our RAM budget, we then
311  // free down to 95%
312  private long freeTrigger;
313  private long freeLevel;
314 
315  // Flush @ this number of docs. If ramBufferSize is
316  // non-zero we will flush by RAM usage instead.
317  private int maxBufferedDocs;
318 
319  private int flushedDocCount; // How many docs already flushed to index
320 
321  internal void UpdateFlushedDocCount(int n)
322  {
323  lock (this)
324  {
325  flushedDocCount += n;
326  }
327  }
328  internal int GetFlushedDocCount()
329  {
330  lock (this)
331  {
332  return flushedDocCount;
333  }
334  }
335  internal void SetFlushedDocCount(int n)
336  {
337  lock (this)
338  {
339  flushedDocCount = n;
340  }
341  }
342 
343  private bool closed;
344 
345  internal DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain indexingChain)
346  {
347  InitBlock();
348  this.directory = directory;
349  this.writer = writer;
350  this.similarity = writer.Similarity;
351  flushedDocCount = writer.MaxDoc();
352 
353  consumer = indexingChain.GetChain(this);
354  if (consumer is DocFieldProcessor)
355  {
356  docFieldProcessor = (DocFieldProcessor) consumer;
357  }
358  }
359 
360  /// <summary>Returns true if any of the fields in the current
361  /// buffered docs have omitTermFreqAndPositions==false
362  /// </summary>
363  internal bool HasProx()
364  {
365  return (docFieldProcessor != null)?docFieldProcessor.fieldInfos.HasProx():true;
366  }
367 
368  /// <summary>If non-null, various details of indexing are printed
369  /// here.
370  /// </summary>
371  internal void SetInfoStream(System.IO.StreamWriter infoStream)
372  {
373  lock (this)
374  {
375  this.infoStream = infoStream;
376  for (int i = 0; i < threadStates.Length; i++)
377  threadStates[i].docState.infoStream = infoStream;
378  }
379  }
380 
381  internal void SetMaxFieldLength(int maxFieldLength)
382  {
383  lock (this)
384  {
385  this.maxFieldLength = maxFieldLength;
386  for (int i = 0; i < threadStates.Length; i++)
387  threadStates[i].docState.maxFieldLength = maxFieldLength;
388  }
389  }
390 
391  internal void SetSimilarity(Similarity similarity)
392  {
393  lock (this)
394  {
395  this.similarity = similarity;
396  for (int i = 0; i < threadStates.Length; i++)
397  threadStates[i].docState.similarity = similarity;
398  }
399  }
400 
401  /// <summary>Set how much RAM we can use before flushing. </summary>
402  internal void SetRAMBufferSizeMB(double mb)
403  {
404  lock (this)
405  {
407  {
408  ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH;
409  waitQueuePauseBytes = 4 * 1024 * 1024;
410  waitQueueResumeBytes = 2 * 1024 * 1024;
411  }
412  else
413  {
414  ramBufferSize = (long) (mb * 1024 * 1024);
415  waitQueuePauseBytes = (long) (ramBufferSize * 0.1);
416  waitQueueResumeBytes = (long) (ramBufferSize * 0.05);
417  freeTrigger = (long) (1.05 * ramBufferSize);
418  freeLevel = (long) (0.95 * ramBufferSize);
419  }
420  }
421  }
422 
423  internal double GetRAMBufferSizeMB()
424  {
425  lock (this)
426  {
427  if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH)
428  {
429  return ramBufferSize;
430  }
431  else
432  {
433  return ramBufferSize / 1024.0 / 1024.0;
434  }
435  }
436  }
437 
438  /// <summary>Gets or sets max buffered docs, which means we will flush by
439  /// doc count instead of by RAM usage.
440  /// </summary>
441  internal int MaxBufferedDocs
442  {
443  get { return maxBufferedDocs; }
444  set { maxBufferedDocs = value; }
445  }
446 
447  /// <summary>Get current segment name we are writing. </summary>
448  internal string Segment
449  {
450  get { return segment; }
451  }
452 
453  /// <summary>Returns how many docs are currently buffered in RAM. </summary>
454  internal int NumDocsInRAM
455  {
456  get { return numDocsInRAM; }
457  }
458 
459  /// <summary>Returns the current doc store segment we are writing
460  /// to.
461  /// </summary>
462  internal string DocStoreSegment
463  {
464  get
465  {
466  lock (this)
467  {
468  return docStoreSegment;
469  }
470  }
471  }
472 
473  /// <summary>Returns the doc offset into the shared doc store for
474  /// the current buffered docs.
475  /// </summary>
476  internal int DocStoreOffset
477  {
478  get { return docStoreOffset; }
479  }
480 
481  /// <summary>Closes the current open doc stores an returns the doc
482  /// store segment name. This returns null if there are *
483  /// no buffered documents.
484  /// </summary>
485  internal System.String CloseDocStore()
486  {
487  lock (this)
488  {
489 
490  System.Diagnostics.Debug.Assert(AllThreadsIdle());
491 
492  if (infoStream != null)
493  Message("closeDocStore: " + openFiles.Count + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
494 
495  bool success = false;
496 
497  try
498  {
499  InitFlushState(true);
500  closedFiles.Clear();
501 
502  consumer.CloseDocStore(flushState);
503  System.Diagnostics.Debug.Assert(0 == openFiles.Count);
504 
505  System.String s = docStoreSegment;
506  docStoreSegment = null;
507  docStoreOffset = 0;
508  numDocsInStore = 0;
509  success = true;
510  return s;
511  }
512  finally
513  {
514  if (!success)
515  {
516  Abort();
517  }
518  }
519  }
520  }
521 
522  private ICollection<string> abortedFiles; // List of files that were written before last abort()
523 
524  private SegmentWriteState flushState;
525 
526  internal ICollection<string> AbortedFiles()
527  {
528  return abortedFiles;
529  }
530 
531  internal void Message(System.String message)
532  {
533  if (infoStream != null)
534  writer.Message("DW: " + message);
535  }
536 
537  internal IList<string> openFiles = new List<string>();
538  internal IList<string> closedFiles = new List<string>();
539 
540  /* Returns Collection of files in use by this instance,
541  * including any flushed segments. */
542  internal IList<string> OpenFiles()
543  {
544  lock (this)
545  {
546  // ToArray returns a copy
547  return openFiles.ToArray();
548  }
549  }
550 
551  internal IList<string> ClosedFiles()
552  {
553  lock (this)
554  {
555  // ToArray returns a copy
556  return closedFiles.ToArray();
557  }
558  }
559 
560  internal void AddOpenFile(System.String name)
561  {
562  lock (this)
563  {
564  System.Diagnostics.Debug.Assert(!openFiles.Contains(name));
565  openFiles.Add(name);
566  }
567  }
568 
569  internal void RemoveOpenFile(System.String name)
570  {
571  lock (this)
572  {
573  System.Diagnostics.Debug.Assert(openFiles.Contains(name));
574  openFiles.Remove(name);
575  closedFiles.Add(name);
576  }
577  }
578 
579  internal void SetAborting()
580  {
581  lock (this)
582  {
583  aborting = true;
584  }
585  }
586 
587  /// <summary>Called if we hit an exception at a bad time (when
588  /// updating the index files) and must discard all
589  /// currently buffered docs. This resets our state,
590  /// discarding any docs added since last flush.
591  /// </summary>
592  internal void Abort()
593  {
594  lock (this)
595  {
596  try
597  {
598  if (infoStream != null)
599  {
600  Message("docWriter: now abort");
601  }
602 
603  // Forcefully remove waiting ThreadStates from line
604  waitQueue.Abort();
605 
606  // Wait for all other threads to finish with
607  // DocumentsWriter:
608  PauseAllThreads();
609 
610  try
611  {
612 
613  System.Diagnostics.Debug.Assert(0 == waitQueue.numWaiting);
614 
615  waitQueue.waitingBytes = 0;
616 
617  try
618  {
619  abortedFiles = OpenFiles();
620  }
621  catch (System.Exception)
622  {
623  abortedFiles = null;
624  }
625 
626  deletesInRAM.Clear();
627  deletesFlushed.Clear();
628  openFiles.Clear();
629 
630  for (int i = 0; i < threadStates.Length; i++)
631  try
632  {
633  threadStates[i].consumer.Abort();
634  }
635  catch (System.Exception)
636  {
637  }
638 
639  try
640  {
641  consumer.Abort();
642  }
643  catch (System.Exception)
644  {
645  }
646 
647  docStoreSegment = null;
648  numDocsInStore = 0;
649  docStoreOffset = 0;
650 
651  // Reset all postings data
652  DoAfterFlush();
653  }
654  finally
655  {
656  ResumeAllThreads();
657  }
658  }
659  finally
660  {
661  aborting = false;
662  System.Threading.Monitor.PulseAll(this);
663  if (infoStream != null)
664  {
665  Message("docWriter: done abort; abortedFiles=" + abortedFiles);
666  }
667  }
668  }
669  }
670 
671  /// <summary>Reset after a flush </summary>
672  private void DoAfterFlush()
673  {
674  // All ThreadStates should be idle when we are called
675  System.Diagnostics.Debug.Assert(AllThreadsIdle());
676  threadBindings.Clear();
677  waitQueue.Reset();
678  segment = null;
679  numDocsInRAM = 0;
680  nextDocID = 0;
681  bufferIsFull = false;
682  flushPending = false;
683  for (int i = 0; i < threadStates.Length; i++)
684  threadStates[i].DoAfterFlush();
685  numBytesUsed = 0;
686  }
687 
688  // Returns true if an abort is in progress
689  internal bool PauseAllThreads()
690  {
691  lock (this)
692  {
693  pauseThreads++;
694  while (!AllThreadsIdle())
695  {
696  System.Threading.Monitor.Wait(this);
697  }
698 
699  return aborting;
700  }
701  }
702 
703  internal void ResumeAllThreads()
704  {
705  lock (this)
706  {
707  pauseThreads--;
708  System.Diagnostics.Debug.Assert(pauseThreads >= 0);
709  if (0 == pauseThreads)
710  System.Threading.Monitor.PulseAll(this);
711  }
712  }
713 
714  private bool AllThreadsIdle()
715  {
716  lock (this)
717  {
718  for (int i = 0; i < threadStates.Length; i++)
719  if (!threadStates[i].isIdle)
720  return false;
721  return true;
722  }
723  }
724 
725  internal bool AnyChanges
726  {
727  get
728  {
729  lock (this)
730  {
731  return numDocsInRAM != 0 || deletesInRAM.numTerms != 0 || deletesInRAM.docIDs.Count != 0 ||
732  deletesInRAM.queries.Count != 0;
733  }
734  }
735  }
736 
737  private void InitFlushState(bool onlyDocStore)
738  {
739  lock (this)
740  {
741  InitSegmentName(onlyDocStore);
742  flushState = new SegmentWriteState(this, directory, segment, docStoreSegment, numDocsInRAM, numDocsInStore, writer.TermIndexInterval);
743  }
744  }
745 
746  /// <summary>Flush all pending docs to a new segment </summary>
747  internal int Flush(bool closeDocStore)
748  {
749  lock (this)
750  {
751 
752  System.Diagnostics.Debug.Assert(AllThreadsIdle());
753 
754  System.Diagnostics.Debug.Assert(numDocsInRAM > 0);
755 
756  System.Diagnostics.Debug.Assert(nextDocID == numDocsInRAM);
757  System.Diagnostics.Debug.Assert(waitQueue.numWaiting == 0);
758  System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0);
759 
760  InitFlushState(false);
761 
762  docStoreOffset = numDocsInStore;
763 
764  if (infoStream != null)
765  Message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
766 
767  bool success = false;
768 
769  try
770  {
771 
772  if (closeDocStore)
773  {
774  System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName != null);
775  System.Diagnostics.Debug.Assert(flushState.docStoreSegmentName.Equals(flushState.segmentName));
776  CloseDocStore();
777  flushState.numDocsInStore = 0;
778  }
779 
780  ICollection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
781  for (int i = 0; i < threadStates.Length; i++)
782  threads.Add(threadStates[i].consumer);
783  consumer.Flush(threads, flushState);
784 
785  if (infoStream != null)
786  {
787  SegmentInfo si = new SegmentInfo(flushState.segmentName, flushState.numDocs, directory);
788  long newSegmentSize = si.SizeInBytes();
789  System.String message = System.String.Format(nf, " oldRAMSize={0:d} newFlushedSize={1:d} docs/MB={2:f} new/old={3:%}",
790  new System.Object[] { numBytesUsed, newSegmentSize, (numDocsInRAM / (newSegmentSize / 1024.0 / 1024.0)), (100.0 * newSegmentSize / numBytesUsed) });
791  Message(message);
792  }
793 
794  flushedDocCount += flushState.numDocs;
795 
796  DoAfterFlush();
797 
798  success = true;
799  }
800  finally
801  {
802  if (!success)
803  {
804  Abort();
805  }
806  }
807 
808  System.Diagnostics.Debug.Assert(waitQueue.waitingBytes == 0);
809 
810  return flushState.numDocs;
811  }
812  }
813 
814  internal ICollection<string> GetFlushedFiles()
815  {
816  return flushState.flushedFiles;
817  }
818 
819  /// <summary>Build compound file for the segment we just flushed </summary>
820  internal void CreateCompoundFile(System.String segment)
821  {
822 
823  CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);
824  foreach(string flushedFile in flushState.flushedFiles)
825  {
826  cfsWriter.AddFile(flushedFile);
827  }
828 
829  // Perform the merge
830  cfsWriter.Close();
831  }
832 
833  /// <summary>Set flushPending if it is not already set and returns
834  /// whether it was set. This is used by IndexWriter to
835  /// trigger a single flush even when multiple threads are
836  /// trying to do so.
837  /// </summary>
838  internal bool SetFlushPending()
839  {
840  lock (this)
841  {
842  if (flushPending)
843  return false;
844  else
845  {
846  flushPending = true;
847  return true;
848  }
849  }
850  }
851 
852  internal void ClearFlushPending()
853  {
854  lock (this)
855  {
856  flushPending = false;
857  }
858  }
859 
860  internal void PushDeletes()
861  {
862  lock (this)
863  {
864  deletesFlushed.Update(deletesInRAM);
865  }
866  }
867 
868  public void Dispose()
869  {
870  // Move to protected method if class becomes unsealed
871  lock (this)
872  {
873  closed = true;
874  System.Threading.Monitor.PulseAll(this);
875  }
876  }
877 
878  internal void InitSegmentName(bool onlyDocStore)
879  {
880  lock (this)
881  {
882  if (segment == null && (!onlyDocStore || docStoreSegment == null))
883  {
884  segment = writer.NewSegmentName();
885  System.Diagnostics.Debug.Assert(numDocsInRAM == 0);
886  }
887  if (docStoreSegment == null)
888  {
889  docStoreSegment = segment;
890  System.Diagnostics.Debug.Assert(numDocsInStore == 0);
891  }
892  }
893  }
894 
895  /// <summary>Returns a free (idle) ThreadState that may be used for
896  /// indexing this one document. This call also pauses if a
897  /// flush is pending. If delTerm is non-null then we
898  /// buffer this deleted term after the thread state has
899  /// been acquired.
900  /// </summary>
901  internal DocumentsWriterThreadState GetThreadState(Document doc, Term delTerm)
902  {
903  lock (this)
904  {
905 
906  // First, find a thread state. If this thread already
907  // has affinity to a specific ThreadState, use that one
908  // again.
909  DocumentsWriterThreadState state = threadBindings[ThreadClass.Current()];
910  if (state == null)
911  {
912 
913  // First time this thread has called us since last
914  // flush. Find the least loaded thread state:
915  DocumentsWriterThreadState minThreadState = null;
916  for (int i = 0; i < threadStates.Length; i++)
917  {
918  DocumentsWriterThreadState ts = threadStates[i];
919  if (minThreadState == null || ts.numThreads < minThreadState.numThreads)
920  minThreadState = ts;
921  }
922  if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.Length >= MAX_THREAD_STATE))
923  {
924  state = minThreadState;
925  state.numThreads++;
926  }
927  else
928  {
929  // Just create a new "private" thread state
930  DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1 + threadStates.Length];
931  if (threadStates.Length > 0)
932  Array.Copy(threadStates, 0, newArray, 0, threadStates.Length);
933  state = newArray[threadStates.Length] = new DocumentsWriterThreadState(this);
934  threadStates = newArray;
935  }
936  threadBindings[ThreadClass.Current()] = state;
937  }
938 
939  // Next, wait until my thread state is idle (in case
940  // it's shared with other threads) and for threads to
941  // not be paused nor a flush pending:
942  WaitReady(state);
943 
944  // Allocate segment name if this is the first doc since
945  // last flush:
946  InitSegmentName(false);
947 
948  state.isIdle = false;
949 
950  bool success = false;
951  try
952  {
953  state.docState.docID = nextDocID;
954 
955  System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init start"));
956 
957  if (delTerm != null)
958  {
959  AddDeleteTerm(delTerm, state.docState.docID);
960  state.doFlushAfter = TimeToFlushDeletes();
961  }
962 
963  System.Diagnostics.Debug.Assert(writer.TestPoint("DocumentsWriter.ThreadState.init after delTerm"));
964 
965  nextDocID++;
966  numDocsInRAM++;
967 
968  // We must at this point commit to flushing to ensure we
969  // always get N docs when we flush by doc count, even if
970  // > 1 thread is adding documents:
971  if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH && numDocsInRAM >= maxBufferedDocs)
972  {
973  flushPending = true;
974  state.doFlushAfter = true;
975  }
976 
977  success = true;
978  }
979  finally
980  {
981  if (!success)
982  {
983  // Forcefully idle this ThreadState:
984  state.isIdle = true;
985  System.Threading.Monitor.PulseAll(this);
986  if (state.doFlushAfter)
987  {
988  state.doFlushAfter = false;
989  flushPending = false;
990  }
991  }
992  }
993 
994  return state;
995  }
996  }
997 
998  /// <summary>Returns true if the caller (IndexWriter) should now
999  /// flush.
1000  /// </summary>
1001  internal bool AddDocument(Document doc, Analyzer analyzer)
1002  {
1003  return UpdateDocument(doc, analyzer, null);
1004  }
1005 
1006  internal bool UpdateDocument(Term t, Document doc, Analyzer analyzer)
1007  {
1008  return UpdateDocument(doc, analyzer, t);
1009  }
1010 
1011  internal bool UpdateDocument(Document doc, Analyzer analyzer, Term delTerm)
1012  {
1013 
1014  // This call is synchronized but fast
1015  DocumentsWriterThreadState state = GetThreadState(doc, delTerm);
1016 
1017  DocState docState = state.docState;
1018  docState.doc = doc;
1019  docState.analyzer = analyzer;
1020 
1021  bool doReturnFalse = false; // {{Aroush-2.9}} to handle return from finally clause
1022 
1023  bool success = false;
1024  try
1025  {
1026  // This call is not synchronized and does all the
1027  // work
1028  DocWriter perDoc;
1029  try
1030  {
1031  perDoc = state.consumer.ProcessDocument();
1032  }
1033  finally
1034  {
1035  docState.Clear();
1036  }
1037  // This call is synchronized but fast
1038  FinishDocument(state, perDoc);
1039  success = true;
1040  }
1041  finally
1042  {
1043  if (!success)
1044  {
1045  lock (this)
1046  {
1047 
1048  if (aborting)
1049  {
1050  state.isIdle = true;
1051  System.Threading.Monitor.PulseAll(this);
1052  Abort();
1053  }
1054  else
1055  {
1056  skipDocWriter.docID = docState.docID;
1057  bool success2 = false;
1058  try
1059  {
1060  waitQueue.Add(skipDocWriter);
1061  success2 = true;
1062  }
1063  finally
1064  {
1065  if (!success2)
1066  {
1067  state.isIdle = true;
1068  System.Threading.Monitor.PulseAll(this);
1069  Abort();
1070  // return false; // {{Aroush-2.9}} this 'return false' is move to outside finally
1071  doReturnFalse = true;
1072  }
1073  }
1074 
1075  if (!doReturnFalse) // {{Aroush-2.9}} added because of the above 'return false' removal
1076  {
1077  state.isIdle = true;
1078  System.Threading.Monitor.PulseAll(this);
1079 
1080  // If this thread state had decided to flush, we
1081  // must clear it so another thread can flush
1082  if (state.doFlushAfter)
1083  {
1084  state.doFlushAfter = false;
1085  flushPending = false;
1086  System.Threading.Monitor.PulseAll(this);
1087  }
1088 
1089  // Immediately mark this document as deleted
1090  // since likely it was partially added. This
1091  // keeps indexing as "all or none" (atomic) when
1092  // adding a document:
1093  AddDeleteDocID(state.docState.docID);
1094  }
1095  }
1096  }
1097  }
1098  }
1099 
1100  if (doReturnFalse) // {{Aroush-2.9}} see comment abouve
1101  {
1102  return false;
1103  }
1104 
1105  return state.doFlushAfter || TimeToFlushDeletes();
1106  }
1107 
1108  // for testing
1109  internal int GetNumBufferedDeleteTerms()
1110  {
1111  lock (this)
1112  {
1113  return deletesInRAM.numTerms;
1114  }
1115  }
1116 
1117  // for testing
1118  internal IDictionary<Term, BufferedDeletes.Num> GetBufferedDeleteTerms()
1119  {
1120  lock (this)
1121  {
1122  return deletesInRAM.terms;
1123  }
1124  }
1125 
1126  /// <summary>Called whenever a merge has completed and the merged segments had deletions </summary>
1127  internal void RemapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount)
1128  {
1129  lock (this)
1130  {
1131  if (docMaps == null)
1132  // The merged segments had no deletes so docIDs did not change and we have nothing to do
1133  return ;
1134  MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount);
1135  deletesInRAM.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
1136  deletesFlushed.Remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount);
1137  flushedDocCount -= mapper.docShift;
1138  }
1139  }
1140 
1141  private void WaitReady(DocumentsWriterThreadState state)
1142  {
1143  lock (this)
1144  {
1145 
1146  while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting))
1147  {
1148  System.Threading.Monitor.Wait(this);
1149  }
1150 
1151  if (closed)
1152  throw new AlreadyClosedException("this IndexWriter is closed");
1153  }
1154  }
1155 
1156  internal bool BufferDeleteTerms(Term[] terms)
1157  {
1158  lock (this)
1159  {
1160  WaitReady(null);
1161  for (int i = 0; i < terms.Length; i++)
1162  AddDeleteTerm(terms[i], numDocsInRAM);
1163  return TimeToFlushDeletes();
1164  }
1165  }
1166 
1167  internal bool BufferDeleteTerm(Term term)
1168  {
1169  lock (this)
1170  {
1171  WaitReady(null);
1172  AddDeleteTerm(term, numDocsInRAM);
1173  return TimeToFlushDeletes();
1174  }
1175  }
1176 
1177  internal bool BufferDeleteQueries(Query[] queries)
1178  {
1179  lock (this)
1180  {
1181  WaitReady(null);
1182  for (int i = 0; i < queries.Length; i++)
1183  AddDeleteQuery(queries[i], numDocsInRAM);
1184  return TimeToFlushDeletes();
1185  }
1186  }
1187 
1188  internal bool BufferDeleteQuery(Query query)
1189  {
1190  lock (this)
1191  {
1192  WaitReady(null);
1193  AddDeleteQuery(query, numDocsInRAM);
1194  return TimeToFlushDeletes();
1195  }
1196  }
1197 
1198  internal bool DeletesFull()
1199  {
1200  lock (this)
1201  {
1202  return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms));
1203  }
1204  }
1205 
1206  internal bool DoApplyDeletes()
1207  {
1208  lock (this)
1209  {
1210  // Very similar to deletesFull(), except we don't count
1211  // numBytesAlloc, because we are checking whether
1212  // deletes (alone) are consuming too many resources now
1213  // and thus should be applied. We apply deletes if RAM
1214  // usage is > 1/2 of our allowed RAM buffer, to prevent
1215  // too-frequent flushing of a long tail of tiny segments
1216  // when merges (which always apply deletes) are
1217  // infrequent.
1218  return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed) >= ramBufferSize / 2) || (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.Size() + deletesFlushed.Size()) >= maxBufferedDeleteTerms));
1219  }
1220  }
1221 
1222  private bool TimeToFlushDeletes()
1223  {
1224  lock (this)
1225  {
1226  return (bufferIsFull || DeletesFull()) && SetFlushPending();
1227  }
1228  }
1229 
1230  internal int MaxBufferedDeleteTerms
1231  {
1232  set { this.maxBufferedDeleteTerms = value; }
1233  get { return maxBufferedDeleteTerms; }
1234  }
1235 
1236  internal bool HasDeletes()
1237  {
1238  lock (this)
1239  {
1240  return deletesFlushed.Any();
1241  }
1242  }
1243 
1244  internal bool ApplyDeletes(SegmentInfos infos)
1245  {
1246  lock (this)
1247  {
1248  if (!HasDeletes())
1249  return false;
1250 
1251  if (infoStream != null)
1252  Message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + deletesFlushed.docIDs.Count + " deleted docIDs and " + deletesFlushed.queries.Count + " deleted queries on " + (+ infos.Count) + " segments.");
1253 
1254  int infosEnd = infos.Count;
1255 
1256  int docStart = 0;
1257  bool any = false;
1258  for (int i = 0; i < infosEnd; i++)
1259  {
1260 
1261  // Make sure we never attempt to apply deletes to
1262  // segment in external dir
1263  System.Diagnostics.Debug.Assert(infos.Info(i).dir == directory);
1264 
1265  SegmentReader reader = writer.readerPool.Get(infos.Info(i), false);
1266  try
1267  {
1268  any |= ApplyDeletes(reader, docStart);
1269  docStart += reader.MaxDoc;
1270  }
1271  finally
1272  {
1273  writer.readerPool.Release(reader);
1274  }
1275  }
1276 
1277  deletesFlushed.Clear();
1278 
1279  return any;
1280  }
1281  }
1282 
1283  // used only by assert
1284  private Term lastDeleteTerm;
1285 
1286  // used only by assert
1287  private bool CheckDeleteTerm(Term term)
1288  {
1289  if (term != null) {
1290  System.Diagnostics.Debug.Assert(lastDeleteTerm == null || term.CompareTo(lastDeleteTerm) > 0, "lastTerm=" + lastDeleteTerm + " vs term=" + term);
1291  }
1292  lastDeleteTerm = term;
1293  return true;
1294  }
1295 
1296  // Apply buffered delete terms, queries and docIDs to the
1297  // provided reader
1298  private bool ApplyDeletes(IndexReader reader, int docIDStart)
1299  {
1300  lock (this)
1301  {
1302  int docEnd = docIDStart + reader.MaxDoc;
1303  bool any = false;
1304 
1305  System.Diagnostics.Debug.Assert(CheckDeleteTerm(null));
1306 
1307  // Delete by term
1308  TermDocs docs = reader.TermDocs();
1309  try
1310  {
1311  foreach(KeyValuePair<Term, BufferedDeletes.Num> entry in deletesFlushed.terms)
1312  {
1313  Term term = entry.Key;
1314  // LUCENE-2086: we should be iterating a TreeMap,
1315  // here, so terms better be in order:
1316  System.Diagnostics.Debug.Assert(CheckDeleteTerm(term));
1317  docs.Seek(term);
1318  int limit = entry.Value.GetNum();
1319  while (docs.Next())
1320  {
1321  int docID = docs.Doc;
1322  if (docIDStart + docID >= limit)
1323  break;
1324  reader.DeleteDocument(docID);
1325  any = true;
1326  }
1327  }
1328  }
1329  finally
1330  {
1331  docs.Close();
1332  }
1333 
1334  // Delete by docID
1335  foreach(int docIdInt in deletesFlushed.docIDs)
1336  {
1337  int docID = docIdInt;
1338  if (docID >= docIDStart && docID < docEnd)
1339  {
1340  reader.DeleteDocument(docID - docIDStart);
1341  any = true;
1342  }
1343  }
1344 
1345  // Delete by query
1346  IndexSearcher searcher = new IndexSearcher(reader);
1347  foreach(KeyValuePair<Query, int> entry in deletesFlushed.queries)
1348  {
1349  Query query = (Query) entry.Key;
1350  int limit = (int)entry.Value;
1351  Weight weight = query.Weight(searcher);
1352  Scorer scorer = weight.Scorer(reader, true, false);
1353  if (scorer != null)
1354  {
1355  while (true)
1356  {
1357  int doc = scorer.NextDoc();
1358  if (((long) docIDStart) + doc >= limit)
1359  break;
1360  reader.DeleteDocument(doc);
1361  any = true;
1362  }
1363  }
1364  }
1365  searcher.Close();
1366  return any;
1367  }
1368  }
1369 
1370  // Buffer a term in bufferedDeleteTerms, which records the
1371  // current number of documents buffered in ram so that the
1372  // delete term will be applied to those documents as well
1373  // as the disk segments.
1374  private void AddDeleteTerm(Term term, int docCount)
1375  {
1376  lock (this)
1377  {
1378  BufferedDeletes.Num num = deletesInRAM.terms[term];
1379  int docIDUpto = flushedDocCount + docCount;
1380  if (num == null)
1381  deletesInRAM.terms[term] = new BufferedDeletes.Num(docIDUpto);
1382  else
1383  num.SetNum(docIDUpto);
1384  deletesInRAM.numTerms++;
1385 
1386  deletesInRAM.AddBytesUsed(BYTES_PER_DEL_TERM + term.Text.Length * CHAR_NUM_BYTE);
1387  }
1388  }
1389 
1390  // Buffer a specific docID for deletion. Currently only
1391  // used when we hit a exception when adding a document
1392  private void AddDeleteDocID(int docID)
1393  {
1394  lock (this)
1395  {
1396  deletesInRAM.docIDs.Add(flushedDocCount + docID);
1397  deletesInRAM.AddBytesUsed(BYTES_PER_DEL_DOCID);
1398  }
1399  }
1400 
1401  private void AddDeleteQuery(Query query, int docID)
1402  {
1403  lock (this)
1404  {
1405  deletesInRAM.queries[query] = flushedDocCount + docID;
1406  deletesInRAM.AddBytesUsed(BYTES_PER_DEL_QUERY);
1407  }
1408  }
1409 
1410  internal bool DoBalanceRAM()
1411  {
1412  lock (this)
1413  {
1414  return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger);
1415  }
1416  }
1417 
1418  /// <summary>Does the synchronized work to finish/flush the
1419  /// inverted document.
1420  /// </summary>
1421  private void FinishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)
1422  {
1423 
1424  if (DoBalanceRAM())
1425  // Must call this w/o holding synchronized(this) else
1426  // we'll hit deadlock:
1427  BalanceRAM();
1428 
1429  lock (this)
1430  {
1431 
1432  System.Diagnostics.Debug.Assert(docWriter == null || docWriter.docID == perThread.docState.docID);
1433 
1434  if (aborting)
1435  {
1436 
1437  // We are currently aborting, and another thread is
1438  // waiting for me to become idle. We just forcefully
1439  // idle this threadState; it will be fully reset by
1440  // abort()
1441  if (docWriter != null)
1442  try
1443  {
1444  docWriter.Abort();
1445  }
1446  catch (System.Exception)
1447  {
1448  }
1449 
1450  perThread.isIdle = true;
1451  System.Threading.Monitor.PulseAll(this);
1452  return ;
1453  }
1454 
1455  bool doPause;
1456 
1457  if (docWriter != null)
1458  doPause = waitQueue.Add(docWriter);
1459  else
1460  {
1461  skipDocWriter.docID = perThread.docState.docID;
1462  doPause = waitQueue.Add(skipDocWriter);
1463  }
1464 
1465  if (doPause)
1466  WaitForWaitQueue();
1467 
1468  if (bufferIsFull && !flushPending)
1469  {
1470  flushPending = true;
1471  perThread.doFlushAfter = true;
1472  }
1473 
1474  perThread.isIdle = true;
1475  System.Threading.Monitor.PulseAll(this);
1476  }
1477  }
1478 
1479  internal void WaitForWaitQueue()
1480  {
1481  lock (this)
1482  {
1483  do
1484  {
1485  System.Threading.Monitor.Wait(this);
1486  }
1487  while (!waitQueue.DoResume());
1488  }
1489  }
1490 
1491  internal class SkipDocWriter:DocWriter
1492  {
1493  public override void Finish()
1494  {
1495  }
1496  public override void Abort()
1497  {
1498  }
1499  public override long SizeInBytes()
1500  {
1501  return 0;
1502  }
1503  }
1504  internal SkipDocWriter skipDocWriter;
1505 
1506  internal long GetRAMUsed()
1507  {
1508  return numBytesUsed + deletesInRAM.bytesUsed + deletesFlushed.bytesUsed;
1509  }
1510 
1511  internal long numBytesAlloc;
1512  internal long numBytesUsed;
1513 
1514  internal System.Globalization.NumberFormatInfo nf = System.Globalization.CultureInfo.CurrentCulture.NumberFormat;
1515 
1516  // Coarse estimates used to measure RAM usage of buffered deletes
1517  internal const int OBJECT_HEADER_BYTES = 8;
1518  internal static readonly int POINTER_NUM_BYTE;
1519  internal const int INT_NUM_BYTE = 4;
1520  internal const int CHAR_NUM_BYTE = 2;
1521 
1522  /* Rough logic: HashMap has an array[Entry] w/ varying
1523  load factor (say 2 * POINTER). Entry is object w/ Term
1524  key, BufferedDeletes.Num val, int hash, Entry next
1525  (OBJ_HEADER + 3*POINTER + INT). Term is object w/
1526  String field and String text (OBJ_HEADER + 2*POINTER).
1527  We don't count Term's field since it's interned.
1528  Term's text is String (OBJ_HEADER + 4*INT + POINTER +
1529  OBJ_HEADER + string.length*CHAR). BufferedDeletes.num is
1530  OBJ_HEADER + INT. */
1531 
1532  internal static readonly int BYTES_PER_DEL_TERM = 8 * POINTER_NUM_BYTE + 5 * OBJECT_HEADER_BYTES + 6 * INT_NUM_BYTE;
1533 
1534  /* Rough logic: del docIDs are List<Integer>. Say list
1535  allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
1536  + int */
1537  internal static readonly int BYTES_PER_DEL_DOCID = 2 * POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
1538 
1539  /* Rough logic: HashMap has an array[Entry] w/ varying
1540  load factor (say 2 * POINTER). Entry is object w/
1541  Query key, Integer val, int hash, Entry next
1542  (OBJ_HEADER + 3*POINTER + INT). Query we often
1543  undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
1544  internal static readonly int BYTES_PER_DEL_QUERY = 5 * POINTER_NUM_BYTE + 2 * OBJECT_HEADER_BYTES + 2 * INT_NUM_BYTE + 24;
1545 
1546  /* Initial chunks size of the shared byte[] blocks used to
1547  store postings data */
1548  internal const int BYTE_BLOCK_SHIFT = 15;
1549  internal static readonly int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
1550  internal static readonly int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
1551  internal static readonly int BYTE_BLOCK_NOT_MASK = ~ BYTE_BLOCK_MASK;
1552 
1553  internal class ByteBlockAllocator : ByteBlockPool.Allocator
1554  {
1555  public ByteBlockAllocator(DocumentsWriter enclosingInstance, int blockSize)
1556  {
1557  this.blockSize = blockSize;
1558  InitBlock(enclosingInstance);
1559  }
1560  private void InitBlock(DocumentsWriter enclosingInstance)
1561  {
1562  this.enclosingInstance = enclosingInstance;
1563  }
1564  private DocumentsWriter enclosingInstance;
1565  public DocumentsWriter Enclosing_Instance
1566  {
1567  get
1568  {
1569  return enclosingInstance;
1570  }
1571 
1572  }
1573 
1574  int blockSize;
1575  internal List<byte[]> freeByteBlocks = new List<byte[]>();
1576 
1577  /* Allocate another byte[] from the shared pool */
1578  public /*internal*/ override byte[] GetByteBlock(bool trackAllocations)
1579  {
1580  lock (Enclosing_Instance)
1581  {
1582  int size = freeByteBlocks.Count;
1583  byte[] b;
1584  if (0 == size)
1585  {
1586  // Always record a block allocated, even if
1587  // trackAllocations is false. This is necessary
1588  // because this block will be shared between
1589  // things that don't track allocations (term
1590  // vectors) and things that do (freq/prox
1591  // postings).
1592  Enclosing_Instance.numBytesAlloc += blockSize;
1593  b = new byte[blockSize];
1594  }
1595  else
1596  {
1597  b = freeByteBlocks[size - 1];
1598  freeByteBlocks.RemoveAt(size - 1);
1599  }
1600  if (trackAllocations)
1601  Enclosing_Instance.numBytesUsed += blockSize;
1602  System.Diagnostics.Debug.Assert(Enclosing_Instance.numBytesUsed <= Enclosing_Instance.numBytesAlloc);
1603  return b;
1604  }
1605  }
1606 
1607  /* Return byte[]'s to the pool */
1608  public /*internal*/ override void RecycleByteBlocks(byte[][] blocks, int start, int end)
1609  {
1610  lock (Enclosing_Instance)
1611  {
1612  for (int i = start; i < end; i++)
1613  {
1614  freeByteBlocks.Add(blocks[i]);
1615  blocks[i] = null;
1616  }
1617  }
1618  }
1619 
1620  public /*internal*/ override void RecycleByteBlocks(IList<byte[]> blocks)
1621  {
1622  lock (Enclosing_Instance)
1623  {
1624  int size = blocks.Count;
1625  for(int i=0;i<size;i++)
1626  freeByteBlocks.Add(blocks[i]);
1627  }
1628  }
1629  }
1630 
1631  /* Initial chunks size of the shared int[] blocks used to
1632  store postings data */
1633  internal const int INT_BLOCK_SHIFT = 13;
1634  internal static readonly int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
1635  internal static readonly int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
1636 
1637  private List<int[]> freeIntBlocks = new List<int[]>();
1638 
1639  /* Allocate another int[] from the shared pool */
1640  internal int[] GetIntBlock(bool trackAllocations)
1641  {
1642  lock (this)
1643  {
1644  int size = freeIntBlocks.Count;
1645  int[] b;
1646  if (0 == size)
1647  {
1648  // Always record a block allocated, even if
1649  // trackAllocations is false. This is necessary
1650  // because this block will be shared between
1651  // things that don't track allocations (term
1652  // vectors) and things that do (freq/prox
1653  // postings).
1654  numBytesAlloc += INT_BLOCK_SIZE * INT_NUM_BYTE;
1655  b = new int[INT_BLOCK_SIZE];
1656  }
1657  else
1658  {
1659  b = freeIntBlocks[size - 1];
1660  freeIntBlocks.RemoveAt(size - 1);
1661  }
1662  if (trackAllocations)
1663  numBytesUsed += INT_BLOCK_SIZE * INT_NUM_BYTE;
1664  System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
1665  return b;
1666  }
1667  }
1668 
1669  internal void BytesAllocated(long numBytes)
1670  {
1671  lock (this)
1672  {
1673  numBytesAlloc += numBytes;
1674  }
1675  }
1676 
1677  internal void BytesUsed(long numBytes)
1678  {
1679  lock (this)
1680  {
1681  numBytesUsed += numBytes;
1682  System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
1683  }
1684  }
1685 
1686  /* Return int[]s to the pool */
1687  internal void RecycleIntBlocks(int[][] blocks, int start, int end)
1688  {
1689  lock (this)
1690  {
1691  for (int i = start; i < end; i++)
1692  {
1693  freeIntBlocks.Add(blocks[i]);
1694  blocks[i] = null;
1695  }
1696  }
1697  }
1698 
1699  internal ByteBlockAllocator byteBlockAllocator;
1700 
1701  internal static int PER_DOC_BLOCK_SIZE = 1024;
1702 
1703  ByteBlockAllocator perDocAllocator;
1704 
1705  /* Initial chunk size of the shared char[] blocks used to
1706  store term text */
1707  internal const int CHAR_BLOCK_SHIFT = 14;
1708  internal static readonly int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
1709  internal static readonly int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
1710 
1711  internal static readonly int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE - 1;
1712 
1713  private List<char[]> freeCharBlocks = new List<char[]>();
1714 
1715  /* Allocate another char[] from the shared pool */
1716  internal char[] GetCharBlock()
1717  {
1718  lock (this)
1719  {
1720  int size = freeCharBlocks.Count;
1721  char[] c;
1722  if (0 == size)
1723  {
1724  numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1725  c = new char[CHAR_BLOCK_SIZE];
1726  }
1727  else
1728  {
1729  c = freeCharBlocks[size - 1];
1730  freeCharBlocks.RemoveAt(size - 1);
1731  }
1732  // We always track allocations of char blocks, for now,
1733  // because nothing that skips allocation tracking
1734  // (currently only term vectors) uses its own char
1735  // blocks.
1736  numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1737  System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
1738  return c;
1739  }
1740  }
1741 
1742  /* Return char[]s to the pool */
1743  internal void RecycleCharBlocks(char[][] blocks, int numBlocks)
1744  {
1745  lock (this)
1746  {
1747  for (int i = 0; i < numBlocks; i++)
1748  {
1749  freeCharBlocks.Add(blocks[i]);
1750  blocks[i] = null;
1751  }
1752  }
1753  }
1754 
1755  internal System.String ToMB(long v)
1756  {
1757  return System.String.Format(nf, "{0:f}", new System.Object[] { (v / 1024F / 1024F) });
1758  }
1759 
1760 
1761  /* We have four pools of RAM: Postings, byte blocks
1762  * (holds freq/prox posting data), char blocks (holds
1763  * characters in the term) and per-doc buffers (stored fields/term vectors).
1764  * Different docs require varying amount of storage from
1765  * these four classes.
1766  *
1767  * For example, docs with many unique single-occurrence
1768  * short terms will use up the Postings RAM and hardly any
1769  * of the other two. Whereas docs with very large terms
1770  * will use alot of char blocks RAM and relatively less of
1771  * the other two. This method just frees allocations from
1772  * the pools once we are over-budget, which balances the
1773  * pools to match the current docs. */
1774  internal void BalanceRAM()
1775  {
1776 
1777  // We flush when we've used our target usage
1778  long flushTrigger = ramBufferSize;
1779 
1780  long deletesRAMUsed = deletesInRAM.bytesUsed + deletesFlushed.bytesUsed;
1781 
1782  if (numBytesAlloc + deletesRAMUsed > freeTrigger)
1783  {
1784 
1785  if (infoStream != null)
1786  Message(
1787  " RAM: now balance allocations: usedMB=" + ToMB(numBytesUsed) +
1788  " vs trigger=" + ToMB(flushTrigger) +
1789  " allocMB=" + ToMB(numBytesAlloc) +
1790  " deletesMB=" + ToMB(deletesRAMUsed) +
1791  " vs trigger=" + ToMB(freeTrigger) +
1792  " byteBlockFree=" + ToMB(byteBlockAllocator.freeByteBlocks.Count * BYTE_BLOCK_SIZE) +
1793  " perDocFree=" + ToMB(perDocAllocator.freeByteBlocks.Count * PER_DOC_BLOCK_SIZE) +
1794  " charBlockFree=" + ToMB(freeCharBlocks.Count * CHAR_BLOCK_SIZE * CHAR_NUM_BYTE));
1795 
1796  long startBytesAlloc = numBytesAlloc + deletesRAMUsed;
1797 
1798  int iter = 0;
1799 
1800  // We free equally from each pool in 32 KB
1801  // chunks until we are below our threshold
1802  // (freeLevel)
1803 
1804  bool any = true;
1805 
1806  while (numBytesAlloc + deletesRAMUsed > freeLevel)
1807  {
1808 
1809  lock (this)
1810  {
1811  if (0 == perDocAllocator.freeByteBlocks.Count
1812  && 0 == byteBlockAllocator.freeByteBlocks.Count
1813  && 0 == freeCharBlocks.Count
1814  && 0 == freeIntBlocks.Count
1815  && !any)
1816  {
1817  // Nothing else to free -- must flush now.
1818  bufferIsFull = numBytesUsed + deletesRAMUsed > flushTrigger;
1819  if (infoStream != null)
1820  {
1821  if (bufferIsFull)
1822  Message(" nothing to free; now set bufferIsFull");
1823  else
1824  Message(" nothing to free");
1825  }
1826  System.Diagnostics.Debug.Assert(numBytesUsed <= numBytesAlloc);
1827  break;
1828  }
1829 
1830  if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.Count > 0)
1831  {
1832  byteBlockAllocator.freeByteBlocks.RemoveAt(byteBlockAllocator.freeByteBlocks.Count - 1);
1833  numBytesAlloc -= BYTE_BLOCK_SIZE;
1834  }
1835 
1836  if ((1 == iter % 5) && freeCharBlocks.Count > 0)
1837  {
1838  freeCharBlocks.RemoveAt(freeCharBlocks.Count - 1);
1839  numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
1840  }
1841 
1842  if ((2 == iter % 5) && freeIntBlocks.Count > 0)
1843  {
1844  freeIntBlocks.RemoveAt(freeIntBlocks.Count - 1);
1845  numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
1846  }
1847 
1848  if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.Count > 0)
1849  {
1850  // Remove upwards of 32 blocks (each block is 1K)
1851  for (int i = 0; i < 32; ++i)
1852  {
1853  perDocAllocator.freeByteBlocks.RemoveAt(perDocAllocator.freeByteBlocks.Count - 1);
1854  numBytesAlloc -= PER_DOC_BLOCK_SIZE;
1855  if (perDocAllocator.freeByteBlocks.Count == 0)
1856  {
1857  break;
1858  }
1859  }
1860  }
1861  }
1862 
1863  if ((4 == iter % 5) && any)
1864  // Ask consumer to free any recycled state
1865  any = consumer.FreeRAM();
1866 
1867  iter++;
1868  }
1869 
1870  if (infoStream != null)
1871  Message(System.String.Format(nf, " after free: freedMB={0:f} usedMB={1:f} allocMB={2:f}",
1872  new System.Object[] { ((startBytesAlloc - numBytesAlloc) / 1024.0 / 1024.0), (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0) }));
1873  }
1874  else
1875  {
1876  // If we have not crossed the 100% mark, but have
1877  // crossed the 95% mark of RAM we are actually
1878  // using, go ahead and flush. This prevents
1879  // over-allocating and then freeing, with every
1880  // flush.
1881  lock (this)
1882  {
1883 
1884  if (numBytesUsed + deletesRAMUsed > flushTrigger)
1885  {
1886  if (infoStream != null)
1887  Message(System.String.Format(nf, " RAM: now flush @ usedMB={0:f} allocMB={1:f} triggerMB={2:f}",
1888  new object[] { (numBytesUsed / 1024.0 / 1024.0), (numBytesAlloc / 1024.0 / 1024.0), (flushTrigger / 1024.0 / 1024.0) }));
1889 
1890  bufferIsFull = true;
1891  }
1892  }
1893  }
1894  }
1895 
1896  internal WaitQueue waitQueue;
1897 
1898  internal class WaitQueue
1899  {
1900  private void InitBlock(DocumentsWriter enclosingInstance)
1901  {
1902  this.enclosingInstance = enclosingInstance;
1903  }
1904  private DocumentsWriter enclosingInstance;
1905  public DocumentsWriter Enclosing_Instance
1906  {
1907  get
1908  {
1909  return enclosingInstance;
1910  }
1911 
1912  }
1913  internal DocWriter[] waiting;
1914  internal int nextWriteDocID;
1915  internal int nextWriteLoc;
1916  internal int numWaiting;
1917  internal long waitingBytes;
1918 
1919  public WaitQueue(DocumentsWriter enclosingInstance)
1920  {
1921  InitBlock(enclosingInstance);
1922  waiting = new DocWriter[10];
1923  }
1924 
1925  internal void Reset()
1926  {
1927  lock (this)
1928  {
1929  // NOTE: nextWriteLoc doesn't need to be reset
1930  System.Diagnostics.Debug.Assert(numWaiting == 0);
1931  System.Diagnostics.Debug.Assert(waitingBytes == 0);
1932  nextWriteDocID = 0;
1933  }
1934  }
1935 
1936  internal bool DoResume()
1937  {
1938  lock (this)
1939  {
1940  return waitingBytes <= Enclosing_Instance.waitQueueResumeBytes;
1941  }
1942  }
1943 
1944  internal bool DoPause()
1945  {
1946  lock (this)
1947  {
1948  return waitingBytes > Enclosing_Instance.waitQueuePauseBytes;
1949  }
1950  }
1951 
1952  internal void Abort()
1953  {
1954  lock (this)
1955  {
1956  int count = 0;
1957  for (int i = 0; i < waiting.Length; i++)
1958  {
1959  DocWriter doc = waiting[i];
1960  if (doc != null)
1961  {
1962  doc.Abort();
1963  waiting[i] = null;
1964  count++;
1965  }
1966  }
1967  waitingBytes = 0;
1968  System.Diagnostics.Debug.Assert(count == numWaiting);
1969  numWaiting = 0;
1970  }
1971  }
1972 
1973  private void WriteDocument(DocWriter doc)
1974  {
1975  System.Diagnostics.Debug.Assert(doc == Enclosing_Instance.skipDocWriter || nextWriteDocID == doc.docID);
1976  bool success = false;
1977  try
1978  {
1979  doc.Finish();
1980  nextWriteDocID++;
1981  Enclosing_Instance.numDocsInStore++;
1982  nextWriteLoc++;
1983  System.Diagnostics.Debug.Assert(nextWriteLoc <= waiting.Length);
1984  if (nextWriteLoc == waiting.Length)
1985  nextWriteLoc = 0;
1986  success = true;
1987  }
1988  finally
1989  {
1990  if (!success)
1991  Enclosing_Instance.SetAborting();
1992  }
1993  }
1994 
1995  public bool Add(DocWriter doc)
1996  {
1997  lock (this)
1998  {
1999 
2000  System.Diagnostics.Debug.Assert(doc.docID >= nextWriteDocID);
2001 
2002  if (doc.docID == nextWriteDocID)
2003  {
2004  WriteDocument(doc);
2005  while (true)
2006  {
2007  doc = waiting[nextWriteLoc];
2008  if (doc != null)
2009  {
2010  numWaiting--;
2011  waiting[nextWriteLoc] = null;
2012  waitingBytes -= doc.SizeInBytes();
2013  WriteDocument(doc);
2014  }
2015  else
2016  break;
2017  }
2018  }
2019  else
2020  {
2021 
2022  // I finished before documents that were added
2023  // before me. This can easily happen when I am a
2024  // small doc and the docs before me were large, or,
2025  // just due to luck in the thread scheduling. Just
2026  // add myself to the queue and when that large doc
2027  // finishes, it will flush me:
2028  int gap = doc.docID - nextWriteDocID;
2029  if (gap >= waiting.Length)
2030  {
2031  // Grow queue
2032  DocWriter[] newArray = new DocWriter[ArrayUtil.GetNextSize(gap)];
2033  System.Diagnostics.Debug.Assert(nextWriteLoc >= 0);
2034  Array.Copy(waiting, nextWriteLoc, newArray, 0, waiting.Length - nextWriteLoc);
2035  Array.Copy(waiting, 0, newArray, waiting.Length - nextWriteLoc, nextWriteLoc);
2036  nextWriteLoc = 0;
2037  waiting = newArray;
2038  gap = doc.docID - nextWriteDocID;
2039  }
2040 
2041  int loc = nextWriteLoc + gap;
2042  if (loc >= waiting.Length)
2043  loc -= waiting.Length;
2044 
2045  // We should only wrap one time
2046  System.Diagnostics.Debug.Assert(loc < waiting.Length);
2047 
2048  // Nobody should be in my spot!
2049  System.Diagnostics.Debug.Assert(waiting [loc] == null);
2050  waiting[loc] = doc;
2051  numWaiting++;
2052  waitingBytes += doc.SizeInBytes();
2053  }
2054 
2055  return DoPause();
2056  }
2057  }
2058  }
2059  static DocumentsWriter()
2060  {
2061  DefaultIndexingChain = new AnonymousClassIndexingChain();
2062  POINTER_NUM_BYTE = Constants.JRE_IS_64BIT?8:4;
2063  }
2064 
2065  public static int BYTE_BLOCK_SIZE_ForNUnit
2066  {
2067  get { return BYTE_BLOCK_SIZE; }
2068  }
2069 
2070  public static int CHAR_BLOCK_SIZE_ForNUnit
2071  {
2072  get { return CHAR_BLOCK_SIZE; }
2073  }
2074  }
2075 }