18 using System.Collections.Generic;
19 using Lucene.Net.Support;
22 namespace Lucene.Net.Index
38 private int mergeThreadPriority = - 1;
40 protected internal IList<MergeThread> mergeThreads =
new List<MergeThread>();
43 private int _maxThreadCount = 1;
49 protected internal int mergeThreadCount;
53 if (allInstances != null)
66 public virtual int MaxThreadCount
71 throw new System.ArgumentException(
"count should be at least 1");
72 _maxThreadCount = value;
74 get {
return _maxThreadCount; }
82 [System.Diagnostics.CodeAnalysis.SuppressMessage(
"Microsoft.Design",
"CA1024:UsePropertiesWhereAppropriate")]
83 public virtual int GetMergeThreadPriority()
87 InitMergeThreadPriority();
88 return mergeThreadPriority;
93 public virtual void SetMergeThreadPriority(
int pri)
97 if (pri > (
int) System.Threading.ThreadPriority.Highest || pri < (
int) System.Threading.ThreadPriority.Lowest)
98 throw new System.ArgumentException(
"priority must be in range " + (
int) System.Threading.ThreadPriority.Lowest +
" .. " + (
int) System.Threading.ThreadPriority.Highest +
" inclusive");
99 mergeThreadPriority = pri;
101 int numThreads = MergeThreadCount();
102 for (
int i = 0; i < numThreads; i++)
110 private bool Verbose()
112 return writer != null && writer.Verbose;
115 private void Message(System.String message)
118 writer.Message(
"CMS: " + message);
121 private void InitMergeThreadPriority()
125 if (mergeThreadPriority == - 1)
130 if (mergeThreadPriority > (
int) System.Threading.ThreadPriority.Highest)
131 mergeThreadPriority = (int) System.Threading.ThreadPriority.Highest;
136 protected override void Dispose(
bool disposing)
144 public virtual void Sync()
148 while (MergeThreadCount() > 0)
151 Message(
"now wait for threads; currently " + mergeThreads.Count +
" still running");
152 int count = mergeThreads.Count;
155 for (
int i = 0; i < count; i++)
156 Message(
" " + i +
": " + mergeThreads[i]);
159 System.Threading.Monitor.Wait(
this);
165 private int MergeThreadCount()
170 int numThreads = mergeThreads.Count;
171 for (
int i = 0; i < numThreads; i++)
173 if (mergeThreads[i].IsAlive)
187 this.writer = writer;
189 InitMergeThreadPriority();
202 Message(
"now merge");
203 Message(
" index: " + writer.
SegString());
214 MergePolicy.OneMerge merge = writer.GetNextMerge();
218 Message(
" no more merges pending; now return");
224 writer.MergeInit(merge);
226 bool success =
false;
231 while (MergeThreadCount() >= _maxThreadCount)
234 Message(
" too many merge threads running; stalling...");
236 System.Threading.Monitor.Wait(
this);
242 Message(
" consider merge " + merge.SegString(dir));
244 System.Diagnostics.Debug.Assert(MergeThreadCount() < _maxThreadCount);
248 MergeThread merger = GetMergeThread(writer, merge);
249 mergeThreads.Add(merger);
251 Message(
" launch new thread [" + merger.
Name +
"]");
261 writer.MergeFinish(merge);
268 protected internal virtual void DoMerge(
MergePolicy.OneMerge merge)
274 protected internal virtual MergeThread GetMergeThread(
IndexWriter writer, MergePolicy.OneMerge merge)
278 var thread =
new MergeThread(
this, writer, merge);
279 thread.SetThreadPriority(mergeThreadPriority);
280 thread.IsBackground =
true;
281 thread.Name =
"Lucene Merge Thread #" + mergeThreadCount++;
290 this.enclosingInstance = enclosingInstance;
297 return enclosingInstance;
308 InitBlock(enclosingInstance);
309 this.writer = writer;
310 this.startMerge = startMerge;
317 runningMerge = merge;
332 public virtual void SetThreadPriority(
int pri)
336 Priority = (System.Threading.ThreadPriority) pri;
338 catch (System.NullReferenceException)
343 catch (System.Security.SecurityException)
350 override public void Run()
360 if (Enclosing_Instance.Verbose())
361 Enclosing_Instance.Message(
" merge thread: start");
365 SetRunningMerge(merge);
366 Enclosing_Instance.DoMerge(merge);
370 merge = writer.GetNextMerge();
373 writer.MergeInit(merge);
374 if (Enclosing_Instance.Verbose())
375 Enclosing_Instance.Message(
" merge thread: do another merge " + merge.SegString(Enclosing_Instance.dir));
381 if (Enclosing_Instance.Verbose())
382 Enclosing_Instance.Message(
" merge thread: done");
384 catch (System.Exception exc)
389 if (!Enclosing_Instance.suppressExceptions)
393 Lucene.Net.Index.ConcurrentMergeScheduler.anyExceptions =
true;
394 Enclosing_Instance.HandleMergeException(exc);
400 lock (Enclosing_Instance)
402 System.Threading.Monitor.PulseAll(Enclosing_Instance);
403 Enclosing_Instance.mergeThreads.Remove(
this);
404 bool removed = !Enclosing_Instance.mergeThreads.Contains(
this);
405 System.Diagnostics.Debug.Assert(removed);
410 public override System.String ToString()
412 MergePolicy.OneMerge merge = RunningMerge ?? startMerge;
413 return "merge thread: " + merge.SegString(Enclosing_Instance.dir);
420 protected internal virtual void HandleMergeException(System.Exception exc)
428 System.Threading.Thread.Sleep(
new System.TimeSpan((System.Int64) 10000 * 250));
433 internal static bool anyExceptions =
false;
436 public static bool AnyUnhandledExceptions()
438 if (allInstances == null)
440 throw new System.SystemException(
"setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
444 int count = allInstances.Count;
447 for (
int i = 0; i < count; i++)
448 allInstances[i].Sync();
449 bool v = anyExceptions;
450 anyExceptions =
false;
455 public static void ClearUnhandledExceptions()
459 anyExceptions =
false;
464 private void AddMyself()
468 int size = allInstances.Count;
470 for (
int i = 0; i < size; i++)
473 if (!(other.closed && 0 == other.MergeThreadCount()))
476 allInstances[upto++] = other;
478 allInstances.RemoveRange(upto, allInstances.Count - upto);
479 allInstances.Add(
this);
483 private bool suppressExceptions;
486 public virtual void SetSuppressExceptions()
488 suppressExceptions =
true;
492 public virtual void ClearSuppressExceptions()
494 suppressExceptions =
false;
498 private static List<ConcurrentMergeScheduler> allInstances;
499 public static void SetTestMode()
501 allInstances =
new List<ConcurrentMergeScheduler>();