Project

General

Profile

1
package eu.dnetlib.pid.resolver.mdstore.plugin;
2

    
3
import com.mongodb.DBObject;
4
import com.mongodb.client.MongoCollection;
5
import eu.dnetlib.pid.resolver.PIDResolver;
6
import eu.dnetlib.pid.resolver.model.ObjectRelation;
7
import eu.dnetlib.pid.resolver.model.ObjectType;
8
import eu.dnetlib.pid.resolver.model.PID;
9
import eu.dnetlib.pid.resolver.model.ResolvedObject;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12

    
13
import java.util.List;
14
import java.util.Map;
15
import java.util.concurrent.BlockingQueue;
16
import java.util.function.Function;
17

    
18
public abstract class AbstractRecordResolver implements RecordResolver {
19

    
20
    protected static final Log log = LogFactory.getLog(AbstractRecordResolver.class);
21
    protected final Long timestamp;
22
    protected List<PIDResolver> pluginResolver;
23
    protected BlockingQueue<DBObject> inputQueue;
24
    protected MongoCollection<DBObject> outputCollection;
25
    protected ResolverSerializer serializer;
26
    protected boolean offline;
27

    
28
    public AbstractRecordResolver(final long ts) {
29
        this.timestamp = ts;
30
    }
31

    
32
    protected String resolveRelations(String inputRecord, ResolvedObject inputObject, boolean shouldUpdate, Function<String,String> getInverseRelation) {
33
        if (inputObject.getRelations() != null) {
34
            for (ObjectRelation rel : inputObject.getRelations()) {
35
                final Map<String, ObjectType> resolvedRelation = tryToResolveRelation(rel.getTargetPID());
36
                if (resolvedRelation != null && !resolvedRelation.isEmpty()) {
37
                    resolvedRelation.entrySet()
38
                            .forEach(e -> {
39
                                rel.setTargetPID(new PID(e.getKey(), "dnet"));
40
                                rel.setTargetType(e.getValue());
41
                                rel.setInverseRelation(getInverseRelation.apply(rel.getRelationSemantics()));
42

    
43
                            });
44
                    shouldUpdate = true;
45
                }
46
            }
47
            if (shouldUpdate) {
48
                final String newXML = serializer.serializeReplacingXML(inputRecord, inputObject);
49
                return newXML;
50
            }
51
        }
52
        return null;
53
    }
54

    
55
    @Override
56
    public Boolean call() throws Exception {
57

    
58
        log.info("START HERE! Ressolving offline:"+offline);
59

    
60
        DBObject currentObject = inputQueue.take();
61
        int i = 0;
62
        String currentRecord = null;
63
        double sumTotal = 0;
64
        while (currentObject != ResolverMDStorePlugin.DONE) {
65
            try {
66
                currentRecord = (String) currentObject.get("body");
67
                if (currentObject.get("resolved_ts") == null) {
68
                    final double start = System.currentTimeMillis();
69
                    final String resolvedRecord = resolve(currentRecord);
70
                    if (resolvedRecord != null) {
71
                        currentObject.put("body", resolvedRecord);
72
                        currentObject.removeField("_id");
73
                        currentObject.put("resolved_ts", timestamp);
74
                        outputCollection.insertOne(currentObject);
75
                    }
76
                    final double total = System.currentTimeMillis() - start;
77
                    sumTotal += total;
78

    
79
                }
80

    
81
                currentObject = inputQueue.take();
82
                if (i++ % 100 == 0) {
83
                    log.debug(Thread.currentThread().getId() + " total object resolved: " + i + "in average time " + (sumTotal / 100) + "ms");
84
                    sumTotal = 0;
85
                }
86
            } catch (Throwable e) {
87
                log.error("Error on resolving objects " + currentRecord, e);
88
                return false;
89
            }
90
        }
91
        if (currentObject == ResolverMDStorePlugin.DONE) {
92
            inputQueue.put(currentObject);
93
        }
94
        return true;
95

    
96
    }
97

    
98
    protected abstract Map<String, ObjectType> tryToResolveRelation(PID targetPID);
99

    
100
    public void setOffline(boolean offline) {
101
        this.offline = offline;
102
    }
103
}
(1-1/6)