Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.actions;
2

    
3
import java.util.List;
4
import java.util.Map;
5

    
6
import com.google.common.base.Predicate;
7
import com.google.common.collect.Maps;
8
import com.google.gson.Gson;
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.NodeToken;
11
import eu.dnetlib.actionmanager.set.RawSet;
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
14
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
15
import eu.dnetlib.miscutils.datetime.DateUtils;
16
import eu.dnetlib.msro.rmi.MSROException;
17
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.apache.hadoop.fs.Path;
21
import org.springframework.beans.factory.annotation.Autowired;
22

    
23
public class PrepareActionSetsJobNode extends SimpleJobNode {
24

    
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(PrepareActionSetsJobNode.class);
29

    
30
	private String ACTIONSET_PATH_XQUERY_TEMPLATE =
31
			"for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') "
32
					+ "return $x/RESOURCE_PROFILE/BODY/SET[@id = '%s']/@directory/string()";
33

    
34
	@Autowired
35
	private UniqueServiceLocator serviceLocator;
36

    
37
	private String sets;
38

    
39
	@Override
40
	protected String execute(final NodeToken token) throws Exception {
41

    
42
		final List<Map<String, String>> setList = getSetList();
43
		final String now = DateUtils.now_ISO8601();
44

    
45
		final ISLookUpService lookUpService = serviceLocator.getService(ISLookUpService.class);
46
		final String basePath = lookUpService.getResourceProfileByQuery(
47
				"/RESOURCE_PROFILE[./HEADER/RESOURCE_TYPE/@value='ActionManagerServiceResourceType']//SERVICE_PROPERTIES/PROPERTY[@key='basePath']/@value/string()");
48

    
49
		for (Map<String, String> set : setList) {
50

    
51
			set.put("rawset", RawSet.newInstance().getId());
52
			set.put("creationDate", now);
53
			set.put("path", getFullPath(basePath, set, lookUpService));
54

    
55
			if (set.get("enabled").equals("true")) {
56
				log.info("preparing set: " + simplifySetInfo(set));
57
			}
58
			// setting the job properties needed to name the rawsets
59
			token.getEnv().setAttribute(set.get("jobProperty"), set.get("rawset"));
60
		}
61

    
62
		token.getEnv().setAttribute("sets", new Gson().toJson(setList));
63
		token.getEnv().setAttribute("actionManagerBasePath", basePath);
64

    
65
		return Arc.DEFAULT_ARC;
66
	}
67

    
68
	private String getFullPath(final String basePath, final Map<String, String> set, final ISLookUpService lookUpService) throws MSROException {
69
		return new Path(basePath + "/" + getPath(set.get("set"), lookUpService) + "/" + set.get("rawset")).toString();
70
	}
71

    
72
	private String getPath(final String setId, final ISLookUpService lookUpService) throws MSROException {
73
		try {
74
			return lookUpService.getResourceProfileByQuery(String.format(ACTIONSET_PATH_XQUERY_TEMPLATE, setId));
75
		} catch (ISLookUpException e) {
76
			throw new MSROException(String.format("Error obtaining directory from ActionSet profile %s", setId), e);
77
		}
78
	}
79

    
80
	private Map<String, String> simplifySetInfo(final Map<String, String> set) {
81
		return Maps.filterKeys(set, new Predicate<String>() {
82

    
83
			@Override
84
			public boolean apply(final String k) {
85

    
86
				return k.equals("set") || k.equals("rawset");
87
			}
88
		});
89
	}
90

    
91
	@SuppressWarnings("unchecked")
92
	protected List<Map<String, String>> getSetList() {
93
		return new Gson().fromJson(getSets(), List.class);
94
	}
95

    
96
	public String getSets() {
97
		return sets;
98
	}
99

    
100
	public void setSets(final String sets) {
101
		this.sets = sets;
102
	}
103

    
104
}
(5-5/8)