[DS-739] Batch processing mode extended to itemimport, filter media. Configurable option for batch size added

git-svn-id: http://scm.dspace.org/svn/repo/dspace/trunk@5726 9c30dcfa-912a-0410-8fc2-9e0234be79fd
This commit is contained in:
Graham Triggs
2010-11-04 22:34:25 +00:00
parent f0537cc3db
commit 2d3a8e6b21
7 changed files with 714 additions and 616 deletions

View File

@@ -75,6 +75,7 @@ import org.dspace.core.Context;
import org.dspace.eperson.EPerson; import org.dspace.eperson.EPerson;
import org.dspace.eperson.Group; import org.dspace.eperson.Group;
import org.dspace.handle.HandleManager; import org.dspace.handle.HandleManager;
import org.dspace.search.DSIndexer;
import org.dspace.workflow.WorkflowManager; import org.dspace.workflow.WorkflowManager;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.NamedNodeMap; import org.w3c.dom.NamedNodeMap;
@@ -135,6 +136,10 @@ public class ItemImport
public static void main(String[] argv) throws Exception public static void main(String[] argv) throws Exception
{
DSIndexer.setBatchProcessingMode(true);
try
{ {
// create an options object and populate it // create an options object and populate it
CommandLineParser parser = new PosixParser(); CommandLineParser parser = new PosixParser();
@@ -566,6 +571,11 @@ public class ItemImport
} }
System.exit(status); System.exit(status);
} }
finally
{
DSIndexer.setBatchProcessingMode(false);
}
}
private void addItems(Context c, Collection[] mycollections, private void addItems(Context c, Collection[] mycollections,
String sourceDir, String mapFile, boolean template) throws Exception String sourceDir, String mapFile, boolean template) throws Exception

View File

@@ -393,8 +393,16 @@ public class MediaFilterManager
{ {
System.out.println("Updating search index:"); System.out.println("Updating search index:");
} }
DSIndexer.setBatchProcessingMode(true);
try
{
DSIndexer.updateIndex(c); DSIndexer.updateIndex(c);
} }
finally
{
DSIndexer.setBatchProcessingMode(false);
}
}
c.complete(); c.complete();
c = null; c = null;

View File

@@ -119,6 +119,9 @@ public class DSIndexer
private static final long WRITE_LOCK_TIMEOUT = 30000 /* 30 sec */; private static final long WRITE_LOCK_TIMEOUT = 30000 /* 30 sec */;
private static int batchFlushAfterDocuments = ConfigurationManager.getIntProperty("search.batch.documents", 20);
private static boolean batchProcessingMode = false;
// Class to hold the index configuration (one instance per config line) // Class to hold the index configuration (one instance per config line)
private static class IndexConfig private static class IndexConfig
{ {
@@ -248,6 +251,15 @@ public class DSIndexer
} }
} }
public static void setBatchProcessingMode(boolean mode)
{
batchProcessingMode = mode;
if (mode == false)
{
flushIndexingTaskQueue();
}
}
/** /**
* If the handle for the "dso" already exists in the index, and * If the handle for the "dso" already exists in the index, and
* the "dso" has a lastModified timestamp that is newer than * the "dso" has a lastModified timestamp that is newer than
@@ -277,29 +289,28 @@ public class DSIndexer
*/ */
public static void indexContent(Context context, DSpaceObject dso, boolean force) throws SQLException public static void indexContent(Context context, DSpaceObject dso, boolean force) throws SQLException
{ {
String handle = dso.getHandle();
IndexingAction action = null;
IndexWriter writer = null;
try try
{ {
action = prepareIndexingAction(dso, force); IndexingTask task = prepareIndexingTask(dso, force);
if (task != null)
if (action != null) {
if (batchProcessingMode)
{
addToIndexingTaskQueue(task);
}
else
{
IndexWriter writer = null;
try
{ {
writer = openIndex(false); writer = openIndex(false);
processIndexingAction(writer, action); processIndexingTask(writer, task);
}
} catch (Exception e)
{
log.error(e.getMessage(), e);
} }
finally finally
{ {
if (action != null && action.getDocument() != null) if (task.getDocument() != null)
{ {
closeAllReaders(action.getDocument()); closeAllReaders(task.getDocument());
} }
if (writer != null) if (writer != null)
@@ -315,6 +326,13 @@ public class DSIndexer
} }
} }
} }
}
}
catch (IOException e)
{
log.error(e);
}
}
/** /**
* unIndex removes an Item, Collection, or Community only works if the * unIndex removes an Item, Collection, or Community only works if the
@@ -346,22 +364,31 @@ public class DSIndexer
* @throws SQLException * @throws SQLException
* @throws IOException * @throws IOException
*/ */
public static void unIndexContent(Context context, String handle) public static void unIndexContent(Context context, String handle) throws SQLException, IOException
throws SQLException, IOException
{ {
if (handle != null) if (handle != null)
{
IndexingTask task = new IndexingTask(IndexingTask.Action.DELETE, new Term("handle", handle), null);
if (task != null)
{
if (batchProcessingMode)
{
addToIndexingTaskQueue(task);
}
else
{ {
IndexWriter writer = openIndex(false); IndexWriter writer = openIndex(false);
try try
{ {
// we have a handle (our unique ID, so remove) processIndexingTask(writer, task);
processIndexingAction(writer, new IndexingAction(IndexingAction.Action.DELETE, new Term("handle", handle), null));
} }
finally finally
{ {
writer.close(); writer.close();
} }
} }
}
}
else else
{ {
log.warn("unindex of content with null handle attempted"); log.warn("unindex of content with null handle attempted");
@@ -372,8 +399,6 @@ public class DSIndexer
} }
} }
/** /**
* reIndexContent removes something from the index, then re-indexes it * reIndexContent removes something from the index, then re-indexes it
* *
@@ -407,7 +432,6 @@ public class DSIndexer
/* Reindex all content preemptively. */ /* Reindex all content preemptively. */
DSIndexer.updateIndex(c, true); DSIndexer.updateIndex(c, true);
} }
/** /**
@@ -424,6 +448,7 @@ public class DSIndexer
try try
{ {
flushIndexingTaskQueue(writer);
writer.optimize(); writer.optimize();
} }
finally finally
@@ -443,7 +468,10 @@ public class DSIndexer
*/ */
public static void main(String[] args) throws SQLException, IOException public static void main(String[] args) throws SQLException, IOException
{ {
Date startTime = new Date();
try
{
setBatchProcessingMode(true);
Context context = new Context(); Context context = new Context();
context.setIgnoreAuthorization(true); context.setIgnoreAuthorization(true);
@@ -528,6 +556,15 @@ public class DSIndexer
log.info("Done with indexing"); log.info("Done with indexing");
} }
finally
{
setBatchProcessingMode(false);
Date endTime = new Date();
System.out.println("Started: " + startTime.getTime());
System.out.println("Ended: " + endTime.getTime());
System.out.println("Elapsed time: " + ((endTime.getTime() - startTime.getTime()) / 1000) + " secs (" + (endTime.getTime() - startTime.getTime()) + " msecs)");
}
}
/** /**
* Iterates over all Items, Collections and Communities. And updates * Iterates over all Items, Collections and Communities. And updates
@@ -562,7 +599,7 @@ public class DSIndexer
for(items = Item.findAll(context);items.hasNext();) for(items = Item.findAll(context);items.hasNext();)
{ {
Item item = (Item) items.next(); Item item = (Item) items.next();
addToIndexingActionQueue(prepareIndexingAction(item, force)); indexContent(context, item);
item.decache(); item.decache();
} }
} }
@@ -574,24 +611,19 @@ public class DSIndexer
} }
} }
Collection[] collections = Collection.findAll(context); for (Collection collection : Collection.findAll(context))
for (int i = 0; i < collections.length; i++)
{ {
addToIndexingActionQueue(prepareIndexingAction(collections[i], force)); indexContent(context, collection);
context.removeCached(collections[i], collections[i].getID()); context.removeCached(collection, collection.getID());
} }
Community[] communities = Community.findAll(context); for (Community community : Community.findAll(context))
for (int i = 0; i < communities.length; i++)
{ {
addToIndexingActionQueue(prepareIndexingAction(communities[i], force)); indexContent(context, community);
context.removeCached(communities[i], communities[i].getID()); context.removeCached(community, community.getID());
} }
flushIndexingActionQueue();
optimizeIndex(context); optimizeIndex(context);
} }
catch(Exception e) catch(Exception e)
{ {
@@ -679,11 +711,11 @@ public class DSIndexer
} }
static IndexingAction prepareIndexingAction(DSpaceObject dso, boolean force) throws SQLException, IOException static IndexingTask prepareIndexingTask(DSpaceObject dso, boolean force) throws SQLException, IOException
{ {
String handle = dso.getHandle(); String handle = dso.getHandle();
Term term = new Term("handle", handle); Term term = new Term("handle", handle);
IndexingAction action = null; IndexingTask action = null;
switch (dso.getType()) switch (dso.getType())
{ {
case Constants.ITEM : case Constants.ITEM :
@@ -694,23 +726,23 @@ public class DSIndexer
if (requiresIndexing(term, ((Item)dso).getLastModified()) || force) if (requiresIndexing(term, ((Item)dso).getLastModified()) || force)
{ {
log.info("Writing Item: " + handle + " to Index"); log.info("Writing Item: " + handle + " to Index");
action = new IndexingAction(IndexingAction.Action.UPDATE, term, buildDocumentForItem((Item)dso)); action = new IndexingTask(IndexingTask.Action.UPDATE, term, buildDocumentForItem((Item)dso));
} }
} }
else else
{ {
action = new IndexingAction(IndexingAction.Action.DELETE, term, null); action = new IndexingTask(IndexingTask.Action.DELETE, term, null);
} }
break; break;
case Constants.COLLECTION : case Constants.COLLECTION :
log.info("Writing Collection: " + handle + " to Index"); log.info("Writing Collection: " + handle + " to Index");
action = new IndexingAction(IndexingAction.Action.UPDATE, term, buildDocumentForCollection((Collection)dso)); action = new IndexingTask(IndexingTask.Action.UPDATE, term, buildDocumentForCollection((Collection)dso));
break; break;
case Constants.COMMUNITY : case Constants.COMMUNITY :
log.info("Writing Community: " + handle + " to Index"); log.info("Writing Community: " + handle + " to Index");
action = new IndexingAction(IndexingAction.Action.UPDATE, term, buildDocumentForCommunity((Community)dso)); action = new IndexingTask(IndexingTask.Action.UPDATE, term, buildDocumentForCommunity((Community)dso));
break; break;
default : default :
@@ -719,7 +751,7 @@ public class DSIndexer
return action; return action;
} }
static void processIndexingAction(IndexWriter writer, IndexingAction action) throws IOException static void processIndexingTask(IndexWriter writer, IndexingTask action) throws IOException
{ {
if (action != null) if (action != null)
{ {
@@ -734,21 +766,21 @@ public class DSIndexer
} }
} }
private static List<IndexingAction> actionQueue = new ArrayList<IndexingAction>(); private static List<IndexingTask> actionQueue = new ArrayList<IndexingTask>();
static void addToIndexingActionQueue(IndexingAction action) static synchronized void addToIndexingTaskQueue(IndexingTask action)
{ {
if (action != null) if (action != null)
{ {
actionQueue.add(action); actionQueue.add(action);
if (actionQueue.size() > 10) if (actionQueue.size() >= batchFlushAfterDocuments)
{ {
flushIndexingActionQueue(); flushIndexingTaskQueue();
} }
} }
} }
static synchronized void flushIndexingActionQueue() static void flushIndexingTaskQueue()
{ {
if (actionQueue.size() > 0) if (actionQueue.size() > 0)
{ {
@@ -757,22 +789,7 @@ public class DSIndexer
try try
{ {
writer = openIndex(false); writer = openIndex(false);
for (IndexingAction action : actionQueue) flushIndexingTaskQueue(writer);
{
try
{
processIndexingAction(writer, action);
}
finally
{
if (action.getDocument() != null)
{
closeAllReaders(action.getDocument());
}
}
}
actionQueue.clear();
} }
catch (IOException e) catch (IOException e)
{ {
@@ -792,10 +809,33 @@ public class DSIndexer
} }
} }
} }
} }
} }
private static synchronized void flushIndexingTaskQueue(IndexWriter writer)
{
for (IndexingTask action : actionQueue)
{
try
{
processIndexingTask(writer, action);
}
catch (IOException e)
{
log.error(e);
}
finally
{
if (action.getDocument() != null)
{
closeAllReaders(action.getDocument());
}
}
}
actionQueue.clear();
}
//////////////////////////////////// ////////////////////////////////////
// Private // Private
//////////////////////////////////// ////////////////////////////////////

View File

@@ -1,45 +0,0 @@
package org.dspace.search;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
class IndexingAction
{
enum Action { ADD, UPDATE, DELETE };
private Action action;
private Term term;
private Document doc;
IndexingAction(Action pAction, Term pTerm, Document pDoc)
{
action = pAction;
term = pTerm;
doc = pDoc;
}
boolean isAdd()
{
return action == Action.ADD;
}
boolean isDelete()
{
return action == Action.DELETE;
}
boolean isUpdate()
{
return action == Action.UPDATE;
}
Term getTerm()
{
return term;
}
Document getDocument()
{
return doc;
}
}

View File

@@ -0,0 +1,84 @@
/*
* IndexingTask.java
*
* Version: $Revision: 5724 $
*
* Date: $Date: 2010-11-04 17:34:12 +0000 (Thu, 04 Nov 2010) $
*
* Copyright (c) 2002-2005, Hewlett-Packard Company and Massachusetts
* Institute of Technology. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* - Neither the name of the Hewlett-Packard Company nor the name of the
* Massachusetts Institute of Technology nor the names of their
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*/
package org.dspace.search;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
class IndexingTask
{
enum Action { ADD, UPDATE, DELETE };
private Action action;
private Term term;
private Document doc;
IndexingTask(Action pAction, Term pTerm, Document pDoc)
{
action = pAction;
term = pTerm;
doc = pDoc;
}
boolean isAdd()
{
return action == Action.ADD;
}
boolean isDelete()
{
return action == Action.DELETE;
}
boolean isUpdate()
{
return action == Action.UPDATE;
}
Term getTerm()
{
return term;
}
Document getDocument()
{
return doc;
}
}

View File

@@ -79,6 +79,7 @@
- [DS-733] Load testing utilities - [DS-733] Load testing utilities
- [DS-734] Improve database efficiency - [DS-734] Improve database efficiency
- [DS-736] ItemImport usage and efficiency improvements - [DS-736] ItemImport usage and efficiency improvements
- [DS-739] Lucene indexing efficiency improvements
(Graham Triggs, Mark Wood) (Graham Triggs, Mark Wood)
- [DS-707] General improvements to performance, bug fixes and style - [DS-707] General improvements to performance, bug fixes and style

View File

@@ -360,12 +360,12 @@
<dependency> <dependency>
<groupId>org.apache.lucene</groupId> <groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId> <artifactId>lucene-core</artifactId>
<version>2.3.0</version> <version>2.9.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.lucene</groupId> <groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers</artifactId> <artifactId>lucene-analyzers</artifactId>
<version>2.3.0</version> <version>2.9.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.dspace</groupId> <groupId>org.dspace</groupId>