java类org.apache.hadoop.io.Writable的实例源码

DumpTypedBytes.java 文件源码 项目:hadoop 阅读 15 收藏 0 点赞 0 评论 0
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
Client.java 文件源码 项目:hadoop-oss 阅读 30 收藏 0 点赞 0 评论 0
/** Construct an IPC client whose values are of the given {@link Writable}
 * class. */
public Client(Class<? extends Writable> valueClass, Configuration conf, 
    SocketFactory factory) {
  this.valueClass = valueClass;
  this.conf = conf;
  this.socketFactory = factory;
  this.connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
      CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
  this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
      CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
  this.clientId = ClientId.getClientId();
  this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
}
TestComparators.java 文件源码 项目:hadoop 阅读 20 收藏 0 点赞 0 评论 0
public void reduce(IntWritable key, Iterator<Writable> values, 
                   OutputCollector<IntWritable, Text> out,
                   Reporter reporter) throws IOException {
  int currentKey = key.get();
  // keys should be in descending order
  if (currentKey > lastKey) {
    fail("Keys not in sorted descending order");
  }
  lastKey = currentKey;
  out.collect(key, new Text("success"));
}
Writables.java 文件源码 项目:ditb 阅读 18 收藏 0 点赞 0 评论 0
/**
 * Copy one Writable to another.  Copies bytes using data streams.
 * @param bytes Source Writable
 * @param tgt Target Writable
 * @return The target Writable.
 * @throws IOException e
 */
public static Writable copyWritable(final byte [] bytes, final Writable tgt)
throws IOException {
  DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
  try {
    tgt.readFields(dis);
  } finally {
    dis.close();
  }
  return tgt;
}
CompositeRecordReader.java 文件源码 项目:hadoop 阅读 17 收藏 0 点赞 0 评论 0
/**
 * Close all child RRs.
 */
public void close() throws IOException {
  if (kids != null) {
    for (RecordReader<K,? extends Writable> rr : kids) {
      rr.close();
    }
  }
  if (jc != null) {
    jc.close();
  }
}
TestIPCServerResponder.java 文件源码 项目:hadoop-oss 阅读 20 收藏 0 点赞 0 评论 0
static Writable call(Client client, Writable param,
    InetSocketAddress address) throws IOException {
  final ConnectionId remoteId = ConnectionId.getConnectionId(address, null,
      null, 0, null, conf);
  return client.call(RpcKind.RPC_BUILTIN, param, remoteId,
      RPC.RPC_SERVICE_CLASS_DEFAULT, null);
}
TestIPCServerResponder.java 文件源码 项目:hadoop-oss 阅读 18 收藏 0 点赞 0 评论 0
@Override
public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
    long receiveTime) throws IOException {
  if (sleep) {
    try {
      Thread.sleep(RANDOM.nextInt(20)); // sleep a bit
    } catch (InterruptedException e) {}
  }
  return param;
}
Server.java 文件源码 项目:hadoop 阅读 24 收藏 0 点赞 0 评论 0
protected Server(String bindAddress, int port,
                Class<? extends Writable> paramClass, int handlerCount, 
                Configuration conf)
  throws IOException 
{
  this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
      .toString(port), null, null);
}
TestSerializationFactory.java 文件源码 项目:hadoop-oss 阅读 15 收藏 0 点赞 0 评论 0
@Test
public void testGetDeserializer() {
  // Test that a valid serializer class is returned when its present
  assertNotNull("A valid class must be returned for default Writable SerDe",
      factory.getDeserializer(Writable.class));
  // Test that a null is returned when none can be found.
  assertNull("A null should be returned if there are no deserializers found",
      factory.getDeserializer(TestSerializationFactory.class));
}
WALFile.java 文件源码 项目:kafka-connect-hdfs 阅读 19 收藏 0 点赞 0 评论 0
/**
 * Read the next key/value pair in the file into <code>key</code> and <code>val</code>.  Returns
 * true if such a pair exists and false when at end of file
 */
public synchronized boolean next(Writable key, Writable val)
    throws IOException {
  if (val.getClass() != WALEntry.class) {
    throw new IOException("wrong value class: " + val + " is not " + WALEntry.class);
  }

  boolean more = next(key);

  if (more) {
    getCurrentValue(val);
  }

  return more;
}


问题


面经


文章

微信
公众号

扫码关注公众号