Project

General

Profile

1
package eu.dnetlib.msro.puma.nodes;
2

    
3
import java.io.BufferedReader;
4
import java.io.FileReader;
5
import javax.annotation.Resource;
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7

    
8
import com.google.common.collect.BiMap;
9
import com.google.common.collect.HashBiMap;
10
import com.googlecode.sarasvati.Arc;
11
import com.googlecode.sarasvati.NodeToken;
12
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
13
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
14
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
15
import eu.dnetlib.msro.puma.objectstore.PUMAObjectStore;
16
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.springframework.beans.factory.annotation.Autowired;
20

    
21
/**
22
 * Created by sandro on 2/3/16.
23
 */
24
public class FeedEnrichedRecordsJobNode extends SimpleJobNode {
25

    
26
	private static final Log log = LogFactory.getLog(FeedEnrichedRecordsJobNode.class);
27

    
28
	@Autowired
29
	private PUMAObjectStore pumaObjectStore;
30

    
31
	private String inputEprParam;
32

    
33
	private String deduplicatedRecordCSV;
34

    
35
	/**
36
	 * The result set client factory.
37
	 */
38
	@Autowired
39
	private ResultSetClientFactory resultSetClientFactory;
40

    
41
	/**
42
	 * The result set factory.
43
	 */
44
	@Resource(name = "iterableResultSetFactory")
45
	private IterableResultSetFactory resultSetFactory;
46

    
47
	@Override
48
	protected String execute(NodeToken nodeToken) throws Exception {
49

    
50
		BiMap<String, String> deduplicatedRecord = createDeduplicatedMap();
51
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(nodeToken.getEnv().getAttribute(getInputEprParam()));
52
		final Iterable<String> inputRecord = resultSetClientFactory.getClient(inputEpr);
53
		pumaObjectStore.initialize();
54
		pumaObjectStore.feedStore(inputRecord, deduplicatedRecord);
55
		return Arc.DEFAULT_ARC;
56
	}
57

    
58
	public String getInputEprParam() {
59
		return inputEprParam;
60
	}
61

    
62
	public void setInputEprParam(String inputEprParam) {
63
		this.inputEprParam = inputEprParam;
64
	}
65

    
66
	public String getDeduplicatedRecordCSV() {
67
		return deduplicatedRecordCSV;
68
	}
69

    
70
	public void setDeduplicatedRecordCSV(String deduplicatedRecordCSV) {
71
		this.deduplicatedRecordCSV = deduplicatedRecordCSV;
72
	}
73

    
74
	private BiMap<String, String> createDeduplicatedMap() throws Exception {
75
		BufferedReader br = null;
76
		String sCurrentLine = null;
77
		BiMap<String, String> map = HashBiMap.create();
78
		br = new BufferedReader(new FileReader(getDeduplicatedRecordCSV()));
79
		while ((sCurrentLine = br.readLine()) != null) {
80
			String[] data = sCurrentLine.trim().split(",");
81
			if (data != null && data.length == 2 && !map.containsValue(data[1]))
82
				map.put(data[0], data[1]);
83
			else
84
				log.error("Unexpected values on creating map");
85
		}
86
		return map;
87

    
88
	}
89

    
90
}
    (1-1/1)