Project

General

Profile

« Previous | Next » 

Revision 47517

Stroing in mdstore fails if the VTD Parser cannot properly apply the xpaths on records to index

View differences:

modules/cnr-mongo-mdstore/trunk/src/test/java/eu/dnetlib/data/mdstore/modular/mongodb/IndexFieldParserTest.java
1 1
package eu.dnetlib.data.mdstore.modular.mongodb;
2 2

  
3
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
4
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser;
5
import org.apache.commons.io.IOUtils;
6
import org.junit.Test;
7

  
8 3
import java.io.IOException;
9 4
import java.io.InputStream;
10 5
import java.util.ArrayList;
11 6
import java.util.List;
12 7
import java.util.Map;
13 8

  
9
import eu.dnetlib.data.mdstore.MDStoreServiceException;
10
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
11
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser;
12
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException;
13
import org.apache.commons.io.IOUtils;
14
import org.junit.Test;
15

  
14 16
/**
15 17
 * Created by sandro on 11/29/16.
16 18
 */
......
18 20

  
19 21

  
20 22
    @Test
21
    public void parserTest() throws IOException {
23
    public void parserTest() throws IOException, MDStoreServiceException, IndexFieldRecordParserException {
22 24
        InputStream inputStream = this.getClass().getResourceAsStream("/eu/dnetlib/data/mdstore/modular/mongodb/inputRecord.xml");
23 25

  
24 26
        final String inputRecord = IOUtils.toString(inputStream);
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/utils/IndexFieldRecordParserException.java
1
package eu.dnetlib.data.mdstore.modular.mongodb.utils;
2

  
3
/**
4
 * Created by Alessia Bardi on 14/06/2017.
5
 *
6
 * @author Alessia Bardi
7
 */
8
public class IndexFieldRecordParserException extends Exception {
9

  
10
	public IndexFieldRecordParserException() {
11
		super();
12
	}
13

  
14
	public IndexFieldRecordParserException(final String message) {
15
		super(message);
16
	}
17

  
18
	public IndexFieldRecordParserException(final String message, final Throwable cause) {
19
		super(message, cause);
20
	}
21

  
22
	public IndexFieldRecordParserException(final Throwable cause) {
23
		super(cause);
24
	}
25
}
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/utils/IndexFieldRecordParser.java
1 1
package eu.dnetlib.data.mdstore.modular.mongodb.utils;
2 2

  
3
import java.util.ArrayList;
4
import java.util.HashMap;
5
import java.util.List;
6
import java.util.Map;
7

  
3 8
import com.ximpleware.AutoPilot;
4 9
import com.ximpleware.VTDGen;
5 10
import com.ximpleware.VTDNav;
......
7 12
import org.apache.commons.logging.Log;
8 13
import org.apache.commons.logging.LogFactory;
9 14

  
10
import java.util.ArrayList;
11
import java.util.HashMap;
12
import java.util.List;
13
import java.util.Map;
14

  
15 15
/**
16 16
 * Created by sandro on 11/29/16.
17 17
 */
......
19 19

  
20 20
    private static final Log log = LogFactory.getLog(IndexFieldRecordParser.class);
21 21

  
22
    public static List<String> getTextValue(final AutoPilot ap, final VTDNav vn, final String xpath) throws Exception {
22
    private static List<String> getTextValue(final AutoPilot ap, final VTDNav vn, final String xpath) throws Exception {
23 23
        List<String> results = new ArrayList<>();
24 24
        ap.selectXPath(xpath);
25 25
        while (ap.evalXPath() != -1) {
......
29 29
        return results;
30 30
    }
31 31

  
32
    public Map<String, List<String>> parseRecord(final String record, final List<MDFormatDescription> mdformats) {
32
    public Map<String, List<String>> parseRecord(final String record, final List<MDFormatDescription> mdformats) throws IndexFieldRecordParserException {
33 33
        if (mdformats == null || mdformats.size() == 0)
34 34
            return null;
35 35
        final Map<String, List<String>> result = new HashMap<>();
......
46 46
                result.put(description.getName(), xpathResult);
47 47
            }
48 48
            return result;
49

  
50 49
        } catch (Throwable e) {
51
            log.error("Error on parsing record " + record, e);
50
            throw new IndexFieldRecordParserException("Cannot index record", e);
52 51
        }
53
        return result;
54 52
    }
55 53

  
56 54

  
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoBulkWritesManager.java
1 1
package eu.dnetlib.data.mdstore.modular.mongodb;
2 2

  
3
import java.util.List;
4
import java.util.Map;
5

  
3 6
import com.google.common.collect.Lists;
4 7
import com.mongodb.BasicDBObject;
5 8
import com.mongodb.DBObject;
6 9
import com.mongodb.WriteConcern;
7 10
import com.mongodb.client.MongoCollection;
8 11
import com.mongodb.client.model.*;
12
import eu.dnetlib.data.mdstore.MDStoreServiceException;
9 13
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
10 14
import eu.dnetlib.data.mdstore.modular.RecordParser;
11 15
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser;
16
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException;
12 17
import org.apache.commons.logging.Log;
13 18
import org.apache.commons.logging.LogFactory;
14 19

  
15
import java.util.List;
16
import java.util.Map;
17

  
18 20
public class MongoBulkWritesManager {
19 21

  
20 22
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
21 23
	private final boolean discardRecords;
22
    private final boolean indexRecords;
23
    private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
24
    private final List<MDFormatDescription> mdref;
25
    private RecordParser recordParser;
24
	private final boolean indexRecords;
25
	private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
26
	private final List<MDFormatDescription> mdref;
27
	private RecordParser recordParser;
26 28
	private MongoCollection<DBObject> validCollection;
27 29
	private List<WriteModel<DBObject>> validBulkOperationList;
28 30
	private BulkWriteOptions writeOptions;
......
31 33
	private int bulkSize;
32 34

  
33 35
	public MongoBulkWritesManager(final MongoCollection<DBObject> collection,
34
                                  final MongoCollection<DBObject> discardedCollection,
35
                                  final List<MDFormatDescription> mdref,
36
                                  final int bulkSize,
37
                                  final RecordParser parser,
38
                                  final boolean discardRecords) {
36
			final MongoCollection<DBObject> discardedCollection,
37
			final List<MDFormatDescription> mdref,
38
			final int bulkSize,
39
			final RecordParser parser,
40
			final boolean discardRecords) {
39 41
		this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
40 42
		this.validBulkOperationList = Lists.newArrayList();
41 43

  
......
45 47
		this.bulkSize = bulkSize;
46 48
		this.recordParser = parser;
47 49
		this.discardRecords = discardRecords;
48
        this.mdref = mdref;
50
		this.mdref = mdref;
49 51

  
50
        this.indexRecords = this.mdref != null;
51
        this.writeOptions = new BulkWriteOptions().ordered(false);
52
		this.indexRecords = this.mdref != null;
53
		this.writeOptions = new BulkWriteOptions().ordered(false);
52 54
	}
53 55

  
54
	public void insert(final String record) {
56
	public void insert(final String record) throws MDStoreServiceException {
57
		Map<String, String> recordProperties = null;
55 58
		try {
56
			final Map<String, String> recordProperties = recordParser.parseRecord(record);
57
            Map<String, List<String>> indexRecordField = null;
58
            if (indexRecords) {
59
                indexRecordField = indexFieldRecordParser.parseRecord(record, mdref);
60
            }
61

  
62
			log.debug("found props: " + recordProperties);
63
			if (recordProperties.containsKey("id")) {
64
                final DBObject obj = buildDBObject(record, recordProperties, indexRecordField);
65
                if (log.isDebugEnabled()) {
66
					log.debug("Saving object" + obj);
67
				}
68
				validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true)));
69
				if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) {
70
					validCollection.bulkWrite(validBulkOperationList, writeOptions);
71
					validBulkOperationList.clear();
72
				}
73
			} else {
74
				if (discardRecords) {
75
					log.debug("parsed record seems invalid");
76
					discardRecord(record);
77
				}
78
			}
59
			recordProperties = recordParser.parseRecord(record);
79 60
		} catch (Throwable e) {
80 61
			if (discardRecords) {
81 62
				log.debug("unhandled exception: " + e.getMessage());
82 63
				discardRecord(record);
83 64
			}
84 65
		}
66
		Map<String, List<String>> indexRecordField = null;
67
		try {
68
			if (indexRecords) {
69
				indexRecordField = indexFieldRecordParser.parseRecord(record, mdref);
70
			}
71
		} catch (IndexFieldRecordParserException e) {
72
			// could not index record fields
73
			throw new MDStoreServiceException("Are you using the correct type of store / index definition for the records in " + validCollection.getNamespace() + " ?", e);
74
		}
75

  
76
		log.debug("found props: " + recordProperties);
77
		if (recordProperties.containsKey("id")) {
78
			final DBObject obj = buildDBObject(record, recordProperties, indexRecordField);
79
			if (log.isDebugEnabled()) {
80
				log.debug("Saving object" + obj);
81
			}
82
			validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true)));
83
			if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) {
84
				validCollection.bulkWrite(validBulkOperationList, writeOptions);
85
				validBulkOperationList.clear();
86
			}
87
		} else {
88
			if (discardRecords) {
89
				log.debug("parsed record seems invalid");
90
				discardRecord(record);
91
			}
92
		}
85 93
	}
86 94

  
87 95
	private void discardRecord(final String record) {
......
109 117
		discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
110 118
	}
111 119

  
112
    private DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
113
        final DBObject obj = new BasicDBObject();
120
	private DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
121
		final DBObject obj = new BasicDBObject();
114 122
		obj.put("id", recordProperties.get("id"));
115 123
		obj.put("originalId", recordProperties.get("originalId"));
116 124
		obj.put("body", record);
117 125
		obj.put("timestamp", System.currentTimeMillis());
118
        if (indexFieldProperties != null)
119
            obj.putAll(indexFieldProperties);
120
        return obj;
126
		if (indexFieldProperties != null)
127
			obj.putAll(indexFieldProperties);
128
		return obj;
121 129
	}
122 130

  
123 131
	private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) {

Also available in: Unified diff