1 |
26600
|
sandro.lab
|
package eu.dnetlib.actionmanager.blackboard;
|
2 |
|
|
|
3 |
42243
|
claudio.at
|
import java.util.Date;
|
4 |
40089
|
claudio.at
|
|
5 |
42243
|
claudio.at
|
import com.google.common.collect.Iterables;
|
6 |
|
|
import eu.dnetlib.actionmanager.is.ISClient;
|
7 |
|
|
import eu.dnetlib.actionmanager.set.ActionManagerSet;
|
8 |
|
|
import eu.dnetlib.actionmanager.set.RawSet;
|
9 |
|
|
import eu.dnetlib.data.hadoop.config.ClusterName;
|
10 |
|
|
import eu.dnetlib.data.hadoop.rmi.HadoopService;
|
11 |
|
|
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
|
12 |
|
|
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
|
13 |
26600
|
sandro.lab
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
|
14 |
|
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
|
15 |
|
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
|
16 |
42243
|
claudio.at
|
import eu.dnetlib.miscutils.datetime.DateUtils;
|
17 |
|
|
import org.apache.commons.logging.Log;
|
18 |
|
|
import org.apache.commons.logging.LogFactory;
|
19 |
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
20 |
26600
|
sandro.lab
|
|
21 |
42243
|
claudio.at
|
public class GarbageActionManagerActionFromHDFS extends AbstractActionManagerAction implements BlackboardServerAction<ActionManagerActions> {
|
22 |
26600
|
sandro.lab
|
|
23 |
42243
|
claudio.at
|
private static final Log log = LogFactory.getLog(GarbageActionManagerActionFromHDFS.class);
|
24 |
|
|
private final static String UPDATE_ACTION_PROFILE = "for $x in "
|
25 |
|
|
+ "/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'ActionManagerSetDSResourceType' and .//SET/@id = '%s'] "
|
26 |
|
|
+ "return update delete $x//RAW_SETS/EXPIRED[@id = '%s']";
|
27 |
|
|
@Autowired
|
28 |
|
|
private UniqueServiceLocator serviceLocator;
|
29 |
|
|
@Autowired
|
30 |
|
|
private ISClient isClient;
|
31 |
|
|
|
32 |
26600
|
sandro.lab
|
@Override
|
33 |
|
|
public void execute(final BlackboardServerHandler handler, final BlackboardJob job) {
|
34 |
|
|
try {
|
35 |
42243
|
claudio.at
|
final String basePath = isClient.getBasePathHDFS();
|
36 |
|
|
final Integer garbageRetain = Integer.valueOf(isClient.getGarbageRetainThreshold());
|
37 |
|
|
final Long dateLimit = DateUtils.now() - Long.valueOf(isClient.getGarbageTimeMargin());
|
38 |
26600
|
sandro.lab
|
|
39 |
42243
|
claudio.at
|
final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
|
40 |
|
|
final ISRegistryService isRegistry = serviceLocator.getService(ISRegistryService.class);
|
41 |
|
|
|
42 |
|
|
for (ActionManagerSet set : isClient.listValidSets()) {
|
43 |
|
|
for (RawSet expired : Iterables.limit(set.getExpired(), garbageRetain)) {
|
44 |
|
|
final Date d = org.apache.commons.lang.time.DateUtils.parseDate(expired.getLastUpdate(), ActionManagerSet.DATE_PATTERNS);
|
45 |
|
|
if (d.getTime() < dateLimit) {
|
46 |
|
|
|
47 |
|
|
log.info(String.format("removing raw action set %s/%s", set.getId(), expired.getId()));
|
48 |
|
|
hadoopService.deleteHdfsPath(ClusterName.DM.toString(), basePath + "/" + set.getDirectory() + "/" + expired.getId());
|
49 |
|
|
|
50 |
|
|
final String xUpdate = String.format(UPDATE_ACTION_PROFILE, set.getId(), expired.getId());
|
51 |
|
|
log.info(String.format("updating ActionSet profile: %s", xUpdate));
|
52 |
|
|
isRegistry.executeXUpdate(xUpdate);
|
53 |
|
|
}
|
54 |
26600
|
sandro.lab
|
}
|
55 |
42243
|
claudio.at
|
}
|
56 |
26600
|
sandro.lab
|
|
57 |
42243
|
claudio.at
|
handler.done(job);
|
58 |
|
|
} catch (Throwable e) {
|
59 |
26600
|
sandro.lab
|
handler.failed(job, e);
|
60 |
|
|
}
|
61 |
|
|
}
|
62 |
|
|
}
|