Merge pull request #380 from mwoodiupui/DS-449

[DS-449] Command line utility org.dspace.app.harvest.Harvest -S throws AuthorizeException
This commit is contained in:
Mark H. Wood
2013-11-11 13:17:32 -08:00

View File

@@ -1084,39 +1084,53 @@ public class OAIHarvester {
}
}
/**
* The class responsible for scheduling harvesting cycles are regular intervals.
* @author alexey
*/
public static class HarvestScheduler implements Runnable
{
private EPerson harvestAdmin;
private Context mainContext;
/**
* The class responsible for scheduling harvesting cycles are regular intervals.
* @author alexey
*/
public static class HarvestScheduler implements Runnable
{
private static EPerson harvestAdmin;
public static final Object lock = new Object();
private Context mainContext;
private static Stack<HarvestThread> harvestThreads;
private static Integer maxActiveThreads;
protected static volatile Integer activeThreads = 0;
public static final Object lock = new Object();
public static final int HARVESTER_STATUS_RUNNING = 1;
public static final int HARVESTER_STATUS_SLEEPING = 2;
public static final int HARVESTER_STATUS_PAUSED = 3;
public static final int HARVESTER_STATUS_STOPPED = 4;
private static Stack<HarvestThread> harvestThreads;
public static final int HARVESTER_INTERRUPT_NONE = 0;
public static final int HARVESTER_INTERRUPT_PAUSE = 1;
public static final int HARVESTER_INTERRUPT_STOP = 2;
public static final int HARVESTER_INTERRUPT_RESUME = 3;
public static final int HARVESTER_INTERRUPT_INSERT_THREAD = 4;
public static final int HARVESTER_INTERRUPT_KILL_THREAD = 5;
private static Integer maxActiveThreads;
private static int status = HARVESTER_STATUS_STOPPED;
private static int interrupt = HARVESTER_INTERRUPT_NONE;
private static Integer interruptValue = 0;
protected static volatile Integer activeThreads = 0;
private static long minHeartbeat;
private static long maxHeartbeat;
public static final int HARVESTER_STATUS_RUNNING = 1;
public static final int HARVESTER_STATUS_SLEEPING = 2;
public static final int HARVESTER_STATUS_PAUSED = 3;
public static final int HARVESTER_STATUS_STOPPED = 4;
public static final int HARVESTER_INTERRUPT_NONE = 0;
public static final int HARVESTER_INTERRUPT_PAUSE = 1;
public static final int HARVESTER_INTERRUPT_STOP = 2;
public static final int HARVESTER_INTERRUPT_RESUME = 3;
public static final int HARVESTER_INTERRUPT_INSERT_THREAD = 4;
public static final int HARVESTER_INTERRUPT_KILL_THREAD = 5;
private static int status = HARVESTER_STATUS_STOPPED;
private static int interrupt = HARVESTER_INTERRUPT_NONE;
private static Integer interruptValue = 0;
private static long minHeartbeat;
private static long maxHeartbeat;
public static boolean hasStatus(int statusToCheck) {
return status == statusToCheck;
@@ -1131,58 +1145,58 @@ public class OAIHarvester {
interruptValue = newInterruptValue;
}
public static String getStatus() {
switch(status) {
case HARVESTER_STATUS_RUNNING:
switch(interrupt) {
case HARVESTER_INTERRUPT_PAUSE: return("The scheduler is finishing active harvests before pausing. ");
case HARVESTER_INTERRUPT_STOP: return("The scheduler is shutting down. ");
}
return("The scheduler is actively harvesting collections. ");
case HARVESTER_STATUS_SLEEPING: return("The scheduler is waiting for collections to harvest. ");
case HARVESTER_STATUS_PAUSED: return("The scheduler is paused. ");
default: return("Automatic harvesting is not active. ");
}
}
public static String getStatus() {
switch(status) {
case HARVESTER_STATUS_RUNNING:
switch(interrupt) {
case HARVESTER_INTERRUPT_PAUSE: return("The scheduler is finishing active harvests before pausing. ");
case HARVESTER_INTERRUPT_STOP: return("The scheduler is shutting down. ");
}
return("The scheduler is actively harvesting collections. ");
case HARVESTER_STATUS_SLEEPING: return("The scheduler is waiting for collections to harvest. ");
case HARVESTER_STATUS_PAUSED: return("The scheduler is paused. ");
default: return("Automatic harvesting is not active. ");
}
}
public HarvestScheduler() throws SQLException, AuthorizeException {
mainContext = new Context();
String harvestAdminParam = ConfigurationManager.getProperty("harvester.eperson");
harvestAdmin = null;
if (harvestAdminParam != null && harvestAdminParam.length() > 0)
public HarvestScheduler() throws SQLException, AuthorizeException {
mainContext = new Context();
String harvestAdminParam = ConfigurationManager.getProperty("oai", "harvester.eperson");
harvestAdmin = null;
if (harvestAdminParam != null && harvestAdminParam.length() > 0)
{
harvestAdmin = EPerson.findByEmail(mainContext, harvestAdminParam);
}
harvestThreads = new Stack<HarvestThread>();
harvestThreads = new Stack<HarvestThread>();
maxActiveThreads = ConfigurationManager.getIntProperty("oai", "harvester.maxThreads");
if (maxActiveThreads == 0)
maxActiveThreads = ConfigurationManager.getIntProperty("oai", "harvester.maxThreads");
if (maxActiveThreads == 0)
{
maxActiveThreads = 3;
}
minHeartbeat = ConfigurationManager.getIntProperty("oai", "harvester.minHeartbeat") * 1000;
if (minHeartbeat == 0)
minHeartbeat = ConfigurationManager.getIntProperty("oai", "harvester.minHeartbeat") * 1000;
if (minHeartbeat == 0)
{
minHeartbeat = 30000;
}
maxHeartbeat = ConfigurationManager.getIntProperty("oai", "harvester.maxHeartbeat") * 1000;
if (maxHeartbeat == 0)
maxHeartbeat = ConfigurationManager.getIntProperty("oai", "harvester.maxHeartbeat") * 1000;
if (maxHeartbeat == 0)
{
maxHeartbeat = 3600000;
}
}
}
public void run() {
scheduleLoop();
}
public void run() {
scheduleLoop();
}
private void scheduleLoop() {
long i=0;
while(true)
{
try
{
private void scheduleLoop() {
long i=0;
while(true)
{
try
{
synchronized (HarvestScheduler.class) {
switch (interrupt)
{
@@ -1213,156 +1227,156 @@ public class OAIHarvester {
}
}
status = HARVESTER_STATUS_RUNNING;
status = HARVESTER_STATUS_RUNNING;
// Stage #1: if something is ready for harvest, push it onto the ready stack, mark it as "queued"
mainContext = new Context();
List<Integer> cids = HarvestedCollection.findReady(mainContext);
log.info("Collections ready for immediate harvest: " + cids.toString());
// Stage #1: if something is ready for harvest, push it onto the ready stack, mark it as "queued"
mainContext = new Context();
List<Integer> cids = HarvestedCollection.findReady(mainContext);
log.info("Collections ready for immediate harvest: " + cids.toString());
for (Integer cid : cids) {
addThread(cid);
}
for (Integer cid : cids) {
addThread(cid);
}
// Stage #2: start up all the threads currently in the queue up to the maximum number
while (!harvestThreads.isEmpty()) {
synchronized(HarvestScheduler.class) {
activeThreads++;
}
Thread activeThread = new Thread(harvestThreads.pop());
activeThread.start();
log.info("Thread started: " + activeThread.toString());
// Stage #2: start up all the threads currently in the queue up to the maximum number
while (!harvestThreads.isEmpty()) {
synchronized(HarvestScheduler.class) {
activeThreads++;
}
Thread activeThread = new Thread(harvestThreads.pop());
activeThread.start();
log.info("Thread started: " + activeThread.toString());
/* Wait while the number of threads running is greater than or equal to max */
while (activeThreads >= maxActiveThreads) {
/* Wait a second */
Thread.sleep(1000);
}
}
/* Wait while the number of threads running is greater than or equal to max */
while (activeThreads >= maxActiveThreads) {
/* Wait a second */
Thread.sleep(1000);
}
}
// Finally, wait for the last few remaining threads to finish
// TODO: this step might be unnecessary. Theoretically a single very long harvest process
// could then lock out all the other ones from starting on their next iteration.
// FIXME: also, this might lead to a situation when a single thread getting stuck without
// throwing an exception would shut down the whole scheduler
while (activeThreads != 0) {
/* Wait a second */
Thread.sleep(1000);
}
// Finally, wait for the last few remaining threads to finish
// TODO: this step might be unnecessary. Theoretically a single very long harvest process
// could then lock out all the other ones from starting on their next iteration.
// FIXME: also, this might lead to a situation when a single thread getting stuck without
// throwing an exception would shut down the whole scheduler
while (activeThreads != 0) {
/* Wait a second */
Thread.sleep(1000);
}
// Commit everything
try {
mainContext.commit();
mainContext.complete();
log.info("Done with iteration " + i);
} catch (SQLException e) {
e.printStackTrace();
mainContext.abort();
}
// Commit everything
try {
mainContext.commit();
mainContext.complete();
log.info("Done with iteration " + i);
} catch (SQLException e) {
e.printStackTrace();
mainContext.abort();
}
}
catch (Exception e) {
log.error("Exception on iteration: " + i);
e.printStackTrace();
}
}
catch (Exception e) {
log.error("Exception on iteration: " + i);
e.printStackTrace();
}
// Stage #3: figure out how long until the next iteration and wait
try {
Context tempContext = new Context();
int nextCollectionId = HarvestedCollection.findOldestHarvest(tempContext);
HarvestedCollection hc = HarvestedCollection.find(tempContext, nextCollectionId);
// Stage #3: figure out how long until the next iteration and wait
try {
Context tempContext = new Context();
int nextCollectionId = HarvestedCollection.findOldestHarvest(tempContext);
HarvestedCollection hc = HarvestedCollection.find(tempContext, nextCollectionId);
int harvestInterval = ConfigurationManager.getIntProperty("oai", "harvester.harvestFrequency");
if (harvestInterval == 0)
int harvestInterval = ConfigurationManager.getIntProperty("oai", "harvester.harvestFrequency");
if (harvestInterval == 0)
{
harvestInterval = 720;
}
Date nextTime;
long nextHarvest = 0;
if (hc != null) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(hc.getHarvestDate());
calendar.add(Calendar.MINUTE, harvestInterval);
nextTime = calendar.getTime();
nextHarvest = nextTime.getTime() + - new Date().getTime();
}
Date nextTime;
long nextHarvest = 0;
if (hc != null) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(hc.getHarvestDate());
calendar.add(Calendar.MINUTE, harvestInterval);
nextTime = calendar.getTime();
nextHarvest = nextTime.getTime() + - new Date().getTime();
}
long upperBound = Math.min(nextHarvest,maxHeartbeat);
long delay = Math.max(upperBound, minHeartbeat) + 1000;
long upperBound = Math.min(nextHarvest,maxHeartbeat);
long delay = Math.max(upperBound, minHeartbeat) + 1000;
tempContext.complete();
tempContext.complete();
status = HARVESTER_STATUS_SLEEPING;
synchronized(lock) {
lock.wait(delay);
}
}
catch (InterruptedException ie) {
log.warn("Interrupt: " + ie.getMessage());
}
catch (SQLException e) {
e.printStackTrace();
}
status = HARVESTER_STATUS_SLEEPING;
synchronized(lock) {
lock.wait(delay);
}
}
catch (InterruptedException ie) {
log.warn("Interrupt: " + ie.getMessage());
}
catch (SQLException e) {
e.printStackTrace();
}
i++;
}
}
i++;
}
}
/**
* Adds a thread to the ready stack. Can also be called externally to queue up a collection
* for harvesting before it is "due" for another cycle. This allows starting a harvest process
* from the UI that still "plays nice" with these thread mechanics instead of making an
* asynchronous call to runHarvest().
*/
public static void addThread(int collecionID) throws SQLException, IOException, AuthorizeException {
log.debug("****** Entered the addThread method. Active threads: " + harvestThreads.toString());
Context subContext = new Context();
//subContext.setCurrentUser(harvestAdmin);
/**
* Adds a thread to the ready stack. Can also be called externally to queue up a collection
* for harvesting before it is "due" for another cycle. This allows starting a harvest process
* from the UI that still "plays nice" with these thread mechanics instead of making an
* asynchronous call to runHarvest().
*/
public static void addThread(int collecionID) throws SQLException, IOException, AuthorizeException {
log.debug("****** Entered the addThread method. Active threads: " + harvestThreads.toString());
Context subContext = new Context();
subContext.setCurrentUser(harvestAdmin);
HarvestedCollection hc = HarvestedCollection.find(subContext, collecionID);
hc.setHarvestStatus(HarvestedCollection.STATUS_QUEUED);
hc.update();
subContext.commit();
HarvestedCollection hc = HarvestedCollection.find(subContext, collecionID);
hc.setHarvestStatus(HarvestedCollection.STATUS_QUEUED);
hc.update();
subContext.commit();
HarvestThread ht = new HarvestThread(subContext, hc);
harvestThreads.push(ht);
HarvestThread ht = new HarvestThread(subContext, hc);
harvestThreads.push(ht);
log.debug("****** Queued up a thread. Active threads: " + harvestThreads.toString());
log.info("Thread queued up: " + ht.toString());
}
log.debug("****** Queued up a thread. Active threads: " + harvestThreads.toString());
log.info("Thread queued up: " + ht.toString());
}
}
}
/**
* A harvester thread used to execute a single harvest cycle on a collection
* @author alexey
*/
private static class HarvestThread extends Thread {
Context context;
HarvestedCollection hc;
/**
* A harvester thread used to execute a single harvest cycle on a collection
* @author alexey
*/
private static class HarvestThread extends Thread {
Context context;
HarvestedCollection hc;
HarvestThread(Context context, HarvestedCollection hc) throws SQLException {
this.context = context;
this.hc = hc;
}
HarvestThread(Context context, HarvestedCollection hc) throws SQLException {
this.context = context;
this.hc = hc;
}
public void run() {
log.info("Thread for collection " + hc.getCollectionId() + " starts.");
runHarvest();
}
public void run() {
log.info("Thread for collection " + hc.getCollectionId() + " starts.");
runHarvest();
}
private void runHarvest()
{
Collection dso = null;
try {
dso = Collection.find(context, hc.getCollectionId());
OAIHarvester harvester = new OAIHarvester(context, dso, hc);
harvester.runHarvest();
}
private void runHarvest()
{
Collection dso = null;
try {
dso = Collection.find(context, hc.getCollectionId());
OAIHarvester harvester = new OAIHarvester(context, dso, hc);
harvester.runHarvest();
}
catch (RuntimeException e) {
log.error("Runtime exception in thread: " + this.toString());
log.error(e.getMessage() + " " + e.getCause());
@@ -1370,35 +1384,34 @@ public class OAIHarvester {
hc.setHarvestStatus(HarvestedCollection.STATUS_UNKNOWN_ERROR);
}
catch (Exception ex) {
log.error("General exception in thread: " + this.toString());
log.error(ex.getMessage() + " " + ex.getCause());
hc.setHarvestMessage("Error occured while generating an OAI response");
hc.setHarvestStatus(HarvestedCollection.STATUS_UNKNOWN_ERROR);
}
finally
{
try {
hc.update();
context.restoreAuthSystemState();
context.complete();
}
log.error("General exception in thread: " + this.toString());
log.error(ex.getMessage() + " " + ex.getCause());
hc.setHarvestMessage("Error occured while generating an OAI response");
hc.setHarvestStatus(HarvestedCollection.STATUS_UNKNOWN_ERROR);
}
finally
{
try {
hc.update();
context.restoreAuthSystemState();
context.complete();
}
catch (RuntimeException e) {
log.error("Unexpected exception while recovering from a harvesting error: " + e.getMessage(), e);
context.abort();
}
catch (Exception e) {
log.error("Unexpected exception while recovering from a harvesting error: " + e.getMessage(), e);
context.abort();
}
catch (Exception e) {
log.error("Unexpected exception while recovering from a harvesting error: " + e.getMessage(), e);
context.abort();
}
synchronized (HarvestScheduler.class) {
HarvestScheduler.activeThreads--;
}
}
synchronized (HarvestScheduler.class) {
HarvestScheduler.activeThreads--;
}
}
log.info("Thread for collection " + hc.getCollectionId() + " completes.");
}
}
log.info("Thread for collection " + hc.getCollectionId() + " completes.");
}
}
}