Project

General

Profile

1
package eu.dnetlib.msro.workflows.hadoop;
2

    
3
import java.util.List;
4

    
5
import javax.annotation.Resource;
6

    
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Value;
10

    
11
import com.google.common.base.Splitter;
12
import com.google.common.collect.Lists;
13
import com.google.gson.Gson;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

    
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
19
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
21

    
22
/**
23
 * This jobNode resolves the original datasource ids to the list of md store ids holding the relative cleaned records.
24
 *
25
 * @author claudio
26
 *
27
 */
28
public class MDStoreDatasourceResolverJobNode extends SimpleJobNode {
29

    
30
	private static final Log log = LogFactory.getLog(MDStoreDatasourceResolverJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
31

    
32
	private String originalDatasourceIdsCSV;
33

    
34
	@Value("${dnet.openaire.dataload.datasource.mdstores.xquery}")
35
	private String xqueryTemplate;
36

    
37
	@Resource
38
	private UniqueServiceLocator serviceLocator;
39

    
40
	@Override
41
	public String execute(final NodeToken token) throws Exception {
42
		log.info("resolving MDStore ids for datasources: " + getOriginalDatasourceIdsCSV());
43

    
44
		final List<String> mdIds = Lists.newArrayList();
45
		for (final String originalId : splitter().split(getOriginalDatasourceIdsCSV())) {
46
			mdIds.addAll(resolveMdIds(originalId));
47
		}
48

    
49
		log.info(String.format("adding %s mdStore ids in wf env", mdIds.size()));
50

    
51
		token.getEnv().setAttribute("mdId", new Gson().toJson(mdIds));
52

    
53
		return Arc.DEFAULT_ARC;
54
	}
55

    
56
	// @Cacheable(value = "mdIds", key = "#acronym")
57
	public List<String> resolveMdIds(final String id) throws ISLookUpException {
58
		log.info("Resolving mdID for " + id + ". Cache not used.");
59
		final String xQuery = String.format(xqueryTemplate, id);
60
		final List<String> mdIds = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xQuery);
61
		return mdIds;
62
	}
63

    
64
	private Splitter splitter() {
65
		return Splitter.on(",").trimResults().omitEmptyStrings();
66
	}
67

    
68
	public String getOriginalDatasourceIdsCSV() {
69
		return originalDatasourceIdsCSV;
70
	}
71

    
72
	public void setOriginalDatasourceIdsCSV(final String originalDatasourceIdsCSV) {
73
		this.originalDatasourceIdsCSV = originalDatasourceIdsCSV;
74
	}
75

    
76
}
(7-7/12)