Lucene.Net  3.0.3
Lucene.Net is a .NET port of the Java Lucene Indexing Library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Properties
ConcurrentMergeScheduler.cs
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 using System.Collections.Generic;
19 using Lucene.Net.Support;
20 using Directory = Lucene.Net.Store.Directory;
21 
22 namespace Lucene.Net.Index
23 {
24 
34 
36  {
37 
38  private int mergeThreadPriority = - 1;
39 
40  protected internal IList<MergeThread> mergeThreads = new List<MergeThread>();
41 
42  // Max number of threads allowed to be merging at once
43  private int _maxThreadCount = 1;
44 
45  protected internal Directory dir;
46 
47  private bool closed;
48  protected internal IndexWriter writer;
49  protected internal int mergeThreadCount;
50 
52  {
53  if (allInstances != null)
54  {
55  // Only for testing
56  AddMyself();
57  }
58  }
59 
66  public virtual int MaxThreadCount
67  {
68  set
69  {
70  if (value < 1)
71  throw new System.ArgumentException("count should be at least 1");
72  _maxThreadCount = value;
73  }
74  get { return _maxThreadCount; }
75  }
76 
82  [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate")]
83  public virtual int GetMergeThreadPriority()
84  {
85  lock (this)
86  {
87  InitMergeThreadPriority();
88  return mergeThreadPriority;
89  }
90  }
91 
93  public virtual void SetMergeThreadPriority(int pri)
94  {
95  lock (this)
96  {
97  if (pri > (int) System.Threading.ThreadPriority.Highest || pri < (int) System.Threading.ThreadPriority.Lowest)
98  throw new System.ArgumentException("priority must be in range " + (int) System.Threading.ThreadPriority.Lowest + " .. " + (int) System.Threading.ThreadPriority.Highest + " inclusive");
99  mergeThreadPriority = pri;
100 
101  int numThreads = MergeThreadCount();
102  for (int i = 0; i < numThreads; i++)
103  {
104  MergeThread merge = mergeThreads[i];
105  merge.SetThreadPriority(pri);
106  }
107  }
108  }
109 
110  private bool Verbose()
111  {
112  return writer != null && writer.Verbose;
113  }
114 
115  private void Message(System.String message)
116  {
117  if (Verbose())
118  writer.Message("CMS: " + message);
119  }
120 
121  private void InitMergeThreadPriority()
122  {
123  lock (this)
124  {
125  if (mergeThreadPriority == - 1)
126  {
127  // Default to slightly higher priority than our
128  // calling thread
129  mergeThreadPriority = 1 + (System.Int32) ThreadClass.Current().Priority;
130  if (mergeThreadPriority > (int) System.Threading.ThreadPriority.Highest)
131  mergeThreadPriority = (int) System.Threading.ThreadPriority.Highest;
132  }
133  }
134  }
135 
136  protected override void Dispose(bool disposing)
137  {
138  //if (disposing)
139  //{
140  closed = true;
141  //}
142  }
143 
144  public virtual void Sync()
145  {
146  lock (this)
147  {
148  while (MergeThreadCount() > 0)
149  {
150  if (Verbose())
151  Message("now wait for threads; currently " + mergeThreads.Count + " still running");
152  int count = mergeThreads.Count;
153  if (Verbose())
154  {
155  for (int i = 0; i < count; i++)
156  Message(" " + i + ": " + mergeThreads[i]);
157  }
158 
159  System.Threading.Monitor.Wait(this);
160 
161  }
162  }
163  }
164 
165  private int MergeThreadCount()
166  {
167  lock (this)
168  {
169  int count = 0;
170  int numThreads = mergeThreads.Count;
171  for (int i = 0; i < numThreads; i++)
172  {
173  if (mergeThreads[i].IsAlive)
174  {
175  count++;
176  }
177  }
178  return count;
179  }
180  }
181 
182  public override void Merge(IndexWriter writer)
183  {
184  // TODO: .NET doesn't support this
185  // assert !Thread.holdsLock(writer);
186 
187  this.writer = writer;
188 
189  InitMergeThreadPriority();
190 
191  dir = writer.Directory;
192 
193  // First, quickly run through the newly proposed merges
194  // and add any orthogonal merges (ie a merge not
195  // involving segments already pending to be merged) to
196  // the queue. If we are way behind on merging, many of
197  // these newly proposed merges will likely already be
198  // registered.
199 
200  if (Verbose())
201  {
202  Message("now merge");
203  Message(" index: " + writer.SegString());
204  }
205 
206  // Iterate, pulling from the IndexWriter's queue of
207  // pending merges, until it's empty:
208  while (true)
209  {
210  // TODO: we could be careful about which merges to do in
211  // the BG (eg maybe the "biggest" ones) vs FG, which
212  // merges to do first (the easiest ones?), etc.
213 
214  MergePolicy.OneMerge merge = writer.GetNextMerge();
215  if (merge == null)
216  {
217  if (Verbose())
218  Message(" no more merges pending; now return");
219  return ;
220  }
221 
222  // We do this w/ the primary thread to keep
223  // deterministic assignment of segment names
224  writer.MergeInit(merge);
225 
226  bool success = false;
227  try
228  {
229  lock (this)
230  {
231  while (MergeThreadCount() >= _maxThreadCount)
232  {
233  if (Verbose())
234  Message(" too many merge threads running; stalling...");
235 
236  System.Threading.Monitor.Wait(this);
237 
238 
239  }
240 
241  if (Verbose())
242  Message(" consider merge " + merge.SegString(dir));
243 
244  System.Diagnostics.Debug.Assert(MergeThreadCount() < _maxThreadCount);
245 
246  // OK to spawn a new merge thread to handle this
247  // merge:
248  MergeThread merger = GetMergeThread(writer, merge);
249  mergeThreads.Add(merger);
250  if (Verbose())
251  Message(" launch new thread [" + merger.Name + "]");
252 
253  merger.Start();
254  success = true;
255  }
256  }
257  finally
258  {
259  if (!success)
260  {
261  writer.MergeFinish(merge);
262  }
263  }
264  }
265  }
266 
268  protected internal virtual void DoMerge(MergePolicy.OneMerge merge)
269  {
270  writer.Merge(merge);
271  }
272 
274  protected internal virtual MergeThread GetMergeThread(IndexWriter writer, MergePolicy.OneMerge merge)
275  {
276  lock (this)
277  {
278  var thread = new MergeThread(this, writer, merge);
279  thread.SetThreadPriority(mergeThreadPriority);
280  thread.IsBackground = true;
281  thread.Name = "Lucene Merge Thread #" + mergeThreadCount++;
282  return thread;
283  }
284  }
285 
286  public /*protected internal*/ class MergeThread:ThreadClass
287  {
288  private void InitBlock(ConcurrentMergeScheduler enclosingInstance)
289  {
290  this.enclosingInstance = enclosingInstance;
291  }
292  private ConcurrentMergeScheduler enclosingInstance;
293  public ConcurrentMergeScheduler Enclosing_Instance
294  {
295  get
296  {
297  return enclosingInstance;
298  }
299 
300  }
301 
302  internal IndexWriter writer;
303  internal MergePolicy.OneMerge startMerge;
304  internal MergePolicy.OneMerge runningMerge;
305 
306  public MergeThread(ConcurrentMergeScheduler enclosingInstance, IndexWriter writer, MergePolicy.OneMerge startMerge)
307  {
308  InitBlock(enclosingInstance);
309  this.writer = writer;
310  this.startMerge = startMerge;
311  }
312 
313  public virtual void SetRunningMerge(MergePolicy.OneMerge merge)
314  {
315  lock (this)
316  {
317  runningMerge = merge;
318  }
319  }
320 
321  public virtual MergePolicy.OneMerge RunningMerge
322  {
323  get
324  {
325  lock (this)
326  {
327  return runningMerge;
328  }
329  }
330  }
331 
332  public virtual void SetThreadPriority(int pri)
333  {
334  try
335  {
336  Priority = (System.Threading.ThreadPriority) pri;
337  }
338  catch (System.NullReferenceException)
339  {
340  // Strangely, Sun's JDK 1.5 on Linux sometimes
341  // throws NPE out of here...
342  }
343  catch (System.Security.SecurityException)
344  {
345  // Ignore this because we will still run fine with
346  // normal thread priority
347  }
348  }
349 
350  override public void Run()
351  {
352 
353  // First time through the while loop we do the merge
354  // that we were started with:
355  MergePolicy.OneMerge merge = this.startMerge;
356 
357  try
358  {
359 
360  if (Enclosing_Instance.Verbose())
361  Enclosing_Instance.Message(" merge thread: start");
362 
363  while (true)
364  {
365  SetRunningMerge(merge);
366  Enclosing_Instance.DoMerge(merge);
367 
368  // Subsequent times through the loop we do any new
369  // merge that writer says is necessary:
370  merge = writer.GetNextMerge();
371  if (merge != null)
372  {
373  writer.MergeInit(merge);
374  if (Enclosing_Instance.Verbose())
375  Enclosing_Instance.Message(" merge thread: do another merge " + merge.SegString(Enclosing_Instance.dir));
376  }
377  else
378  break;
379  }
380 
381  if (Enclosing_Instance.Verbose())
382  Enclosing_Instance.Message(" merge thread: done");
383  }
384  catch (System.Exception exc)
385  {
386  // Ignore the exception if it was due to abort:
387  if (!(exc is MergePolicy.MergeAbortedException))
388  {
389  if (!Enclosing_Instance.suppressExceptions)
390  {
391  // suppressExceptions is normally only set during
392  // testing.
393  Lucene.Net.Index.ConcurrentMergeScheduler.anyExceptions = true;
394  Enclosing_Instance.HandleMergeException(exc);
395  }
396  }
397  }
398  finally
399  {
400  lock (Enclosing_Instance)
401  {
402  System.Threading.Monitor.PulseAll(Enclosing_Instance);
403  Enclosing_Instance.mergeThreads.Remove(this);
404  bool removed = !Enclosing_Instance.mergeThreads.Contains(this);
405  System.Diagnostics.Debug.Assert(removed);
406  }
407  }
408  }
409 
410  public override System.String ToString()
411  {
412  MergePolicy.OneMerge merge = RunningMerge ?? startMerge;
413  return "merge thread: " + merge.SegString(Enclosing_Instance.dir);
414  }
415  }
416 
420  protected internal virtual void HandleMergeException(System.Exception exc)
421  {
422  // When an exception is hit during merge, IndexWriter
423  // removes any partial files and then allows another
424  // merge to run. If whatever caused the error is not
425  // transient then the exception will keep happening,
426  // so, we sleep here to avoid saturating CPU in such
427  // cases:
428  System.Threading.Thread.Sleep(new System.TimeSpan((System.Int64) 10000 * 250));
429 
430  throw new MergePolicy.MergeException(exc, dir);
431  }
432 
433  internal static bool anyExceptions = false;
434 
436  public static bool AnyUnhandledExceptions()
437  {
438  if (allInstances == null)
439  {
440  throw new System.SystemException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
441  }
442  lock (allInstances)
443  {
444  int count = allInstances.Count;
445  // Make sure all outstanding threads are done so we see
446  // any exceptions they may produce:
447  for (int i = 0; i < count; i++)
448  allInstances[i].Sync();
449  bool v = anyExceptions;
450  anyExceptions = false;
451  return v;
452  }
453  }
454 
455  public static void ClearUnhandledExceptions()
456  {
457  lock (allInstances)
458  {
459  anyExceptions = false;
460  }
461  }
462 
464  private void AddMyself()
465  {
466  lock (allInstances)
467  {
468  int size = allInstances.Count;
469  int upto = 0;
470  for (int i = 0; i < size; i++)
471  {
472  ConcurrentMergeScheduler other = allInstances[i];
473  if (!(other.closed && 0 == other.MergeThreadCount()))
474  // Keep this one for now: it still has threads or
475  // may spawn new threads
476  allInstances[upto++] = other;
477  }
478  allInstances.RemoveRange(upto, allInstances.Count - upto);
479  allInstances.Add(this);
480  }
481  }
482 
483  private bool suppressExceptions;
484 
486  public /*internal*/ virtual void SetSuppressExceptions()
487  {
488  suppressExceptions = true;
489  }
490 
492  public /*internal*/ virtual void ClearSuppressExceptions()
493  {
494  suppressExceptions = false;
495  }
496 
498  private static List<ConcurrentMergeScheduler> allInstances;
499  public static void SetTestMode()
500  {
501  allInstances = new List<ConcurrentMergeScheduler>();
502  }
503  }
504 }