Hadoop Streaming Python的一个简单Wrapper

Map比较简单,不贴了。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class Reduce:
KV_SEP = "\t"
def __init__(self):
self.last_key = None
self.value_list = []
def reduce(self, key, value_list):
pass
def processLine(self, key, value):
# First time
if self.last_key is None:
self.last_key = key
# Next key
if self.last_key != key:
# Should emit key, [val1, val2...]
for str in self.reduce(self.last_key, self.value_list):
yield str
del self.value_list[:]
self.last_key = key
else:
self.value_list.append(value)
def work(self):
# Read all lines
key = ""
value = ""
while True:
# Input
line = sys.stdin.readline()
if not line:
break
arr = line.split(Reduce.KV_SEP, 1)
if len(arr) != 2:
continue
# Reduce
key, value = arr
for str in self.processLine(key, value):
sys.stdout.write(str)
# last group
for str in self.processLine(None, value):
sys.stdout.write(str)
class Reduce: KV_SEP = "\t" def __init__(self): self.last_key = None self.value_list = [] def reduce(self, key, value_list): pass def processLine(self, key, value): # First time if self.last_key is None: self.last_key = key # Next key if self.last_key != key: # Should emit key, [val1, val2...] for str in self.reduce(self.last_key, self.value_list): yield str del self.value_list[:] self.last_key = key else: self.value_list.append(value) def work(self): # Read all lines key = "" value = "" while True: # Input line = sys.stdin.readline() if not line: break arr = line.split(Reduce.KV_SEP, 1) if len(arr) != 2: continue # Reduce key, value = arr for str in self.processLine(key, value): sys.stdout.write(str) # last group for str in self.processLine(None, value): sys.stdout.write(str)
class Reduce:

    KV_SEP = "\t"

    def __init__(self):
        self.last_key = None
        self.value_list = []

    def reduce(self, key, value_list):
        pass

    def processLine(self, key, value):
        # First time
        if self.last_key is None:
            self.last_key = key 
        # Next key
        if self.last_key != key:
            # Should emit key, [val1, val2...]
            for str in self.reduce(self.last_key, self.value_list):
                yield str 
            del self.value_list[:]
            self.last_key = key 
        else:
            self.value_list.append(value)

    def work(self):
        # Read all lines
        key = ""
        value = ""
        while True:
            # Input
            line = sys.stdin.readline()
            if not line:
                break
            arr = line.split(Reduce.KV_SEP, 1)
            if len(arr) != 2:
                continue
            # Reduce
            key, value = arr 
            for str in self.processLine(key, value):
                sys.stdout.write(str)
        # last group
        for str in self.processLine(None, value):
            sys.stdout.write(str)

 

Leave a Reply

Your email address will not be published. Required fields are marked *