Project

General

Profile

1
package eu.dnetlib.data.transform.xml.vtd;
2

    
3
import com.mongodb.client.MongoCollection;
4
import com.mongodb.client.MongoDatabase;
5
import eu.dnetlib.data.transform.xml2.DatasetToProto;
6
import org.apache.commons.lang3.time.StopWatch;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
10
import org.bson.Document;
11
import org.junit.Test;
12
import org.junit.runner.RunWith;
13
import org.springframework.beans.factory.annotation.Autowired;
14
import org.springframework.test.context.ContextConfiguration;
15
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
16

    
17
import java.io.IOException;
18
import java.util.Objects;
19
import java.util.concurrent.atomic.AtomicInteger;
20
import java.util.stream.StreamSupport;
21

    
22
import static org.junit.Assert.assertNotNull;
23
import static org.junit.Assert.assertTrue;
24

    
25
@RunWith(SpringJUnit4ClassRunner.class)
26
@ContextConfiguration(classes = { ConfigurationTestConfig.class })
27
public class VtdParserToProtoIT {
28

    
29
	private static final Log log = LogFactory.getLog(VtdParserToProtoIT.class);
30
	public static final String COLLECTION_NAME = "datacite";
31
	private static final int BATCH_SIZE = 10000;
32
	public static final int LOG_FREQ = 5000;
33

    
34
	@Autowired
35
	private MongoDatabase db;
36

    
37
	@Test
38
	public void testParseDatacite() throws IOException {
39

    
40
		final MongoCollection<Document> collection = db.getCollection(COLLECTION_NAME);
41

    
42
		final long collectionSize = collection.count();
43
		log.info(String.format("found %s records in collection '%s'", collectionSize, COLLECTION_NAME));
44

    
45
		final AtomicInteger read = new AtomicInteger(0);
46
		final DescriptiveStatistics stats = new DescriptiveStatistics();
47

    
48
		final StopWatch timer = new StopWatch();
49

    
50
		final DatasetToProto mapper = new DatasetToProto();
51
		StreamSupport.stream(collection.find().batchSize(BATCH_SIZE).spliterator(), false)
52
				.peek(d -> {
53
					if (read.addAndGet(1) % LOG_FREQ == 0) {
54
						log.info(String.format("records read so far %s", read.get()));
55
						log.info(String.format("stats so far %s", stats.toString()));
56
					}
57
				})
58
				.map(d -> (String) d.get("body"))
59
				.filter(Objects::nonNull)
60
				.peek(s -> timer.start())
61
				.map(mapper)
62
				.forEach(oaf -> {
63
					assertNotNull(oaf);
64
					assertTrue(oaf.hasEntity());
65

    
66
					timer.stop();
67
					stats.addValue(timer.getTime());
68
					timer.reset();
69
				});
70

    
71
		log.info(String.format("processed %s/%s records", read.get(), collectionSize));
72
	}
73

    
74

    
75
}
(2-2/4)