Project

General

Profile

1
package eu.dnetlib.data.transform;
2

    
3
import java.io.*;
4
import java.util.*;
5
import java.util.Map.Entry;
6
import java.util.zip.GZIPInputStream;
7
import javax.xml.transform.TransformerConfigurationException;
8
import javax.xml.transform.TransformerFactoryConfigurationError;
9

    
10
import com.google.common.base.Function;
11
import com.google.common.collect.Iterables;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Maps;
14
import com.google.common.collect.Sets;
15
import com.google.protobuf.InvalidProtocolBufferException;
16
import com.googlecode.protobuf.format.JsonFormat;
17
import eu.dnetlib.actionmanager.actions.ActionFactory;
18
import eu.dnetlib.actionmanager.actions.XsltInfoPackageAction;
19
import eu.dnetlib.actionmanager.common.Agent;
20
import eu.dnetlib.actionmanager.common.Operation;
21
import eu.dnetlib.actionmanager.common.Provenance;
22
import eu.dnetlib.data.mapreduce.hbase.index.config.*;
23
import eu.dnetlib.data.mapreduce.util.*;
24
import eu.dnetlib.data.proto.KindProtos.Kind;
25
import eu.dnetlib.data.proto.OafProtos.Oaf;
26
import eu.dnetlib.data.proto.TypeProtos.Type;
27
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
28
import org.apache.commons.io.IOUtils;
29
import org.apache.commons.lang.StringUtils;
30
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.LogFactory;
32
import org.dom4j.Document;
33
import org.dom4j.DocumentException;
34
import org.dom4j.io.SAXReader;
35
import org.json.JSONObject;
36
import org.junit.Before;
37
import org.junit.Ignore;
38
import org.junit.Test;
39
import org.springframework.core.io.ByteArrayResource;
40
import org.springframework.core.io.Resource;
41

    
42
import static org.junit.Assert.*;
43

    
44
public class XsltRowTransformerFactoryTest {
45

    
46
	private static final Log log = LogFactory.getLog(XsltRowTransformerFactoryTest.class);
47
	private static String basePathProfiles = "/eu/dnetlib/test/profiles/TransformationRuleDSResources/TransformationRuleDSResourceType/2hbase/";
48
	private XsltRowTransformerFactory factory;
49
	private EntityConfigTable entityConfigTable;
50

    
51
	@Before
52
	public void setUp() throws Exception {
53
		factory = new XsltRowTransformerFactory();
54
		entityConfigTable = IndexConfig.load(IndexConfigTest.config).getConfigMap();
55
	}
56

    
57
	@Test
58
	@Ignore // need to reimplement because claimUpdates_2_hbase.xsl was removed
59
	public void testParseOafClaimUpdate() throws Exception {
60
		doTest(loadFromTransformationProfile("claimUpdates_2_hbase.xsl"), load("recordClaimUpdate.xml"));
61
	}
62

    
63
	@Test
64
	@Ignore // need to reimplement because claimUpdates_2_hbase.xsl was removed
65
	public void testParseClaimUpdate() throws Exception {
66

    
67
		final List<Row> rows = Lists.newArrayList();
68
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordClaim.xml")));
69
		rows.addAll(asRows(loadFromTransformationProfile("claimUpdates_2_hbase.xsl"), load("recordClaimUpdate.xml")));
70

    
71
		printAll(mapAll(buildTable(rows)));
72
	}
73

    
74
	@Test
75
	public void testParseClaimRel() throws Exception {
76

    
77
		doTest(loadFromTransformationProfile("claimRels_2_hbase.xml"), load("recordClaimRel.xml"));
78
	}
79

    
80

    
81
	@Test
82
	public void testParseFp7IctPUB() throws Exception {
83

    
84
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("ec_fp7_ict.xml"));
85
	}
86

    
87
	@Test
88
	public void testParseRecordCrossref() throws Exception {
89

    
90
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordCrossref.xml"));
91
	}
92

    
93
	@Test
94
	public void testParseDatasetPUB() throws Exception {
95

    
96
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatasetPUB.xml"));
97
	}
98

    
99
	@Test
100
	public void testParseSoftwareEgiApp() throws Exception {
101

    
102
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("softwareEgiApp.xml"));
103
	}
104

    
105
	@Test
106
	public void testParseSoftwareEgiApp2() throws Exception {
107

    
108
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("softwareEgiApp2.xml"));
109
	}
110

    
111
	@Test
112
	public void testParseOrpEgiApp() throws Exception {
113

    
114
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("orpEgiApp.xml"));
115
	}
116

    
117
	@Test
118
	public void testParseDatasetLindat() throws Exception {
119

    
120
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("datasetLindat.xml"));
121
	}
122

    
123
	@Test
124
	public void testParseDatasetNeuroVault() throws Exception {
125

    
126
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordNeuroVault.xml"));
127
	}
128

    
129
	@Test
130
	public void testParseClaim() throws Exception {
131

    
132
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordClaim.xml"));
133
	}
134

    
135
	@Test
136
	public void testParseClaimDataset() throws Exception {
137

    
138
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordClaimDataset.xml"));
139
	}
140

    
141

    
142
	@Test
143
	public void testParseACM() throws Exception {
144

    
145
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordACM.xml"));
146
	}
147

    
148
	@Test
149
	public void testParseASB() throws Exception {
150

    
151
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordASB.xml"));
152
	}
153

    
154
	@Test
155
	public void testParseProjectCorda() throws Exception {
156

    
157
		doTest(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml"));
158
	}
159

    
160
	@Test
161
	public void testParseProjectFCT() throws Exception {
162

    
163
		doTest(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordFCT.xml"));
164
	}
165

    
166

    
167
	@Test
168
	public void testParseOaf() throws Exception {
169

    
170
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("record.xml"));
171
	}
172

    
173
	@Test
174
	public void testParseOafPublication() throws Exception {
175

    
176
		doTest(loadFromTransformationProfile("oaf_entity2hbase.xml"), load("record.xml"));
177
	}
178

    
179
	@Test
180
	public void testParseLindat() throws Exception {
181

    
182
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordLindat.xml"));
183
	}
184

    
185
	@Test
186
	public void testParseDatacite() throws Exception {
187

    
188
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatacite.xml"));
189
	}
190

    
191
	@Test
192
	public void testParseDatacite2() throws Exception {
193

    
194
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatacite2.xml"));
195
	}
196

    
197
	@Test
198
	public void testParseOpenTrials() throws Exception {
199

    
200
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("opentrials_datacite1.xml"));
201
	}
202

    
203
	@Test
204
	public void testLinkPangaea() throws Exception {
205

    
206
		final List<Row> rows = Lists.newArrayList();
207
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("pangaeODF1.xml")));
208
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("pangaeODF2.xml")));
209
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("pangaeOAF.xml")));
210
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCordaPangaea.xml")));
211

    
212
		printAll(mapAll(buildTable(rows)));
213
	}
214

    
215
	@Test
216
	public void testPangaea() throws Exception {
217

    
218
		final List<Row> rows = Lists.newArrayList();
219
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("pangaeOAF2.xml")));
220
		printAll(mapAll(buildTable(rows)));
221
	}
222
	@Test
223
	public void testZenodo() throws Exception {
224

    
225
		final List<Row> rows = Lists.newArrayList();
226
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("zenodoData.xml")));
227
		printAll(mapAll(buildTable(rows)));
228
	}
229

    
230
	@Test
231
	public void testZenodoSoftware() throws Exception {
232

    
233
		final List<Row> rows = Lists.newArrayList();
234
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("softwareZenodo_odf.xml")));
235
		printAll(mapAll(buildTable(rows)));
236
	}
237

    
238
	@Test
239
	public void testLinkCorda() throws Exception {
240

    
241
		final List<Row> rows = Lists.newArrayList();
242
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml")));
243
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordCorda.xml")));
244

    
245
		printAll(mapAll(buildTable(rows)));
246
	}
247

    
248
	@Test
249
	public void testLinkFCT() throws Exception {
250

    
251
		final List<Row> rows = Lists.newArrayList();
252
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordFCT.xml")));
253
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordFCT.xml")));
254

    
255
		printAll(mapAll(buildTable(rows)));
256
	}
257

    
258
	@Test
259
	public void testLinkARC() throws Exception {
260

    
261
		final List<Row> rows = Lists.newArrayList();
262
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordARC.xml")));
263
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordARC.xml")));
264

    
265
		printAll(mapAll(buildTable(rows)));
266
	}
267

    
268
	@Test
269
	public void testLinkWT() throws Exception {
270

    
271
		final List<Row> rows = Lists.newArrayList();
272
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordWT.xml")));
273
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordWT.xml")));
274

    
275
		printAll(mapAll(buildTable(rows)));
276
	}
277

    
278

    
279

    
280
	@Test
281
	public void testLinkOrganization() throws Exception {
282

    
283
		final List<Row> rows = Lists.newArrayList();
284
		rows.addAll(asRows(loadFromTransformationProfile("organizations_2_hbase.xsl"), load("organization.xml")));
285
		rows.addAll(asRows(loadFromTransformationProfile("projectorganization_2_hbase.xsl"), load("project_organization.xml")));
286
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml")));
287

    
288
		printAll(mapAll(buildTable(rows)));
289
	}
290

    
291
	@Test
292
	public void testLinkOrganizationAffiliation() throws Exception {
293

    
294
		final List<Row> rows = Lists.newArrayList();
295
		rows.addAll(asRows(loadFromTransformationProfile("organizations_2_hbase.xsl"), load("organization.xml")));
296
		rows.addAll(asRows(loadFromTransformationProfile("resultorganization_2_hbase.xsl"), load("result_organization.xml")));
297
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("record.xml")));
298

    
299
		printAll(mapAll(buildTable(rows)));
300
	}
301

    
302
	@Test
303
	public void testDuplicates() throws Exception {
304
		final String mergeId = "50|dedup_wf_001::08ed625d07e5738b794ff14d6773fd9f";
305
		final List<Row> rows = Lists.newArrayList();
306

    
307
		final Function<Row, Row> f = rowIn -> {
308

    
309
			final List<Column<String,byte[]>> cols = Lists.newArrayList();
310
			for(Column<String,byte[]> col : rowIn.getColumns()) {
311
				if (col.getName().equals("body")) {
312
					cols.add(new Column(col.getName(), col.getValue()));
313

    
314
				}
315
			}
316
			return new Row("result", rowIn.getKey(), cols);
317
		};
318

    
319
		final List<Row> puma1 = asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordPuma1.xml"), f);
320
		puma1.add(new Row("resultResult_dedup_isMergedIn", mergeId));
321

    
322
		final List<Row> puma2 = asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordPuma2.xml"), f);
323
		puma2.add(new Row("resultResult_dedup_isMergedIn", mergeId));
324

    
325
		rows.addAll(puma1);
326
		rows.addAll(puma2);
327

    
328
		List<Oaf> duplicates = Lists.newArrayList();
329
		duplicates.add(getOafBody(puma1));
330
		duplicates.add(getOafBody(puma2));
331
		final Oaf.Builder oafMerge = OafEntityMerger.merge(mergeId, duplicates);
332

    
333
		final Row mergeRow = new Row("result", mergeId, Lists.newArrayList(new Column("body", oafMerge.build().toByteArray())));
334

    
335
		rows.add(mergeRow);
336

    
337
		printAll(mapAll(buildTable(rows)));
338
	}
339

    
340
	private Oaf getOafBody(final List<Row> rows) throws InvalidProtocolBufferException {
341
		for(Row row : rows) {
342
			if(StringUtils.startsWith(row.getKey(), "50")) {
343
				return Oaf.parseFrom(row.getColumn("body").getValue());
344

    
345
			}
346
		}
347
		return null;
348
	}
349

    
350
	@Test
351
	public void testParseDoajOAF() throws Exception {
352

    
353
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("doajUniversityRecord.xml"));
354
	}
355

    
356
	@Test
357
	public void testParseDatasource() throws Exception {
358

    
359
		doTest(loadFromTransformationProfile("datasources_2_hbase.xsl"), load("datasourceNative.xml"));
360
	}
361
	@Test
362
	public void testParseDatasourcePiwik() throws Exception {
363

    
364
		doTest(loadFromTransformationProfile("datasources_2_hbase.xsl"), load("datasourcePiwik.xml"));
365
	}
366

    
367
	@Test
368
	public void testParseDataDatasource() throws Exception {
369

    
370
		doTest(loadFromTransformationProfile("datasources_2_hbase.xsl"), load("dataDatasource.xml"));
371
	}
372

    
373
	@Test
374
	public void testFromMongodbCompressedDump() throws Exception {
375
		doTestJsonGz(loadFromTransformationProfile("oaf2hbase.xml"), load("mdstore_cleaned.json.gz"));
376
	}
377

    
378
	@Test
379
	public void testLoadFromTransformationProfile() throws IOException {
380
		InputStream in = loadFromTransformationProfile("oaf2hbase.xml");
381
		log.info(IOUtils.toString(in));
382
	}
383

    
384
	@Test
385
	public void test_template() throws Exception {
386
		final String xslt = IOUtils.toString(loadFromTransformationProfile("oaf2hbase.xml"));
387
		final XsltRowTransformer transformer = factory.getTransformer(xslt);
388
		assertNotNull(transformer);
389

    
390
		final String record = IOUtils.toString(load("record.xml"));
391
		final List<Row> rows = transformer.apply(record);
392

    
393
		System.out.println(rows);
394
	}
395

    
396
	@Test
397
	public void testWrongCharsOrganization() throws Exception {
398
		final List<Row> rows = Lists.newArrayList();
399
		rows.addAll(asRows(loadFromTransformationProfile("organizations_2_hbase.xsl"), load("organizationWrongChars.xml")));
400
		printAll(mapAll(buildTable(rows)));
401
	}
402

    
403
	@Test
404
	public void testParseProjectWithFunderOriginalName() throws Exception {
405

    
406
		doTest(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectWithFunderOriginalName.xml"));
407
	}
408
	@Test
409
	public void testLinkFunderOriginalName() throws Exception {
410

    
411
		final List<Row> rows = Lists.newArrayList();
412
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectWithFunderOriginalName.xml")));
413
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordFunderOriginalName.xml")));
414

    
415
		printAll(mapAll(buildTable(rows)));
416
	}
417

    
418
	@Test
419
	public void testProjectExtraInfo() throws Exception {
420
		final List<Row> rows = Lists.newArrayList();
421
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordExtraInfo.xml")));
422
		printAll(mapAll(buildTable(rows)));
423
	}
424

    
425
	@Test
426
	public void testParseSoftwareFromODF() throws Exception {
427
		final List<Row> rows = Lists.newArrayList();
428
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("softwareODF.xml")));
429
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml")));
430
		printAll(mapAll(buildTable(rows)));
431
	}
432

    
433
	@Test
434
	public void testParseSoftwareFromOAF() throws Exception {
435
		final List<Row> rows = Lists.newArrayList();
436
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordOAFsoftwareCLOSED.xml")));
437
		printAll(mapAll(buildTable(rows)));
438
	}
439

    
440
	@Test
441
	public void testParseSoftwareFromOAFOpen() throws Exception {
442
		final List<Row> rows = Lists.newArrayList();
443
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordOAFsoftwareOPEN.xml")));
444
		printAll(mapAll(buildTable(rows)));
445
	}
446

    
447
	@Test
448
	public void testParseSoftwareBiotool() throws Exception {
449
		final List<Row> rows = Lists.newArrayList();
450
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("biotoolSw.xml")));
451
		printAll(mapAll(buildTable(rows)));
452
	}
453

    
454
	@Test
455
	public void testParseOafWithExternalRef() throws Exception {
456
		final List<Row> rows = Lists.newArrayList();
457
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("oafWithExternalReference.xml")));
458
		printAll(mapAll(buildTable(rows)));
459
	}
460

    
461
	@Test
462
	public void testParseOafWithCommunity() throws Exception {
463
		final List<Row> rows = Lists.newArrayList();
464
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("oafWithCommunity.xml")));
465
		printAll(mapAll(buildTable(rows)));
466
	}
467

    
468
	@Test
469
	public void testParseOafWithUpdates() throws Exception {
470
		final List<Row> rows = Lists.newArrayList();
471
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("oafWithCommunity.xml")));
472

    
473
		ActionFactory actionFactory = new ActionFactory();
474

    
475
		Map<String, Resource> xslts = Maps.newHashMap();
476

    
477
		xslts.put("oaf2hbase", new ByteArrayResource(IOUtils.toString(loadFromTransformationProfile("oaf2hbase.xml")).getBytes()));
478
		actionFactory.setXslts(xslts);
479

    
480
		XsltInfoPackageAction pa = actionFactory.generateInfoPackageAction(
481
				"oaf2hbase",
482
				"rawset-id",
483
				new Agent("agent-id", "agent-name", Agent.AGENT_TYPE.algo),
484
				Operation.UPDATE,
485
				IOUtils.toString(load("oafUpdateWithCommunity.xml")),
486
				Provenance.sysimport_mining_aggregator,
487
				"0.9");
488

    
489
		final String qualifier = "update_" + System.nanoTime();
490

    
491
		IOUtils.readLines(load("country_updates.json")).forEach(line -> {
492

    
493
			Oaf.Builder oaf = Oaf.newBuilder();
494

    
495
			try {
496
				JsonFormat.merge(line, oaf);
497
			} catch (JsonFormat.ParseException e) {
498
				throw new IllegalArgumentException(e);
499
			}
500

    
501
			Column<String, byte[]> col = new Column<>("update_" + System.nanoTime(), oaf.build().toByteArray());
502
			rows.add(new Row("result", oaf.getEntity().getId(), Lists.newArrayList(col)));
503
		});
504

    
505
		pa.asAtomicActions().forEach(a -> {
506
			Column<String, byte[]> col = new Column<>("update_" + System.nanoTime(), a.getTargetValue());
507
			rows.add(new Row(a.getTargetColumnFamily(), a.getTargetRowKey(), Lists.newArrayList(col)));
508
		});
509

    
510

    
511
		/*
512
		rows.forEach(r -> {
513
			log.info(r);
514
		});
515
		*/
516

    
517
		mapAll(buildTable(rows)).entrySet().forEach(b -> {
518
			log.info(b.getKey());
519
			log.info(b.getValue());
520
		});
521
	}
522

    
523
	private void doTest(final InputStream xsltStream, final InputStream recordStream) throws Exception {
524
		try {
525
			final List<Row> rows = asRows(xsltStream, recordStream);
526

    
527
			log.info(rows);
528

    
529
			final Map<String, Map<String, Map<String, byte[]>>> table = buildTable(rows);
530

    
531
			// System.out.println("\n" + table.toString());
532

    
533
			final Map<String, XmlRecordFactory> builders = mapAll(table);
534

    
535
			printAll(builders);
536
		} catch (final InvalidProtocolBufferException e) {
537
			throw new Exception(e);
538
		} catch (final TransformerConfigurationException e) {
539
			throw new Exception(e);
540
		} catch (final TransformerFactoryConfigurationError e) {
541
			throw new Exception(e);
542
		} catch (final DocumentException e) {
543
			throw new Exception(e);
544
		}
545
	}
546

    
547
	private void doTestJsonGz(final InputStream xsltStream, final InputStream recordStream) throws Exception {
548

    
549
		final Iterator<List<Row>> rowsIterator = asRowsJsonGzip(xsltStream, recordStream);
550

    
551
		int i = 0;
552
		while (rowsIterator.hasNext()) {
553
			final List<Row> rows = rowsIterator.next();
554
			i++;
555

    
556
			if ((i % 10000) == 0) {
557
				System.out.println(i);
558
			}
559

    
560
			final Map<String, Map<String, Map<String, byte[]>>> table = buildTableDoaj(rows);
561

    
562
			for (final Map<String, Map<String, byte[]>> m : table.values()) {
563
				for (final Map<String, byte[]> mv : m.values()) {
564
					for (final byte[] v : mv.values()) {
565
						final OafDecoder d = OafDecoder.decode(v);
566
						assertNotNull(d);
567
						assertNotNull(d.getOaf());
568

    
569
						switch (d.getKind()) {
570
						case entity:
571
							assertNotNull(d.getMetadata());
572
							if (d.getOaf().getEntity().getType().equals(Type.result)) {
573
								System.out.println(d.getOaf());
574
							}
575
							break;
576
						case relation:
577
							assertNotNull(d.getRel());
578
							break;
579
						default:
580
							break;
581
						}
582
					}
583
				}
584
			}
585
		}
586
	}
587

    
588
	private List<Row> asRows(final InputStream xsltStream, final InputStream recordStream, final Function<Row, Row> p) throws Exception {
589
		return asRows(xsltStream, new HashMap<>(), recordStream, p);
590
	}
591

    
592
	private List<Row> asRows(final InputStream xsltStream, final InputStream recordStream) throws Exception {
593
		return asRows(xsltStream, new HashMap<>(), recordStream);
594
	}
595

    
596
	private List<Row> asRows(final InputStream xsltStream, final Map<String, Object> params, final InputStream recordStream) throws Exception {
597
		return asRows(xsltStream, params, recordStream, null);
598
	}
599

    
600
	private List<Row> asRows(final InputStream xsltStream, final Map<String, Object> params, final InputStream recordStream, final Function<Row, Row> p) throws Exception {
601
		final String xslt = IOUtils.toString(xsltStream);
602
		final XsltRowTransformer transformer = factory.getTransformer(xslt, params);
603
		assertNotNull(transformer);
604

    
605
		final String record = IOUtils.toString(recordStream);
606
		final List<Row> rows = transformer.apply(record);
607

    
608
		assertNotNull(rows);
609
		assertFalse(rows.isEmpty());
610
		return p == null ? rows : Lists.newArrayList(Iterables.transform(rows, p));
611
	}
612

    
613
	private Iterator<List<Row>> asRowsJsonGzip(final InputStream xsltStream, final InputStream recordStreamJsonGzip) throws Exception {
614
		final String xslt = IOUtils.toString(xsltStream);
615
		final XsltRowTransformer transformer = factory.getTransformer(xslt);
616
		assertNotNull(transformer);
617
		assertNotNull(recordStreamJsonGzip);
618

    
619
		final GZIPInputStream stream = new GZIPInputStream(recordStreamJsonGzip);
620
		assertNotNull(stream);
621
		final BufferedReader inStream = new BufferedReader(new InputStreamReader(stream));
622
		assertNotNull(inStream);
623
		return new Iterator<List<Row>>() {
624

    
625
			String jsonRecord = null;
626

    
627
			@Override
628
			public boolean hasNext() {
629
				try {
630
					return (jsonRecord = inStream.readLine()) != null;
631
				} catch (final IOException e) {
632
					throw new RuntimeException(e);
633
				}
634
			}
635

    
636
			@Override
637
			public List<Row> next() {
638

    
639
				final JSONObject jsonObj = new JSONObject(jsonRecord);
640
				final String body = jsonObj.getString("body");
641
				try {
642
					assertTrue(StringUtils.isNotBlank(body));
643
					// System.out.println(body);
644
					final List<Row> rows = transformer.apply(body);
645
					assertNotNull(rows);
646
					assertFalse(rows.isEmpty());
647
					return rows;
648
				} catch (final Throwable e) {
649
					System.err.println("error transforming document: " + body);
650
					throw new RuntimeException(e);
651
				}
652
			}
653

    
654
			@Override
655
			public void remove() {
656
				throw new UnsupportedOperationException();
657
			}
658

    
659
		};
660

    
661
	}
662

    
663
	private Map<String, Map<String, Map<String, byte[]>>> buildTableDoaj(final List<Row> rows) throws UnsupportedEncodingException {
664
		final Map<String, Map<String, Map<String, byte[]>>> table = Maps.newHashMap();
665

    
666
		for (final Row row : rows) {
667
			final String rowKey = row.getKey();
668
			final String cf = row.getColumnFamily();
669
			if (!table.containsKey(rowKey)) {
670
				table.put(rowKey, new HashMap<>());
671
			}
672
			if (!table.get(rowKey).containsKey(cf)) {
673
				table.get(rowKey).put(row.getColumnFamily(), new HashMap<>());
674
			}
675
			for (final Column<String, byte[]> c : row.getColumns()) {
676
				// System.out.println(String.format("ADDING K:%s CF:%s Q:%s", rowKey, cf, c.getName()));
677
				table.get(rowKey).get(cf).put(c.getName(), c.getValue());
678
				if (cf.equals("result") && c.getName().equals("body")) {
679
					// System.out.println(String.format("ADDING K:%s CF:%s Q:%s", rowKey, cf, c.getName()));
680
					assertTrue(StringUtils.isNotBlank(new String(c.getValue(), "UTF-8")));
681
				}
682
			}
683
		}
684
		return table;
685

    
686
	}
687

    
688
	protected Map<String, Map<String, Map<String, byte[]>>> buildTable(final List<Row> rows) throws UnsupportedEncodingException {
689
		final Map<String, Map<String, Map<String, byte[]>>> table = Maps.newHashMap();
690

    
691
		for (final Row row : rows) {
692
			final String rowKey = row.getKey();
693
			final String cf = row.getColumnFamily();
694
			if (!table.containsKey(rowKey)) {
695
				table.put(rowKey, new HashMap<>());
696
			}
697
			if (!table.get(rowKey).containsKey(cf)) {
698
				table.get(rowKey).put(row.getColumnFamily(), new HashMap<>());
699
			}
700
			for (final Column<String, byte[]> c : row.getColumns()) {
701
				System.out.println(String.format("ADDING K:%s CF:%s Q:%s", rowKey, cf, c.getName()));
702
				table.get(rowKey).get(cf).put(c.getName(), c.getValue());
703
				if (c.getName().equals("body")) {
704
					final String theBody = new String(c.getValue(), "UTF-8");
705
					assertTrue(StringUtils.isNotBlank(theBody));
706
					//System.out.println(theBody);
707
				}
708
			}
709
		}
710
		return table;
711

    
712
	}
713

    
714
	protected Map<String, XmlRecordFactory> mapAll(final Map<String, Map<String, Map<String, byte[]>>> table) throws Exception {
715

    
716
		final Map<String, XmlRecordFactory> builders = Maps.newHashMap();
717
		for (final Entry<String, Map<String, Map<String, byte[]>>> e : table.entrySet()) {
718
			map(builders, e.getKey(), e.getValue());
719
		}
720
		return builders;
721
	}
722

    
723
	// private Map<String, XmlRecordFactory> mapResultsOnly(final Map<String, Map<String, Map<String, byte[]>>> table) throws Exception {
724
	//
725
	// final Map<String, XmlRecordFactory> builders = Maps.newHashMap();
726
	// for (final Entry<String, Map<String, Map<String, byte[]>>> e : table.entrySet()) {
727
	// final Type type = OafRowKeyDecoder.decode(e.getKey()).getType();
728
	// if (type == Type.result) {
729
	// map(builders, e.getKey(), e.getValue());
730
	// }
731
	// }
732
	// return builders;
733
	// }
734

    
735
	private void map(final Map<String, XmlRecordFactory> builders, final String rowKey, final Map<String, Map<String, byte[]>> row) throws Exception {
736

    
737
		final Type type = OafRowKeyDecoder.decode(rowKey).getType();
738

    
739
		final Map<String, byte[]> familyMap = row.get(type.toString());
740

    
741
		if (familyMap == null) return;
742

    
743
		final byte[] bodyB = familyMap.get("body");
744

    
745
		if (bodyB != null) {
746
			ensureBuilder(builders, rowKey);
747

    
748
			final Oaf oaf = UpdateMerger.mergeBodyUpdates(familyMap);
749

    
750
			final OafDecoder mainEntity = OafDecoder.decode(oaf);
751

    
752
			builders.get(rowKey).setMainEntity(mainEntity);
753

    
754
			for (final LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
755

    
756
				final String it = ld.getRelDescriptor().getIt();
757
				final Map<String, byte[]> cols = row.get(it);
758

    
759
				if ((cols != null) && !cols.isEmpty()) {
760

    
761
					for (final byte[] oafB : cols.values()) {
762

    
763
						final Oaf.Builder relBuilder = Oaf.newBuilder(Oaf.parseFrom(oafB));
764

    
765
						if (ld.isSymmetric()) {
766
							final RelDescriptor rd = ld.getRelDescriptor();
767

    
768
							relBuilder.getRelBuilder().setCachedTarget(mainEntity.getEntity()).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
769
						}
770

    
771
						relBuilder.getRelBuilder().setChild(ld.isChild());
772

    
773
						final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(System.currentTimeMillis());
774
						oafBuilder.mergeFrom(relBuilder.build());
775

    
776
						final String targetId = ld.isSymmetric() ? oafBuilder.getRel().getTarget() : oafBuilder.getRel().getSource();
777
						ensureBuilder(builders, targetId);
778
						final OafDecoder decoder = OafDecoder.decode(oafBuilder.build());
779

    
780
						if (ld.isChild()) {
781
							builders.get(targetId).addChild(type, decoder);
782
						} else {
783
							builders.get(targetId).addRelation(type, decoder);
784
						}
785
					}
786

    
787
				}
788
			}
789
		}
790

    
791
	}
792

    
793
	private void printAll(final Map<String, XmlRecordFactory> builders) throws DocumentException {
794
		print(Sets.newHashSet(Type.values()), builders, null);
795
	}
796

    
797
	private void print(final Set<Type> types, final Map<String, XmlRecordFactory> builders, final Map<Type, Set<String>> xpaths) throws DocumentException {
798
		final SAXReader r = new SAXReader();
799

    
800
		for (final Entry<String, XmlRecordFactory> e : builders.entrySet()) {
801
			final OafRowKeyDecoder kd = OafRowKeyDecoder.decode(e.getKey());
802

    
803
			if (!e.getValue().isValid()) throw new IllegalArgumentException("invalid builder: " + e.getKey());
804
			if (types.contains(kd.getType())) {
805
				final String val = IndentXmlString.apply(e.getValue().build());
806

    
807
				if ((xpaths != null) && !xpaths.isEmpty() && (xpaths.get(kd.getType()) != null)) {
808
					final Document doc = r.read(new StringReader(val));
809

    
810
					log.debug("\n" + e.getKey());
811
					for (final String xpath : xpaths.get(kd.getType())) {
812
						log.debug(doc.valueOf(xpath));
813
					}
814
				} else {
815
					log.info(val);
816
				}
817
			}
818
		}
819
	}
820

    
821
	private void printNoIndent(final Map<String, XmlRecordFactory> builders) {
822
		for (final Entry<String, XmlRecordFactory> e : builders.entrySet()) {
823
			if (e.getValue().isValid()) {
824
				log.debug(e.getValue().build());
825
			} else {
826
				log.debug("invalid builder: " + e.getKey());
827
			}
828
		}
829
	}
830

    
831
	private void ensureBuilder(final Map<String, XmlRecordFactory> builders, final String rowKey) throws Exception {
832
		if (!builders.containsKey(rowKey)) {
833
			builders.put(rowKey, newBuilder());
834
		}
835
	}
836

    
837
	private XmlRecordFactory newBuilder() throws TransformerConfigurationException, TransformerFactoryConfigurationError, DocumentException {
838
		return new XmlRecordFactory(entityConfigTable, ContextMapper.fromXml(Context.xml),
839
				RelClasses.fromJSon(RelClassesTest.relClassesJson), XmlRecordFactoryTest.SCHEMA_LOCATION, true, false, false, XmlRecordFactoryTest.specialDatasourceTypes);
840
	}
841

    
842
	private InputStream load(final String fileName) {
843
		return getClass().getResourceAsStream(fileName);
844
	}
845

    
846
	private InputStream loadFromTransformationProfile(final String profilePath) {
847
		log.info("Loading xslt from: " + basePathProfiles + profilePath);
848
		InputStream profile = getClass().getResourceAsStream(basePathProfiles + profilePath);
849
		SAXReader saxReader = new SAXReader();
850
		Document doc = null;
851
		try {
852
			doc = saxReader.read(profile);
853
		} catch (DocumentException e) {
854
			e.printStackTrace();
855
			throw new RuntimeException(e);
856
		}
857
		String xslt = doc.selectSingleNode("//SCRIPT/CODE/*[local-name()='stylesheet']").asXML();
858
		//log.info(xslt);
859
		return IOUtils.toInputStream(xslt);
860
	}
861

    
862
}
    (1-1/1)