Project

General

Profile

1
import boto
2
import boto.s3.connection
3
import sys
4
import pymongo
5
from pymongo import MongoClient
6
import os.path
7
import time
8

    
9

    
10

    
11
def file_exist(bucket, file_path):
12
	c =bucket.get_key(file_path)
13
	if c is not None and c.size > 0:
14
		return True
15
	return False
16

    
17
def exportItemForMongoCollection(obsId, db, bucket, log_file):	
18
	destination_collection =db[ 's3_'+obsId[:36]]
19
	source_collection = db[ obsId[:36]]
20
	print 'Start to exporting objectStore :%s '%obsId
21
	i = 0
22
	skip = 0
23
	last_percentage = 0
24
	total = source_collection.estimated_document_count()
25
	for item in source_collection.find(no_cursor_timeout=True):
26
		fs_path = item['fsPath']
27
		objectId = item['id']
28
		dest_item = destination_collection.find_one({'id':objectId})
29
		if dest_item is None:
30
			if os.path.isfile(fs_path):
31
				i += 1
32
				if not file_exist(bucket, '%s/%s'%(obsId,objectId)):
33
					key = bucket.new_key('%s/%s'%(obsId,objectId))
34
					try:
35
						key.set_contents_from_filename(fs_path)	
36
					except Exception as e:
37
						for k in range(10):
38
							try:
39
								time.sleep(10)
40
								key.set_contents_from_filename(fs_path)		
41
							except Exception as e:
42
								print "Error on saving object on S3"
43
								print e
44
								print "Sleeping 10 seconds and retry"
45
						
46
					item.pop('_id', None)
47
					item.pop('fsPath')
48
					item['uri'] = 's3://%s/%s'%(bucket.name, key.name)
49
					destination_collection.insert_one(item)
50
				if i % 100 == 0:
51
					print "Exported %i/%i   (skipped)%i "%(i, total, skip)
52
			else:
53
				log_file.writeline('Missing file for objectStoreid: %s ObjectId:%s path: %s'%(obsId, objectId, fs_path))
54
		else:
55
			skip += 1
56
			print "skipping Item"
57
		
58

    
59
def start_import(metadataCollection, bucket, log_file, done_file, skip_store):
60
	client = MongoClient()
61
	db = client['objectStore']
62
	metadataCollection = db[metadataCollection]
63
	print skip_store
64
	for item in metadataCollection.find(no_cursor_timeout=True):
65
		obsId = item['obsId']
66
		if obsId not in skip_store:
67
			destination_collection = db[ 's3_'+obsId[:36]]
68
			destination_collection.create_index([('id',pymongo.ASCENDING)])
69
			exportItemForMongoCollection(obsId, db, bucket, log_file)
70
			done_file.write('{}\n'.format(obsId))
71

    
72

    
73

    
74

    
75

    
76
if __name__=='__main__':
77
	args = sys.argv
78
	if not len(args) is not 3 :
79
		print "Error on applying script usage: python scriptName.py s3cfgPath objectstoreBucket"
80
	f = open(args[1])
81
	props = {}	
82
	for line in f:
83
		d =line.split('=')
84
		if len(d) == 2:
85
			props[d[0].strip()] = d[1].strip()
86
	skip_store =[]
87
	if os.path.isfile('store_done'):
88
		f = open('store_done')
89
		skip_store =[line.strip() for line in f if len(line) >0]
90

    
91
	bname = args[2]
92
	conn = boto.connect_s3(aws_access_key_id = props['access_key'], aws_secret_access_key = props['secret_key'],  host = props['host_base'], calling_format = boto.s3.connection.OrdinaryCallingFormat())
93
	bucket = conn.get_bucket(bname, validate=True)
94
	log_file = open('s3_migration.log', 'w')
95
	done_file = open('store_done_1', 'wb')
96
	start_import('metadataObjectStore',bucket, log_file, done_file, skip_store)
97
	log_file.close()
    (1-1/1)