Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map.Entry;
6

    
7
import org.apache.commons.codec.binary.Base64;
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11
import org.apache.solr.client.solrj.SolrServerException;
12
import org.apache.solr.client.solrj.impl.CloudSolrServer;
13
import org.apache.solr.client.solrj.response.SolrPingResponse;
14
import org.apache.solr.client.solrj.response.UpdateResponse;
15
import org.apache.solr.common.SolrInputDocument;
16

    
17
import com.google.common.collect.Lists;
18

    
19
import eu.dnetlib.data.mapreduce.JobParams;
20
import eu.dnetlib.data.proto.TypeProtos.Type;
21
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
22
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory;
23
import eu.dnetlib.miscutils.datetime.HumanTime;
24
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
25

    
26
public class IndexFeedMapper extends Mapper<Text, Text, Text, Text> {
27

    
28
	private InputDocumentFactory documentFactory;
29

    
30
	private CloudSolrServer solrServer;
31

    
32
	private String version;
33

    
34
	private String dsId;
35

    
36
	private int shutdownWaitTime = 10000;
37

    
38
	private int bufferFlushThreshold = 100;
39

    
40
	private ApplyXslt dmfToRecord;
41

    
42
	private List<SolrInputDocument> buffer;
43

    
44
	private int backoffTimeMs = 5000;
45

    
46
	private boolean simulation = false;
47

    
48
	@Override
49
	protected void setup(final Context context) throws IOException, InterruptedException {
50

    
51
		logConfiguration(context.getConfiguration());
52

    
53
		dsId = context.getConfiguration().get(JobParams.INDEX_DSID);
54
		shutdownWaitTime = Integer.parseInt(context.getConfiguration().get(JobParams.INDEX_SHUTDOWN_WAIT));
55
		bufferFlushThreshold = Integer.parseInt(context.getConfiguration().get(JobParams.INDEX_BUFFER_FLUSH_TRESHOLD));
56
		documentFactory = new StreamingInputDocumentFactory();
57
		version = InputDocumentFactory.getParsedDateField(context.getConfiguration().get(JobParams.INDEX_FEED_TIME));
58
		buffer = Lists.newArrayList();
59
		simulation = Boolean.parseBoolean(context.getConfiguration().get(JobParams.INDEX_FEED_SIMULATION_MODE));
60

    
61
		final String xslt = new String(Base64.decodeBase64(context.getConfiguration().get(JobParams.INDEX_XSLT)));
62

    
63
		System.out.println("got xslt: \n" + xslt);
64
		System.out.println("got version: " + version);
65
		System.out.println("simulation: " + simulation);
66
		System.out.println("buffer size: " + bufferFlushThreshold);
67

    
68
		dmfToRecord = new ApplyXslt(xslt);
69

    
70
		final String baseURL = context.getConfiguration().get(JobParams.INDEX_SOLR_URL);
71
		System.out.println("solr server baseURL: " + baseURL);
72

    
73
		final String collection = context.getConfiguration().get(JobParams.INDEX_SOLR_COLLECTION);
74
		System.out.println("solr server collection: " + collection);
75

    
76
		while (true) {
77
			try {
78
				System.out.println("initializing solr server...");
79
				solrServer = new CloudSolrServer(baseURL);
80

    
81
				solrServer.connect();
82

    
83
				solrServer.setParallelUpdates(true);
84
				solrServer.setDefaultCollection(collection);
85

    
86
				final SolrPingResponse rsp = solrServer.ping();
87

    
88
				if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus());
89
				else {
90
					break;
91
				}
92

    
93
			} catch (final Throwable e) {
94
				if (solrServer != null) {
95
					solrServer.shutdown();
96
				}
97
				context.getCounter("index init", e.getMessage()).increment(1);
98
				System.out.println(String.format("failed to init solr client wait %dms", backoffTimeMs));
99
				Thread.sleep(backoffTimeMs);
100
			}
101
		}
102
	}
103

    
104
	@Override
105
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
106

    
107
		String indexRecord = "";
108
		SolrInputDocument doc = null;
109

    
110
		try {
111
			indexRecord = dmfToRecord.evaluate(value.toString());
112
			doc = documentFactory.parseDocument(version, indexRecord, dsId, "dnetResult");
113
			if ((doc == null) || doc.isEmpty()) throw new EmptySolrDocumentException();
114

    
115
		} catch (final Throwable e) {
116
			context.getCounter("index feed", "skipped records").increment(1);
117
			handleError(key, value, context, indexRecord, doc, e);
118
			return;
119
		}
120

    
121
		while (true) {
122
			try {
123
				addDocument(context, doc);
124
				return;
125
			} catch (final Throwable e) {
126
				context.getCounter("index feed", "retries").increment(1);
127
				handleError(key, value, context, indexRecord, doc, e);
128
				System.out.println(String.format("failed to feed documents, waiting %dms", backoffTimeMs));
129
				Thread.sleep(backoffTimeMs);
130
			}
131
		}
132
	}
133

    
134
	private void addDocument(final Context context, final SolrInputDocument doc) throws SolrServerException, IOException, EmptySolrDocumentException {
135
		buffer.add(doc);
136
		if (buffer.size() >= bufferFlushThreshold) {
137
			doAdd(buffer, context);
138
		}
139
	}
140

    
141
	private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException {
142
		if (!simulation) {
143
			final long start = System.currentTimeMillis();
144
			final UpdateResponse rsp = solrServer.add(buffer);
145
			final long stop = System.currentTimeMillis() - start;
146
			System.out.println("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n");
147

    
148
			final int status = rsp.getStatus();
149

    
150
			context.getCounter("index feed", "status code: " + status).increment(buffer.size());
151

    
152
			if (status != 0) throw new SolrServerException("bad status: " + status);
153

    
154
			for (final SolrInputDocument doc : buffer) {
155
				context.getCounter("index entity", getEntityType(doc)).increment(1);
156
			}
157
		}
158
		buffer.clear();
159
	}
160

    
161
	@Override
162
	protected void cleanup(final Context context) throws IOException, InterruptedException {
163
		super.cleanup(context);
164
		try {
165
			if (!buffer.isEmpty()) {
166
				doAdd(buffer, context);
167
			}
168
			System.out.println("\nwaiting " + shutdownWaitTime + "ms before shutdown");
169
			Thread.sleep(shutdownWaitTime);
170
			solrServer.shutdown();
171
		} catch (final SolrServerException e) {
172
			System.err.println("couldn't shutdown server " + e.getMessage());
173
		}
174
	}
175

    
176
	private void handleError(final Text key, final Text value, final Context context, final String indexRecord, final SolrInputDocument doc, final Throwable e)
177
			throws IOException, InterruptedException {
178
		context.getCounter("index feed", e.getClass().getName()).increment(1);
179
		context.write(key, printRottenRecord(context.getTaskAttemptID().toString(), value, indexRecord, doc));
180
		// e.printStackTrace(System.err);
181
	}
182

    
183
	private Text printRottenRecord(final String taskid, final Text value, final String indexRecord, final SolrInputDocument doc) {
184
		return new Text("\n**********************************\n" + "task: " + taskid + "\n"
185
				+ check("original", value.toString() + check("indexRecord", indexRecord) + check("solrDoc", doc)));
186
	}
187

    
188
	private String check(final String label, final Object value) {
189
		if ((value != null) && !value.toString().isEmpty()) return "\n " + label + ":\n" + value + "\n";
190
		return "\n";
191
	}
192

    
193
	private void logConfiguration(final Configuration conf) {
194
		System.out.println("job configutation #################");
195
		for (final Entry<String, String> e : conf) {
196
			System.out.println("'" + e.getKey() + "' : '" + e.getValue() + "'");
197
		}
198
		System.out.println("end of job configutation #################\n\n");
199
	}
200

    
201
	private String getEntityType(final SolrInputDocument doc) {
202
		if (!doc.containsKey("oaftype")) return "unknown";
203

    
204
		final Type type = Type.valueOf(doc.getFieldValue("oaftype").toString());
205
		switch (type) {
206
		case result:
207
			if (!doc.containsKey("resulttypeid")) return "result";
208
			return doc.getFieldValue("resulttypeid").toString();
209
		default:
210
			return type.toString();
211
		}
212
	}
213

    
214
}
(5-5/7)