Project

General

Profile

1
package eu.dnetlib.msro.workflows.dedup;
2

    
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.List;
6
import java.util.Queue;
7

    
8
import javax.annotation.Resource;
9
import javax.xml.ws.wsaddressing.W3CEndpointReference;
10

    
11
import org.antlr.stringtemplate.StringTemplate;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.Document;
15
import org.dom4j.DocumentException;
16
import org.dom4j.Node;
17
import org.dom4j.io.SAXReader;
18
import org.springframework.beans.factory.annotation.Required;
19

    
20
import com.google.common.collect.Lists;
21
import com.google.common.collect.Queues;
22
import com.googlecode.sarasvati.Arc;
23
import com.googlecode.sarasvati.NodeToken;
24

    
25
import eu.dnetlib.data.proto.TypeProtos.Type;
26
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
27
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
28
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity;
29
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder;
30
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
31

    
32
public class BuildSimilarityMeshJobNode extends AsyncJobNode {
33

    
34
	private static final Log log = LogFactory.getLog(BuildSimilarityMeshJobNode.class);
35

    
36
	/** The result set factory. */
37
	@Resource(name = "iterableResultSetFactory")
38
	private IterableResultSetFactory resultSetFactory;
39

    
40
	/** The result set client factory. */
41
	@Resource(name = "resultSetClientFactory")
42
	private ResultSetClientFactory resultSetClientFactory;
43

    
44
	private StringTemplate similarity;
45

    
46
	private String inputEprParam;
47

    
48
	private String outputEprParam;
49

    
50
	@Override
51
	protected String execute(final NodeToken token) throws Exception {
52

    
53
		final String inputEpr = token.getEnv().getAttribute(getInputEprParam());
54

    
55
		final Iterator<String> rsClient = resultSetClientFactory.getClient(inputEpr).iterator();
56
		final Queue<Object> queue = Queues.newLinkedBlockingQueue();
57
		final SAXReader reader = new SAXReader();
58

    
59
		if (rsClient.hasNext()) {
60
			populateQueue(queue, reader, rsClient.next());
61
		}
62

    
63
		final W3CEndpointReference eprOut = resultSetFactory.createIterableResultSet(new Iterable<String>() {
64

    
65
			@Override
66
			public Iterator<String> iterator() {
67
				return new Iterator<String>() {
68

    
69
					@Override
70
					public boolean hasNext() {
71
						synchronized (queue) {
72
							return !queue.isEmpty();
73
						}
74
					}
75

    
76
					@Override
77
					public String next() {
78
						synchronized (queue) {
79
							final Object o = queue.poll();
80
							while (queue.isEmpty() && rsClient.hasNext()) {
81
								populateQueue(queue, reader, rsClient.next());
82
							}
83
							return buildSimilarity((Similarity) o);
84
						}
85
					}
86

    
87
					@Override
88
					public void remove() {
89
						throw new UnsupportedOperationException();
90
					}
91
				};
92
			}
93
		});
94

    
95
		token.getEnv().setAttribute(getOutputEprParam(), eprOut.toString());
96

    
97
		return Arc.DEFAULT_ARC;
98
	}
99

    
100
	private void populateQueue(final Queue<Object> q, final SAXReader r, final String xml) {
101
		try {
102
			final Document d = r.read(new StringReader(xml));
103
			final String groupid = d.valueOf("//FIELD[@name='id']");
104
			final List<?> items = d.selectNodes("//FIELD[@name='group']/ITEM");
105
			final String entitytype = d.valueOf("//FIELD[@name='entitytype']");
106
			final List<String> group = Lists.newArrayList();
107
			for (final Object id : items) {
108
				group.add(((Node) id).getText());
109
			}
110
			// compute the full mesh
111
			final Type type = Type.valueOf(entitytype);
112

    
113
			final List<Similarity> mesh = SimilarityMeshBuilder.build(type, group);
114
			// total += mesh.size();
115
			if (log.isDebugEnabled()) {
116
				log.debug(String.format("built mesh for group '%s', size %d", groupid, mesh.size()));
117
			}
118
			for (final Similarity s : mesh) {
119
				if (log.isDebugEnabled()) {
120
					log.debug(String.format("adding to queue: %s", s.toString()));
121
				}
122
				q.add(s);
123
			}
124
		} catch (final DocumentException e) {
125
			log.error("invalid document: " + xml);
126
		}
127
	}
128

    
129
	private String buildSimilarity(final Similarity s) {
130
		final StringTemplate template = new StringTemplate(getSimilarity().getTemplate());
131

    
132
		template.setAttribute("source", s.getPair().getKey());
133
		template.setAttribute("target", s.getPair().getValue());
134
		template.setAttribute("type", s.getType().toString());
135

    
136
		final String res = template.toString();
137
		return res;
138
	}
139

    
140
	public String getInputEprParam() {
141
		return inputEprParam;
142
	}
143

    
144
	public void setInputEprParam(final String inputEprParam) {
145
		this.inputEprParam = inputEprParam;
146
	}
147

    
148
	public String getOutputEprParam() {
149
		return outputEprParam;
150
	}
151

    
152
	public void setOutputEprParam(final String outputEprParam) {
153
		this.outputEprParam = outputEprParam;
154
	}
155

    
156
	public StringTemplate getSimilarity() {
157
		return similarity;
158
	}
159

    
160
	@Required
161
	public void setSimilarity(final StringTemplate similarity) {
162
		this.similarity = similarity;
163
	}
164

    
165
}
(1-1/15)