Project

General

Profile

1 35869 claudio.at
package eu.dnetlib.msro.workflows.dedup;
2 35866 claudio.at
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.List;
6
import java.util.Queue;
7
8
import javax.annotation.Resource;
9
import javax.xml.ws.wsaddressing.W3CEndpointReference;
10
11
import org.antlr.stringtemplate.StringTemplate;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.Document;
15
import org.dom4j.DocumentException;
16
import org.dom4j.Node;
17
import org.dom4j.io.SAXReader;
18
import org.springframework.beans.factory.annotation.Required;
19
20
import com.google.common.collect.Lists;
21
import com.google.common.collect.Queues;
22
import com.googlecode.sarasvati.Arc;
23
import com.googlecode.sarasvati.NodeToken;
24
25
import eu.dnetlib.data.proto.TypeProtos.Type;
26
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
27
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
28 37183 claudio.at
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity;
29 35869 claudio.at
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder;
30 35866 claudio.at
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
31
32
public class BuildSimilarityMeshJobNode extends AsyncJobNode {
33
34
	private static final Log log = LogFactory.getLog(BuildSimilarityMeshJobNode.class);
35
36
	/** The result set factory. */
37
	@Resource(name = "iterableResultSetFactory")
38
	private IterableResultSetFactory resultSetFactory;
39
40
	/** The result set client factory. */
41
	@Resource(name = "resultSetClientFactory")
42
	private ResultSetClientFactory resultSetClientFactory;
43
44
	private StringTemplate similarity;
45
46
	private String inputEprParam;
47
48
	private String outputEprParam;
49
50
	@Override
51
	protected String execute(final NodeToken token) throws Exception {
52
53
		final String inputEpr = token.getEnv().getAttribute(getInputEprParam());
54
55
		final Iterator<String> rsClient = resultSetClientFactory.getClient(inputEpr).iterator();
56
		final Queue<Object> queue = Queues.newLinkedBlockingQueue();
57
		final SAXReader reader = new SAXReader();
58
59
		if (rsClient.hasNext()) {
60
			populateQueue(queue, reader, rsClient.next());
61
		}
62
63
		final W3CEndpointReference eprOut = resultSetFactory.createIterableResultSet(new Iterable<String>() {
64
65
			@Override
66
			public Iterator<String> iterator() {
67
				return new Iterator<String>() {
68
69
					@Override
70
					public boolean hasNext() {
71
						synchronized (queue) {
72
							return !queue.isEmpty();
73
						}
74
					}
75
76
					@Override
77
					public String next() {
78
						synchronized (queue) {
79
							final Object o = queue.poll();
80
							while (queue.isEmpty() && rsClient.hasNext()) {
81
								populateQueue(queue, reader, rsClient.next());
82
							}
83 37183 claudio.at
							return buildSimilarity((Similarity) o);
84 35866 claudio.at
						}
85
					}
86
87
					@Override
88
					public void remove() {
89
						throw new UnsupportedOperationException();
90
					}
91
				};
92
			}
93
		});
94
95
		token.getEnv().setAttribute(getOutputEprParam(), eprOut.toString());
96
97
		return Arc.DEFAULT_ARC;
98
	}
99
100
	private void populateQueue(final Queue<Object> q, final SAXReader r, final String xml) {
101
		try {
102
			final Document d = r.read(new StringReader(xml));
103
			final String groupid = d.valueOf("//FIELD[@name='id']");
104
			final List<?> items = d.selectNodes("//FIELD[@name='group']/ITEM");
105
			final String entitytype = d.valueOf("//FIELD[@name='entitytype']");
106
			final List<String> group = Lists.newArrayList();
107
			for (final Object id : items) {
108
				group.add(((Node) id).getText());
109
			}
110
			// compute the full mesh
111 37183 claudio.at
			final Type type = Type.valueOf(entitytype);
112
113
			final List<Similarity> mesh = SimilarityMeshBuilder.build(type, group);
114 35866 claudio.at
			// total += mesh.size();
115
			if (log.isDebugEnabled()) {
116
				log.debug(String.format("built mesh for group '%s', size %d", groupid, mesh.size()));
117
			}
118 37183 claudio.at
			for (final Similarity s : mesh) {
119 35866 claudio.at
				if (log.isDebugEnabled()) {
120 37183 claudio.at
					log.debug(String.format("adding to queue: %s", s.toString()));
121 35866 claudio.at
				}
122 37183 claudio.at
				q.add(s);
123 35866 claudio.at
			}
124
		} catch (final DocumentException e) {
125
			log.error("invalid document: " + xml);
126
		}
127
	}
128
129 37183 claudio.at
	private String buildSimilarity(final Similarity s) {
130 35866 claudio.at
		final StringTemplate template = new StringTemplate(getSimilarity().getTemplate());
131
132 37183 claudio.at
		template.setAttribute("source", s.getPair().getKey());
133
		template.setAttribute("target", s.getPair().getValue());
134
		template.setAttribute("type", s.getType().toString());
135 35866 claudio.at
136
		final String res = template.toString();
137
		return res;
138
	}
139
140
	public String getInputEprParam() {
141
		return inputEprParam;
142
	}
143
144
	public void setInputEprParam(final String inputEprParam) {
145
		this.inputEprParam = inputEprParam;
146
	}
147
148
	public String getOutputEprParam() {
149
		return outputEprParam;
150
	}
151
152
	public void setOutputEprParam(final String outputEprParam) {
153
		this.outputEprParam = outputEprParam;
154
	}
155
156
	public StringTemplate getSimilarity() {
157
		return similarity;
158
	}
159
160
	@Required
161
	public void setSimilarity(final StringTemplate similarity) {
162
		this.similarity = similarity;
163
	}
164
165
}