mirror of
https://github.com/DSpace/DSpace.git
synced 2025-10-17 06:53:09 +00:00
DS-4440 GDPR - Anonymize statistics feature - remove shard support
This commit is contained in:
@@ -14,7 +14,6 @@ import static java.util.Arrays.asList;
|
|||||||
import static java.util.Calendar.DAY_OF_YEAR;
|
import static java.util.Calendar.DAY_OF_YEAR;
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
import static org.apache.commons.cli.Option.builder;
|
import static org.apache.commons.cli.Option.builder;
|
||||||
import static org.apache.commons.lang.StringUtils.isNotBlank;
|
|
||||||
import static org.apache.commons.lang.time.DateFormatUtils.format;
|
import static org.apache.commons.lang.time.DateFormatUtils.format;
|
||||||
import static org.apache.log4j.Logger.getLogger;
|
import static org.apache.log4j.Logger.getLogger;
|
||||||
import static org.dspace.core.LogManager.getHeader;
|
import static org.dspace.core.LogManager.getHeader;
|
||||||
@@ -25,8 +24,6 @@ import java.util.ArrayList;
|
|||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
@@ -197,31 +194,18 @@ public class AnonymizeStatistics {
|
|||||||
|
|
||||||
// list of the processing callables to execute
|
// list of the processing callables to execute
|
||||||
Collection<DoProcessing> callables = new ArrayList<>();
|
Collection<DoProcessing> callables = new ArrayList<>();
|
||||||
// list of the shards to commit
|
|
||||||
Set<String> shards = new HashSet<>();
|
|
||||||
|
|
||||||
for (SolrDocument document : documents.getResults()) {
|
for (SolrDocument document : documents.getResults()) {
|
||||||
updated++;
|
updated++;
|
||||||
|
|
||||||
callables.add(new DoProcessing(document, updated));
|
callables.add(new DoProcessing(document, updated));
|
||||||
String shard = (String) document.getFieldValue("[shard]");
|
|
||||||
|
|
||||||
if (isNotBlank(shard)) {
|
|
||||||
shards.add(shard);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// execute the processing callables
|
// execute the processing callables
|
||||||
executorService.invokeAll(callables);
|
executorService.invokeAll(callables);
|
||||||
|
|
||||||
// Commit the main core
|
// Commit the solr core
|
||||||
solrLoggerService.commit();
|
solrLoggerService.commit();
|
||||||
|
|
||||||
// Commit all relevant solr shards
|
|
||||||
for (String shard : shards) {
|
|
||||||
solrLoggerService.commitShard(shard);
|
|
||||||
}
|
|
||||||
|
|
||||||
System.out.println("processed " + updated + " records");
|
System.out.println("processed " + updated + " records");
|
||||||
} while (documents.getResults().getNumFound() > 0);
|
} while (documents.getResults().getNumFound() > 0);
|
||||||
|
|
||||||
@@ -231,7 +215,6 @@ public class AnonymizeStatistics {
|
|||||||
} else {
|
} else {
|
||||||
printWarning("not all relevant documents were updated, check the DSpace logs for more details");
|
printWarning("not all relevant documents were updated, check the DSpace logs for more details");
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
printError(e);
|
printError(e);
|
||||||
}
|
}
|
||||||
@@ -259,7 +242,7 @@ public class AnonymizeStatistics {
|
|||||||
"ip:*",
|
"ip:*",
|
||||||
"time:[* TO " + TIME_LIMIT + "] AND -dns:" + DNS_MASK,
|
"time:[* TO " + TIME_LIMIT + "] AND -dns:" + DNS_MASK,
|
||||||
null, batchSize, -1, null, null, null, null,
|
null, batchSize, -1, null, null, null, null,
|
||||||
null, false, -1, false, true
|
null, false, -1, false
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -7,8 +7,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.dspace.statistics;
|
package org.dspace.statistics;
|
||||||
|
|
||||||
import static org.apache.commons.lang.StringUtils.substringAfterLast;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
@@ -129,7 +127,6 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
protected boolean useProxies;
|
protected boolean useProxies;
|
||||||
|
|
||||||
private static final List<String> statisticYearCores = new ArrayList<>();
|
private static final List<String> statisticYearCores = new ArrayList<>();
|
||||||
private static final Map<String, HttpSolrClient> statisticYearCoreServers = new HashMap<>();
|
|
||||||
private static boolean statisticYearCoresInit = false;
|
private static boolean statisticYearCoresInit = false;
|
||||||
|
|
||||||
private static final String IP_V4_REGEX = "^((?:\\d{1,3}\\.){3})\\d{1,3}$";
|
private static final String IP_V4_REGEX = "^((?:\\d{1,3}\\.){3})\\d{1,3}$";
|
||||||
@@ -658,7 +655,6 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
Map<String, String> params = new HashMap<>();
|
Map<String, String> params = new HashMap<>();
|
||||||
params.put("q", query);
|
params.put("q", query);
|
||||||
params.put("rows", "10");
|
params.put("rows", "10");
|
||||||
params.put("fl","[shard],*");
|
|
||||||
if (0 < statisticYearCores.size()) {
|
if (0 < statisticYearCores.size()) {
|
||||||
params.put(ShardParams.SHARDS, StringUtils.join(statisticYearCores.iterator(), ','));
|
params.put(ShardParams.SHARDS, StringUtils.join(statisticYearCores.iterator(), ','));
|
||||||
}
|
}
|
||||||
@@ -828,12 +824,8 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
for (int i = 0; i < docsToUpdate.size(); i++) {
|
for (int i = 0; i < docsToUpdate.size(); i++) {
|
||||||
SolrInputDocument solrDocument = docsToUpdate.get(i);
|
SolrInputDocument solrDocument = docsToUpdate.get(i);
|
||||||
|
|
||||||
// Get the relevant shard client
|
// Delete the document from the solr client
|
||||||
// For a non-sharded core, the shard variable will reference the main core
|
solr.deleteByQuery("uid:" + solrDocument.getFieldValue("uid"));
|
||||||
HttpSolrClient shard = getSolrServer(solrDocument.getFieldValue("[shard]").toString());
|
|
||||||
|
|
||||||
// Delete the document from the shard client
|
|
||||||
shard.deleteByQuery("uid:" + solrDocument.getFieldValue("uid"));
|
|
||||||
|
|
||||||
// Now loop over our fieldname actions
|
// Now loop over our fieldname actions
|
||||||
for (int j = 0; j < fieldNames.size(); j++) {
|
for (int j = 0; j < fieldNames.size(); j++) {
|
||||||
@@ -864,15 +856,11 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
|
|
||||||
// see https://stackoverflow.com/questions/26941260/normalizing-solr-records-for-sharding-version-issues
|
// see https://stackoverflow.com/questions/26941260/normalizing-solr-records-for-sharding-version-issues
|
||||||
solrDocument.removeField("_version_");
|
solrDocument.removeField("_version_");
|
||||||
// this field will not work with a non-sharded core
|
|
||||||
solrDocument.removeField("[shard]");
|
|
||||||
|
|
||||||
// Add the updated document to the shard client
|
solr.add(solrDocument);
|
||||||
shard.add(solrDocument);
|
|
||||||
|
|
||||||
if (commit) {
|
if (commit) {
|
||||||
shard.commit();
|
commit();
|
||||||
solr.commit();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// System.out.println("SolrLogger.update(\""+query+"\"):"+(new
|
// System.out.println("SolrLogger.update(\""+query+"\"):"+(new
|
||||||
@@ -1040,16 +1028,6 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
String dateStart, String dateEnd, List<String> facetQueries, String sort,
|
String dateStart, String dateEnd, List<String> facetQueries, String sort,
|
||||||
boolean ascending, int facetMinCount, boolean defaultFilterQueries)
|
boolean ascending, int facetMinCount, boolean defaultFilterQueries)
|
||||||
throws SolrServerException, IOException {
|
throws SolrServerException, IOException {
|
||||||
return query(query, filterQuery, facetField, rows, max, dateType, dateStart, dateEnd, facetQueries, sort,
|
|
||||||
ascending, facetMinCount, defaultFilterQueries, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public QueryResponse query(String query, String filterQuery, String facetField, int rows, int max, String dateType,
|
|
||||||
String dateStart, String dateEnd, List<String> facetQueries, String sort,
|
|
||||||
boolean ascending, int facetMinCount, boolean defaultFilterQueries,
|
|
||||||
boolean includeShardField)
|
|
||||||
throws SolrServerException, IOException {
|
|
||||||
|
|
||||||
if (solr == null) {
|
if (solr == null) {
|
||||||
return null;
|
return null;
|
||||||
@@ -1060,10 +1038,6 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
.setFacetMinCount(facetMinCount);
|
.setFacetMinCount(facetMinCount);
|
||||||
addAdditionalSolrYearCores(solrQuery);
|
addAdditionalSolrYearCores(solrQuery);
|
||||||
|
|
||||||
if (includeShardField) {
|
|
||||||
solrQuery.setParam("fl", "[shard],*");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the date facet if present
|
// Set the date facet if present
|
||||||
if (dateType != null) {
|
if (dateType != null) {
|
||||||
solrQuery.setParam("facet.range", "time")
|
solrQuery.setParam("facet.range", "time")
|
||||||
@@ -1344,7 +1318,6 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
SolrPingResponse ping = returnServer.ping();
|
SolrPingResponse ping = returnServer.ping();
|
||||||
log.debug("Ping of Solr Core {} returned with Status {}",
|
log.debug("Ping of Solr Core {} returned with Status {}",
|
||||||
coreName, ping.getStatus());
|
coreName, ping.getStatus());
|
||||||
statisticYearCoreServers.put(coreName, returnServer);
|
|
||||||
return returnServer;
|
return returnServer;
|
||||||
} catch (IOException | RemoteSolrException | SolrServerException e) {
|
} catch (IOException | RemoteSolrException | SolrServerException e) {
|
||||||
log.debug("Ping of Solr Core {} failed with {}. New Core Will be Created",
|
log.debug("Ping of Solr Core {} failed with {}. New Core Will be Created",
|
||||||
@@ -1566,19 +1539,10 @@ public class SolrLoggerServiceImpl implements SolrLoggerService, InitializingBea
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void commit() throws Exception {
|
public void commit() throws IOException, SolrServerException {
|
||||||
solr.commit();
|
solr.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void commitShard(String shard) throws IOException, SolrServerException {
|
|
||||||
getSolrServer(shard).commit();
|
|
||||||
}
|
|
||||||
|
|
||||||
private HttpSolrClient getSolrServer(String shard) {
|
|
||||||
return statisticYearCoreServers.get(substringAfterLast(shard, "/"));
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void addDocumentsToFile(Context context, SolrDocumentList docs, File exportOutput)
|
protected void addDocumentsToFile(Context context, SolrDocumentList docs, File exportOutput)
|
||||||
throws SQLException, ParseException, IOException {
|
throws SQLException, ParseException, IOException {
|
||||||
for (SolrDocument doc : docs) {
|
for (SolrDocument doc : docs) {
|
||||||
|
@@ -245,36 +245,6 @@ public interface SolrLoggerService {
|
|||||||
int facetMinCount, boolean defaultFilterQueries)
|
int facetMinCount, boolean defaultFilterQueries)
|
||||||
throws SolrServerException, IOException;
|
throws SolrServerException, IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform a solr query.
|
|
||||||
*
|
|
||||||
* @param query the query to be used
|
|
||||||
* @param filterQuery filter query
|
|
||||||
* @param facetField field to facet the results by
|
|
||||||
* @param rows the max number of results to return
|
|
||||||
* @param max the max number of facets to return
|
|
||||||
* @param dateType the type to be used (example: DAY, MONTH, YEAR)
|
|
||||||
* @param dateStart the start date Format:(-3, -2, ..) the date is calculated
|
|
||||||
* relatively on today
|
|
||||||
* @param dateEnd the end date stop Format (-2, +1, ..) the date is calculated
|
|
||||||
* relatively on today
|
|
||||||
* @param facetQueries list of facet queries
|
|
||||||
* @param sort the sort field
|
|
||||||
* @param ascending the sort direction (true: ascending)
|
|
||||||
* @param facetMinCount Minimum count of results facet must have to return a result
|
|
||||||
* @param defaultFilterQueries
|
|
||||||
* use the default filter queries
|
|
||||||
* @param includeShardField
|
|
||||||
* include the shard field in the result documents
|
|
||||||
* @throws SolrServerException Exception from the Solr server to the solrj Java client.
|
|
||||||
* @throws java.io.IOException passed through.
|
|
||||||
*/
|
|
||||||
public QueryResponse query(String query, String filterQuery,
|
|
||||||
String facetField, int rows, int max, String dateType, String dateStart,
|
|
||||||
String dateEnd, List<String> facetQueries, String sort, boolean ascending,
|
|
||||||
int facetMinCount, boolean defaultFilterQueries, boolean includeShardField)
|
|
||||||
throws SolrServerException, IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns in a filterQuery string all the ip addresses that should be ignored
|
* Returns in a filterQuery string all the ip addresses that should be ignored
|
||||||
*
|
*
|
||||||
@@ -303,14 +273,7 @@ public interface SolrLoggerService {
|
|||||||
/**
|
/**
|
||||||
* Commit the solr core.
|
* Commit the solr core.
|
||||||
*/
|
*/
|
||||||
public void commit() throws Exception;
|
public void commit() throws IOException, SolrServerException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Commit a solr shard.
|
|
||||||
* @param shard
|
|
||||||
* The shard to commit.
|
|
||||||
*/
|
|
||||||
public void commitShard(String shard) throws Exception;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Anonymize a given ip
|
* Anonymize a given ip
|
||||||
|
Reference in New Issue
Block a user