Clover Coverage Report - contrib
Coverage timestamp: Fri Apr 27 2012 21:25:11 UTC
../../../../../../../img/srcFileCovDistChart7.png 54% of files have more coverage
111   243   41   13.88
34   197   0.37   8
8     5.12  
1    
 
  TypedBytesRecordReader       Line # 56 111 41 68.6% 0.6862745
 
No Tests
 
1    /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements. See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    *
10    * http://www.apache.org/licenses/LICENSE-2.0
11    *
12    * Unless required by applicable law or agreed to in writing, software
13    * distributed under the License is distributed on an "AS IS" BASIS,
14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    * See the License for the specific language governing permissions and
16    * limitations under the License.
17    */
18   
19    package org.apache.hadoop.hive.contrib.util.typedbytes;
20   
21    import java.io.DataInputStream;
22    import java.io.IOException;
23    import java.io.InputStream;
24    import java.util.ArrayList;
25    import java.util.Arrays;
26    import java.util.HashMap;
27    import java.util.List;
28    import java.util.Map;
29    import java.util.Properties;
30   
31    import org.apache.hadoop.conf.Configuration;
32    import org.apache.hadoop.hive.ql.exec.RecordReader;
33    import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer;
34    import org.apache.hadoop.hive.serde.Constants;
35    import org.apache.hadoop.hive.serde2.io.ByteWritable;
36    import org.apache.hadoop.hive.serde2.io.DoubleWritable;
37    import org.apache.hadoop.hive.serde2.io.ShortWritable;
38    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
39    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
40    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
41    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
42    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
43    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry;
44    import org.apache.hadoop.io.BooleanWritable;
45    import org.apache.hadoop.io.BytesWritable;
46    import org.apache.hadoop.io.FloatWritable;
47    import org.apache.hadoop.io.IntWritable;
48    import org.apache.hadoop.io.LongWritable;
49    import org.apache.hadoop.io.Text;
50    import org.apache.hadoop.io.Writable;
51   
52    /**
53    * TypedBytesRecordReader.
54    *
55    */
 
56    public class TypedBytesRecordReader implements RecordReader {
57   
58    private DataInputStream din;
59    private TypedBytesWritableInput tbIn;
60   
61    private NonSyncDataOutputBuffer barrStr = new NonSyncDataOutputBuffer();
62    private TypedBytesWritableOutput tbOut;
63   
64    private ArrayList<Writable> row = new ArrayList<Writable>(0);
65    private ArrayList<String> rowTypeName = new ArrayList<String>(0);
66    private List<String> columnTypes;
67   
68    private ArrayList<ObjectInspector> srcOIns = new ArrayList<ObjectInspector>();
69    private ArrayList<ObjectInspector> dstOIns = new ArrayList<ObjectInspector>();
70    private ArrayList<Converter> converters = new ArrayList<Converter>();
71   
72    private static Map<Type, String> typedBytesToTypeName = new HashMap<Type, String>();
 
73  6 toggle static {
74  6 typedBytesToTypeName.put(getType(1), Constants.TINYINT_TYPE_NAME);
75  6 typedBytesToTypeName.put(getType(2), Constants.BOOLEAN_TYPE_NAME);
76  6 typedBytesToTypeName.put(getType(3), Constants.INT_TYPE_NAME);
77  6 typedBytesToTypeName.put(getType(4), Constants.BIGINT_TYPE_NAME);
78  6 typedBytesToTypeName.put(getType(5), Constants.FLOAT_TYPE_NAME);
79  6 typedBytesToTypeName.put(getType(6), Constants.DOUBLE_TYPE_NAME);
80  6 typedBytesToTypeName.put(getType(7), Constants.STRING_TYPE_NAME);
81  6 typedBytesToTypeName.put(getType(11), Constants.SMALLINT_TYPE_NAME);
82    }
83   
 
84  5 toggle public void initialize(InputStream in, Configuration conf, Properties tbl) throws IOException {
85  5 din = new DataInputStream(in);
86  5 tbIn = new TypedBytesWritableInput(din);
87  5 tbOut = new TypedBytesWritableOutput(barrStr);
88  5 String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
89  5 columnTypes = Arrays.asList(columnTypeProperty.split(","));
90  5 for (String columnType : columnTypes) {
91  10 PrimitiveTypeEntry dstTypeEntry = PrimitiveObjectInspectorUtils
92    .getTypeEntryFromTypeName(columnType);
93  10 dstOIns.add(PrimitiveObjectInspectorFactory
94    .getPrimitiveWritableObjectInspector(dstTypeEntry.primitiveCategory));
95    }
96    }
97   
 
98  5 toggle public Writable createRow() throws IOException {
99  5 BytesWritable retWrit = new BytesWritable();
100  5 return retWrit;
101    }
102   
 
103  10 toggle private Writable allocateWritable(Type type) {
104  10 switch (type) {
105  1 case BYTE:
106  1 return new ByteWritable();
107  0 case BOOL:
108  0 return new BooleanWritable();
109  0 case INT:
110  0 return new IntWritable();
111  2 case SHORT:
112  2 return new ShortWritable();
113  0 case LONG:
114  0 return new LongWritable();
115  0 case FLOAT:
116  0 return new FloatWritable();
117  0 case DOUBLE:
118  0 return new DoubleWritable();
119  7 case STRING:
120  7 return new Text();
121  0 default:
122    assert false; // not supported
123    }
124  0 return null;
125    }
126   
 
127  2089 toggle public int next(Writable data) throws IOException {
128  2089 int pos = 0;
129  2089 barrStr.reset();
130   
131  2089 while (true) {
132  6257 Type type = tbIn.readTypeCode();
133   
134    // it was a empty stream
135  6257 if (type == null) {
136  4 return -1;
137    }
138   
139  6253 if (type == Type.ENDOFRECORD) {
140  2084 tbOut.writeEndOfRecord();
141  2084 if (barrStr.getLength() > 0) {
142  2084 ((BytesWritable) data).set(barrStr.getData(), 0, barrStr.getLength());
143    }
144  2084 return barrStr.getLength();
145    }
146   
147  4169 if (pos >= row.size()) {
148  10 Writable wrt = allocateWritable(type);
149  0 assert pos == row.size();
150  0 assert pos == rowTypeName.size();
151  10 row.add(wrt);
152  10 rowTypeName.add(type.name());
153  10 String typeName = typedBytesToTypeName.get(type);
154  10 PrimitiveTypeEntry srcTypeEntry = PrimitiveObjectInspectorUtils
155    .getTypeEntryFromTypeName(typeName);
156  10 srcOIns
157    .add(PrimitiveObjectInspectorFactory
158    .getPrimitiveWritableObjectInspector(srcTypeEntry.primitiveCategory));
159  10 converters.add(ObjectInspectorConverters.getConverter(srcOIns.get(pos),
160    dstOIns.get(pos)));
161    } else {
162  4159 if (!rowTypeName.get(pos).equals(type.name())) {
163  1 throw new RuntimeException("datatype of row changed from "
164    + rowTypeName.get(pos) + " to " + type.name());
165    }
166    }
167   
168  4168 Writable w = row.get(pos);
169  4168 switch (type) {
170  84 case BYTE:
171  84 tbIn.readByte((ByteWritable) w);
172  84 break;
173  0 case BOOL:
174  0 tbIn.readBoolean((BooleanWritable) w);
175  0 break;
176  0 case INT:
177  0 tbIn.readInt((IntWritable) w);
178  0 break;
179  1000 case SHORT:
180  1000 tbIn.readShort((ShortWritable) w);
181  1000 break;
182  0 case LONG:
183  0 tbIn.readLong((LongWritable) w);
184  0 break;
185  0 case FLOAT:
186  0 tbIn.readFloat((FloatWritable) w);
187  0 break;
188  0 case DOUBLE:
189  0 tbIn.readDouble((DoubleWritable) w);
190  0 break;
191  3084 case STRING:
192  3084 tbIn.readText((Text) w);
193  3084 break;
194  0 default:
195    assert false; // should never come here
196    }
197   
198  4168 write(pos, w);
199  4168 pos++;
200    }
201    }
202   
 
203  4168 toggle private void write(int pos, Writable inpw) throws IOException {
204  4168 String typ = columnTypes.get(pos);
205   
206  4168 Writable w = (Writable) converters.get(pos).convert(inpw);
207   
208  4168 if (typ.equalsIgnoreCase(Constants.BOOLEAN_TYPE_NAME)) {
209  0 tbOut.writeBoolean((BooleanWritable) w);
210  4168 } else if (typ.equalsIgnoreCase(Constants.TINYINT_TYPE_NAME)) {
211  0 tbOut.writeByte((ByteWritable) w);
212  4168 } else if (typ.equalsIgnoreCase(Constants.SMALLINT_TYPE_NAME)) {
213  500 tbOut.writeShort((ShortWritable) w);
214  3668 } else if (typ.equalsIgnoreCase(Constants.INT_TYPE_NAME)) {
215  0 tbOut.writeInt((IntWritable) w);
216  3668 } else if (typ.equalsIgnoreCase(Constants.BIGINT_TYPE_NAME)) {
217  0 tbOut.writeLong((LongWritable) w);
218  3668 } else if (typ.equalsIgnoreCase(Constants.FLOAT_TYPE_NAME)) {
219  0 tbOut.writeFloat((FloatWritable) w);
220  3668 } else if (typ.equalsIgnoreCase(Constants.DOUBLE_TYPE_NAME)) {
221  0 tbOut.writeDouble((DoubleWritable) w);
222  3668 } else if (typ.equalsIgnoreCase(Constants.STRING_TYPE_NAME)) {
223  3668 tbOut.writeText((Text) w);
224    } else {
225    assert false;
226    }
227    }
228   
 
229  5 toggle public void close() throws IOException {
230  5 if (din != null) {
231  5 din.close();
232    }
233    }
234   
 
235  48 toggle public static Type getType(int code) {
236  48 for (Type type : Type.values()) {
237  282 if (type.code == code) {
238  48 return type;
239    }
240    }
241  0 return null;
242    }
243    }