-
Notifications
You must be signed in to change notification settings - Fork 0
/
Product_Occurance.java
138 lines (130 loc) · 4.14 KB
/
Product_Occurance.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*@name Ali Mousa
*Using Pair approch for both mapper and reducer
*/
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
//import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Product_Occurance
{
public static class Map extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>
{
private Text word = new Text();
private HashMap<String, Integer> assoc;
public Map()
{
assoc = new HashMap<String, Integer>();
}
@Override
public void map(LongWritable FID, Text value, Context output) throws IOException
{
String key = null;
List<String> list = new ArrayList<String>();
String totalKey;
String id;
String line = value.toString();
Scanner scanner = new Scanner(line);
String template = "%s, %s";
String num;
boolean found = false;
while (scanner.hasNext())
{
String currentItem;
num = scanner.next();
if(list.size() != 0)
{
for(String item : list)
{
if(num.equals(item))
{
found = true;
continue;
}
totalKey = String.format(template, item, "*");
if(assoc.containsKey(totalKey))
{
assoc.put(totalKey, assoc.get(totalKey) + 1);
}
else
{
assoc.put(totalKey, 1);
}
currentItem = String.format(template, item, num);
if(!assoc.containsKey(currentItem))
{
assoc.put(currentItem, 1);
}
else
{
assoc.put(currentItem, assoc.get(currentItem) + 1);
}
}
}
if(!found)
{
list.add(num);
}
found = false;
}
}
@Override
protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws java.io.IOException, java.lang.InterruptedException
{
for(String key : assoc.keySet())
{
context.write(new Text(key), new IntWritable(assoc.get(key)));
}
}
}
public static class CustomPartitioner<Text, IntWritable> extends org.apache.hadoop.mapreduce.Partitioner<Text, IntWritable>
{
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks)
{
return (key.toString().split(",")[0]).hashCode() % numReduceTasks;
}
}
public static class Reduce extends org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, FloatWritable>
{
int total;
@Override
public void reduce(Text key, Iterable<IntWritable> list, Context context) throws IOException, InterruptedException
{
int sum = 0;
Iterator<IntWritable> values = list.iterator();
while (values.hasNext())
{
sum += values.next().get();
}
if(key.toString().indexOf('*') != -1)
{
total = sum ;
}
else
{
context.write(key, new FloatWritable((float) sum/total));
}
}
}
public static void main(String[] args) throws Exception
{
Job job = new Job();
job.setJarByClass(Product_Occurance.class);
job.setJobName("Co existance");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(Map.class);
// job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(CustomPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}