[DS-728] Add core curation code & workflow support

git-svn-id: http://scm.dspace.org/svn/repo/dspace/trunk@5674 9c30dcfa-912a-0410-8fc2-9e0234be79fd
This commit is contained in:
Richard Rodgers
2010-10-28 00:09:31 +00:00
parent 708bd70f31
commit 5262078026
15 changed files with 2298 additions and 14 deletions

View File

@@ -0,0 +1,154 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.IOException;
import java.sql.SQLException;
import org.dspace.content.Collection;
import org.dspace.content.Community;
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.content.ItemIterator;
import org.dspace.core.Constants;
import org.dspace.core.Context;
import org.dspace.handle.HandleManager;
/**
* AbstractCurationTask encapsulates a few common patterns of task use,
* resources, and convenience methods.
*
* @author richardrodgers
*/
public abstract class AbstractCurationTask implements CurationTask
{
// invoking curator
protected Curator curator = null;
// curator-assigned taskId
protected String taskId = null;
@Override
public void init(Curator curator, String taskId) throws IOException
{
this.curator = curator;
this.taskId = taskId;
}
@Override
public abstract int perform(DSpaceObject dso) throws IOException;
/**
* Distributes a task through a DSpace container - a convenience method
* for tasks declaring the <code>@Distributive</code> property. Users must
* override the 'performItem' invoked by this method.
*
* @param dso
* @throws IOException
*/
protected void distribute(DSpaceObject dso) throws IOException
{
try
{
int type = dso.getType();
if (Constants.ITEM == type)
{
performItem((Item)dso);
}
else if (Constants.COLLECTION == type)
{
ItemIterator iter = ((Collection)dso).getItems();
while (iter.hasNext())
{
performItem(iter.next());
}
}
else if (Constants.COMMUNITY == type)
{
Community comm = (Community)dso;
for (Community subcomm : comm.getSubcommunities())
{
distribute(subcomm);
}
for (Collection coll : comm.getCollections())
{
distribute(coll);
}
}
}
catch (SQLException sqlE)
{
throw new IOException(sqlE.getMessage(), sqlE);
}
}
/**
* Performs task upon an Item. Must be overridden if <code>distribute</code>
* method is used.
*
* @param item
* @throws SQLException
* @throws IOException
*/
protected void performItem(Item item) throws SQLException, IOException
{
// no-op - override when using 'distribute' method
}
@Override
public int perform(Context ctx, String id) throws IOException
{
DSpaceObject dso = dereference(ctx, id);
return (dso != null) ? perform(dso) : Curator.CURATE_FAIL;
}
/**
* Returns a DSpaceObject for passed identifier, if it exists
*
* @param ctx
* DSpace context
* @param id
* canonical id of object
* @return dso
* DSpace object, or null if no object with id exists
* @throws IOException
*/
protected DSpaceObject dereference(Context ctx, String id) throws IOException
{
try
{
return HandleManager.resolveToObject(ctx, id);
}
catch (SQLException sqlE)
{
throw new IOException(sqlE.getMessage(), sqlE);
}
}
/**
* Sends message to the reporting stream
*
* @param message
* the message to stream
*/
protected void report(String message)
{
curator.report(message);
}
/**
* Assigns the result of the task performance
*
* @param result
* the result string
*/
protected void setResult(String result)
{
curator.setResult(taskId, result);
}
}

View File

@@ -0,0 +1,265 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Iterator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.dspace.content.Community;
import org.dspace.core.Context;
import org.dspace.core.PluginManager;
import org.dspace.eperson.EPerson;
/**
* CurationCli provides command-line access to Curation tools and processes.
*
* @author richardrodgers
*/
public class CurationCli
{
public static void main(String[] args) throws Exception
{
// create an options object and populate it
CommandLineParser parser = new PosixParser();
Options options = new Options();
options.addOption("t", "task", true,
"curation task name");
options.addOption("T", "taskfile", true,
"file containing curation task names");
options.addOption("i", "id", true,
"Id (handle) of object to perform task on, or 'all' to perform on whole repository");
options.addOption("q", "queue", true,
"name of task queue to process");
options.addOption("e", "eperson", true,
"email address of curating eperson");
options.addOption("r", "reporter", true,
"reporter to manage results - use '-' to report to console. If absent, no reporting");
options.addOption("v", "verbose", false,
"report activity to stdout");
options.addOption("h", "help", false, "help");
CommandLine line = parser.parse(options, args);
String taskName = null;
String taskFileName = null;
String idName = null;
String taskQueueName = null;
String ePersonName = null;
String reporterName = null;
boolean verbose = false;
if (line.hasOption('h'))
{
HelpFormatter help = new HelpFormatter();
help.printHelp("CurationCli\n", options);
System.out
.println("\nwhole repo: CurationCli -t estimate -i all");
System.out
.println("single item: CurationCli -t generate -i itemId");
System.out
.println("task queue: CurationCli -q monthly");
System.exit(0);
}
if (line.hasOption('t'))
{ // task
taskName = line.getOptionValue('t');
}
if (line.hasOption('T'))
{ // task file
taskFileName = line.getOptionValue('T');
}
if (line.hasOption('i'))
{ // id
idName = line.getOptionValue('i');
}
if (line.hasOption('q'))
{ // task queue
taskQueueName = line.getOptionValue('q');
}
if (line.hasOption('e'))
{ // eperson
ePersonName = line.getOptionValue('e');
}
if (line.hasOption('r'))
{ // report file
reporterName = line.getOptionValue('r');
}
if (line.hasOption('v'))
{ // verbose
verbose = true;
}
// now validate the args
if (idName == null && taskQueueName == null)
{
System.out.println("Id must be specified: a handle, 'all', or a task queue (-h for help)");
System.exit(1);
}
if (taskName == null && taskFileName == null && taskQueueName == null)
{
System.out.println("A curation task or queue must be specified (-h for help)");
System.exit(1);
}
Context c = new Context();
if (ePersonName != null)
{
EPerson ePerson = EPerson.findByEmail(c, ePersonName);
if (ePerson == null)
{
System.out.println("EPerson not found: " + ePersonName);
System.exit(1);
}
c.setCurrentUser(ePerson);
}
else
{
c.setIgnoreAuthorization(true);
}
Curator curator = new Curator();
if (reporterName != null)
{
curator.setReporter(reporterName);
}
// we are operating in batch mode, if anyone cares.
curator.setInvoked(Curator.Invoked.BATCH);
// load curation tasks
if (taskName != null)
{
if (verbose)
{
System.out.println("Adding task: " + taskName);
}
curator.addTask(taskName);
if (verbose && ! curator.hasTask(taskName))
{
System.out.println("Task: " + taskName + " not resolved");
}
}
else if (taskQueueName == null)
{
// load taskFile
BufferedReader reader = null;
try
{
reader = new BufferedReader(new FileReader(taskFileName));
while ((taskName = reader.readLine()) != null)
{
if (verbose)
{
System.out.println("Adding task: " + taskName);
}
curator.addTask(taskName);
}
}
finally
{
if (reader != null)
{
reader.close();
}
}
}
// run tasks against object
long start = System.currentTimeMillis();
if (verbose)
{
System.out.println("Starting curation");
}
if (idName != null)
{
if (verbose)
{
System.out.println("Curating id: " + idName);
}
if ("all".equals(idName))
{
// run on all top-level communities
for (Community comm : Community.findAllTop(c))
{
if (verbose)
{
System.out.println("Curating community: " + comm.getHandle());
}
curator.curate(comm);
}
}
else
{
curator.curate(c, idName);
}
}
else
{
// process the task queue
TaskQueue queue = (TaskQueue)PluginManager.getSinglePlugin("curate", TaskQueue.class);
if (queue == null)
{
System.out.println("No implementation configured for queue");
throw new UnsupportedOperationException("No queue service available");
}
// use current time as our reader 'ticket'
long ticket = System.currentTimeMillis();
Iterator<TaskQueueEntry> entryIter = queue.dequeue(taskQueueName, ticket).iterator();
while (entryIter.hasNext())
{
TaskQueueEntry entry = entryIter.next();
if (verbose)
{
System.out.println("Curating id: " + entry.getObjectId());
}
// does entry relate to a DSO or workflow object?
if (entry.getObjectId().indexOf("/") > 0)
{
curator.clear();
for (String task : entry.getTaskNames())
{
curator.addTask(task);
}
curator.curate(c, entry.getObjectId());
}
else
{
// make eperson who queued task the effective user
EPerson agent = EPerson.findByEmail(c, entry.getEpersonId());
if (agent != null)
{
c.setCurrentUser(agent);
}
WorkflowCurator.curate(c, entry.getObjectId());
}
}
queue.release(taskQueueName, ticket, true);
}
c.complete();
if (verbose)
{
long elapsed = System.currentTimeMillis() - start;
System.out.println("Ending curation. Elapsed time: " + elapsed);
}
}
}

View File

@@ -0,0 +1,53 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.IOException;
import org.dspace.content.DSpaceObject;
import org.dspace.core.Context;
/**
* CurationTask describes a rather generic ability to perform an operation
* upon a DSpace object.
*
* @author richardrodgers
*/
public interface CurationTask
{
/**
* Initialize task - parameters inform the task of it's invoking curator.
* Since the curator can provide services to the task, this represents
* curation DI.
*
* @param curator the Curator controlling this task
* @param taskId identifier task should use in invoking services
* @throws IOException
*/
void init(Curator curator, String taskId) throws IOException;
/**
* Perform the curation task upon passed DSO
*
* @param dso the DSpace object
* @return status code
* @throws IOException
*/
int perform(DSpaceObject dso) throws IOException;
/**
* Perform the curation task for passed id
*
* @param ctx DSpace context object
* @param id persistent ID for DSpace object
* @return status code
* @throws Exception
*/
int perform(Context ctx, String id) throws IOException;
}

View File

@@ -0,0 +1,441 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.dspace.content.Collection;
import org.dspace.content.Community;
import org.dspace.content.DSpaceObject;
import org.dspace.content.ItemIterator;
import org.dspace.core.Constants;
import org.dspace.core.Context;
import org.dspace.core.PluginManager;
import org.dspace.handle.HandleManager;
/**
* Curator orchestrates and manages the application of a one or more curation
* tasks to a DSpace object. It provides common services and runtime
* environment to the tasks.
*
* @author richardrodgers
*/
public class Curator
{
// status code values
/** Curator unable to find requested task */
public static final int CURATE_NOTASK = -3;
/** no assigned status code - typically because task not yet performed */
public static final int CURATE_UNSET = -2;
/** task encountered a error in processing */
public static final int CURATE_ERROR = -1;
/** task completed successfully */
public static final int CURATE_SUCCESS = 0;
/** task failed */
public static final int CURATE_FAIL = 1;
/** task was not applicable to passed object */
public static final int CURATE_SKIP = 2;
// invocation modes - used by Suspendable tasks
public static enum Invoked { INTERACTIVE, BATCH, ANY };
private static Logger log = Logger.getLogger(Curator.class);
private Map<String, TaskRunner> trMap = new HashMap<String, TaskRunner>();
private List<String> perfList = new ArrayList<String>();
private TaskQueue taskQ = null;
private String reporter = null;
private Invoked iMode = null;
/**
* No-arg constructor
*/
public Curator()
{
}
/**
* Add a task to the set to be performed. Caller should make no assumptions
* on execution ordering.
*
* @param taskName - logical name of task
* @return this curator - to support concatenating invocation style
*/
public Curator addTask(String taskName)
{
CurationTask task = (CurationTask)PluginManager.getNamedPlugin("curate", CurationTask.class, taskName);
if (task != null)
{
try
{
task.init(this, taskName);
trMap.put(taskName, new TaskRunner(task, taskName));
// performance order currently FIFO - to be revisited
perfList.add(taskName);
}
catch (IOException ioE)
{
log.error("Task: '" + taskName + "' initialization failure: " + ioE.getMessage());
}
}
else
{
log.error("Task: '" + taskName + "' does not resolve");
}
return this;
}
/**
* Returns whether this curator has the specified task
*
* @param taskName - logical name of the task
* @return true if task has been configured, else false
*/
public boolean hasTask(String taskName)
{
return perfList.contains(taskName);
}
/**
* Removes a task from the set to be performed.
*
* @param taskName - logical name of the task
* @return this curator - to support concatenating invocation style
*/
public Curator removeTask(String taskName)
{
trMap.remove(taskName);
perfList.remove(taskName);
return this;
}
/**
* Assigns invocation mode.
*
* @param mode one of INTERACTIVE, BATCH, ANY
* @return
*/
public Curator setInvoked(Invoked mode)
{
iMode = mode;
return this;
}
/**
* Sets the reporting stream for this curator.
*
* @param reporter name of reporting stream. The name '-'
* causes reporting to standard out.
* @return the Curator instance
*/
public Curator setReporter(String reporter)
{
this.reporter = reporter;
return this;
}
/**
* Performs all configured tasks upon object identified by id. If
* the object can be resolved as a handle, the DSO will be the
* target object.
*
* @param c a Dpace context
* @param id an object identifier
* @throws IOException
*/
public void curate(Context c, String id) throws IOException
{
if (id == null)
{
log.error("curate - null id");
return;
}
try
{
DSpaceObject dso = HandleManager.resolveToObject(c, id);
if (dso != null)
{
curate(dso);
}
else
{
for (String taskName : perfList)
{
trMap.get(taskName).run(dso);
}
}
}
catch (SQLException sqlE)
{
throw new IOException(sqlE.getMessage(), sqlE);
}
}
/**
* Performs all configured tasks upon DSpace object.
* @param dso the DSpace object
* @throws IOException
*/
public void curate(DSpaceObject dso) throws IOException
{
if (dso == null)
{
log.error("curate - null dso");
return;
}
int type = dso.getType();
for (String taskName : perfList)
{
TaskRunner tr = trMap.get(taskName);
// do we need to iterate over the object ?
if (type == Constants.ITEM ||
tr.task.getClass().isAnnotationPresent(Distributive.class))
{
tr.run(dso);
}
else if (type == Constants.COLLECTION)
{
doCollection(tr, (Collection)dso);
}
else if (type == Constants.COMMUNITY)
{
doCommunity(tr, (Community)dso);
}
}
}
/**
* Places a curation request for the object identified by id on a
* managed queue named by the queueId.
*
* @param c A DSpace context
* @param id an object Id
* @param queueId name of a queue. If queue does not exist, it will
* be created automatically.
* @throws IOException
*/
public void queue(Context c, String id, String queueId) throws IOException
{
if (taskQ == null)
{
taskQ = (TaskQueue)PluginManager.getSinglePlugin("curate", TaskQueue.class);
}
if (taskQ != null)
{
taskQ.enqueue(queueId, new TaskQueueEntry(c.getCurrentUser().getName(),
System.currentTimeMillis(), perfList, id));
}
else
{
log.error("curate - no TaskQueue implemented");
}
}
/**
* Removes all configured tasks from the Curator.
*/
public void clear()
{
trMap.clear();
perfList.clear();
}
/**
* Adds a message to the configured reporting stream.
*
* @param message the message to output to the reporting stream.
*/
public void report(String message)
{
// Stub for now
if ("-".equals(reporter))
{
System.out.println(message);
}
}
/**
* Returns the status code for the latest performance of the named task.
*
* @param taskName the task name
* @return the status code - one of CURATE_ values
*/
public int getStatus(String taskName)
{
TaskRunner tr = trMap.get(taskName);
return (tr != null) ? tr.statusCode : CURATE_NOTASK;
}
/**
* Returns the result string for the latest performance of the named task.
*
* @param taskName the task name
* @return the result string, or <code>null</code> if task has not set it.
*/
public String getResult(String taskName)
{
TaskRunner tr = trMap.get(taskName);
return (tr != null) ? tr.result : null;
}
/**
* Assigns a result to the performance of the named task.
*
* @param taskName the task name
* @param result a string indicating results of performing task.
*/
public void setResult(String taskName, String result)
{
TaskRunner tr = trMap.get(taskName);
if (tr != null)
{
tr.setResult(result);
}
}
/**
* Returns whether a given DSO is a 'container' - collection or community
* @param dso a DSpace object
* @return true if a container, false otherwise
*/
public static boolean isContainer(DSpaceObject dso)
{
return (dso.getType() == Constants.COMMUNITY ||
dso.getType() == Constants.COLLECTION);
}
private boolean doCommunity(TaskRunner tr, Community comm) throws IOException
{
try
{
if (! tr.run(comm))
{
return false;
}
for (Community subcomm : comm.getSubcommunities())
{
if (! doCommunity(tr, subcomm))
{
return false;
}
}
for (Collection coll : comm.getCollections())
{
if (! doCollection(tr, coll))
{
return false;
}
}
}
catch (SQLException sqlE)
{
throw new IOException(sqlE.getMessage(), sqlE);
}
return true;
}
private boolean doCollection(TaskRunner tr, Collection coll) throws IOException
{
try
{
if (! tr.run(coll))
{
return false;
}
ItemIterator iter = coll.getItems();
while (iter.hasNext())
{
if (! tr.run(iter.next()))
{
return false;
}
}
}
catch (SQLException sqlE)
{
throw new IOException(sqlE.getMessage());
}
return true;
}
private class TaskRunner
{
CurationTask task = null;
String taskName = null;
int statusCode = CURATE_UNSET;
String result = null;
Invoked mode = null;
int[] codes = null;
public TaskRunner(CurationTask task, String name)
{
this.task = task;
taskName = name;
parseAnnotations(task.getClass());
}
public boolean run(DSpaceObject dso) throws IOException
{
if (dso == null)
{
throw new IOException("DSpaceObject is null");
}
statusCode = task.perform(dso);
return ! suspend(statusCode);
}
public boolean run(Context c, String id) throws IOException
{
if (c == null || id == null)
{
throw new IOException("Context or identifier is null");
}
statusCode = task.perform(c, id);
return ! suspend(statusCode);
}
public void setResult(String result)
{
this.result = result;
}
private void parseAnnotations(Class tClass)
{
Suspendable suspendAnn = (Suspendable)tClass.getAnnotation(Suspendable.class);
if (suspendAnn != null)
{
mode = suspendAnn.invoked();
codes = suspendAnn.statusCodes();
}
}
private boolean suspend(int code)
{
if (mode != null && (mode.equals(Invoked.ANY) || mode.equals(iMode)))
{
for (int i : codes)
{
if (code == i)
{
return true;
}
}
}
return false;
}
}
}

View File

@@ -0,0 +1,26 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Annotation type for CurationTasks. A task is distributive if it
* distributes its performance to the component parts of it's target object.
* This usually implies container iteration.
*
* @author richardrodgers
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
public @interface Distributive
{
}

View File

@@ -0,0 +1,191 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import org.dspace.core.ConfigurationManager;
/**
* FileTaskQueue provides a TaskQueue implementation based on flat files
* for the queues and semaphores.
*
* @author richardrodgers
*/
public class FileTaskQueue implements TaskQueue
{
private static Logger log = Logger.getLogger(TaskQueue.class);
// base directory for curation task queues
private String tqDir = ConfigurationManager.getProperty("curate", "taskqueue.dir");
// ticket for queue readers
private long readTicket = -1L;
// list of queues owned by reader
private List<Integer> readList = new ArrayList<Integer>();
public FileTaskQueue()
{
}
@Override
public String[] queueNames()
{
return new File(tqDir).list();
}
@Override
public synchronized void enqueue(String queueName, TaskQueueEntry entry)
throws IOException
{
Set entrySet = new HashSet<TaskQueueEntry>();
entrySet.add(entry);
enqueue(queueName, entrySet);
}
@Override
public synchronized void enqueue(String queueName, Set<TaskQueueEntry> entrySet)
throws IOException
{
// don't block or fail - iterate until an unlocked queue found/created
int queueIdx = 0;
File qDir = ensureQueue(queueName);
while (true)
{
File lock = new File(qDir, "lock" + Integer.toString(queueIdx));
if (! lock.exists())
{
// no lock - create one
lock.createNewFile();
// append set contents to queue
BufferedWriter writer = null;
try
{
File queue = new File(qDir, "queue" + Integer.toString(queueIdx));
writer = new BufferedWriter(new FileWriter(queue, true));
Iterator<TaskQueueEntry> iter = entrySet.iterator();
while (iter.hasNext())
{
writer.write(iter.next().toString());
writer.newLine();
}
}
finally
{
if (writer != null)
{
writer.close();
}
}
// remove lock
lock.delete();
break;
}
queueIdx++;
}
}
@Override
public synchronized Set<TaskQueueEntry> dequeue(String queueName, long ticket)
throws IOException
{
Set<TaskQueueEntry> entrySet = new HashSet<TaskQueueEntry>();
if (readTicket == -1L)
{
// hold the ticket & copy all Ids available, locking queues
// stop when no more queues or one found locked
File qDir = ensureQueue(queueName);
readTicket = ticket;
int queueIdx = 0;
while (true)
{
File queue = new File(qDir, "queue" + Integer.toString(queueIdx));
File lock = new File(qDir, "lock" + Integer.toString(queueIdx));
if (queue.exists() && ! lock.exists())
{
// no lock - create one
lock.createNewFile();
// read contents from file
BufferedReader reader = null;
try
{
reader = new BufferedReader(new FileReader(queue));
String entryStr = null;
while ((entryStr = reader.readLine()) != null)
{
entryStr = entryStr.trim();
if (entryStr.length() > 0)
{
entrySet.add(new TaskQueueEntry(entryStr));
}
}
}
finally
{
if (reader != null)
{
reader.close();
}
}
readList.add(queueIdx);
}
else
{
break;
}
queueIdx++;
}
}
return entrySet;
}
@Override
public synchronized void release(String queueName, long ticket, boolean remove)
{
if (ticket == readTicket)
{
readTicket = -1L;
File qDir = ensureQueue(queueName);
// remove locks & queues (if flag true)
for (Integer queueIdx : readList)
{
File lock = new File(qDir, "lock" + Integer.toString(queueIdx));
if (remove)
{
File queue = new File(qDir, "queue" + Integer.toString(queueIdx));
queue.delete();
}
lock.delete();
}
readList.clear();
}
}
private File ensureQueue(String queueName)
{
// create directory structures as needed
File baseDir = new File(tqDir, queueName);
if (! baseDir.exists())
{
baseDir.mkdirs();
}
return baseDir;
}
}

View File

@@ -0,0 +1,25 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Annotation type for CurationTasks. A task is mutative if it
* alters (transforms, mutates) it's target object.
*
* @author richardrodgers
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
public @interface Mutative
{
}

View File

@@ -0,0 +1,94 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.dspace.content.Bitstream;
import org.dspace.content.BitstreamFormat;
import org.dspace.content.Bundle;
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.core.Context;
/**
* ProfileFormats is a task that creates a distribution table of Bitstream
* formats for it's passed object. Primarily a curation task demonstrator.
*
* @author richardrodgers
*/
@Distributive
public class ProfileFormats extends AbstractCurationTask
{
// map of formats to occurrences
private Map<String, Integer> fmtTable = new HashMap<String, Integer>();
/**
* Perform the curation task upon passed DSO
*
* @param dso the DSpace object
* @throws IOException
*/
@Override
public int perform(DSpaceObject dso) throws IOException
{
fmtTable.clear();
distribute(dso);
formatResults();
return Curator.CURATE_SUCCESS;
}
@Override
protected void performItem(Item item) throws SQLException, IOException
{
for (Bundle bundle : item.getBundles())
{
for (Bitstream bs : bundle.getBitstreams())
{
String fmt = bs.getFormat().getShortDescription();
Integer count = fmtTable.get(fmt);
if (count == null)
{
count = 1;
}
else
{
count += 1;
}
fmtTable.put(fmt, count);
}
}
}
private void formatResults() throws IOException
{
try
{
Context c = new Context();
StringBuilder sb = new StringBuilder();
for (String fmt : fmtTable.keySet())
{
BitstreamFormat bsf = BitstreamFormat.findByShortDescription(c, fmt);
sb.append(String.format("%6d", fmtTable.get(fmt))).append(" (").
append(bsf.getSupportLevelText().charAt(0)).append(") ").
append(bsf.getDescription()).append("\n");
}
report(sb.toString());
setResult(sb.toString());
c.complete();
}
catch (SQLException sqlE)
{
throw new IOException(sqlE.getMessage(), sqlE);
}
}
}

View File

@@ -0,0 +1,142 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.dspace.app.util.DCInput;
import org.dspace.app.util.DCInputSet;
import org.dspace.app.util.DCInputsReader;
import org.dspace.app.util.DCInputsReaderException;
import org.dspace.content.DCValue;
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.core.Constants;
/**
* RequiredMetadata task compares item metadata with fields
* marked as required in input-forms.xml. The task succeeds if all
* required fields are present in the item metadata, otherwise it fails.
* Primarily a curation task demonstrator.
*
* @author richardrodgers
*/
@Suspendable
public class RequiredMetadata extends AbstractCurationTask
{
// map of DCInputSets
private DCInputsReader reader = null;
// map of required fields
private Map<String, List<String>> reqMap = new HashMap<String, List<String>>();
@Override public void init(Curator curator, String taskId) throws IOException
{
super.init(curator, taskId);
try
{
reader = new DCInputsReader();
}
catch (DCInputsReaderException dcrE)
{
throw new IOException(dcrE.getMessage(), dcrE);
}
}
/**
* Perform the curation task upon passed DSO
*
* @param dso the DSpace object
* @throws IOException
*/
@Override
public int perform(DSpaceObject dso) throws IOException
{
if (dso.getType() == Constants.ITEM)
{
Item item = (Item)dso;
int count = 0;
try
{
StringBuilder sb = new StringBuilder();
String handle = item.getHandle();
if (handle == null)
{
// we are still in workflow - no handle assigned
handle = "in workflow";
}
sb.append("Item: ").append(handle);
for (String req : getReqList(item.getOwningCollection().getHandle()))
{
DCValue[] vals = item.getMetadata(req);
if (vals.length == 0)
{
sb.append(" missing required field: ").append(req);
count++;
}
}
report(sb.toString());
setResult(sb.toString());
}
catch (DCInputsReaderException dcrE)
{
throw new IOException(dcrE.getMessage(), dcrE);
}
catch (SQLException sqlE)
{
throw new IOException(sqlE.getMessage(), sqlE);
}
return (count == 0) ? Curator.CURATE_SUCCESS : Curator.CURATE_FAIL;
}
else
{
setResult("OK");
return Curator.CURATE_SKIP;
}
}
private List<String> getReqList(String handle) throws DCInputsReaderException
{
List<String> reqList = reqMap.get(handle);
if (reqList == null)
{
reqList = reqMap.get("default");
}
if (reqList == null)
{
reqList = new ArrayList<String>();
DCInputSet inputs = reader.getInputs(handle);
for (int i = 0; i < inputs.getNumberPages(); i++)
{
for (DCInput input : inputs.getPageRows(i, true, true))
{
if (input.isRequired())
{
StringBuilder sb = new StringBuilder();
sb.append(input.getSchema()).append(".");
sb.append(input.getElement()).append(".");
String qual = input.getQualifier();
if (qual == null)
{
qual = "";
}
sb.append(qual);
reqList.add(sb.toString());
}
}
}
reqMap.put(inputs.getFormName(), reqList);
}
return reqList;
}
}

View File

@@ -0,0 +1,35 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Annotation type for CurationTasks. A task is suspendable if it may
* be suspended (halted) when a condition detected by the curation framework
* occurs. The current implementation monitors and uses the status code
* returned from the task to determine suspension, together with the
* 'invocation mode' - optionally set by the caller on the curation object.
* Thus, it effectively means that if a task is iterating over a collection,
* the first error, or failure will halt the process.
* This insures that the status code and result of the failure are preserved.
*
* @author richardrodgers
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
public @interface Suspendable
{
// by default, suspension occurs however task is invoked
Curator.Invoked invoked() default Curator.Invoked.ANY;
// by default, either ERROR or FAILURE status codes trigger suspension
int[] statusCodes() default {-1, 1};
}

View File

@@ -0,0 +1,84 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.IOException;
import java.util.Set;
/**
* TaskQueue objects manage access to named queues of task entries.
* Entries represent curation task requests that have been deferred.
* The queue supports concurrent non-blocking writers, but controls
* read access to a single reader possessing a ticket (first come,
* first serve). After the read, the queue remains locked until
* released by the reader, after which it is typically purged.
*
* @author richardrodgers
*/
public interface TaskQueue {
/**
* Returns list of queue names.
*
* @return queues
* the list of names of active queues
*/
String[] queueNames();
/**
* Queues a single entry to a named queue.
*
* @param queueName
* the name of the queue on which to write
* @param entry
* the task entry
* @throws IOException
*/
void enqueue(String queueName, TaskQueueEntry entry) throws IOException;
/**
* Queues a set of task entries to a named queue.
*
* @param queueName
* the name of the queue on which to write
* @param entrySet
* the set of task entries
* @throws IOException
*/
void enqueue(String queueName, Set<TaskQueueEntry> entrySet) throws IOException;
/**
* Returns the set of task entries from the named queue. The operation locks
* the queue from any further enqueue or dequeue operations until a
* <code>release</code> is called. The ticket may be any number, but a
* timestamp should guarantee sufficient uniqueness.
*
* @param queueName
* the name of the queue to read
* @param ticket
* a token which must be presented to release the queue
* @return set
* the current set of queued task entries
* @throws IOException
*/
Set<TaskQueueEntry> dequeue(String queueName, long ticket) throws IOException;
/**
* Releases the lock upon the named queue, deleting it if <code>removeEntries</code>
* is set to true.
*
* @param queueName
* the name of the queue to release
* @param ticket
* a token that was presented when queue was dequeued.
* @param removeEntries
* flag to indicate whether entries may be deleted
*/
void release(String queueName, long ticket, boolean removeEntries);
}

View File

@@ -0,0 +1,116 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.util.Arrays;
import java.util.List;
/**
* TaskQueueEntry defines the record or entry in the named task queues.
* Regular immutable value object class.
*
* @author richardrodgers
*/
public final class TaskQueueEntry
{
private final String epersonId;
private final String submitTime;
private final String tasks;
private final String objId;
/**
* TaskQueueEntry constructor with enumerated field values.
*
* @param epersonId
* @param submitTime
* @param taskNames
* @param objId
*/
public TaskQueueEntry(String epersonId, long submitTime,
List<String> taskNames, String objId)
{
this.epersonId = epersonId;
this.submitTime = Long.toString(submitTime);
StringBuilder sb = new StringBuilder();
for (String tName : taskNames)
{
sb.append(tName).append(",");
}
this.tasks = sb.substring(0, sb.length() - 1);
this.objId = objId;
}
/**
* Constructor with a pipe-separated list of field values.
*
* @param entry
* list of field values separated by '|'s
*/
public TaskQueueEntry(String entry)
{
String[] tokens = entry.split("\\|");
epersonId = tokens[0];
submitTime = tokens[1];
tasks = tokens[2];
objId = tokens[3];
}
/**
* Returns the epersonId (email) of the agent who enqueued this task entry.
*
* @return epersonId
* name of EPerson (email) or 'unknown' if none recorded.
*/
public String getEpersonId()
{
return epersonId;
}
/**
* Returns the timestamp of when this entry was enqueued.
*
* @return time
* Submission timestamp
*/
public long getSubmitTime()
{
return Long.valueOf(submitTime);
}
/**
* Return the list of tasks associated with this entry.
*
* @return tasks
* the list of task names (Plugin names)
*/
public List<String> getTaskNames()
{
return Arrays.asList(tasks.split(","));
}
/**
* Returns the object identifier.
* @return objId
* usually a handle or workflow id
*/
public String getObjectId()
{
return objId;
}
/**
* Returns a string representation of the entry
* @return string
* pipe-separated field values
*/
@Override
public String toString()
{
return epersonId + "|" + submitTime + "|" + tasks + "|" + objId;
}
}

View File

@@ -0,0 +1,180 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* Utils contains a few commonly occurring methods.
*
* @author richardrodgers
*/
public class Utils
{
private static final int BUFF_SIZE = 4096;
// we can live with 4k preallocation
private static final byte[] buffer = new byte[BUFF_SIZE];
/**
* Calculates and returns a checksum for the passed file using the passed
* algorithm.
*
* @param file
* file on which to calculate checksum
* @param algorithm
* string for algorithm: 'MD5', 'SHA1', etc
* @return checksum
* string of the calculated checksum
*
* @throws IOException
*/
public static String checksum(File file, String algorithm) throws IOException
{
InputStream in = null;
String chkSum = null;
try
{
in = new FileInputStream(file);
chkSum = checksum(in, algorithm);
}
finally
{
if (in != null)
{
in.close();
}
}
return chkSum;
}
/**
* Calculates and returns a checksum for the passed IO stream using the passed
* algorithm.
*
* @param in
* input stream on which to calculate checksum
* @param algorithm
* string for algorithm: 'MD5', 'SHA1', etc
* @return checksum
* string of the calculated checksum
*
* @throws IOException
*/
public static String checksum(InputStream in, String algorithm) throws IOException
{
try
{
DigestInputStream din = new DigestInputStream(in,
MessageDigest.getInstance(algorithm));
while (true)
{
synchronized (buffer)
{
if (din.read(buffer) == -1)
{
break;
}
// otherwise, a no-op
}
}
return toHex(din.getMessageDigest().digest());
} catch (NoSuchAlgorithmException nsaE) {
throw new IOException(nsaE.getMessage());
}
}
/**
* Reasonably efficient Hex checksum converter
*
* @param data
* byte array
* @return hexString
* checksum
*/
static final char[] HEX_CHARS = "0123456789abcdef".toCharArray();
public static String toHex(byte[] data) {
if ((data == null) || (data.length == 0)) {
return null;
}
char[] chars = new char[2 * data.length];
for (int i = 0; i < data.length; ++i) {
chars[2 * i] = HEX_CHARS[(data[i] & 0xF0) >>> 4];
chars[2 * i + 1] = HEX_CHARS[data[i] & 0x0F];
}
return new String(chars);
}
/**
* Performs a buffered copy from one file into another.
*
* @param inFile
* @param outFile
* @throws IOException
*/
public static void copy(File inFile, File outFile) throws IOException
{
FileInputStream in = null;
FileOutputStream out = null;
try
{
in = new FileInputStream(inFile);
out = new FileOutputStream(outFile);
copy(in, out);
}
finally
{
if (in != null)
{
in.close();
}
if (out != null)
{
out.close();
}
}
}
/**
* Performs a buffered copy from one IO stream into another. Note that stream
* closure is responsibility of caller.
*
* @param in
* input stream
* @param out
* output stream
* @throws IOException
*/
public static void copy(InputStream in, OutputStream out) throws IOException
{
while (true)
{
synchronized (buffer)
{
int count = in.read(buffer);
if (-1 == count)
{
break;
}
// write out those same bytes
out.write(buffer, 0, count);
}
}
// needed to flush cache
out.flush();
}
}

View File

@@ -0,0 +1,378 @@
/*
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://dspace.org/license/
*/
package org.dspace.curate;
import org.dspace.content.Item;
import java.util.Arrays;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import org.apache.log4j.Logger;
import org.dspace.authorize.AuthorizeException;
import org.dspace.content.Collection;
import org.dspace.core.ConfigurationManager;
import org.dspace.core.Context;
import org.dspace.core.LogManager;
import org.dspace.eperson.EPerson;
import org.dspace.eperson.Group;
import org.dspace.workflow.WorkflowItem;
import org.dspace.workflow.WorkflowManager;
// Warning - static import ahead!
import static javax.xml.stream.XMLStreamConstants.*;
/**
* WorkflowCurator manages interactions between curation and workflow.
* Specifically, it is invoked in WorkflowManager to allow the
* performance of curation tasks during workflow.
*
* @author richardrodgers
*/
public class WorkflowCurator {
/** log4j logger */
private static Logger log = Logger.getLogger(WorkflowCurator.class);
private static File cfgFile = new File(ConfigurationManager.getProperty("dspace.dir") +
File.separator + "config" + File.separator +
"workflow-curation.xml");
private static Map<String, TaskSet> tsMap = new HashMap<String, TaskSet>();
private static final String[] flowSteps = { "step1", "step2", "step3", "archive" };
static {
try {
loadTaskConfig();
} catch (IOException e) {
// debug e.printStackTrace();
log.fatal("Unable to load config: " + cfgFile.getAbsolutePath());
}
}
public static boolean needsCuration(WorkflowItem wfi) {
return getFlowStep(wfi) != null;
}
/**
* Determines and executes curation on a Workflow item.
*
* @param c the context
* @param wfi the workflow item
* @return true if curation was completed or not required,
* false if tasks were queued for later completion,
* or item was rejected
* @throws AuthorizeException
* @throws IOException
* @throws SQLException
*/
public static boolean doCuration(Context c, WorkflowItem wfi)
throws AuthorizeException, IOException, SQLException {
FlowStep step = getFlowStep(wfi);
if (step != null) {
Curator curator = new Curator();
// are we going to perform, or just put on queue?
if (step.queue != null) {
for (Task task : step.tasks) {
curator.addTask(task.name);
}
curator.queue(c, String.valueOf(wfi.getID()), step.queue);
wfi.update();
return false;
} else {
return curate(c, wfi);
}
}
return true;
}
/**
* Determines and executes curation of a Workflow item.
*
* @param c the user context
* @param wfId the workflow id
* @throws AuthorizeException
* @throws IOException
* @throws SQLException
*/
public static boolean curate(Context c, String wfId)
throws AuthorizeException, IOException, SQLException {
WorkflowItem wfi = WorkflowItem.find(c, Integer.parseInt(wfId));
if (wfi != null) {
if (curate(c, wfi)) {
WorkflowManager.advance(c, wfi, c.getCurrentUser(), false, true);
return true;
}
} else {
log.warn(LogManager.getHeader(c, "No workflow item found for id: " + wfId, null));
}
return false;
}
public static boolean curate(Context c, WorkflowItem wfi)
throws AuthorizeException, IOException, SQLException {
FlowStep step = getFlowStep(wfi);
if (step != null) {
// assign collection to item in case task needs it
Item item = wfi.getItem();
item.setOwningCollection(wfi.getCollection());
Curator curator = new Curator();
for (Task task : step.tasks) {
curator.addTask(task.name);
curator.curate(item);
int status = curator.getStatus(task.name);
String result = curator.getResult(task.name);
String action = "none";
if (status == Curator.CURATE_FAIL) {
// task failed - notify any contacts the task has assigned
if (task.powers.contains("reject")) {
action = "reject";
}
notifyContacts(c, wfi, task, "fail", action, result);
// if task so empowered, reject submission and terminate
if ("reject".equals(action)) {
WorkflowManager.reject(c, wfi, c.getCurrentUser(),
task.name + ": " + result);
return false;
}
} else if (status == Curator.CURATE_SUCCESS) {
if (task.powers.contains("approve")) {
action = "approve";
}
notifyContacts(c, wfi, task, "success", action, result);
if ("approve".equals(action)) {
// cease further task processing and advance submission
return true;
}
} else if (status == Curator.CURATE_ERROR) {
notifyContacts(c, wfi, task, "error", action, result);
}
curator.clear();
}
}
return true;
}
private static void notifyContacts(Context c, WorkflowItem wfi, Task task,
String status, String action, String message)
throws AuthorizeException, IOException, SQLException {
EPerson[] epa = resolveContacts(c, task.getContacts(status), wfi);
if (epa.length > 0) {
WorkflowManager.notifyOfCuration(c, wfi, epa, task.name, action, message);
}
}
private static EPerson[] resolveContacts(Context c, List<String> contacts,
WorkflowItem wfi)
throws AuthorizeException, IOException, SQLException {
List<EPerson> epList = new ArrayList<EPerson>();
for (String contact : contacts) {
// decode contacts
if ("$flowgroup".equals(contact)) {
// special literal for current flowgoup
int step = state2step(wfi.getState());
// make sure this step exists
if (step < 4) {
Group wfGroup = wfi.getCollection().getWorkflowGroup(step);
if (wfGroup != null) {
epList.addAll(Arrays.asList(Group.allMembers(c, wfGroup)));
}
}
} else if ("$colladmin".equals(contact)) {
Group adGroup = wfi.getCollection().getAdministrators();
if (adGroup != null) {
epList.addAll(Arrays.asList(Group.allMembers(c, adGroup)));
}
} else if ("$siteadmin".equals(contact)) {
EPerson siteEp = EPerson.findByEmail(c,
ConfigurationManager.getProperty("mail.admin"));
if (siteEp != null) {
epList.add(siteEp);
}
} else if (contact.indexOf("@") > 0) {
// little shaky heuristic here - assume an eperson email name
EPerson ep = EPerson.findByEmail(c, contact);
if (ep != null) {
epList.add(ep);
}
} else {
// assume it is an arbitrary group name
Group group = Group.findByName(c, contact);
if (group != null) {
epList.addAll(Arrays.asList(Group.allMembers(c, group)));
}
}
}
return epList.toArray(new EPerson[epList.size()]);
}
private static FlowStep getFlowStep(WorkflowItem wfi) {
Collection coll = wfi.getCollection();
String key = tsMap.containsKey(coll.getHandle()) ? coll.getHandle() : "default";
TaskSet ts = tsMap.get(key);
if (ts != null) {
int myStep = state2step(wfi.getState());
for (FlowStep fstep : ts.steps) {
if (fstep.step == myStep) {
return fstep;
}
}
}
return null;
}
private static int state2step(int state) {
if (state <= WorkflowManager.WFSTATE_STEP1POOL) return 1;
if (state <= WorkflowManager.WFSTATE_STEP2POOL) return 2;
if (state <= WorkflowManager.WFSTATE_STEP3POOL) return 3;
return 4;
}
private static int stepName2step(String name) {
for (int i = 0; i < flowSteps.length; i++) {
if (flowSteps[i].equals(name)) {
return i + 1;
}
}
// invalid stepName - log
log.warn("Invalid step: '" + name + "' provided");
return -1;
}
private static void loadTaskConfig() throws IOException {
Map<String, String> collMap = new HashMap<String, String>();
Map<String, TaskSet> setMap = new HashMap<String, TaskSet>();
TaskSet taskSet = null;
FlowStep flowStep = null;
Task task = null;
String type = null;
try {
XMLInputFactory factory = XMLInputFactory.newInstance();
XMLStreamReader reader = factory.createXMLStreamReader(
new FileInputStream(cfgFile), "UTF-8");
while (reader.hasNext()) {
int event = reader.next();
if (event == START_ELEMENT) {
String eName = reader.getLocalName();
if ("mapping".equals(eName)) {
collMap.put(reader.getAttributeValue(0),
reader.getAttributeValue(1));
} else if ("taskset".equals(eName)) {
taskSet = new TaskSet(reader.getAttributeValue(0));
} else if ("flowstep".equals(eName)) {
flowStep = new FlowStep(reader.getAttributeValue(0),
reader.getAttributeValue(1));
} else if ("task".equals(eName)) {
task = new Task(reader.getAttributeValue(0));
} else if ("workflow".equals(eName)) {
type = "power";
} else if ("notify".equals(eName)) {
type = reader.getAttributeValue(0);
}
} else if (event == CHARACTERS) {
if (task != null) {
if ("power".equals(type)) {
task.addPower(reader.getText());
} else {
task.addContact(type, reader.getText());
}
}
} else if (event == END_ELEMENT) {
String eName = reader.getLocalName();
if ("task".equals(eName)) {
flowStep.addTask(task);
task = null;
} else if ("flowstep".equals(eName)) {
taskSet.addStep(flowStep);
} else if ("taskset".equals(eName)) {
setMap.put(taskSet.setName, taskSet);
}
}
}
reader.close();
// stitch maps together
for (String key: collMap.keySet()) {
String value = collMap.get(key);
if (! "none".equals(value) && setMap.containsKey(value)) {
tsMap.put(key, setMap.get(value));
}
}
} catch (XMLStreamException xsE) {
throw new IOException(xsE.getMessage());
}
}
private static class TaskSet {
public String setName = null;
public List<FlowStep> steps = null;
public TaskSet(String setName) {
this.setName = setName;
steps = new ArrayList<FlowStep>();
}
public void addStep(FlowStep step) {
steps.add(step);
}
}
private static class FlowStep {
public int step = -1;
public String queue = null;
public List<Task> tasks = null;
public FlowStep(String stepStr, String queueStr) {
this.step = stepName2step(stepStr);
this.queue = queueStr;
tasks = new ArrayList<Task>();
}
public void addTask(Task task) {
tasks.add(task);
}
}
private static class Task {
public String name = null;
public List<String> powers = new ArrayList<String>();
public Map<String, List<String>> contacts = new HashMap<String, List<String>>();
public Task(String name) {
this.name = name;
}
public void addPower(String power) {
powers.add(power);
}
public void addContact(String status, String contact) {
List<String> sContacts = contacts.get(status);
if (sContacts == null) {
sContacts = new ArrayList<String>();
contacts.put(status, sContacts);
}
sContacts.add(contact);
}
public List<String> getContacts(String status) {
List<String> ret = contacts.get(status);
return (ret != null) ? ret : new ArrayList<String>();
}
}
}

View File

@@ -48,10 +48,11 @@ import java.util.MissingResourceException;
import java.util.ResourceBundle;
import javax.mail.MessagingException;
import org.apache.log4j.Logger;
import org.dspace.authorize.AuthorizeException;
import org.dspace.authorize.AuthorizeManager;
import org.dspace.content.Bitstream;
import org.dspace.content.Collection;
import org.dspace.content.DCDate;
import org.dspace.content.DCValue;
@@ -63,6 +64,7 @@ import org.dspace.core.Context;
import org.dspace.core.Email;
import org.dspace.core.I18nUtil;
import org.dspace.core.LogManager;
import org.dspace.curate.WorkflowCurator;
import org.dspace.eperson.EPerson;
import org.dspace.eperson.Group;
import org.dspace.handle.HandleManager;
@@ -199,8 +201,9 @@ public class WorkflowManager
// remove the WorkspaceItem
wsi.deleteWrapper();
// now get the worflow started
doState(c, wfi, WFSTATE_STEP1POOL, null);
// now get the workflow started
wfi.setState(WFSTATE_SUBMIT);
advance(c, wfi, null);
// Return the workflow item
return wfi;
@@ -341,7 +344,7 @@ public class WorkflowManager
}
/**
* approveAction() sends an item forward in the workflow (reviewers,
* advance() sends an item forward in the workflow (reviewers,
* approvers, and editors all do an 'approve' to move the item forward) if
* the item arrives at the submit state, then remove the WorkflowItem and
* call the archive() method to put it in the archive, and email notify the
@@ -356,17 +359,67 @@ public class WorkflowManager
*/
public static void advance(Context c, WorkflowItem wi, EPerson e)
throws SQLException, IOException, AuthorizeException
{
advance(c, wi, e, true, true);
}
/**
* advance() sends an item forward in the workflow (reviewers,
* approvers, and editors all do an 'approve' to move the item forward) if
* the item arrives at the submit state, then remove the WorkflowItem and
* call the archive() method to put it in the archive, and email notify the
* submitter of a successful submission
*
* @param c
* Context
* @param wi
* WorkflowItem do do the approval on
* @param e
* EPerson doing the approval
*
* @param curate
* boolean indicating whether curation tasks should be done
*
* @param record
* boolean indicating whether to record action
*/
public static boolean advance(Context c, WorkflowItem wi, EPerson e,
boolean curate, boolean record)
throws SQLException, IOException, AuthorizeException
{
int taskstate = wi.getState();
boolean archived = false;
// perform curation tasks if needed
if (curate && WorkflowCurator.needsCuration(wi))
{
if (! WorkflowCurator.doCuration(c, wi)) {
// don't proceed - either curation tasks queued, or item rejected
log.info(LogManager.getHeader(c, "advance_workflow",
"workflow_item_id=" + wi.getID() + ",item_id="
+ wi.getItem().getID() + ",collection_id="
+ wi.getCollection().getID() + ",old_state="
+ taskstate + ",doCuration=false"));
return archived;
}
}
switch (taskstate)
{
case WFSTATE_SUBMIT:
archived = doState(c, wi, WFSTATE_STEP1POOL, e);
break;
case WFSTATE_STEP1:
// authorize DSpaceActions.SUBMIT_REVIEW
// Record provenance
recordApproval(c, wi, e);
doState(c, wi, WFSTATE_STEP2POOL, e);
if (record)
{
recordApproval(c, wi, e);
}
archived = doState(c, wi, WFSTATE_STEP2POOL, e);
break;
@@ -374,8 +427,11 @@ public class WorkflowManager
// authorize DSpaceActions.SUBMIT_STEP2
// Record provenance
recordApproval(c, wi, e);
doState(c, wi, WFSTATE_STEP3POOL, e);
if (record)
{
recordApproval(c, wi, e);
}
archived = doState(c, wi, WFSTATE_STEP3POOL, e);
break;
@@ -384,7 +440,7 @@ public class WorkflowManager
// authorize DSpaceActions.SUBMIT_STEP3
// We don't record approval for editors, since they can't reject,
// and thus didn't actually make a decision
doState(c, wi, WFSTATE_ARCHIVE, e);
archived = doState(c, wi, WFSTATE_ARCHIVE, e);
break;
@@ -396,6 +452,7 @@ public class WorkflowManager
+ wi.getItem().getID() + ",collection_id="
+ wi.getCollection().getID() + ",old_state="
+ taskstate + ",new_state=" + wi.getState()));
return archived;
}
/**
@@ -519,7 +576,8 @@ public class WorkflowManager
else
{
// no reviewers, skip ahead
archived = doState(c, wi, WFSTATE_STEP2POOL, null);
wi.setState(WFSTATE_STEP1);
archived = advance(c, wi, null, true, false);
}
break;
@@ -559,7 +617,8 @@ public class WorkflowManager
else
{
// no reviewers, skip ahead
archived = doState(c, wi, WFSTATE_STEP3POOL, null);
wi.setState(WFSTATE_STEP2);
archived = advance(c, wi, null, true, false);
}
break;
@@ -595,7 +654,8 @@ public class WorkflowManager
else
{
// no editors, skip ahead
archived = doState(c, wi, WFSTATE_ARCHIVE, newowner);
wi.setState(WFSTATE_STEP3);
archived = advance(c, wi, null, true, false);
}
break;
@@ -640,7 +700,8 @@ public class WorkflowManager
* @param state the workflow state
* @return the text representation
*/
public static String getWorkflowText(int state) {
public static String getWorkflowText(int state)
{
if (state > -1 && state < workflowText.length) {
return workflowText[state];
}
@@ -843,6 +904,43 @@ public class WorkflowManager
DatabaseManager.updateQuery(c, myrequest, wi.getID());
}
// send notices of curation activity
public static void notifyOfCuration(Context c, WorkflowItem wi, EPerson[] epa,
String taskName, String action, String message) throws SQLException, IOException
{
try
{
// Get the item title
String title = getItemTitle(wi);
// Get the submitter's name
String submitter = getSubmitterName(wi);
// Get the collection
Collection coll = wi.getCollection();
for (int i = 0; i < epa.length; i++)
{
Locale supportedLocale = I18nUtil.getEPersonLocale(epa[i]);
Email email = ConfigurationManager.getEmail(I18nUtil.getEmailFilename(supportedLocale,
"flowtask_notify"));
email.addArgument(title);
email.addArgument(coll.getMetadata("name"));
email.addArgument(submitter);
email.addArgument(taskName);
email.addArgument(message);
email.addArgument(action);
email.addRecipient(epa[i].getEmail());
email.send();
}
}
catch (MessagingException e)
{
log.warn(LogManager.getHeader(c, "notifyOfCuration", "cannot email users" +
" of workflow_item_id" + wi.getID()));
}
}
private static void notifyGroupOfTask(Context c, WorkflowItem wi,
Group mygroup, EPerson[] epa) throws SQLException, IOException
@@ -906,8 +1004,10 @@ public class WorkflowManager
}
catch (MessagingException e)
{
String gid = (mygroup != null) ?
String.valueOf(mygroup.getID()) : "none";
log.warn(LogManager.getHeader(c, "notifyGroupofTask",
"cannot email user" + " group_id" + mygroup.getID()
"cannot email user" + " group_id" + gid
+ " workflow_item_id" + wi.getID()));
}
}