Project

General

Profile

1
package eu.dnetlib.pid.resolver.mdstore.plugin;
2

    
3
import com.mongodb.DBObject;
4
import com.mongodb.client.MongoCollection;
5
import eu.dnetlib.pid.resolver.PIDResolver;
6
import eu.dnetlib.pid.resolver.model.ObjectRelation;
7
import eu.dnetlib.pid.resolver.model.ObjectType;
8
import eu.dnetlib.pid.resolver.model.PID;
9
import eu.dnetlib.pid.resolver.model.ResolvedObject;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12

    
13
import java.util.List;
14
import java.util.Map;
15
import java.util.concurrent.BlockingQueue;
16

    
17
public abstract class AbstractRecordResolver implements RecordResolver {
18

    
19
    protected static final Log log = LogFactory.getLog(AbstractRecordResolver.class);
20
    protected final Long timestamp;
21
    protected List<PIDResolver> pluginResolver;
22
    protected BlockingQueue<DBObject> inputQueue;
23
    protected MongoCollection<DBObject> outputCollection;
24
    protected ResolverSerializer serializer;
25
    protected boolean offline;
26

    
27
    public AbstractRecordResolver(final long ts) {
28
        this.timestamp = ts;
29
    }
30

    
31
    protected String resolveRelations(String inputRecord, ResolvedObject inputObject, boolean shouldUpdate) {
32
        if (inputObject.getRelations() != null) {
33
            for (ObjectRelation rel : inputObject.getRelations()) {
34
                final Map<String, ObjectType> resolvedRelation = tryToResolveRelation(rel.getTargetPID());
35
                if (resolvedRelation != null && !resolvedRelation.isEmpty()) {
36
                    resolvedRelation.entrySet()
37
                            .forEach(e -> {
38
                                rel.setTargetPID(new PID(e.getKey(), "dnet"));
39
                                rel.setTargetType(e.getValue());
40
                            });
41
                    shouldUpdate = true;
42
                }
43
            }
44
            if (shouldUpdate) {
45
                final String newXML = serializer.serializeReplacingXML(inputRecord, inputObject);
46
                return newXML;
47
            }
48
        }
49
        return null;
50
    }
51

    
52
    @Override
53
    public Boolean call() throws Exception {
54

    
55
        log.info("START HERE! Ressolving offline:"+offline);
56

    
57
        DBObject currentObject = inputQueue.take();
58
        int i = 0;
59
        String currentRecord = null;
60
        double sumTotal = 0;
61
        while (currentObject != ResolverMDStorePlugin.DONE) {
62
            try {
63
                currentRecord = (String) currentObject.get("body");
64
                if (currentObject.get("resolved_ts") == null) {
65
                    final double start = System.currentTimeMillis();
66
                    final String resolvedRecord = resolve(currentRecord);
67
                    if (resolvedRecord != null) {
68
                        currentObject.put("body", resolvedRecord);
69
                        currentObject.removeField("_id");
70
                        currentObject.put("resolved_ts", timestamp);
71
                        outputCollection.insertOne(currentObject);
72
                    }
73
                    final double total = System.currentTimeMillis() - start;
74
                    sumTotal += total;
75

    
76
                }
77

    
78
                currentObject = inputQueue.take();
79
                if (i++ % 100 == 0) {
80
                    log.debug(Thread.currentThread().getId() + " total object resolved: " + i + "in average time " + (sumTotal / 100) + "ms");
81
                    sumTotal = 0;
82
                }
83
            } catch (Throwable e) {
84
                log.error("Error on resolving objects " + currentRecord, e);
85
                return false;
86
            }
87
        }
88
        if (currentObject == ResolverMDStorePlugin.DONE) {
89
            inputQueue.put(currentObject);
90
        }
91
        return true;
92

    
93
    }
94

    
95
    protected abstract Map<String, ObjectType> tryToResolveRelation(PID targetPID);
96

    
97
    public void setOffline(boolean offline) {
98
        this.offline = offline;
99
    }
100
}
(1-1/6)