1
|
from elasticsearch import Elasticsearch
|
2
|
from elasticsearch_dsl import *
|
3
|
from jinja2 import Template
|
4
|
import json
|
5
|
import hashlib
|
6
|
import time
|
7
|
from lxml import etree
|
8
|
import zlib, base64
|
9
|
|
10
|
def md5sum(str_input):
|
11
|
m = hashlib.md5()
|
12
|
m.update(str_input.encode('utf-8'))
|
13
|
return m.hexdigest()
|
14
|
|
15
|
|
16
|
def create_creator(item):
|
17
|
creator = dict(name=item.get('name',''), identifier = None, affiliation= item.get('affiliation',''))
|
18
|
for i in item.get('nameIdentifiers',[]):
|
19
|
creator['identifier']= dict(schemename=i.get('nameIdentifierScheme',''), value=i.get('nameIdentifier',''))
|
20
|
return creator
|
21
|
|
22
|
|
23
|
def create_record(x, t, matches):
|
24
|
record = x.attributes.__dict__['_d_']
|
25
|
dataset =dict(id= x.id, publisher = record.get('publisher'),publicationYear = record.get('publicationYear'), subjects=record.get('subjects',[]),
|
26
|
contributors= record.get('contributors',[]), dates = record.get('dates',[]), language= record.get('language'),
|
27
|
types = record.get('types'), identifiers = record.get('identifiers',[]), relatedIdentifiers= record.get('relatedIdentifiers',[]),
|
28
|
formats = record.get('formats',[]), version= record.get('version',''), sizes = record.get('sizes',[]),
|
29
|
rightsList = record.get('rightsList',[]), descriptions= record.get('descriptions', []),
|
30
|
geoLocations = record.get('geoLocations',[]), titles =[], creators=[])
|
31
|
if record.get('titles'):
|
32
|
dataset['titles'] = [dict(title=title['title'], lang=title.get('lang',None)) for title in record.get('titles',[]) if 'title' in title]
|
33
|
if record.get('creators'):
|
34
|
dataset['creators']= [create_creator(item) for item in record.get('creators',[])]
|
35
|
|
36
|
provider = x.relationships.client.data.id.upper()
|
37
|
hosted = matches.get(provider, dict(openaire_id="openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18", official_name="Unknown Repository"))
|
38
|
return t.render(dataset=dataset, provider=provider, id= x.id, hosted=hosted, objIdentifier= md5sum(x.id))
|
39
|
|
40
|
|
41
|
def scan_index(timestamp=None, active=True, scroll_id=None, page_size= None):
|
42
|
if not page_size:
|
43
|
page_size = 1000
|
44
|
es = Elasticsearch(hosts=['ip-90-147-167-25.ct1.garrservices.it','ip-90-147-167-26.ct1.garrservices.it','ip-90-147-167-27.ct1.garrservices.it', 'ip-90-147-167-28.ct1.garrservices.it'], timeout=1000)
|
45
|
s = Search(using=es, index='datacite').query(Q('match', attributes__isActive=True))
|
46
|
if timestamp:
|
47
|
print ("APPLICO FILTRO ", timestamp)
|
48
|
s= s.filter('range',timestamp={'gte': timestamp})
|
49
|
t = Template(open('res/template.jinja').read())
|
50
|
matches = json.loads(open('res/matches.json').read())
|
51
|
total = s.execute().hits.total
|
52
|
result = dict(total=total, counter = 0, result=[], scroll_id = scroll_id)
|
53
|
i = 0
|
54
|
for item in s.params(size=1000).scan():
|
55
|
record = dict(originalId = item.id, timestamp = item.timestamp)
|
56
|
record['body'] = base64.b64encode(zlib.compress(create_record(item, t, matches).encode('utf-8'))).decode('ascii')
|
57
|
record['id']= "datacite____::"+md5sum(item.id)
|
58
|
result['result'].append(record)
|
59
|
result['counter'] +=1
|
60
|
if len(result['result']) == page_size:
|
61
|
yield result
|
62
|
del (result['result'])
|
63
|
result['result'] = []
|
64
|
yield result
|
65
|
|
66
|
|