1 |
54585
|
sandro.lab
|
import boto
|
2 |
|
|
import boto.s3.connection
|
3 |
|
|
import sys
|
4 |
54591
|
sandro.lab
|
import pymongo
|
5 |
54585
|
sandro.lab
|
from pymongo import MongoClient
|
6 |
|
|
import os.path
|
7 |
54586
|
sandro.lab
|
import time
|
8 |
54585
|
sandro.lab
|
|
9 |
|
|
|
10 |
54586
|
sandro.lab
|
|
11 |
|
|
def file_exist(bucket, file_path):
|
12 |
54587
|
sandro.lab
|
c =bucket.get_key(file_path)
|
13 |
|
|
if c is not None and c.size > 0:
|
14 |
|
|
return True
|
15 |
|
|
return False
|
16 |
54586
|
sandro.lab
|
|
17 |
54585
|
sandro.lab
|
def exportItemForMongoCollection(obsId, db, bucket, log_file):
|
18 |
|
|
destination_collection =db[ 's3_'+obsId[:36]]
|
19 |
|
|
source_collection = db[ obsId[:36]]
|
20 |
|
|
print 'Start to exporting objectStore :%s '%obsId
|
21 |
|
|
i = 0
|
22 |
|
|
last_percentage = 0
|
23 |
|
|
total = source_collection.estimated_document_count()
|
24 |
|
|
for item in source_collection.find(no_cursor_timeout=True):
|
25 |
|
|
fs_path = item['fsPath']
|
26 |
|
|
objectId = item['id']
|
27 |
54593
|
sandro.lab
|
dest_item = destination_collection.find_one({'id':objectId})
|
28 |
54591
|
sandro.lab
|
if dest_item is None:
|
29 |
|
|
if os.path.isfile(fs_path):
|
30 |
|
|
i += 1
|
31 |
|
|
if not file_exist(bucket, '%s/%s'%(obsId,objectId)):
|
32 |
|
|
key = bucket.new_key('%s/%s'%(obsId,objectId))
|
33 |
|
|
try:
|
34 |
|
|
key.set_contents_from_filename(fs_path)
|
35 |
|
|
except Exception as e:
|
36 |
|
|
time.sleep(10)
|
37 |
|
|
key.set_contents_from_filename(fs_path)
|
38 |
|
|
|
39 |
|
|
item.pop('_id', None)
|
40 |
|
|
item.pop('fsPath')
|
41 |
|
|
item['uri'] = 's3://%s/%s'%(bucket.name, key.name)
|
42 |
|
|
destination_collection.insert_one(item)
|
43 |
|
|
if i % 1000 == 0:
|
44 |
|
|
print "Exported %i/%i"%(i, total)
|
45 |
|
|
else:
|
46 |
|
|
log_file.writeline('Missing file for objectStoreid: %s ObjectId:%s path: %s'%(obsId, objectId, fs_path))
|
47 |
|
|
|
48 |
54585
|
sandro.lab
|
|
49 |
|
|
def start_import(metadataCollection, bucket, log_file):
|
50 |
|
|
client = MongoClient()
|
51 |
|
|
db = client['objectStore']
|
52 |
|
|
metadataCollection = db[metadataCollection]
|
53 |
|
|
for item in metadataCollection.find(no_cursor_timeout=True):
|
54 |
|
|
obsId = item['obsId']
|
55 |
|
|
exportItemForMongoCollection(obsId, db, bucket, log_file)
|
56 |
54592
|
sandro.lab
|
destination_collection =db[ 's3_'+obsId[:36]]
|
57 |
54591
|
sandro.lab
|
print "creating Index on ID"
|
58 |
|
|
destination_collection.create_index([('id',pymongo.ASCENDING)])
|
59 |
54585
|
sandro.lab
|
|
60 |
|
|
|
61 |
|
|
|
62 |
|
|
|
63 |
|
|
|
64 |
|
|
if __name__=='__main__':
|
65 |
|
|
args = sys.argv
|
66 |
|
|
if not len(args) is not 3 :
|
67 |
|
|
print "Error on applying script usage: python scriptName.py s3cfgPath objectstoreBucket"
|
68 |
|
|
f = open(args[1])
|
69 |
|
|
props = {}
|
70 |
|
|
for line in f:
|
71 |
|
|
d =line.split('=')
|
72 |
|
|
if len(d) == 2:
|
73 |
|
|
props[d[0].strip()] = d[1].strip()
|
74 |
|
|
|
75 |
|
|
bname = args[2]
|
76 |
|
|
conn = boto.connect_s3(aws_access_key_id = props['access_key'], aws_secret_access_key = props['secret_key'], host = props['host_base'], calling_format = boto.s3.connection.OrdinaryCallingFormat())
|
77 |
|
|
bucket = conn.get_bucket(bname, validate=True)
|
78 |
|
|
log_file = open('s3_migration.log', 'w')
|
79 |
|
|
start_import('metadataObjectStore',bucket, log_file)
|
80 |
|
|
log_file.close()
|