From 2d3a8e6b21c7a40c03509b48195cae6e872b39c0 Mon Sep 17 00:00:00 2001 From: Graham Triggs Date: Thu, 4 Nov 2010 22:34:25 +0000 Subject: [PATCH] [DS-739] Batch processing mode extended to itemimport, filter media. Configurable option for batch size added git-svn-id: http://scm.dspace.org/svn/repo/dspace/trunk@5726 9c30dcfa-912a-0410-8fc2-9e0234be79fd --- .../org/dspace/app/itemimport/ItemImport.java | 816 +++++++++--------- .../app/mediafilter/MediaFilterManager.java | 10 +- .../java/org/dspace/search/DSIndexer.java | 370 ++++---- .../org/dspace/search/IndexingAction.java | 45 - .../java/org/dspace/search/IndexingTask.java | 84 ++ dspace/CHANGES | 1 + pom.xml | 4 +- 7 files changed, 714 insertions(+), 616 deletions(-) delete mode 100644 dspace-api/src/main/java/org/dspace/search/IndexingAction.java create mode 100644 dspace-api/src/main/java/org/dspace/search/IndexingTask.java diff --git a/dspace-api/src/main/java/org/dspace/app/itemimport/ItemImport.java b/dspace-api/src/main/java/org/dspace/app/itemimport/ItemImport.java index ee28a65749..2e06735119 100755 --- a/dspace-api/src/main/java/org/dspace/app/itemimport/ItemImport.java +++ b/dspace-api/src/main/java/org/dspace/app/itemimport/ItemImport.java @@ -75,6 +75,7 @@ import org.dspace.core.Context; import org.dspace.eperson.EPerson; import org.dspace.eperson.Group; import org.dspace.handle.HandleManager; +import org.dspace.search.DSIndexer; import org.dspace.workflow.WorkflowManager; import org.w3c.dom.Document; import org.w3c.dom.NamedNodeMap; @@ -136,435 +137,444 @@ public class ItemImport public static void main(String[] argv) throws Exception { - // create an options object and populate it - CommandLineParser parser = new PosixParser(); - - Options options = new Options(); - - options.addOption("a", "add", false, "add items to DSpace"); - options.addOption("r", "replace", false, "replace items in mapfile"); - options.addOption("d", "delete", false, - "delete items listed in mapfile"); - options.addOption("s", "source", true, "source of items (directory)"); - options.addOption("z", "zip", true, "name of zip file"); - options.addOption("c", "collection", true, - "destination collection(s) Handle or database ID"); - options.addOption("m", "mapfile", true, "mapfile items in mapfile"); - options.addOption("e", "eperson", true, - "email of eperson doing importing"); - options.addOption("w", "workflow", false, - "send submission through collection's workflow"); - options.addOption("n", "notify", false, - "if sending submissions through the workflow, send notification emails"); - options.addOption("t", "test", false, - "test run - do not actually import items"); - options.addOption("p", "template", false, "apply template"); - options.addOption("R", "resume", false, - "resume a failed import (add only)"); - - options.addOption("h", "help", false, "help"); - - CommandLine line = parser.parse(options, argv); - - String command = null; // add replace remove, etc - String sourcedir = null; - String mapfile = null; - String eperson = null; // db ID or email - String[] collections = null; // db ID or handles - int status = 0; - - if (line.hasOption('h')) - { - HelpFormatter myhelp = new HelpFormatter(); - myhelp.printHelp("ItemImport\n", options); - System.out - .println("\nadding items: ItemImport -a -e eperson -c collection -s sourcedir -m mapfile"); - System.out - .println("\nadding items from zip file: ItemImport -a -e eperson -c collection -s sourcedir -z filename.zip -m mapfile"); - System.out - .println("replacing items: ItemImport -r -e eperson -c collection -s sourcedir -m mapfile"); - System.out - .println("deleting items: ItemImport -d -e eperson -m mapfile"); - System.out - .println("If multiple collections are specified, the first collection will be the one that owns the item."); - - System.exit(0); - } - - if (line.hasOption('a')) - { - command = "add"; - } - - if (line.hasOption('r')) - { - command = "replace"; - } - - if (line.hasOption('d')) - { - command = "delete"; - } - - if (line.hasOption('w')) - { - useWorkflow = true; - if (line.hasOption('n')) - { - useWorkflowSendEmail = true; - } - } - - if (line.hasOption('t')) - { - isTest = true; - System.out.println("**Test Run** - not actually importing items."); - } - - if (line.hasOption('p')) - { - template = true; - } - - if (line.hasOption('s')) // source - { - sourcedir = line.getOptionValue('s'); - } - - if (line.hasOption('m')) // mapfile - { - mapfile = line.getOptionValue('m'); - } - - if (line.hasOption('e')) // eperson - { - eperson = line.getOptionValue('e'); - } - - if (line.hasOption('c')) // collections - { - collections = line.getOptionValues('c'); - } - - if (line.hasOption('R')) - { - isResume = true; - System.out - .println("**Resume import** - attempting to import items not already imported"); - } - - boolean zip = false; - String zipfilename = ""; - String ziptempdir = ConfigurationManager.getProperty("org.dspace.app.itemexport.work.dir"); - if (line.hasOption('z')) - { - zip = true; - zipfilename = sourcedir + System.getProperty("file.separator") + line.getOptionValue('z'); - } - - // now validate - // must have a command set - if (command == null) - { - System.out - .println("Error - must run with either add, replace, or remove (run with -h flag for details)"); - System.exit(1); - } - else if ("add".equals(command) || "replace".equals(command)) - { - if (sourcedir == null) - { - System.out - .println("Error - a source directory containing items must be set"); - System.out.println(" (run with -h flag for details)"); - System.exit(1); - } - - if (mapfile == null) - { - System.out - .println("Error - a map file to hold importing results must be specified"); - System.out.println(" (run with -h flag for details)"); - System.exit(1); - } - - if (eperson == null) - { - System.out - .println("Error - an eperson to do the importing must be specified"); - System.out.println(" (run with -h flag for details)"); - System.exit(1); - } - - if (collections == null) - { - System.out - .println("Error - at least one destination collection must be specified"); - System.out.println(" (run with -h flag for details)"); - System.exit(1); - } - } - else if ("delete".equals(command)) - { - if (eperson == null) - { - System.out - .println("Error - an eperson to do the importing must be specified"); - System.exit(1); - } - - if (mapfile == null) - { - System.out.println("Error - a map file must be specified"); - System.exit(1); - } - } - - // can only resume for adds - if (isResume && !"add".equals(command)) - { - System.out - .println("Error - resume option only works with --add command"); - System.exit(1); - } - - // do checks around mapfile - if mapfile exists and 'add' is selected, - // resume must be chosen - File myFile = new File(mapfile); - - if (!isResume && "add".equals(command) && myFile.exists()) - { - System.out.println("Error - the mapfile " + mapfile - + " already exists."); - System.out - .println("Either delete it or use --resume if attempting to resume an aborted import."); - System.exit(1); - } - - // does the zip file exist and can we write to the temp directory - if (zip) - { - File zipfile = new File(sourcedir); - if (!zipfile.canRead()) - { - System.out.println("Zip file '" + sourcedir + "' does not exist, or is not readable."); - System.exit(1); - } - - if (ziptempdir == null) - { - System.out.println("Unable to unzip import file as the key 'org.dspace.app.itemexport.work.dir' is not set in dspace.cfg"); - System.exit(1); - } - zipfile = new File(ziptempdir); - if (!zipfile.isDirectory()) - { - System.out.println("'" + ConfigurationManager.getProperty("org.dspace.app.itemexport.work.dir") + - "' as defined by the key 'org.dspace.app.itemexport.work.dir' in dspace.cfg " + - "is not a valid directory"); - System.exit(1); - } - File tempdir = new File(ziptempdir); - if (!tempdir.exists() && !tempdir.mkdirs()) - { - log.error("Unable to create temporary directory"); - } - sourcedir = ziptempdir + System.getProperty("file.separator") + line.getOptionValue("z"); - ziptempdir = ziptempdir + System.getProperty("file.separator") + - line.getOptionValue("z") + System.getProperty("file.separator"); - } - - ItemImport myloader = new ItemImport(); - - // create a context - Context c = new Context(); - - // find the EPerson, assign to context - EPerson myEPerson = null; - - if (eperson.indexOf('@') != -1) - { - // @ sign, must be an email - myEPerson = EPerson.findByEmail(c, eperson); - } - else - { - myEPerson = EPerson.find(c, Integer.parseInt(eperson)); - } - - if (myEPerson == null) - { - System.out.println("Error, eperson cannot be found: " + eperson); - System.exit(1); - } - - c.setCurrentUser(myEPerson); - - // find collections - Collection[] mycollections = null; - - // don't need to validate collections set if command is "delete" - if (!"delete".equals(command)) - { - System.out.println("Destination collections:"); - - mycollections = new Collection[collections.length]; - - // validate each collection arg to see if it's a real collection - for (int i = 0; i < collections.length; i++) - { - // is the ID a handle? - if (collections[i].indexOf('/') != -1) - { - // string has a / so it must be a handle - try and resolve - // it - mycollections[i] = (Collection) HandleManager - .resolveToObject(c, collections[i]); - - // resolved, now make sure it's a collection - if ((mycollections[i] == null) - || (mycollections[i].getType() != Constants.COLLECTION)) - { - mycollections[i] = null; - } - } - // not a handle, try and treat it as an integer collection - // database ID - else if (collections[i] != null) - { - mycollections[i] = Collection.find(c, Integer - .parseInt(collections[i])); - } - - // was the collection valid? - if (mycollections[i] == null) - { - throw new IllegalArgumentException("Cannot resolve " - + collections[i] + " to collection"); - } - - // print progress info - String owningPrefix = ""; - - if (i == 0) - { - owningPrefix = "Owning "; - } - - System.out.println(owningPrefix + " Collection: " - + mycollections[i].getMetadata("name")); - } - } // end of validating collections + DSIndexer.setBatchProcessingMode(true); try { - // If this is a zip archive, unzip it first - if (zip) + // create an options object and populate it + CommandLineParser parser = new PosixParser(); + + Options options = new Options(); + + options.addOption("a", "add", false, "add items to DSpace"); + options.addOption("r", "replace", false, "replace items in mapfile"); + options.addOption("d", "delete", false, + "delete items listed in mapfile"); + options.addOption("s", "source", true, "source of items (directory)"); + options.addOption("z", "zip", true, "name of zip file"); + options.addOption("c", "collection", true, + "destination collection(s) Handle or database ID"); + options.addOption("m", "mapfile", true, "mapfile items in mapfile"); + options.addOption("e", "eperson", true, + "email of eperson doing importing"); + options.addOption("w", "workflow", false, + "send submission through collection's workflow"); + options.addOption("n", "notify", false, + "if sending submissions through the workflow, send notification emails"); + options.addOption("t", "test", false, + "test run - do not actually import items"); + options.addOption("p", "template", false, "apply template"); + options.addOption("R", "resume", false, + "resume a failed import (add only)"); + + options.addOption("h", "help", false, "help"); + + CommandLine line = parser.parse(options, argv); + + String command = null; // add replace remove, etc + String sourcedir = null; + String mapfile = null; + String eperson = null; // db ID or email + String[] collections = null; // db ID or handles + int status = 0; + + if (line.hasOption('h')) { - ZipFile zf = new ZipFile(zipfilename); - ZipEntry entry; - Enumeration entries = zf.entries(); - while (entries.hasMoreElements()) + HelpFormatter myhelp = new HelpFormatter(); + myhelp.printHelp("ItemImport\n", options); + System.out + .println("\nadding items: ItemImport -a -e eperson -c collection -s sourcedir -m mapfile"); + System.out + .println("\nadding items from zip file: ItemImport -a -e eperson -c collection -s sourcedir -z filename.zip -m mapfile"); + System.out + .println("replacing items: ItemImport -r -e eperson -c collection -s sourcedir -m mapfile"); + System.out + .println("deleting items: ItemImport -d -e eperson -m mapfile"); + System.out + .println("If multiple collections are specified, the first collection will be the one that owns the item."); + + System.exit(0); + } + + if (line.hasOption('a')) + { + command = "add"; + } + + if (line.hasOption('r')) + { + command = "replace"; + } + + if (line.hasOption('d')) + { + command = "delete"; + } + + if (line.hasOption('w')) + { + useWorkflow = true; + if (line.hasOption('n')) { - entry = (ZipEntry)entries.nextElement(); - if (entry.isDirectory()) - { - if (!new File(ziptempdir + entry.getName()).mkdir()) - { - log.error("Unable to create contents directory"); - } - } - else - { - System.out.println("Extracting file: " + entry.getName()); - int index = entry.getName().lastIndexOf('/'); - if (index == -1) - { - // Was it created on Windows instead? - index = entry.getName().lastIndexOf('\\'); - } - if (index > 0) - { - File dir = new File(ziptempdir + entry.getName().substring(0, index)); - if (!dir.mkdirs()) - { - log.error("Unable to create directory"); - } - } - byte[] buffer = new byte[1024]; - int len; - InputStream in = zf.getInputStream(entry); - BufferedOutputStream out = new BufferedOutputStream( - new FileOutputStream(ziptempdir + entry.getName())); - while((len = in.read(buffer)) >= 0) - { - out.write(buffer, 0, len); - } - in.close(); - out.close(); - } + useWorkflowSendEmail = true; } } - c.setIgnoreAuthorization(true); - - if ("add".equals(command)) + if (line.hasOption('t')) { - myloader.addItems(c, mycollections, sourcedir, mapfile, template); + isTest = true; + System.out.println("**Test Run** - not actually importing items."); } - else if ("replace".equals(command)) + + if (line.hasOption('p')) { - myloader.replaceItems(c, mycollections, sourcedir, mapfile, template); + template = true; + } + + if (line.hasOption('s')) // source + { + sourcedir = line.getOptionValue('s'); + } + + if (line.hasOption('m')) // mapfile + { + mapfile = line.getOptionValue('m'); + } + + if (line.hasOption('e')) // eperson + { + eperson = line.getOptionValue('e'); + } + + if (line.hasOption('c')) // collections + { + collections = line.getOptionValues('c'); + } + + if (line.hasOption('R')) + { + isResume = true; + System.out + .println("**Resume import** - attempting to import items not already imported"); + } + + boolean zip = false; + String zipfilename = ""; + String ziptempdir = ConfigurationManager.getProperty("org.dspace.app.itemexport.work.dir"); + if (line.hasOption('z')) + { + zip = true; + zipfilename = sourcedir + System.getProperty("file.separator") + line.getOptionValue('z'); + } + + // now validate + // must have a command set + if (command == null) + { + System.out + .println("Error - must run with either add, replace, or remove (run with -h flag for details)"); + System.exit(1); + } + else if ("add".equals(command) || "replace".equals(command)) + { + if (sourcedir == null) + { + System.out + .println("Error - a source directory containing items must be set"); + System.out.println(" (run with -h flag for details)"); + System.exit(1); + } + + if (mapfile == null) + { + System.out + .println("Error - a map file to hold importing results must be specified"); + System.out.println(" (run with -h flag for details)"); + System.exit(1); + } + + if (eperson == null) + { + System.out + .println("Error - an eperson to do the importing must be specified"); + System.out.println(" (run with -h flag for details)"); + System.exit(1); + } + + if (collections == null) + { + System.out + .println("Error - at least one destination collection must be specified"); + System.out.println(" (run with -h flag for details)"); + System.exit(1); + } } else if ("delete".equals(command)) { - myloader.deleteItems(c, mapfile); + if (eperson == null) + { + System.out + .println("Error - an eperson to do the importing must be specified"); + System.exit(1); + } + + if (mapfile == null) + { + System.out.println("Error - a map file must be specified"); + System.exit(1); + } + } + + // can only resume for adds + if (isResume && !"add".equals(command)) + { + System.out + .println("Error - resume option only works with --add command"); + System.exit(1); + } + + // do checks around mapfile - if mapfile exists and 'add' is selected, + // resume must be chosen + File myFile = new File(mapfile); + + if (!isResume && "add".equals(command) && myFile.exists()) + { + System.out.println("Error - the mapfile " + mapfile + + " already exists."); + System.out + .println("Either delete it or use --resume if attempting to resume an aborted import."); + System.exit(1); + } + + // does the zip file exist and can we write to the temp directory + if (zip) + { + File zipfile = new File(sourcedir); + if (!zipfile.canRead()) + { + System.out.println("Zip file '" + sourcedir + "' does not exist, or is not readable."); + System.exit(1); + } + + if (ziptempdir == null) + { + System.out.println("Unable to unzip import file as the key 'org.dspace.app.itemexport.work.dir' is not set in dspace.cfg"); + System.exit(1); + } + zipfile = new File(ziptempdir); + if (!zipfile.isDirectory()) + { + System.out.println("'" + ConfigurationManager.getProperty("org.dspace.app.itemexport.work.dir") + + "' as defined by the key 'org.dspace.app.itemexport.work.dir' in dspace.cfg " + + "is not a valid directory"); + System.exit(1); + } + File tempdir = new File(ziptempdir); + if (!tempdir.exists() && !tempdir.mkdirs()) + { + log.error("Unable to create temporary directory"); + } + sourcedir = ziptempdir + System.getProperty("file.separator") + line.getOptionValue("z"); + ziptempdir = ziptempdir + System.getProperty("file.separator") + + line.getOptionValue("z") + System.getProperty("file.separator"); + } + + ItemImport myloader = new ItemImport(); + + // create a context + Context c = new Context(); + + // find the EPerson, assign to context + EPerson myEPerson = null; + + if (eperson.indexOf('@') != -1) + { + // @ sign, must be an email + myEPerson = EPerson.findByEmail(c, eperson); + } + else + { + myEPerson = EPerson.find(c, Integer.parseInt(eperson)); + } + + if (myEPerson == null) + { + System.out.println("Error, eperson cannot be found: " + eperson); + System.exit(1); + } + + c.setCurrentUser(myEPerson); + + // find collections + Collection[] mycollections = null; + + // don't need to validate collections set if command is "delete" + if (!"delete".equals(command)) + { + System.out.println("Destination collections:"); + + mycollections = new Collection[collections.length]; + + // validate each collection arg to see if it's a real collection + for (int i = 0; i < collections.length; i++) + { + // is the ID a handle? + if (collections[i].indexOf('/') != -1) + { + // string has a / so it must be a handle - try and resolve + // it + mycollections[i] = (Collection) HandleManager + .resolveToObject(c, collections[i]); + + // resolved, now make sure it's a collection + if ((mycollections[i] == null) + || (mycollections[i].getType() != Constants.COLLECTION)) + { + mycollections[i] = null; + } + } + // not a handle, try and treat it as an integer collection + // database ID + else if (collections[i] != null) + { + mycollections[i] = Collection.find(c, Integer + .parseInt(collections[i])); + } + + // was the collection valid? + if (mycollections[i] == null) + { + throw new IllegalArgumentException("Cannot resolve " + + collections[i] + " to collection"); + } + + // print progress info + String owningPrefix = ""; + + if (i == 0) + { + owningPrefix = "Owning "; + } + + System.out.println(owningPrefix + " Collection: " + + mycollections[i].getMetadata("name")); + } + } // end of validating collections + + try + { + // If this is a zip archive, unzip it first + if (zip) + { + ZipFile zf = new ZipFile(zipfilename); + ZipEntry entry; + Enumeration entries = zf.entries(); + while (entries.hasMoreElements()) + { + entry = (ZipEntry)entries.nextElement(); + if (entry.isDirectory()) + { + if (!new File(ziptempdir + entry.getName()).mkdir()) + { + log.error("Unable to create contents directory"); + } + } + else + { + System.out.println("Extracting file: " + entry.getName()); + int index = entry.getName().lastIndexOf('/'); + if (index == -1) + { + // Was it created on Windows instead? + index = entry.getName().lastIndexOf('\\'); + } + if (index > 0) + { + File dir = new File(ziptempdir + entry.getName().substring(0, index)); + if (!dir.mkdirs()) + { + log.error("Unable to create directory"); + } + } + byte[] buffer = new byte[1024]; + int len; + InputStream in = zf.getInputStream(entry); + BufferedOutputStream out = new BufferedOutputStream( + new FileOutputStream(ziptempdir + entry.getName())); + while((len = in.read(buffer)) >= 0) + { + out.write(buffer, 0, len); + } + in.close(); + out.close(); + } + } + } + + c.setIgnoreAuthorization(true); + + if ("add".equals(command)) + { + myloader.addItems(c, mycollections, sourcedir, mapfile, template); + } + else if ("replace".equals(command)) + { + myloader.replaceItems(c, mycollections, sourcedir, mapfile, template); + } + else if ("delete".equals(command)) + { + myloader.deleteItems(c, mapfile); + } + + // complete all transactions + c.complete(); + } + catch (Exception e) + { + // abort all operations + if (mapOut != null) + { + mapOut.close(); + } + + mapOut = null; + + c.abort(); + e.printStackTrace(); + System.out.println(e); + status = 1; + } + + // Delete the unzipped file + try + { + if (zip) + { + System.gc(); + System.out.println("Deleting temporary zip directory: " + ziptempdir); + ItemImport.deleteDirectory(new File(ziptempdir)); + } + } + catch (Exception ex) + { + System.out.println("Unable to delete temporary zip archive location: " + ziptempdir); } - // complete all transactions - c.complete(); - } - catch (Exception e) - { - // abort all operations if (mapOut != null) { mapOut.close(); } - mapOut = null; - - c.abort(); - e.printStackTrace(); - System.out.println(e); - status = 1; - } - - // Delete the unzipped file - try - { - if (zip) + if (isTest) { - System.gc(); - System.out.println("Deleting temporary zip directory: " + ziptempdir); - ItemImport.deleteDirectory(new File(ziptempdir)); + System.out.println("***End of Test Run***"); } + System.exit(status); } - catch (Exception ex) + finally { - System.out.println("Unable to delete temporary zip archive location: " + ziptempdir); + DSIndexer.setBatchProcessingMode(false); } - - if (mapOut != null) - { - mapOut.close(); - } - - if (isTest) - { - System.out.println("***End of Test Run***"); - } - System.exit(status); } private void addItems(Context c, Collection[] mycollections, diff --git a/dspace-api/src/main/java/org/dspace/app/mediafilter/MediaFilterManager.java b/dspace-api/src/main/java/org/dspace/app/mediafilter/MediaFilterManager.java index 81e1989391..91b41486a4 100644 --- a/dspace-api/src/main/java/org/dspace/app/mediafilter/MediaFilterManager.java +++ b/dspace-api/src/main/java/org/dspace/app/mediafilter/MediaFilterManager.java @@ -393,7 +393,15 @@ public class MediaFilterManager { System.out.println("Updating search index:"); } - DSIndexer.updateIndex(c); + DSIndexer.setBatchProcessingMode(true); + try + { + DSIndexer.updateIndex(c); + } + finally + { + DSIndexer.setBatchProcessingMode(false); + } } c.complete(); diff --git a/dspace-api/src/main/java/org/dspace/search/DSIndexer.java b/dspace-api/src/main/java/org/dspace/search/DSIndexer.java index f605dc4beb..c623caa1f2 100644 --- a/dspace-api/src/main/java/org/dspace/search/DSIndexer.java +++ b/dspace-api/src/main/java/org/dspace/search/DSIndexer.java @@ -118,6 +118,9 @@ public class DSIndexer private static final String LAST_INDEXED_FIELD = "DSIndexer.lastIndexed"; private static final long WRITE_LOCK_TIMEOUT = 30000 /* 30 sec */; + + private static int batchFlushAfterDocuments = ConfigurationManager.getIntProperty("search.batch.documents", 20); + private static boolean batchProcessingMode = false; // Class to hold the index configuration (one instance per config line) private static class IndexConfig @@ -248,6 +251,15 @@ public class DSIndexer } } + public static void setBatchProcessingMode(boolean mode) + { + batchProcessingMode = mode; + if (mode == false) + { + flushIndexingTaskQueue(); + } + } + /** * If the handle for the "dso" already exists in the index, and * the "dso" has a lastModified timestamp that is newer than @@ -277,43 +289,49 @@ public class DSIndexer */ public static void indexContent(Context context, DSpaceObject dso, boolean force) throws SQLException { - String handle = dso.getHandle(); - IndexingAction action = null; - IndexWriter writer = null; - try { - action = prepareIndexingAction(dso, force); - - if (action != null) + IndexingTask task = prepareIndexingTask(dso, force); + if (task != null) { - writer = openIndex(false); - processIndexingAction(writer, action); - } - - } catch (Exception e) - { - log.error(e.getMessage(), e); - } - finally - { - if (action != null && action.getDocument() != null) - { - closeAllReaders(action.getDocument()); - } - - if (writer != null) - { - try + if (batchProcessingMode) { - writer.close(); + addToIndexingTaskQueue(task); } - catch (IOException e) + else { - log.error("Unable to close IndexWriter", e); + IndexWriter writer = null; + try + { + writer = openIndex(false); + processIndexingTask(writer, task); + } + finally + { + if (task.getDocument() != null) + { + closeAllReaders(task.getDocument()); + } + + if (writer != null) + { + try + { + writer.close(); + } + catch (IOException e) + { + log.error("Unable to close IndexWriter", e); + } + } + } } } } + catch (IOException e) + { + log.error(e); + } } /** @@ -346,20 +364,29 @@ public class DSIndexer * @throws SQLException * @throws IOException */ - public static void unIndexContent(Context context, String handle) - throws SQLException, IOException + public static void unIndexContent(Context context, String handle) throws SQLException, IOException { if (handle != null) { - IndexWriter writer = openIndex(false); - try + IndexingTask task = new IndexingTask(IndexingTask.Action.DELETE, new Term("handle", handle), null); + if (task != null) { - // we have a handle (our unique ID, so remove) - processIndexingAction(writer, new IndexingAction(IndexingAction.Action.DELETE, new Term("handle", handle), null)); - } - finally - { - writer.close(); + if (batchProcessingMode) + { + addToIndexingTaskQueue(task); + } + else + { + IndexWriter writer = openIndex(false); + try + { + processIndexingTask(writer, task); + } + finally + { + writer.close(); + } + } } } else @@ -372,8 +399,6 @@ public class DSIndexer } } - - /** * reIndexContent removes something from the index, then re-indexes it * @@ -407,7 +432,6 @@ public class DSIndexer /* Reindex all content preemptively. */ DSIndexer.updateIndex(c, true); - } /** @@ -424,6 +448,7 @@ public class DSIndexer try { + flushIndexingTaskQueue(writer); writer.optimize(); } finally @@ -443,90 +468,102 @@ public class DSIndexer */ public static void main(String[] args) throws SQLException, IOException { - - Context context = new Context(); - context.setIgnoreAuthorization(true); - - String usage = "org.dspace.search.DSIndexer [-cbhof[r ]] or nothing to update/clean an existing index."; - Options options = new Options(); - HelpFormatter formatter = new HelpFormatter(); - CommandLine line = null; - - options.addOption(OptionBuilder - .withArgName("item handle") - .hasArg(true) - .withDescription( - "remove an Item, Collection or Community from index based on its handle") - .create("r")); - - options.addOption(OptionBuilder.isRequired(false).withDescription( - "optimize existing index").create("o")); - - options.addOption(OptionBuilder - .isRequired(false) - .withDescription( - "clean existing index removing any documents that no longer exist in the db") - .create("c")); - - options.addOption(OptionBuilder.isRequired(false).withDescription( - "(re)build index, wiping out current one if it exists").create( - "b")); - - options.addOption(OptionBuilder - .isRequired(false) - .withDescription( - "if updating existing index, force each handle to be reindexed even if uptodate") - .create("f")); - - options.addOption(OptionBuilder.isRequired(false).withDescription( - "print this help message").create("h")); - + Date startTime = new Date(); try { - line = new PosixParser().parse(options, args); - } - catch (Exception e) - { - // automatically generate the help statement - formatter.printHelp(usage, e.getMessage(), options, ""); - System.exit(1); - } + setBatchProcessingMode(true); + Context context = new Context(); + context.setIgnoreAuthorization(true); - if (line.hasOption("h")) - { - // automatically generate the help statement - formatter.printHelp(usage, options); - System.exit(1); - } + String usage = "org.dspace.search.DSIndexer [-cbhof[r ]] or nothing to update/clean an existing index."; + Options options = new Options(); + HelpFormatter formatter = new HelpFormatter(); + CommandLine line = null; - if (line.hasOption("r")) - { - log.info("Removing " + line.getOptionValue("r") + " from Index"); - unIndexContent(context, line.getOptionValue("r")); - } - else if (line.hasOption("o")) - { - log.info("Optimizing Index"); - optimizeIndex(context); - } - else if (line.hasOption("c")) - { - log.info("Cleaning Index"); - cleanIndex(context); - } - else if (line.hasOption("b")) - { - log.info("(Re)building index from scratch."); - createIndex(context); - } - else - { - log.info("Updating and Cleaning Index"); - cleanIndex(context); - updateIndex(context, line.hasOption("f")); - } + options.addOption(OptionBuilder + .withArgName("item handle") + .hasArg(true) + .withDescription( + "remove an Item, Collection or Community from index based on its handle") + .create("r")); - log.info("Done with indexing"); + options.addOption(OptionBuilder.isRequired(false).withDescription( + "optimize existing index").create("o")); + + options.addOption(OptionBuilder + .isRequired(false) + .withDescription( + "clean existing index removing any documents that no longer exist in the db") + .create("c")); + + options.addOption(OptionBuilder.isRequired(false).withDescription( + "(re)build index, wiping out current one if it exists").create( + "b")); + + options.addOption(OptionBuilder + .isRequired(false) + .withDescription( + "if updating existing index, force each handle to be reindexed even if uptodate") + .create("f")); + + options.addOption(OptionBuilder.isRequired(false).withDescription( + "print this help message").create("h")); + + try + { + line = new PosixParser().parse(options, args); + } + catch (Exception e) + { + // automatically generate the help statement + formatter.printHelp(usage, e.getMessage(), options, ""); + System.exit(1); + } + + if (line.hasOption("h")) + { + // automatically generate the help statement + formatter.printHelp(usage, options); + System.exit(1); + } + + if (line.hasOption("r")) + { + log.info("Removing " + line.getOptionValue("r") + " from Index"); + unIndexContent(context, line.getOptionValue("r")); + } + else if (line.hasOption("o")) + { + log.info("Optimizing Index"); + optimizeIndex(context); + } + else if (line.hasOption("c")) + { + log.info("Cleaning Index"); + cleanIndex(context); + } + else if (line.hasOption("b")) + { + log.info("(Re)building index from scratch."); + createIndex(context); + } + else + { + log.info("Updating and Cleaning Index"); + cleanIndex(context); + updateIndex(context, line.hasOption("f")); + } + + log.info("Done with indexing"); + } + finally + { + setBatchProcessingMode(false); + Date endTime = new Date(); + System.out.println("Started: " + startTime.getTime()); + System.out.println("Ended: " + endTime.getTime()); + System.out.println("Elapsed time: " + ((endTime.getTime() - startTime.getTime()) / 1000) + " secs (" + (endTime.getTime() - startTime.getTime()) + " msecs)"); + } } /** @@ -562,7 +599,7 @@ public class DSIndexer for(items = Item.findAll(context);items.hasNext();) { Item item = (Item) items.next(); - addToIndexingActionQueue(prepareIndexingAction(item, force)); + indexContent(context, item); item.decache(); } } @@ -574,24 +611,19 @@ public class DSIndexer } } - Collection[] collections = Collection.findAll(context); - for (int i = 0; i < collections.length; i++) + for (Collection collection : Collection.findAll(context)) + { + indexContent(context, collection); + context.removeCached(collection, collection.getID()); + } + + for (Community community : Community.findAll(context)) { - addToIndexingActionQueue(prepareIndexingAction(collections[i], force)); - context.removeCached(collections[i], collections[i].getID()); - - } - - Community[] communities = Community.findAll(context); - for (int i = 0; i < communities.length; i++) - { - addToIndexingActionQueue(prepareIndexingAction(communities[i], force)); - context.removeCached(communities[i], communities[i].getID()); + indexContent(context, community); + context.removeCached(community, community.getID()); } - flushIndexingActionQueue(); optimizeIndex(context); - } catch(Exception e) { @@ -679,11 +711,11 @@ public class DSIndexer } - static IndexingAction prepareIndexingAction(DSpaceObject dso, boolean force) throws SQLException, IOException + static IndexingTask prepareIndexingTask(DSpaceObject dso, boolean force) throws SQLException, IOException { String handle = dso.getHandle(); Term term = new Term("handle", handle); - IndexingAction action = null; + IndexingTask action = null; switch (dso.getType()) { case Constants.ITEM : @@ -694,23 +726,23 @@ public class DSIndexer if (requiresIndexing(term, ((Item)dso).getLastModified()) || force) { log.info("Writing Item: " + handle + " to Index"); - action = new IndexingAction(IndexingAction.Action.UPDATE, term, buildDocumentForItem((Item)dso)); + action = new IndexingTask(IndexingTask.Action.UPDATE, term, buildDocumentForItem((Item)dso)); } } else { - action = new IndexingAction(IndexingAction.Action.DELETE, term, null); + action = new IndexingTask(IndexingTask.Action.DELETE, term, null); } break; case Constants.COLLECTION : log.info("Writing Collection: " + handle + " to Index"); - action = new IndexingAction(IndexingAction.Action.UPDATE, term, buildDocumentForCollection((Collection)dso)); + action = new IndexingTask(IndexingTask.Action.UPDATE, term, buildDocumentForCollection((Collection)dso)); break; case Constants.COMMUNITY : log.info("Writing Community: " + handle + " to Index"); - action = new IndexingAction(IndexingAction.Action.UPDATE, term, buildDocumentForCommunity((Community)dso)); + action = new IndexingTask(IndexingTask.Action.UPDATE, term, buildDocumentForCommunity((Community)dso)); break; default : @@ -719,7 +751,7 @@ public class DSIndexer return action; } - static void processIndexingAction(IndexWriter writer, IndexingAction action) throws IOException + static void processIndexingTask(IndexWriter writer, IndexingTask action) throws IOException { if (action != null) { @@ -734,21 +766,21 @@ public class DSIndexer } } - private static List actionQueue = new ArrayList(); + private static List actionQueue = new ArrayList(); - static void addToIndexingActionQueue(IndexingAction action) + static synchronized void addToIndexingTaskQueue(IndexingTask action) { if (action != null) { actionQueue.add(action); - if (actionQueue.size() > 10) + if (actionQueue.size() >= batchFlushAfterDocuments) { - flushIndexingActionQueue(); + flushIndexingTaskQueue(); } } } - static synchronized void flushIndexingActionQueue() + static void flushIndexingTaskQueue() { if (actionQueue.size() > 0) { @@ -757,22 +789,7 @@ public class DSIndexer try { writer = openIndex(false); - for (IndexingAction action : actionQueue) - { - try - { - processIndexingAction(writer, action); - } - finally - { - if (action.getDocument() != null) - { - closeAllReaders(action.getDocument()); - } - } - } - - actionQueue.clear(); + flushIndexingTaskQueue(writer); } catch (IOException e) { @@ -792,10 +809,33 @@ public class DSIndexer } } } - } } - + + private static synchronized void flushIndexingTaskQueue(IndexWriter writer) + { + for (IndexingTask action : actionQueue) + { + try + { + processIndexingTask(writer, action); + } + catch (IOException e) + { + log.error(e); + } + finally + { + if (action.getDocument() != null) + { + closeAllReaders(action.getDocument()); + } + } + } + + actionQueue.clear(); + } + //////////////////////////////////// // Private //////////////////////////////////// diff --git a/dspace-api/src/main/java/org/dspace/search/IndexingAction.java b/dspace-api/src/main/java/org/dspace/search/IndexingAction.java deleted file mode 100644 index 9dd0be913e..0000000000 --- a/dspace-api/src/main/java/org/dspace/search/IndexingAction.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.dspace.search; - -import org.apache.lucene.document.Document; -import org.apache.lucene.index.Term; - -class IndexingAction -{ - enum Action { ADD, UPDATE, DELETE }; - - private Action action; - private Term term; - private Document doc; - - IndexingAction(Action pAction, Term pTerm, Document pDoc) - { - action = pAction; - term = pTerm; - doc = pDoc; - } - - boolean isAdd() - { - return action == Action.ADD; - } - - boolean isDelete() - { - return action == Action.DELETE; - } - - boolean isUpdate() - { - return action == Action.UPDATE; - } - - Term getTerm() - { - return term; - } - - Document getDocument() - { - return doc; - } -} diff --git a/dspace-api/src/main/java/org/dspace/search/IndexingTask.java b/dspace-api/src/main/java/org/dspace/search/IndexingTask.java new file mode 100644 index 0000000000..e97d1f557f --- /dev/null +++ b/dspace-api/src/main/java/org/dspace/search/IndexingTask.java @@ -0,0 +1,84 @@ +/* + * IndexingTask.java + * + * Version: $Revision: 5724 $ + * + * Date: $Date: 2010-11-04 17:34:12 +0000 (Thu, 04 Nov 2010) $ + * + * Copyright (c) 2002-2005, Hewlett-Packard Company and Massachusetts + * Institute of Technology. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * - Neither the name of the Hewlett-Packard Company nor the name of the + * Massachusetts Institute of Technology nor the names of their + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE + * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH + * DAMAGE. + */ +package org.dspace.search; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.Term; + +class IndexingTask +{ + enum Action { ADD, UPDATE, DELETE }; + + private Action action; + private Term term; + private Document doc; + + IndexingTask(Action pAction, Term pTerm, Document pDoc) + { + action = pAction; + term = pTerm; + doc = pDoc; + } + + boolean isAdd() + { + return action == Action.ADD; + } + + boolean isDelete() + { + return action == Action.DELETE; + } + + boolean isUpdate() + { + return action == Action.UPDATE; + } + + Term getTerm() + { + return term; + } + + Document getDocument() + { + return doc; + } +} diff --git a/dspace/CHANGES b/dspace/CHANGES index c75e1cee9c..b2757bac3e 100644 --- a/dspace/CHANGES +++ b/dspace/CHANGES @@ -79,6 +79,7 @@ - [DS-733] Load testing utilities - [DS-734] Improve database efficiency - [DS-736] ItemImport usage and efficiency improvements +- [DS-739] Lucene indexing efficiency improvements (Graham Triggs, Mark Wood) - [DS-707] General improvements to performance, bug fixes and style diff --git a/pom.xml b/pom.xml index 20dbdeccdf..cb29c4aa9e 100644 --- a/pom.xml +++ b/pom.xml @@ -360,12 +360,12 @@ org.apache.lucene lucene-core - 2.3.0 + 2.9.3 org.apache.lucene lucene-analyzers - 2.3.0 + 2.9.3 org.dspace