标签:
实现了HDFS的最基本的功能。具体的每一步的实现过程可以在这里看到https://github.com/DanielJyc/HDFS
。每一个commit都可以执行,算是记录了自己的实现过程。总体的设计框图如下:
# -*- coding: UTF-8 -*-
import os
import uuid
import math
import time
class Client(object):
"""docstring for Client"""
def __init__(self, namenode):
self.namenode = namenode
def write(self, filename, data):
chunks = [] #存放data分出来的num_chunks份数据
chunkloc = 1
num_chunks = self.get_num_chunks(data)
for i in range(0, len(data), self.namenode.chunksize):
chunks.append(data[i:i+self.namenode.chunksize])
chunk_uuids = self.namenode.alloc(filename, num_chunks) #为文件分配空间,更新元数据
for i in range(0, len(chunk_uuids)):
chunkloc = i % self.namenode.num_datanodes + 1
self.namenode.datanodes[chunkloc].write(chunk_uuids[i], chunks[i])
#备份第二份
chunkloc = chunkloc % self.namenode.num_datanodes + 1
self.namenode.datanodes[chunkloc].write(chunk_uuids[i], chunks[i])
def read(self, filename):
if True == self.namenode.exits(filename) :
data = ‘‘
chunk_uuids = self.namenode.filetable[filename]
for chunk_uuid in chunk_uuids:
chunkloc = self.namenode.chunktable[chunk_uuid] #获取uuid的DataNode的位置
data_temp = self.namenode.datanodes[chunkloc].read(chunk_uuid)
if -1 == data_temp: #读取当前DataNode上的chunk不存在(即:某一个DataNode被损坏)
data_temp = self.namenode.datanodes[chunkloc%self.namenode.num_datanodes + 1].read(chunk_uuid)
print ‘Current chunk is broken.‘ #读取下一个DataNode的chunk
data = data + data_temp
return data
else :
print "The file: \"" + filename + "\" is not exits."
def delete(self, filename): #删除文件:物理删除和元数据删除
if True == self.namenode.exits(filename) :
chunk_uuids = self.namenode.filetable[filename]
for chunk_uuid in chunk_uuids :
chunkloc = self.namenode.chunktable[chunk_uuid]
self.namenode.datanodes[chunkloc].delete(chunk_uuid) #物理删除:第一份
self.namenode.datanodes[chunkloc%self.namenode.num_datanodes + 1].delete(chunk_uuid) #物理删除:第二份
self.namenode.delete(filename) #逻辑删除:在元数据删除信息
else :
print "The file: \"" + filename + "\" is not exits."
def list_files(self):
print "Files:"
for (k, v) in self.namenode.filetable.items():
print k
def get_num_chunks(self, data):
return int(math.ceil(len(data)*1.0 / self.namenode.chunksize))
class Namenode(object):
"""docstring for Namenode"""
def __init__(self):
self.num_datanodes = 3
self.chunksize = 10
self.filetable = {}
self.chunktable = {}
self.datanodes = {}
self.init_datanodes() #初始化:loc<-->server
def init_datanodes(self):
for i in range(1, self.num_datanodes+1):
self.datanodes[i] = Datanode(i)
def alloc(self, filename, num_chunks): #完成映射:filetable和chunktable
chunkloc = 1
chunk_uuids = []
for i in range(0, num_chunks):
chunk_uuid = uuid.uuid1();
chunk_uuids.append(chunk_uuid)
self.chunktable[chunk_uuid] = chunkloc
chunkloc = chunkloc % self.num_datanodes + 1 #!!注意:要+1,否则chunkloc值不会变坏
self.filetable[filename] = chunk_uuids
print self.filetable
return chunk_uuids
def delete(self, filename):
chunk_uuids = self.filetable[filename]
for chunk_uuid in chunk_uuids:
self.chunktable.pop(chunk_uuid)
self.filetable.pop(filename)
def exits(self, filename): #检测文件是否存在
if filename in self.filetable:
return True
else:
return False
class Datanode(object):
"""docstring for Datanode"""
def __init__(self, chunkloc):
self.chunkloc = chunkloc
self.local_fs_root = "D:/HDFSTemp/Datanode" + str(chunkloc) #用不同的目录来模仿不同的Datanode
if not os.path.isdir(self.local_fs_root):
os.makedirs(self.local_fs_root)
def write(self, chunk_uuid, chunk):#写入到chunk
try:
with open(self.local_fs_root + "/" + str(chunk_uuid), "w") as fw:
fw.write(chunk)
except IOError :
print "The HDFS is broken."
def read(self, chunk_uuid): #从chunk读取
data = None
try :
with open(self.local_fs_root + "/" + str(chunk_uuid), "r") as fr:
data = fr.read()
return data
except IOError :
return -1
def delete(self, chunk_uuid):
try:
os.remove(self.local_fs_root + "/" + str(chunk_uuid))
except WindowsError:
print "Filename:" + self.local_fs_root + "/" + str(chunk_uuid) + ‘dose not exits.‘
class Command(object):
"""docstring for Command"""
def __init__(self, client):
self.client = client
def command_line(self):
while True:
cmd = raw_input(‘Input your command:\n‘)
if(‘upload‘ == cmd):
self.upload_cmd()
elif(‘download‘ == cmd):
self.download_cmd()
elif(‘delete‘ == cmd):
filename = raw_input(‘Input the filename which you want to delete in HDFS:\n‘)
self.client.delete(filename)
elif(‘ls‘ == cmd):
self.client.list_files()
elif(‘exits‘ == cmd):
break
else:
print "Wrong command. \n"
def upload_cmd(self):
filename = raw_input(‘Input the filename which you want to upload in local:\n‘)
try :
with open(filename, "r") as fr: #读取本地文件
data = fr.read()
self.client.write(filename, data) #写入HDFS
except IOError :
print "No such file in local."
def download_cmd(self):
filename = raw_input(‘Input the filename which you want to download in HDFS:\n‘)
data = self.client.read(filename) #读取HDFS文件
print data
with open(filename, "w") as fw:
fw.write(data) #写入本地
def main():
nd = Namenode()
client = Client(nd)
command = Command(client)
command.command_line()
if __name__ == ‘__main__‘:
main()
标签:
原文地址:http://www.cnblogs.com/danieljyc/p/3870539.html