Project

General

Profile

1
package eu.dnetlib.data.actionmanager.blackboard;
2

    
3
import java.util.Date;
4

    
5
import com.google.common.collect.Iterables;
6
import eu.dnetlib.data.actionmanager.is.ISClient;
7

    
8
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
9
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
10
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
12
import eu.dnetlib.rmi.data.hadoop.ClusterName;
13
import eu.dnetlib.rmi.data.hadoop.HadoopService;
14
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerSet;
15
import eu.dnetlib.rmi.data.hadoop.actionmanager.RawSet;
16
import eu.dnetlib.rmi.enabling.ISRegistryService;
17
import org.apache.commons.lang3.time.DateUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.springframework.beans.factory.annotation.Autowired;
21

    
22
public class GarbageActionManagerActionFromHDFS extends AbstractActionManagerAction implements BlackboardServerAction<ActionManagerActions> {
23

    
24
	private static final Log log = LogFactory.getLog(GarbageActionManagerActionFromHDFS.class);
25
	private final static String UPDATE_ACTION_PROFILE = "for $x in "
26
			+ "/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'ActionManagerSetDSResourceType' and .//SET/@id = '%s'] "
27
			+ "return update delete $x//RAW_SETS/EXPIRED[@id = '%s']";
28
	@Autowired
29
	private UniqueServiceLocator serviceLocator;
30
	@Autowired
31
	private ISClient isClient;
32

    
33
	@Override
34
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) {
35
		try {
36
			final String basePath = isClient.getBasePathHDFS();
37
			final Integer garbageRetain = Integer.valueOf(isClient.getGarbageRetainThreshold());
38
			final Long dateLimit =  new Date().getTime() - Long.valueOf(isClient.getGarbageTimeMargin());
39

    
40
			final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
41
			final ISRegistryService isRegistry = serviceLocator.getService(ISRegistryService.class);
42

    
43
			for (ActionManagerSet set : isClient.listValidSets()) {
44
				for (RawSet expired : Iterables.limit(set.getExpired(), garbageRetain)) {
45
					final Date d = DateUtils.parseDate(expired.getLastUpdate(), ActionManagerSet.DATE_PATTERNS);
46
					if (d.getTime() < dateLimit) {
47

    
48
						log.info(String.format("removing raw action set %s/%s", set.getId(), expired.getId()));
49
						hadoopService.deleteHdfsPath(ClusterName.DM.toString(), basePath + "/" + set.getDirectory() + "/" + expired.getId());
50

    
51
						final String xUpdate = String.format(UPDATE_ACTION_PROFILE, set.getId(), expired.getId());
52
						log.info(String.format("updating ActionSet profile: %s", xUpdate));
53
						isRegistry.executeXUpdate(xUpdate);
54
					}
55
				}
56
			}
57

    
58
			handler.done(job);
59
		} catch (Throwable e) {
60
			handler.failed(job, e);
61
		}
62
	}
63
}
(6-6/7)