Project

General

Profile

1
package eu.dnetlib.actionmanager.blackboard;
2

    
3
import java.util.Date;
4

    
5
import com.google.common.collect.Iterables;
6
import eu.dnetlib.actionmanager.is.ISClient;
7
import eu.dnetlib.actionmanager.set.ActionManagerSet;
8
import eu.dnetlib.actionmanager.set.RawSet;
9
import eu.dnetlib.data.hadoop.config.ClusterName;
10
import eu.dnetlib.data.hadoop.rmi.HadoopService;
11
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
16
import eu.dnetlib.miscutils.datetime.DateUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.springframework.beans.factory.annotation.Autowired;
20

    
21
public class GarbageActionManagerActionFromHDFS extends AbstractActionManagerAction implements BlackboardServerAction<ActionManagerActions> {
22

    
23
	private static final Log log = LogFactory.getLog(GarbageActionManagerActionFromHDFS.class);
24
	private final static String UPDATE_ACTION_PROFILE = "for $x in "
25
			+ "/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'ActionManagerSetDSResourceType' and .//SET/@id = '%s'] "
26
			+ "return update delete $x//RAW_SETS/EXPIRED[@id = '%s']";
27
	@Autowired
28
	private UniqueServiceLocator serviceLocator;
29
	@Autowired
30
	private ISClient isClient;
31

    
32
	@Override
33
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) {
34
		try {
35
			final String basePath = isClient.getBasePathHDFS();
36
			final Integer garbageRetain = Integer.valueOf(isClient.getGarbageRetainThreshold());
37
			final Long dateLimit = DateUtils.now() - Long.valueOf(isClient.getGarbageTimeMargin());
38

    
39
			final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
40
			final ISRegistryService isRegistry = serviceLocator.getService(ISRegistryService.class);
41

    
42
			for (ActionManagerSet set : isClient.listValidSets()) {
43
				for (RawSet expired : Iterables.limit(set.getExpired(), garbageRetain)) {
44
					final Date d = org.apache.commons.lang.time.DateUtils.parseDate(expired.getLastUpdate(), ActionManagerSet.DATE_PATTERNS);
45
					if (d.getTime() < dateLimit) {
46

    
47
						log.info(String.format("removing raw action set %s/%s", set.getId(), expired.getId()));
48
						hadoopService.deleteHdfsPath(ClusterName.DM.toString(), basePath + "/" + set.getDirectory() + "/" + expired.getId());
49

    
50
						final String xUpdate = String.format(UPDATE_ACTION_PROFILE, set.getId(), expired.getId());
51
						log.info(String.format("updating ActionSet profile: %s", xUpdate));
52
						isRegistry.executeXUpdate(xUpdate);
53
					}
54
				}
55
			}
56

    
57
			handler.done(job);
58
		} catch (Throwable e) {
59
			handler.failed(job, e);
60
		}
61
	}
62
}
(5-5/7)