Project

General

Profile

1
package eu.dnetlib.data.transform;
2

    
3
import java.io.*;
4
import java.util.*;
5
import java.util.Map.Entry;
6
import java.util.zip.GZIPInputStream;
7
import javax.xml.transform.TransformerConfigurationException;
8
import javax.xml.transform.TransformerFactoryConfigurationError;
9

    
10
import com.google.common.base.Function;
11
import com.google.common.collect.Iterables;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Maps;
14
import com.google.common.collect.Sets;
15
import com.google.protobuf.InvalidProtocolBufferException;
16
import com.googlecode.protobuf.format.JsonFormat;
17
import eu.dnetlib.actionmanager.actions.ActionFactory;
18
import eu.dnetlib.actionmanager.actions.XsltInfoPackageAction;
19
import eu.dnetlib.actionmanager.common.Agent;
20
import eu.dnetlib.actionmanager.common.Operation;
21
import eu.dnetlib.actionmanager.common.Provenance;
22
import eu.dnetlib.data.mapreduce.hbase.index.config.*;
23
import eu.dnetlib.data.mapreduce.util.*;
24
import eu.dnetlib.data.proto.KindProtos.Kind;
25
import eu.dnetlib.data.proto.OafProtos.Oaf;
26
import eu.dnetlib.data.proto.TypeProtos.Type;
27
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
28
import org.apache.commons.io.IOUtils;
29
import org.apache.commons.lang.StringUtils;
30
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.LogFactory;
32
import org.dom4j.Document;
33
import org.dom4j.DocumentException;
34
import org.dom4j.io.SAXReader;
35
import org.json.JSONObject;
36
import org.junit.Before;
37
import org.junit.Ignore;
38
import org.junit.Test;
39
import org.springframework.core.io.ByteArrayResource;
40
import org.springframework.core.io.Resource;
41

    
42
import static org.junit.Assert.*;
43

    
44
public class XsltRowTransformerFactoryTest {
45

    
46
	private static final Log log = LogFactory.getLog(XsltRowTransformerFactoryTest.class);
47
	private static String basePathProfiles = "/eu/dnetlib/test/profiles/TransformationRuleDSResources/TransformationRuleDSResourceType/2hbase/";
48
	private XsltRowTransformerFactory factory;
49
	private EntityConfigTable entityConfigTable;
50

    
51
	@Before
52
	public void setUp() throws Exception {
53
		factory = new XsltRowTransformerFactory();
54
		entityConfigTable = IndexConfig.load(IndexConfigTest.config).getConfigMap();
55
	}
56

    
57
	@Test
58
	@Ignore // need to reimplement because claimUpdates_2_hbase.xsl was removed
59
	public void testParseOafClaimUpdate() throws Exception {
60
		doTest(loadFromTransformationProfile("claimUpdates_2_hbase.xsl"), load("recordClaimUpdate.xml"));
61
	}
62

    
63
	@Test
64
	@Ignore // need to reimplement because claimUpdates_2_hbase.xsl was removed
65
	public void testParseClaimUpdate() throws Exception {
66

    
67
		final List<Row> rows = Lists.newArrayList();
68
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordClaim.xml")));
69
		rows.addAll(asRows(loadFromTransformationProfile("claimUpdates_2_hbase.xsl"), load("recordClaimUpdate.xml")));
70

    
71
		printAll(mapAll(buildTable(rows)));
72
	}
73

    
74
	@Test
75
	public void testParseClaimRel() throws Exception {
76

    
77
		doTest(loadFromTransformationProfile("claimRels_2_hbase.xml"), load("recordClaimRel.xml"));
78
	}
79

    
80

    
81
	@Test
82
	public void testParseFp7IctPUB() throws Exception {
83

    
84
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("ec_fp7_ict.xml"));
85
	}
86

    
87
	@Test
88
	public void testParseRecordCrossref() throws Exception {
89

    
90
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordCrossref.xml"));
91
	}
92

    
93
	@Test
94
	public void testParseDatasetPUB() throws Exception {
95

    
96
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatasetPUB.xml"));
97
	}
98

    
99
	@Test
100
	public void testParseSoftwareEgiApp() throws Exception {
101

    
102
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("softwareEgiApp.xml"));
103
	}
104

    
105
	@Test
106
	public void testParseSoftwareEgiApp2() throws Exception {
107

    
108
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("softwareEgiApp2.xml"));
109
	}
110

    
111
	@Test
112
	public void testParseOrpEgiApp() throws Exception {
113

    
114
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("orpEgiApp.xml"));
115
	}
116

    
117
	@Test
118
	public void testParseSoftwareDOECODE() throws Exception {
119

    
120
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("doecode.xml"));
121
	}
122

    
123

    
124
	@Test
125
	public void testParseDatasetLindat() throws Exception {
126

    
127
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("datasetLindat.xml"));
128
	}
129

    
130
	@Test
131
	public void testParseDatasetNeuroVault() throws Exception {
132

    
133
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordNeuroVault.xml"));
134
	}
135

    
136
	@Test
137
	public void testParseClaim() throws Exception {
138

    
139
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordClaim.xml"));
140
	}
141

    
142
	@Test
143
	public void testParseClaimDedup() throws Exception {
144

    
145
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordClaimedDedup.xml"));
146
	}
147

    
148

    
149
	@Test
150
	public void testParseClaimDataset() throws Exception {
151

    
152
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordClaimDataset.xml"));
153
	}
154

    
155

    
156
	@Test
157
	public void testParseACM() throws Exception {
158

    
159
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordACM.xml"));
160
	}
161

    
162
	@Test
163
	public void testParseASB() throws Exception {
164

    
165
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("recordASB.xml"));
166
	}
167

    
168
	@Test
169
	public void testParseProjectCorda() throws Exception {
170

    
171
		doTest(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml"));
172
	}
173

    
174
	@Test
175
	public void testParseProjectFCT() throws Exception {
176

    
177
		doTest(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordFCT.xml"));
178
	}
179

    
180

    
181
	@Test
182
	public void testParseOaf() throws Exception {
183

    
184
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("record.xml"));
185
	}
186

    
187
	@Test
188
	public void testParseOafPublication() throws Exception {
189

    
190
		doTest(loadFromTransformationProfile("oaf_entity2hbase.xml"), load("record.xml"));
191
	}
192

    
193
	@Test
194
	public void testParseLindat() throws Exception {
195

    
196
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordLindat.xml"));
197
	}
198

    
199
	@Test
200
	public void testParseDatacite() throws Exception {
201

    
202
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatacite.xml"));
203
	}
204

    
205
	@Test
206
	public void testParseDatacite2() throws Exception {
207

    
208
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("recordDatacite2.xml"));
209
	}
210

    
211
	@Test
212
	public void testParseOpenTrials() throws Exception {
213

    
214
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("opentrials_datacite1.xml"));
215
	}
216

    
217
	@Test
218
	public void testLinkPangaea() throws Exception {
219

    
220
		final List<Row> rows = Lists.newArrayList();
221
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("pangaeODF1.xml")));
222
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("pangaeODF2.xml")));
223
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("pangaeOAF.xml")));
224
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCordaPangaea.xml")));
225

    
226
		printAll(mapAll(buildTable(rows)));
227
	}
228

    
229
	@Test
230
	public void testPangaea() throws Exception {
231

    
232
		final List<Row> rows = Lists.newArrayList();
233
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("pangaeOAF2.xml")));
234
		printAll(mapAll(buildTable(rows)));
235
	}
236
	@Test
237
	public void testZenodo() throws Exception {
238

    
239
		final List<Row> rows = Lists.newArrayList();
240
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("zenodoData.xml")));
241
		printAll(mapAll(buildTable(rows)));
242
	}
243

    
244
	@Test
245
	public void testZenodoSoftware() throws Exception {
246

    
247
		final List<Row> rows = Lists.newArrayList();
248
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("softwareZenodo_odf.xml")));
249
		printAll(mapAll(buildTable(rows)));
250
	}
251

    
252
	@Test
253
	public void testLinkCorda() throws Exception {
254

    
255
		final List<Row> rows = Lists.newArrayList();
256
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml")));
257
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordCorda.xml")));
258

    
259
		printAll(mapAll(buildTable(rows)));
260
	}
261

    
262
	@Test
263
	public void testLinkFCT() throws Exception {
264

    
265
		final List<Row> rows = Lists.newArrayList();
266
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordFCT.xml")));
267
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordFCT.xml")));
268

    
269
		printAll(mapAll(buildTable(rows)));
270
	}
271

    
272
	@Test
273
	public void testLinkARC() throws Exception {
274

    
275
		final List<Row> rows = Lists.newArrayList();
276
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordARC.xml")));
277
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordARC.xml")));
278

    
279
		printAll(mapAll(buildTable(rows)));
280
	}
281

    
282
	@Test
283
	public void testLinkWT() throws Exception {
284

    
285
		final List<Row> rows = Lists.newArrayList();
286
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordWT.xml")));
287
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordWT.xml")));
288

    
289
		printAll(mapAll(buildTable(rows)));
290
	}
291

    
292

    
293

    
294
	@Test
295
	public void testLinkOrganization() throws Exception {
296

    
297
		final List<Row> rows = Lists.newArrayList();
298
		rows.addAll(asRows(loadFromTransformationProfile("organizations_2_hbase.xsl"), load("organization.xml")));
299
		rows.addAll(asRows(loadFromTransformationProfile("projectorganization_2_hbase.xsl"), load("project_organization.xml")));
300
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml")));
301

    
302
		printAll(mapAll(buildTable(rows)));
303
	}
304

    
305
	@Test
306
	public void testLinkOrganizationAffiliation() throws Exception {
307

    
308
		final List<Row> rows = Lists.newArrayList();
309
		rows.addAll(asRows(loadFromTransformationProfile("organizations_2_hbase.xsl"), load("organization.xml")));
310
		rows.addAll(asRows(loadFromTransformationProfile("resultorganization_2_hbase.xsl"), load("result_organization.xml")));
311
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("record.xml")));
312

    
313
		printAll(mapAll(buildTable(rows)));
314
	}
315

    
316
	@Test
317
	public void testDuplicates() throws Exception {
318
		final String mergeId = "50|dedup_wf_001::08ed625d07e5738b794ff14d6773fd9f";
319
		final List<Row> rows = Lists.newArrayList();
320

    
321
		final Function<Row, Row> f = rowIn -> {
322

    
323
			final List<Column<String,byte[]>> cols = Lists.newArrayList();
324
			for(Column<String,byte[]> col : rowIn.getColumns()) {
325
				if (col.getName().equals("body")) {
326
					cols.add(new Column(col.getName(), col.getValue()));
327

    
328
				}
329
			}
330
			return new Row("result", rowIn.getKey(), cols);
331
		};
332

    
333
		final List<Row> puma1 = asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordPuma1.xml"), f);
334
		puma1.add(new Row("resultResult_dedup_isMergedIn", mergeId));
335

    
336
		final List<Row> puma2 = asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordPuma2.xml"), f);
337
		puma2.add(new Row("resultResult_dedup_isMergedIn", mergeId));
338

    
339
		rows.addAll(puma1);
340
		rows.addAll(puma2);
341

    
342
		List<Oaf> duplicates = Lists.newArrayList();
343
		duplicates.add(getOafBody(puma1));
344
		duplicates.add(getOafBody(puma2));
345
		final Oaf.Builder oafMerge = OafEntityMerger.merge(mergeId, duplicates);
346

    
347
		final Row mergeRow = new Row("result", mergeId, Lists.newArrayList(new Column("body", oafMerge.build().toByteArray())));
348

    
349
		rows.add(mergeRow);
350

    
351
		printAll(mapAll(buildTable(rows)));
352
	}
353

    
354
	private Oaf getOafBody(final List<Row> rows) throws InvalidProtocolBufferException {
355
		for(Row row : rows) {
356
			if(StringUtils.startsWith(row.getKey(), "50")) {
357
				return Oaf.parseFrom(row.getColumn("body").getValue());
358

    
359
			}
360
		}
361
		return null;
362
	}
363

    
364
	@Test
365
	public void testParseDoajOAF() throws Exception {
366

    
367
		doTest(loadFromTransformationProfile("oaf2hbase.xml"), load("doajUniversityRecord.xml"));
368
	}
369

    
370
	@Test
371
	public void testParseDatasource() throws Exception {
372

    
373
		doTest(loadFromTransformationProfile("datasources_2_hbase.xsl"), load("datasourceNative.xml"));
374
	}
375
	@Test
376
	public void testParseDatasourcePiwik() throws Exception {
377

    
378
		doTest(loadFromTransformationProfile("datasources_2_hbase.xsl"), load("datasourcePiwik.xml"));
379
	}
380

    
381
	@Test
382
	public void testParseDataDatasource() throws Exception {
383

    
384
		doTest(loadFromTransformationProfile("datasources_2_hbase.xsl"), load("dataDatasource.xml"));
385
	}
386

    
387
	@Test
388
	public void testFromMongodbCompressedDump() throws Exception {
389
		doTestJsonGz(loadFromTransformationProfile("oaf2hbase.xml"), load("mdstore_cleaned.json.gz"));
390
	}
391

    
392
	@Test
393
	public void testLoadFromTransformationProfile() throws IOException {
394
		InputStream in = loadFromTransformationProfile("oaf2hbase.xml");
395
		log.info(IOUtils.toString(in));
396
	}
397

    
398
	@Test
399
	public void test_template() throws Exception {
400
		final String xslt = IOUtils.toString(loadFromTransformationProfile("oaf2hbase.xml"));
401
		final XsltRowTransformer transformer = factory.getTransformer(xslt);
402
		assertNotNull(transformer);
403

    
404
		final String record = IOUtils.toString(load("record.xml"));
405
		final List<Row> rows = transformer.apply(record);
406

    
407
		System.out.println(rows);
408
	}
409

    
410
	@Test
411
	public void testWrongCharsOrganization() throws Exception {
412
		final List<Row> rows = Lists.newArrayList();
413
		rows.addAll(asRows(loadFromTransformationProfile("organizations_2_hbase.xsl"), load("organizationWrongChars.xml")));
414
		printAll(mapAll(buildTable(rows)));
415
	}
416

    
417
	@Test
418
	public void testParseProjectWithFunderOriginalName() throws Exception {
419

    
420
		doTest(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectWithFunderOriginalName.xml"));
421
	}
422
	@Test
423
	public void testLinkFunderOriginalName() throws Exception {
424

    
425
		final List<Row> rows = Lists.newArrayList();
426
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectWithFunderOriginalName.xml")));
427
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordFunderOriginalName.xml")));
428

    
429
		printAll(mapAll(buildTable(rows)));
430
	}
431

    
432
	@Test
433
	public void testProjectExtraInfo() throws Exception {
434
		final List<Row> rows = Lists.newArrayList();
435
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordExtraInfo.xml")));
436
		printAll(mapAll(buildTable(rows)));
437
	}
438

    
439
	@Test
440
	public void testParseSoftwareFromODF() throws Exception {
441
		final List<Row> rows = Lists.newArrayList();
442
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("softwareODF.xml")));
443
		rows.addAll(asRows(loadFromTransformationProfile("projects_2_hbase.xsl"), load("projectRecordCorda.xml")));
444
		printAll(mapAll(buildTable(rows)));
445
	}
446

    
447
	@Test
448
	public void testParseSoftwareFromOAF() throws Exception {
449
		final List<Row> rows = Lists.newArrayList();
450
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordOAFsoftwareCLOSED.xml")));
451
		printAll(mapAll(buildTable(rows)));
452
	}
453

    
454
	@Test
455
	public void testParsePubFromODF() throws Exception {
456
		final List<Row> rows = Lists.newArrayList();
457
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("recordODFPub.xml")));
458
		printAll(mapAll(buildTable(rows)));
459
	}
460

    
461
	@Test
462
	public void testParseSoftwareFromOAFOpen() throws Exception {
463
		final List<Row> rows = Lists.newArrayList();
464
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("recordOAFsoftwareOPEN.xml")));
465
		printAll(mapAll(buildTable(rows)));
466
	}
467

    
468
	@Test
469
	public void testParseSoftwareBiotool() throws Exception {
470
		final List<Row> rows = Lists.newArrayList();
471
		rows.addAll(asRows(loadFromTransformationProfile("odf2hbase.xml"), load("biotoolSw.xml")));
472
		printAll(mapAll(buildTable(rows)));
473
	}
474

    
475
	@Test
476
	public void testParseOafWithExternalRef() throws Exception {
477
		final List<Row> rows = Lists.newArrayList();
478
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("oafWithExternalReference.xml")));
479
		printAll(mapAll(buildTable(rows)));
480
	}
481

    
482
	@Test
483
	public void testParseOafWithCommunity() throws Exception {
484
		final List<Row> rows = Lists.newArrayList();
485
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("oafWithCommunity.xml")));
486
		printAll(mapAll(buildTable(rows)));
487
	}
488

    
489
	@Test
490
	public void testParseOafWithUpdates() throws Exception {
491
		final List<Row> rows = Lists.newArrayList();
492
		rows.addAll(asRows(loadFromTransformationProfile("oaf2hbase.xml"), load("oafWithCommunity.xml")));
493

    
494
		ActionFactory actionFactory = new ActionFactory();
495

    
496
		Map<String, Resource> xslts = Maps.newHashMap();
497

    
498
		xslts.put("oaf2hbase", new ByteArrayResource(IOUtils.toString(loadFromTransformationProfile("oaf2hbase.xml")).getBytes()));
499
		actionFactory.setXslts(xslts);
500

    
501
		XsltInfoPackageAction pa = actionFactory.generateInfoPackageAction(
502
				"oaf2hbase",
503
				"rawset-id",
504
				new Agent("agent-id", "agent-name", Agent.AGENT_TYPE.algo),
505
				Operation.UPDATE,
506
				IOUtils.toString(load("oafUpdateWithCommunity.xml")),
507
				Provenance.sysimport_mining_aggregator,
508
				"0.9");
509

    
510
		final String qualifier = "update_" + System.nanoTime();
511

    
512
		IOUtils.readLines(load("country_updates.json")).forEach(line -> {
513

    
514
			Oaf.Builder oaf = Oaf.newBuilder();
515

    
516
			try {
517
				JsonFormat.merge(line, oaf);
518
			} catch (JsonFormat.ParseException e) {
519
				throw new IllegalArgumentException(e);
520
			}
521

    
522
			Column<String, byte[]> col = new Column<>("update_" + System.nanoTime(), oaf.build().toByteArray());
523
			rows.add(new Row("result", oaf.getEntity().getId(), Lists.newArrayList(col)));
524
		});
525

    
526
		pa.asAtomicActions().forEach(a -> {
527
			Column<String, byte[]> col = new Column<>("update_" + System.nanoTime(), a.getTargetValue());
528
			rows.add(new Row(a.getTargetColumnFamily(), a.getTargetRowKey(), Lists.newArrayList(col)));
529
		});
530

    
531

    
532
		/*
533
		rows.forEach(r -> {
534
			log.info(r);
535
		});
536
		*/
537

    
538
		mapAll(buildTable(rows)).entrySet().forEach(b -> {
539
			log.info(b.getKey());
540
			log.info(b.getValue());
541
		});
542
	}
543

    
544
	@Test
545
	public void testParseCrisPub() throws Exception {
546
		doTest(loadFromTransformationProfile("odf2hbase.xml"), load("cris_pub1.xml"));
547
	}
548

    
549
	private void doTest(final InputStream xsltStream, final InputStream recordStream) throws Exception {
550
		try {
551
			final List<Row> rows = asRows(xsltStream, recordStream);
552

    
553
			log.info(rows);
554

    
555
			final Map<String, Map<String, Map<String, byte[]>>> table = buildTable(rows);
556

    
557
			// System.out.println("\n" + table.toString());
558

    
559
			final Map<String, XmlRecordFactory> builders = mapAll(table);
560

    
561
			printAll(builders);
562
		} catch (final InvalidProtocolBufferException e) {
563
			throw new Exception(e);
564
		} catch (final TransformerConfigurationException e) {
565
			throw new Exception(e);
566
		} catch (final TransformerFactoryConfigurationError e) {
567
			throw new Exception(e);
568
		} catch (final DocumentException e) {
569
			throw new Exception(e);
570
		}
571
	}
572

    
573
	private void doTestJsonGz(final InputStream xsltStream, final InputStream recordStream) throws Exception {
574

    
575
		final Iterator<List<Row>> rowsIterator = asRowsJsonGzip(xsltStream, recordStream);
576

    
577
		int i = 0;
578
		while (rowsIterator.hasNext()) {
579
			final List<Row> rows = rowsIterator.next();
580
			i++;
581

    
582
			if ((i % 10000) == 0) {
583
				System.out.println(i);
584
			}
585

    
586
			final Map<String, Map<String, Map<String, byte[]>>> table = buildTableDoaj(rows);
587

    
588
			for (final Map<String, Map<String, byte[]>> m : table.values()) {
589
				for (final Map<String, byte[]> mv : m.values()) {
590
					for (final byte[] v : mv.values()) {
591
						final OafDecoder d = OafDecoder.decode(v);
592
						assertNotNull(d);
593
						assertNotNull(d.getOaf());
594

    
595
						switch (d.getKind()) {
596
						case entity:
597
							assertNotNull(d.getMetadata());
598
							if (d.getOaf().getEntity().getType().equals(Type.result)) {
599
								System.out.println(d.getOaf());
600
							}
601
							break;
602
						case relation:
603
							assertNotNull(d.getRel());
604
							break;
605
						default:
606
							break;
607
						}
608
					}
609
				}
610
			}
611
		}
612
	}
613

    
614
	private List<Row> asRows(final InputStream xsltStream, final InputStream recordStream, final Function<Row, Row> p) throws Exception {
615
		return asRows(xsltStream, new HashMap<>(), recordStream, p);
616
	}
617

    
618
	private List<Row> asRows(final InputStream xsltStream, final InputStream recordStream) throws Exception {
619
		return asRows(xsltStream, new HashMap<>(), recordStream);
620
	}
621

    
622
	private List<Row> asRows(final InputStream xsltStream, final Map<String, Object> params, final InputStream recordStream) throws Exception {
623
		return asRows(xsltStream, params, recordStream, null);
624
	}
625

    
626
	private List<Row> asRows(final InputStream xsltStream, final Map<String, Object> params, final InputStream recordStream, final Function<Row, Row> p) throws Exception {
627
		final String xslt = IOUtils.toString(xsltStream);
628
		final XsltRowTransformer transformer = factory.getTransformer(xslt, params);
629
		assertNotNull(transformer);
630

    
631
		final String record = IOUtils.toString(recordStream);
632
		final List<Row> rows = transformer.apply(record);
633

    
634
		assertNotNull(rows);
635
		assertFalse(rows.isEmpty());
636
		return p == null ? rows : Lists.newArrayList(Iterables.transform(rows, p));
637
	}
638

    
639
	private Iterator<List<Row>> asRowsJsonGzip(final InputStream xsltStream, final InputStream recordStreamJsonGzip) throws Exception {
640
		final String xslt = IOUtils.toString(xsltStream);
641
		final XsltRowTransformer transformer = factory.getTransformer(xslt);
642
		assertNotNull(transformer);
643
		assertNotNull(recordStreamJsonGzip);
644

    
645
		final GZIPInputStream stream = new GZIPInputStream(recordStreamJsonGzip);
646
		assertNotNull(stream);
647
		final BufferedReader inStream = new BufferedReader(new InputStreamReader(stream));
648
		assertNotNull(inStream);
649
		return new Iterator<List<Row>>() {
650

    
651
			String jsonRecord = null;
652

    
653
			@Override
654
			public boolean hasNext() {
655
				try {
656
					return (jsonRecord = inStream.readLine()) != null;
657
				} catch (final IOException e) {
658
					throw new RuntimeException(e);
659
				}
660
			}
661

    
662
			@Override
663
			public List<Row> next() {
664

    
665
				final JSONObject jsonObj = new JSONObject(jsonRecord);
666
				final String body = jsonObj.getString("body");
667
				try {
668
					assertTrue(StringUtils.isNotBlank(body));
669
					// System.out.println(body);
670
					final List<Row> rows = transformer.apply(body);
671
					assertNotNull(rows);
672
					assertFalse(rows.isEmpty());
673
					return rows;
674
				} catch (final Throwable e) {
675
					System.err.println("error transforming document: " + body);
676
					throw new RuntimeException(e);
677
				}
678
			}
679

    
680
			@Override
681
			public void remove() {
682
				throw new UnsupportedOperationException();
683
			}
684

    
685
		};
686

    
687
	}
688

    
689
	private Map<String, Map<String, Map<String, byte[]>>> buildTableDoaj(final List<Row> rows) throws UnsupportedEncodingException {
690
		final Map<String, Map<String, Map<String, byte[]>>> table = Maps.newHashMap();
691

    
692
		for (final Row row : rows) {
693
			final String rowKey = row.getKey();
694
			final String cf = row.getColumnFamily();
695
			if (!table.containsKey(rowKey)) {
696
				table.put(rowKey, new HashMap<>());
697
			}
698
			if (!table.get(rowKey).containsKey(cf)) {
699
				table.get(rowKey).put(row.getColumnFamily(), new HashMap<>());
700
			}
701
			for (final Column<String, byte[]> c : row.getColumns()) {
702
				// System.out.println(String.format("ADDING K:%s CF:%s Q:%s", rowKey, cf, c.getName()));
703
				table.get(rowKey).get(cf).put(c.getName(), c.getValue());
704
				if (cf.equals("result") && c.getName().equals("body")) {
705
					// System.out.println(String.format("ADDING K:%s CF:%s Q:%s", rowKey, cf, c.getName()));
706
					assertTrue(StringUtils.isNotBlank(new String(c.getValue(), "UTF-8")));
707
				}
708
			}
709
		}
710
		return table;
711

    
712
	}
713

    
714
	protected Map<String, Map<String, Map<String, byte[]>>> buildTable(final List<Row> rows) throws UnsupportedEncodingException {
715
		final Map<String, Map<String, Map<String, byte[]>>> table = Maps.newHashMap();
716

    
717
		for (final Row row : rows) {
718
			final String rowKey = row.getKey();
719
			final String cf = row.getColumnFamily();
720
			if (!table.containsKey(rowKey)) {
721
				table.put(rowKey, new HashMap<>());
722
			}
723
			if (!table.get(rowKey).containsKey(cf)) {
724
				table.get(rowKey).put(row.getColumnFamily(), new HashMap<>());
725
			}
726
			for (final Column<String, byte[]> c : row.getColumns()) {
727
				System.out.println(String.format("ADDING K:%s CF:%s Q:%s", rowKey, cf, c.getName()));
728
				table.get(rowKey).get(cf).put(c.getName(), c.getValue());
729
				if (c.getName().equals("body")) {
730
					final String theBody = new String(c.getValue(), "UTF-8");
731
					assertTrue(StringUtils.isNotBlank(theBody));
732
					//System.out.println(theBody);
733
				}
734
			}
735
		}
736
		return table;
737

    
738
	}
739

    
740
	protected Map<String, XmlRecordFactory> mapAll(final Map<String, Map<String, Map<String, byte[]>>> table) throws Exception {
741

    
742
		final Map<String, XmlRecordFactory> builders = Maps.newHashMap();
743
		for (final Entry<String, Map<String, Map<String, byte[]>>> e : table.entrySet()) {
744
			map(builders, e.getKey(), e.getValue());
745
		}
746
		return builders;
747
	}
748

    
749
	// private Map<String, XmlRecordFactory> mapResultsOnly(final Map<String, Map<String, Map<String, byte[]>>> table) throws Exception {
750
	//
751
	// final Map<String, XmlRecordFactory> builders = Maps.newHashMap();
752
	// for (final Entry<String, Map<String, Map<String, byte[]>>> e : table.entrySet()) {
753
	// final Type type = OafRowKeyDecoder.decode(e.getKey()).getType();
754
	// if (type == Type.result) {
755
	// map(builders, e.getKey(), e.getValue());
756
	// }
757
	// }
758
	// return builders;
759
	// }
760

    
761
	private void map(final Map<String, XmlRecordFactory> builders, final String rowKey, final Map<String, Map<String, byte[]>> row) throws Exception {
762

    
763
		final Type type = OafRowKeyDecoder.decode(rowKey).getType();
764

    
765
		final Map<String, byte[]> familyMap = row.get(type.toString());
766

    
767
		if (familyMap == null) return;
768

    
769
		final byte[] bodyB = familyMap.get("body");
770

    
771
		if (bodyB != null) {
772
			ensureBuilder(builders, rowKey);
773

    
774
			final Oaf oaf = UpdateMerger.mergeBodyUpdates(familyMap);
775

    
776
			final OafDecoder mainEntity = OafDecoder.decode(oaf);
777

    
778
			builders.get(rowKey).setMainEntity(mainEntity);
779

    
780
			for (final LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
781

    
782
				final String it = ld.getRelDescriptor().getIt();
783
				final Map<String, byte[]> cols = row.get(it);
784

    
785
				if ((cols != null) && !cols.isEmpty()) {
786

    
787
					for (final byte[] oafB : cols.values()) {
788

    
789
						final Oaf.Builder relBuilder = Oaf.newBuilder(Oaf.parseFrom(oafB));
790

    
791
						if (ld.isSymmetric()) {
792
							final RelDescriptor rd = ld.getRelDescriptor();
793

    
794
							relBuilder.getRelBuilder().setCachedTarget(mainEntity.getEntity()).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
795
						}
796

    
797
						relBuilder.getRelBuilder().setChild(ld.isChild());
798

    
799
						final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(System.currentTimeMillis());
800
						oafBuilder.mergeFrom(relBuilder.build());
801

    
802
						final String targetId = ld.isSymmetric() ? oafBuilder.getRel().getTarget() : oafBuilder.getRel().getSource();
803
						ensureBuilder(builders, targetId);
804
						final OafDecoder decoder = OafDecoder.decode(oafBuilder.build());
805

    
806
						if (ld.isChild()) {
807
							builders.get(targetId).addChild(type, decoder);
808
						} else {
809
							builders.get(targetId).addRelation(type, decoder);
810
						}
811
					}
812

    
813
				}
814
			}
815
		}
816

    
817
	}
818

    
819
	private void printAll(final Map<String, XmlRecordFactory> builders) throws DocumentException {
820
		print(Sets.newHashSet(Type.values()), builders, null);
821
	}
822

    
823
	private void print(final Set<Type> types, final Map<String, XmlRecordFactory> builders, final Map<Type, Set<String>> xpaths) throws DocumentException {
824
		final SAXReader r = new SAXReader();
825

    
826
		for (final Entry<String, XmlRecordFactory> e : builders.entrySet()) {
827
			final OafRowKeyDecoder kd = OafRowKeyDecoder.decode(e.getKey());
828

    
829
			if (!e.getValue().isValid()) throw new IllegalArgumentException("invalid builder: " + e.getKey());
830
			if (types.contains(kd.getType())) {
831
				final String val = IndentXmlString.apply(e.getValue().build());
832

    
833
				if ((xpaths != null) && !xpaths.isEmpty() && (xpaths.get(kd.getType()) != null)) {
834
					final Document doc = r.read(new StringReader(val));
835

    
836
					log.debug("\n" + e.getKey());
837
					for (final String xpath : xpaths.get(kd.getType())) {
838
						log.debug(doc.valueOf(xpath));
839
					}
840
				} else {
841
					log.info(val);
842
				}
843
			}
844
		}
845
	}
846

    
847
	private void printNoIndent(final Map<String, XmlRecordFactory> builders) {
848
		for (final Entry<String, XmlRecordFactory> e : builders.entrySet()) {
849
			if (e.getValue().isValid()) {
850
				log.debug(e.getValue().build());
851
			} else {
852
				log.debug("invalid builder: " + e.getKey());
853
			}
854
		}
855
	}
856

    
857
	private void ensureBuilder(final Map<String, XmlRecordFactory> builders, final String rowKey) throws Exception {
858
		if (!builders.containsKey(rowKey)) {
859
			builders.put(rowKey, newBuilder());
860
		}
861
	}
862

    
863
	private XmlRecordFactory newBuilder() throws TransformerConfigurationException, TransformerFactoryConfigurationError, DocumentException {
864
		return new XmlRecordFactory(entityConfigTable, ContextMapper.fromXml(Context.xml),
865
				RelClasses.fromJSon(RelClassesTest.relClassesJson), XmlRecordFactoryTest.SCHEMA_LOCATION, true, false, false, XmlRecordFactoryTest.specialDatasourceTypes);
866
	}
867

    
868
	private InputStream load(final String fileName) {
869
		return getClass().getResourceAsStream(fileName);
870
	}
871

    
872
	private InputStream loadFromTransformationProfile(final String profilePath) {
873
		log.info("Loading xslt from: " + basePathProfiles + profilePath);
874
		InputStream profile = getClass().getResourceAsStream(basePathProfiles + profilePath);
875
		SAXReader saxReader = new SAXReader();
876
		Document doc = null;
877
		try {
878
			doc = saxReader.read(profile);
879
		} catch (DocumentException e) {
880
			e.printStackTrace();
881
			throw new RuntimeException(e);
882
		}
883
		String xslt = doc.selectSingleNode("//SCRIPT/CODE/*[local-name()='stylesheet']").asXML();
884
		//log.info(xslt);
885
		return IOUtils.toInputStream(xslt);
886
	}
887

    
888
}
    (1-1/1)