Project

General

Profile

« Previous | Next » 

Revision 54390

updated resolver and Trying to allow mdstore plugin to print progress information on UI

View differences:

webapps/dnet-dli-container/trunk/src/main/resources/eu/dnetlib/cnr-site.properties
3 3
#services.is.store.database.bean                = 	temporaryExistDatabase
4 4

  
5 5
dnet.data.path 									=	/var/lib/dnet_dli
6
#dnet.data.path 									=	/tmp/dnet
7
dnet.bootstrap.schemas=classpath*:/eu/dnetlib/bootstrap/schemas/**/*.xsd
8
dnet.bootstrap.profiles=classpath*:/eu/dnetlib/bootstrap/profiles/**/*.xml
6
#dnet.data.path 								=	/tmp/dnet
7
dnet.bootstrap.schemas                          =   classpath*:/eu/dnetlib/bootstrap/schemas/**/*.xsd
8
dnet.bootstrap.profiles                         =   classpath*:/eu/dnetlib/bootstrap/profiles/**/*.xml
9 9

  
10 10
services.msro.reindex.limit      				=   100
11 11
#services.msro.disabled.beans   				=   reindexOAIPostCleaningEFGHandler,reindexOAIPostEditEFGHandler,reindexPostEditEFGHandler
12 12

  
13
msro.wf.logger.name					=	wf_logs_dli
13
msro.wf.logger.name					            =	wf_logs_dli
14 14

  
15 15
services.aggregator.host						=	localhost
16 16
services.aggregator.name        				=   DRIVER
......
24 24
msro.wf.nodes.mdstoreSearch.interpretation 		=	cleaned
25 25
msro.wf.nodes.download.xpathMetadataId 			=	//*[local-name()='objIdentifier']/text()
26 26

  
27

  
28 27
dnet.modular.ui.authorization.default.superAdmin=	admin
29 28

  
30

  
31 29
# MSRO SPECIFIC
32 30
services.msro.graph.updateindex.incremental		=	classpath:/eu/dnetlib/enabling/manager/msro/wf/update-index-incremental.wf.xml
33 31
dnet.datasource.updater 						=   openaireplusDatasourceUpdater
34 32

  
35 33
hbase.actions.table 							=	db_actions_dli
36 34

  
37
services.datasourceManager.core=simpleDnetDataSourceManagerCore
35
services.datasourceManager.core                 =   simpleDnetDataSourceManagerCore
38 36

  
39
dnet.datasourcemanager.db.name=dnet_dli
37
dnet.datasourcemanager.db.name                  =   dnet_dli
40 38

  
41 39

  
42 40
# Mail configuration
43 41

  
44
msro.wf.mail.smtp.host = smtp.isti.cnr.it
45
msro.wf.mail.smtp.user = smtp-dnet
46
msro.wf.mail.smtp.password = hhr*7932
47
msro.wf.mail.cc = michele.artini@isti.cnr.it
48
services.pid.resolver.header=dli_resolver::
42
msro.wf.mail.smtp.host                          =   smtp.isti.cnr.it
43
msro.wf.mail.smtp.user                          =   smtp-dnet
44
msro.wf.mail.smtp.password                      =   hhr*7932
45
msro.wf.mail.cc                                 =   michele.artini@isti.cnr.it
46
services.pid.resolver.header                    =   dli_resolver::
49 47

  
50
service.index.solr.rank.enable=true
48
service.index.solr.rank.enable                  =   true
51 49

  
52
services.publisher.oai.host=localhost
53
services.publisher.oai.port=27017
54
services.objectstore.dao=gridFSObjectstoreDao
55
services.transformation.blacklist_api = http://localhost:8280/validator-service/worfklows?request=GetBlacklistedRecords&datasourceId=
50
services.publisher.oai.host                     =   localhost
51
services.publisher.oai.port                     =   27017
52
services.objectstore.dao                        =   gridFSObjectstoreDao
53
services.transformation.blacklist_api           = http://localhost:8280/validator-service/worfklows?request=GetBlacklistedRecords&datasourceId=
56 54
# AUTH
57 55

  
58 56
dnet.modular.ui.authorization.mongo.port=27017
......
70 68
#dnet.modular.ui.authorization.mongo.host=localhost
71 69
services.mdstore.mongodb.host=localhost
72 70
services.dli.resolver.store.DatabaseName=dliResolvedStore
73

  
74 71
services.dli.resolver.crossRef.dump=ES
modules/dnet-dli/trunk/src/test/java/eu/dnetlib/dli/collector/plugin/CrossRefIteratorTest.java
2 2

  
3 3
import org.antlr.stringtemplate.StringTemplate;
4 4
import org.apache.commons.io.IOUtils;
5
import org.junit.Ignore;
5 6
import org.junit.Test;
6 7

  
7 8
import java.io.IOException;
......
11 12

  
12 13

  
13 14
    @Test
15
    @Ignore
14 16
    public void CrossRefIteratorTest() throws IOException {
15 17
        final InputStream resourceAsStream = getClass().getResourceAsStream("/eu/dnetlib/dli/templates/Scholixv1.st");
16 18

  
modules/dnet-dli/trunk/src/test/resources/eu/dnetlib/dli/parser/inputANDS.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<oai:record xmlns:oai="http://www.openarchives.org/OAI/2.0/"
3
            xmlns:dri="http://www.driver-repository.eu/namespace/dri"
4
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
5
    <oai:header>
6
        <dri:objIdentifier>r3d100010464::000383753d5421f363c55070598e99f8</dri:objIdentifier>
7
        <dri:recordIdentifier>oai:ands.org.au::dd4da803-156c-5e80-a929-5f99213967f9</dri:recordIdentifier>
8
        <dri:dateOfCollection>2017-09-18T12:32:01.055+02:00</dri:dateOfCollection>
9
        <dri:repositoryId>ands_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=
10
        </dri:repositoryId>
11
        <dri:datasourceprefix>r3d100010464</dri:datasourceprefix>
12
        <identifier xmlns="http://www.openarchives.org/OAI/2.0/">oai:ands.org.au::dd4da803-156c-5e80-a929-5f99213967f9
13
        </identifier>
14
        <datestamp xmlns="http://www.openarchives.org/OAI/2.0/">2017-09-14T09:09:41Z</datestamp>
15
        <setSpec xmlns="http://www.openarchives.org/OAI/2.0/">datasource:66</setSpec>
1
<record xmlns="http://www.scholix.org" xmlns:oaf="http://namespace.dnet.eu/oaf"
2
        xmlns:dc="http://purl.org/dc/elements/1.1/"
3
        xmlns:dri="http://www.driver-repository.eu/namespace/dri">
4
    <oai:header xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5
                xmlns:oai="http://www.openarchives.org/OAI/2.0/">
6
        <dri:objIdentifier>crossref____::6b85fd4ba09dfc3cd0417e2f758755e5</dri:objIdentifier>
7
        <dri:resolvedDate>2018-12-13T12:29:43.770</dri:resolvedDate>
8
        <dri:resolvedDate>2018-12-13T11:13:45.043</dri:resolvedDate>
9
        <dri:recordIdentifier>10.1577/1548-8675(1998)018<0569:phbitw>2.0.co;2::doi</dri:recordIdentifier>
10
        <dri:dateOfCollection>2018-12-11T12:27:26.758+01:00</dri:dateOfCollection>
11
        <dri:repositoryId>0cf05336-894e-4793-a1f0-58d9705c4e6f_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=</dri:repositoryId>
12
        <dri:datasourceprefix>crossref____</dri:datasourceprefix>
16 13
    </oai:header>
17
    <metadata xmlns="http://www.openarchives.org/OAI/2.0/">
18
        <link xmlns="">
19
            <publicationDate>2013-09-16</publicationDate>
20
            <publisher>
21
                <name>James Cook University</name>
22
            </publisher>
23
            <linkProvider>
24
                <name>Australian National Data Service</name>
25
                <identifiers>
26
                    <identifier>http://nla.gov.au/nla.party-1508909</identifier>
27
                    <schema>url</schema>
28
                </identifiers>
29
            </linkProvider>
30
            <relationship>
31
                <name>hasAssociationWith</name>
32
                <schema>RIF-CS</schema>
33
                <inverseRelationship>hasAssociationWith</inverseRelationship>
34
            </relationship>
35
            <source>
36
                <identifier>
37
                    <identifier>
38
                        https://research.jcu.edu.au/researchdata/published/detail/b4984261068349a62c8479992e41f163
39
                    </identifier>
40
                    <schema>url</schema>
41
                </identifier>
42
                <objectType>
43
                    <type>dataset</type>
44
                </objectType>
45
                <title>Vertebrate monitoring in the Australian Wet Tropics rainforest at AU8A6 (145.62993124,
46
                    -17.60369044, 800.0m above MSL) collected by Reptile Surveys
47
                </title>
48
                <publicationDate>2017-09-14</publicationDate>
49
            </source>
50
            <target>
51
                <identifier>
52
                    <identifier>http://dx.doi.org/10.1890/09-1069.1</identifier>
53
                    <schema>url</schema>
54
                </identifier>
55
                <objectType>
56
                    <type>literature</type>
57
                </objectType>
58
                <title>Distributions, life history specialisation and phylogeny of the rainforest vertebrates in the
59
                    Australian Wet Tropics.
60
                </title>
61
            </target>
62
        </link>
14
    <metadata>
15
        <oaf:pid type ="doi">10.1577/1548-8675(1998)018&lt;0569:PHBITW&gt;2.0.CO;2</oaf:pid>
16
        <dc:identifier>http://dx.doi.org/10.1577/1548-8675(1998)018&lt;0569:PHBITW&gt;2.0.CO;2</dc:identifier>
17
        <dc:date></dc:date>
18
        <dc:description></dc:description>
19
        <dc:type>publication</dc:type>
20

  
21
        <oaf:relatedIdentifier relatedIdentifierType="dnet" relationType="References" inverseRelationType="" entityType="publication">dli_resolver::60f9cf6d317e38bd7081f7733d92d80a</oaf:relatedIdentifier>
63 22
    </metadata>
64
    <oai:about>
65
        <provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance"
66
                    xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
67
            <originDescription xmlns="" altered="true" harvestDate="2017-09-18T12:32:01.055+02:00">
68
                <baseURL>https%3A%2F%2Fresearchdata.ands.org.au%2Fapi%2Fregistry%2Foai%2F</baseURL>
69
                <identifier>oai:ands.org.au::dd4da803-156c-5e80-a929-5f99213967f9</identifier>
70
                <datestamp>2017-09-14T09:09:41Z</datestamp>
71
                <metadataNamespace/>
72
            </originDescription>
73
        </provenance>
74
    </oai:about>
75
</oai:record>
23
    <oaf:about>
24
        <oaf:datainfo>
25
            <oaf:completionStatus>incomplete</oaf:completionStatus>
26

  
27
            <oaf:collectedFrom id="dli_________::crossref" name="Crossref" completionStatus="incomplete"/>
28

  
29
        </oaf:datainfo>
30
    </oaf:about>
31

  
32

  
33
</record>
modules/dnet-dli/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/MergeDLIRecord.java
6 6
import eu.dnetlib.msro.workflows.procs.ProcessAware;
7 7
import eu.dnetlib.msro.workflows.procs.Token;
8 8
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
9
import eu.dnetlib.msro.workflows.util.ProgressProvider;
9 10
import eu.dnetlib.rmi.data.MDStoreService;
10 11
import eu.dnetlib.rmi.enabling.ISLookUpService;
11 12
import org.springframework.beans.factory.annotation.Autowired;
12 13

  
13 14
import java.util.List;
14 15

  
15
public class MergeDLIRecord extends BlackboardJobNode implements ProcessAware {
16
public class MergeDLIRecord extends BlackboardJobNode implements ProcessAware, ProgressProvider {
16 17

  
17 18
    private final static String queryTemplate = "for $x in collection(' /db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType') where $x//RESOURCE_IDENTIFIER/@value/string()='%s' return $x//FIELD[./key='NamespacePrefix']/value/text()";
18 19

  
......
28 29

  
29 30
    private String sparkPath;
30 31

  
32
    private String numExecutor;
33

  
31 34
    private boolean skipJob = false;
32 35

  
33 36
    private String sparkApplicationName;
......
62 65
            job.getParameters().put("sparkPath", getSparkPath());
63 66
            job.getParameters().put("sparkJobPath", getSparkJobPath());
64 67
            job.getParameters().put("mongoDBName", getMongoDBName());
68
            job.getParameters().put("numExecutor", getNumExecutor());
65 69
            job.getParameters().put("sparkApplicationName", process.getId());
66 70
        }
67 71

  
......
135 139
    public void setDsId(String dsId) {
136 140
        this.dsId = dsId;
137 141
    }
142

  
143
    public String getNumExecutor() {
144
        return numExecutor;
145
    }
146

  
147
    public void setNumExecutor(String numExecutor) {
148
        this.numExecutor = numExecutor;
149
    }
150

  
151
    @Override
152
    public String getProgressDescription() {
153
        return null;
154
    }
138 155
}
modules/dnet-dli/trunk/src/main/java/eu/dnetlib/resolver/parser/PMFResolverParser.java
122 122

  
123 123
			return parsedObject;
124 124
		} catch (Throwable e) {
125
			log.debug("Input record: " + record);
125
			log.error("Input record: " + record);
126 126
			log.error("Error on parsing record ", e);
127 127
			return null;
128 128
		}
modules/dnet-dli/trunk/src/main/java/eu/dnetlib/resolver/parser/DMFResolverParser.java
75 75

  
76 76

  
77 77
			final List<Node> relations =
78
                    VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='relatedIdentifier']", Arrays.asList("relatedIdentifierType", "relationType", "inverseRelationType"));
78
                    VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='relatedIdentifier']", Arrays.asList("relatedIdentifierType", "relationType", "inverseRelationType", "entityType"));
79 79

  
80 80
			if (relations != null && relations.size() > 0) {
81 81
				final List<ObjectRelation> relationsResult = new ArrayList<>();
......
83 83
					final String relationType = relationMap.getAttributes().get("relationType");
84 84
                    final String inverseRelationType = relationMap.getAttributes().get("inverseRelationType");
85 85
                    final String relatedIdentifierType = relationMap.getAttributes().get("relatedIdentifierType");
86
                    final String relatedEntityType = relationMap.getAttributes().get("entityType");
86 87
					final String relatedPid = relationMap.getTextValue();
87 88
                    final DLIObjectRelation currentRelation = new DLIObjectRelation();
89
                    currentRelation.setTargetType(ObjectType.valueOf(relatedEntityType));
88 90
                    currentRelation.setTargetPID(new PID(relatedPid, relatedIdentifierType));
89 91
					currentRelation.setRelationSemantics(relationType);
90 92
                    currentRelation.setInverseRelation(inverseRelationType);
modules/dnet-dli/trunk/src/main/java/eu/dnetlib/resolver/mdstore/plugin/DLIMergeRecord.java
42 42
        final String sparkJobPath= params.get("sparkJobPath");
43 43
        final String sparkApplicationName= params.get("sparkApplicationName");
44 44
        final String mongoDBName= params.get("mongoDBName");
45
        final String number_of_core= params.get("numExecutor");
46

  
45 47
        if (isNotBlank(id) && isNotBlank(host) && isNotBlank(nsPrefix) && isNotBlank(sparkJobPath) && isNotBlank(sparkPath)) {
46 48
            log.debug("starting spark job");
47 49
            final String mdStoreCollection = transactionManager.getMDStoreCollection(id);
48
            final String [] command= {sparkPath+"bin/spark-submit",sparkJobPath ,host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, sparkApplicationName };
50
            final String [] command= {sparkPath+"bin/spark-submit",sparkJobPath ,host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName };
49 51
            try {
50 52
                final ProcessBuilder builder = new ProcessBuilder(command);
51 53
                final Process p = builder.start();
......
84 86
        }
85 87

  
86 88
    }
89

  
90
    @Override
91
    public String getStatus() {
92
        return "30/100";
93
    }
87 94
}
modules/dnet-dli/trunk/src/main/java/eu/dnetlib/resolver/mdstore/plugin/DLIRecordResolver.java
25 25
import java.util.List;
26 26
import java.util.Map;
27 27
import java.util.concurrent.BlockingQueue;
28
import java.util.function.Function;
28 29

  
29 30

  
30 31
/**
......
57 58
		if (inputObject.getCompletionStatus() == null || !inputObject.getCompletionStatus().equals(CompletionStatus.complete.toString())) {
58 59
			shouldUpdate = tryToResolveRecord(inputObject);
59 60
		}
60
        return resolveRelations(inputRecord, inputObject, shouldUpdate);
61 61

  
62
        return resolveRelations(inputRecord, inputObject, shouldUpdate,s -> {
63

  
64
			try {
65
				return DLIUtils.getInverse(s);
66
			} catch (Exception e) {
67
				log.error("Error on getting Inverse relation from "+s, e);
68
				return "";
69
			}
70
		});
71

  
62 72
	}
63 73

  
64 74
    @Override
modules/dnet-dli/trunk/src/main/java/eu/dnetlib/dli/resolver/DataciteOfflineResolver.java
86 86
                    .collect(Collectors.toList())
87 87
            );
88 88
        }
89
        dli.setDatasourceProvenance(A);
89
        dli.setDatasourceProvenance(
90
                DnetStreamSupport.generateStreamFromIterator(root.getAsJsonArray("datasourceProvenance").iterator())
91
                        .map(JsonElement::getAsJsonObject)
92
                        .map(it -> {
93
                            final DLIObjectProvenance provenance =new DLIObjectProvenance();
94
                            provenance.setDatasourceId(it.get("datasourceId").getAsString());
95
                            provenance.setCompletionStatus(it.get("completionStatus").getAsString());
96
                            provenance.setProvisionMode(it.get("provisionMode").getAsString());
97
                            provenance.setDatasource(it.get("datasource").getAsString());
98
                            provenance.setPublisher(getStringValue(it, "publisher"));
99
                            return provenance;
100
                        })
101
                        .collect(Collectors.toList())
102
        );
90 103
        dli.setCompletionStatus(CompletionStatus.complete.toString());
91 104
        return dli;
92 105
    }
modules/dnet-dli/trunk/src/main/resources/eu/dnetlib/dli/templates/DMFXML.st
1 1
<metadata>
2 2
	<resource xmlns="http://datacite.org/schema/kernel-3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://datacite.org/schema/kernel-3 http://schema.datacite.org/meta/kernel-3/metadata.xsd">
3
		<identifier identifierType="$object.pidType$">$object.pid$</identifier>
3
		<identifier identifierType="$object.pidType$">$object.escapedXMLPid$</identifier>
4 4
		<creators> $object.escapedXMLAuthors:{<creator><creatorName>$it$</creatorName></creator>}$ </creators>
5 5
		<titles> $object.escapedXMLTitles:{<title>$it$</title> }$ </titles>
6 6
		$object.datasourceProvenance:{
modules/dnet-data-services/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/action/MDStorePlugin.java
7 7

  
8 8
public interface MDStorePlugin {
9 9

  
10
	public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback) throws MDStoreServiceException;
10
	void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback) throws MDStoreServiceException;
11 11

  
12
	default String getStatus() {
13
		return "";
14
	}
15

  
12 16
}
modules/dnet-data-services/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/action/PluginAction.java
1 1
package eu.dnetlib.data.mdstore.modular.action;
2 2

  
3 3
import java.util.Map;
4
import java.util.concurrent.Executors;
4 5

  
5 6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
6 7
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
......
9 10
import org.apache.commons.logging.LogFactory;
10 11
import org.springframework.beans.factory.annotation.Autowired;
11 12

  
13

  
14
class StatusThread extends Thread {
15
	boolean shouldUpdate = false;
16
	final BlackboardServerHandler handler;
17
	final BlackboardJob job;
18
	final long timeout;
19
	final MDStorePlugin currentPlugin;
20

  
21
	public StatusThread(final BlackboardServerHandler handler, final BlackboardJob job, final long timeout, final  MDStorePlugin currentPlugin) {
22
		this.job = job;
23
		this.handler = handler;
24
		this.timeout = timeout;
25
		this.currentPlugin = currentPlugin;
26
	}
27

  
28
	@Override
29
	public void run() {
30
		while (shouldUpdate) {
31
			job.getParameters().put("ongoingPercentage",currentPlugin.getStatus());
32
			handler.ongoing(job);
33
			try {
34
				sleep(timeout);
35
			} catch (InterruptedException e) {
36

  
37
			}
38
		}
39
	}
40

  
41
	public boolean isShouldUpdate() {
42
		return shouldUpdate;
43
	}
44

  
45
	public void setShouldUpdate(boolean shouldUpdate) {
46
		this.shouldUpdate = shouldUpdate;
47
	}
48
}
49

  
12 50
public class PluginAction extends AbstractMDStoreAction {
13 51

  
14 52
	private static final Log log = LogFactory.getLog(PluginAction.class);
......
27 65

  
28 66
		log.info("running MDStore plugin: " + name);
29 67

  
68
		final StatusThread st = new StatusThread(handler, job,20000, plugins.get(name));
69
		st.setShouldUpdate(true);
70
		st.start();
30 71
		plugins.get(name).run(getDao(), job.getParameters(), params -> {
72
			st.setShouldUpdate(false);
73

  
31 74
			job.getParameters().putAll(params);
32 75
			handler.done(job);
33 76
		});
modules/dnet-pid-resolver/trunk/src/main/java/eu/dnetlib/pid/resolver/mdstore/plugin/AbstractRecordResolver.java
13 13
import java.util.List;
14 14
import java.util.Map;
15 15
import java.util.concurrent.BlockingQueue;
16
import java.util.function.Function;
16 17

  
17 18
public abstract class AbstractRecordResolver implements RecordResolver {
18 19

  
......
28 29
        this.timestamp = ts;
29 30
    }
30 31

  
31
    protected String resolveRelations(String inputRecord, ResolvedObject inputObject, boolean shouldUpdate) {
32
    protected String resolveRelations(String inputRecord, ResolvedObject inputObject, boolean shouldUpdate, Function<String,String> getInverseRelation) {
32 33
        if (inputObject.getRelations() != null) {
33 34
            for (ObjectRelation rel : inputObject.getRelations()) {
34 35
                final Map<String, ObjectType> resolvedRelation = tryToResolveRelation(rel.getTargetPID());
......
37 38
                            .forEach(e -> {
38 39
                                rel.setTargetPID(new PID(e.getKey(), "dnet"));
39 40
                                rel.setTargetType(e.getValue());
41
                                rel.setInverseRelation(getInverseRelation.apply(rel.getRelationSemantics()));
42

  
40 43
                            });
41 44
                    shouldUpdate = true;
42 45
                }
modules/dnet-pid-resolver/trunk/src/main/java/eu/dnetlib/pid/resolver/mdstore/plugin/ResolverMDStorePlugin.java
1 1
package eu.dnetlib.pid.resolver.mdstore.plugin;
2 2

  
3
import com.google.common.collect.Lists;
3 4
import com.mongodb.BasicDBObject;
4 5
import com.mongodb.BasicDBObjectBuilder;
5 6
import com.mongodb.DBObject;
6 7
import com.mongodb.client.FindIterable;
7 8
import com.mongodb.client.MongoCollection;
8 9
import com.mongodb.client.MongoDatabase;
9
import com.mongodb.client.model.Filters;
10
import com.mongodb.client.model.UpdateOptions;
10
import com.mongodb.client.model.*;
11 11
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
12 12
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
13 13
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
......
176 176
        final Bson queryByTs = Filters.gte("resolved_ts", Long.valueOf(timestamp));
177 177
        int i = 0;
178 178
        final FindIterable<DBObject> dbObjects = timestamp == 0 ? resolvedRecord.find() : resolvedRecord.find(queryByTs);
179
        for (DBObject object : dbObjects) {
180
			Bson query = Filters.eq("id", object.get("id").toString());
181
			final DBObject replacedObj = BasicDBObjectBuilder.start()
182
					.add("body", object.get("body").toString())
183
                    .add("resolved_ts", object.get("resolved_ts"))
184
                    .get();
185
			Bson newDocument = new Document("$set", replacedObj);
186
			currentMdStoreCollection.findOneAndUpdate(query, newDocument);
187
            i++;
179
		final UpdateOptions f = new UpdateOptions().upsert(true);
180
		final List<WriteModel<DBObject>> upsertList = new ArrayList<>();
181
		final BulkWriteOptions writeOptions = new BulkWriteOptions().ordered(false);
182
		final int bulkSize = 1000;
183
		long validOpCounter  =0;
184
		for (DBObject object : dbObjects) {
185
			if (StringUtils.isNotBlank(object.get("id").toString())) {
186
				final DBObject replacedObj = BasicDBObjectBuilder.start()
187
						.add("body", object.get("body").toString())
188
						.add("resolved_ts", object.get("resolved_ts"))
189
						.get();
188 190

  
191
				upsertList.add(new ReplaceOneModel(new BasicDBObject("id",  object.get("id").toString()), replacedObj, f));
192
				validOpCounter++;
193
				if (((validOpCounter % bulkSize) == 0) && (validOpCounter != 0)) {
194
					currentMdStoreCollection.bulkWrite(upsertList, writeOptions);
195
					upsertList.clear();
196
					log.info("Transaction commit: Upserting: "+validOpCounter);
197
				}
198
			}
189 199
		}
200
//		if (upsertList.size()>0){
201
//			mdstore.bulkWrite(upsertList, writeOptions);
202
//		}
203
//
204
//
205
//
206
//        for (DBObject object : dbObjects) {
207
//			Bson query = Filters.eq("id", object.get("id").toString());
208
//			final DBObject replacedObj = BasicDBObjectBuilder.start()
209
//					.add("body", object.get("body").toString())
210
//                    .add("resolved_ts", object.get("resolved_ts"))
211
//                    .get();
212
//			Bson newDocument = new Document("$set", replacedObj);
213
//			currentMdStoreCollection.findOneAndUpdate(query, newDocument);
214
//            i++;
215
//
216
//		}
190 217

  
191 218
        log.info("Updated " + i);
192 219
    }
220

  
221

  
193 222
}
modules/dnet-dli-spark/trunk/src/main/java/eu/dnetlib/dli/DLIMergeRecord.java
12 12
import eu.dnetlib.resolver.parser.DLIParser;
13 13
import org.antlr.stringtemplate.StringTemplate;
14 14
import org.apache.commons.io.IOUtils;
15
import org.apache.commons.lang3.StringEscapeUtils;
15 16
import org.apache.commons.lang3.StringUtils;
16 17
import org.apache.commons.lang3.tuple.ImmutablePair;
17 18
import org.apache.spark.api.java.JavaRDD;
......
55 56
    final static String mongoUrlTemplate = "mongodb://%s/%s.%s";
56 57

  
57 58
    public static void main(String[] args) throws Exception {
58
        if (args == null || args.length != 5) {
59
        if (args == null || args.length != 6) {
59 60
            throw new Exception("Error the number of args is incorrect");
60 61
        }
61 62

  
......
64 65
        final String db = args[1];
65 66
        final String collection = args[2];
66 67
        final String ns_prefix = args[3];
67
        final String applicationName = args[4];
68
        final String core_number = args[4];
69
        final String applicationName = args[5];
68 70

  
69 71
        DLIUtils.datasources.put("crossref____", new ImmutablePair<>("dli_________::crossref", "Crossref"));
70 72

  
71 73
        SparkSession spark = SparkSession.builder()
72
                .master("local[4]")
74
                .master("local["+core_number+"]")
73 75
                .appName("Mongo Reduce DLI Record"+ applicationName)
74 76
                .config("spark.mongodb.input.uri", String.format(mongoUrlTemplate, host, db, collection))
75 77
                .config("spark.mongodb.output.uri", String.format(mongoUrlTemplate, host, db, "out" + collection))
......
142 144
            }).collect(Collectors.toList()));
143 145
            final String objIdentifier = ns_prefix + "::" + parse.getIdentifier();
144 146
            String originalId = parse.getPid() + "::" + parse.getPidType();
145
            final String newBody = getSerializerInstance().serializeReplacingXML(resultObject.getBody(), parse, objIdentifier, originalId);
147
            final String newBody = getSerializerInstance().serializeReplacingXML(resultObject.getBody(), parse, objIdentifier, StringEscapeUtils.escapeXml11(originalId));
146 148
            final Document result = new Document();
147 149
            result.put("id", objIdentifier);
148 150
            result.put("originalId", originalId);
modules/dnet-dli-spark/trunk/pom.xml
59 59
            <groupId>org.apache.spark</groupId>
60 60
            <artifactId>spark-core_2.11</artifactId>
61 61
            <version>2.2.0</version>
62
            <!--<scope>provided</scope>-->
62
            <scope>provided</scope>
63 63
        </dependency>
64 64
        <dependency>
65 65
            <groupId>org.apache.spark</groupId>
66 66
            <artifactId>spark-sql_2.11</artifactId>
67 67
            <version>2.2.0</version>
68
            <!--<scope>provided</scope>-->
68
            <scope>provided</scope>
69 69
        </dependency>
70 70
        <dependency>
71 71
            <groupId>eu.dnetlib</groupId>

Also available in: Unified diff