def parse_instances(self, instances, prediction=False):
"""Parses input instances according to the associated schema.
Arguments:
instances: The tensor containing input strings.
prediction: Whether the instances are being parsed for producing predictions or not.
Returns:
A dictionary of tensors key'ed by field names.
"""
# Convert the schema into an equivalent Example schema (expressed as features in Example
# terminology).
features = {}
for field in self.schema:
if field.type == SchemaFieldType.integer:
dtype = tf.int64
default_value = [0]
elif field.type == SchemaFieldType.real:
dtype = tf.float32
default_value = [0.0]
else:
# discrete
dtype = tf.string
default_value = ['']
if field.length == 0:
feature = tf.VarLenFeature(dtype=dtype)
else:
if field.length != 1:
default_value = default_value * field.length
feature = tf.FixedLenFeature(shape=[field.length], dtype=dtype, default_value=default_value)
features[field.name] = feature
return tf.parse_example(instances, features, name='examples')
python类Example()的实例源码
def _TextGenerator(self, example_gen):
"""Generates article and abstract text from tf.Example."""
while True:
e = example_gen.next()
try:
article_text = self._GetExFeatureText(e, self._article_key)
abstract_text = self._GetExFeatureText(e, self._abstract_key)
except ValueError:
tf.logging.error('Failed to get article or abstract from example')
continue
yield (article_text, abstract_text)
def _GetExFeatureText(self, ex, key):
"""Extract text for a feature from td.Example.
Args:
ex: tf.Example.
key: key of the feature to be extracted.
Returns:
feature: a feature text extracted.
"""
return ex.features.feature[key].bytes_list.value[0]
def tf_Examples(data_path, num_epochs=None):
"""Generates tf.Examples from path of data files.
Binary data format: <length><blob>. <length> represents the byte size
of <blob>. <blob> is serialized tf.Example proto. The tf.Example contains
the tokenized article text and summary.
Args:
data_path: path to tf.Example data files.
num_epochs: Number of times to go through the data. None means infinite.
Yields:
Deserialized tf.Example.
If there are multiple files specified, they accessed in a random order.
"""
epoch = 0
while True:
if num_epochs is not None and epoch >= num_epochs:
break
filelist = glob.glob(data_path)
assert filelist, 'Empty filelist.'
shuffle(filelist)
for f in filelist:
reader = open(f, 'rb')
while True:
len_bytes = reader.read(8)
if not len_bytes: break
str_len = struct.unpack('q', len_bytes)[0]
example_str = struct.unpack('%ds' % str_len, reader.read(str_len))[0]
yield example_pb2.Example.FromString(example_str)
epoch += 1
def write_to_tfrecord(label, shape, binary_image, tfrecord_file):
""" This example is to write a sample to TFRecord file. If you want to write
more samples, just use a loop.
"""
writer = tf.python_io.TFRecordWriter(tfrecord_file)
# write label, shape, and image content to the TFRecord file
example = tf.train.Example(features=tf.train.Features(feature={
'label': _int64_feature(label),
'shape': _bytes_feature(shape),
'image': _bytes_feature(binary_image)
}))
writer.write(example.SerializeToString())
writer.close()
def loadTFRecords(sc, input_dir, binary_features=[]):
"""Load TFRecords from disk into a Spark DataFrame.
This will attempt to automatically convert the tf.train.Example features into Spark DataFrame columns of equivalent types.
Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to
disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a "hint"
from the caller in the ``binary_features`` argument.
Args:
:sc: SparkContext
:input_dir: location of TFRecords on disk.
:binary_features: a list of tf.train.Example features which are expected to be binary/bytearrays.
Returns:
A Spark DataFrame mirroring the tf.train.Example schema.
"""
import tensorflow as tf
tfr_rdd = sc.newAPIHadoopFile(input_dir, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
# infer Spark SQL types from tf.Example
record = tfr_rdd.take(1)[0]
example = tf.train.Example()
example.ParseFromString(bytes(record[0]))
schema = infer_schema(example, binary_features)
# convert serialized protobuf to tf.Example to Row
example_rdd = tfr_rdd.mapPartitions(lambda x: fromTFExample(x, binary_features))
# create a Spark DataFrame from RDD[Row]
df = example_rdd.toDF(schema)
# save reference of this dataframe
loadedDF[df] = input_dir
return df
def infer_schema(example, binary_features=[]):
"""Given a tf.train.Example, infer the Spark DataFrame schema (StructFields).
Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to
disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a "hint"
from the caller in the ``binary_features`` argument.
Args:
:example: a tf.train.Example
:binary_features: a list of tf.train.Example features which are expected to be binary/bytearrays.
Returns:
A DataFrame StructType schema
"""
def _infer_sql_type(k, v):
# special handling for binary features
if k in binary_features:
return BinaryType()
if v.int64_list.value:
result = v.int64_list.value
sql_type = LongType()
elif v.float_list.value:
result = v.float_list.value
sql_type = DoubleType()
else:
result = v.bytes_list.value
sql_type = StringType()
if len(result) > 1: # represent multi-item tensors as Spark SQL ArrayType() of base types
return ArrayType(sql_type)
else: # represent everything else as base types (and empty tensors as StringType())
return sql_type
return StructType([ StructField(k, _infer_sql_type(k, v), True) for k,v in sorted(example.features.feature.items()) ])
def get_input_fn(filename):
"""Returns an `input_fn` for train and eval."""
def input_fn(params):
"""A simple input_fn using the experimental input pipeline."""
# Retrieves the batch size for the current shard. The # of shards is
# computed according to the input pipeline deployment. See
# `tf.contrib.tpu.RunConfig` for details.
batch_size = params["batch_size"]
def parser(serialized_example):
"""Parses a single tf.Example into image and label tensors."""
features = tf.parse_single_example(
serialized_example,
features={
"image_raw": tf.FixedLenFeature([], tf.string),
"label": tf.FixedLenFeature([], tf.int64),
})
image = tf.decode_raw(features["image_raw"], tf.uint8)
image.set_shape([28 * 28])
# Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]
image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
label = tf.cast(features["label"], tf.int32)
return image, label
dataset = tf.data.TFRecordDataset(
filename, buffer_size=FLAGS.dataset_reader_buffer_size)
dataset = dataset.map(parser).cache().repeat()
dataset = dataset.apply(
tf.contrib.data.batch_and_drop_remainder(batch_size))
images, labels = dataset.make_one_shot_iterator().get_next()
return images, labels
return input_fn
def __init__(self, dataGenerator, bucketing=True, truncate_input=False):
"""Batcher constructor.
Args:
data_path: tf.Example filepattern.
vocab: Vocabulary.
hps: Seq2SeqAttention model hyperparameters.
article_key: article feature key in tf.Example.
abstract_key: abstract feature key in tf.Example.
max_article_sentences: Max number of sentences used from article.
max_abstract_sentences: Max number of sentences used from abstract.
bucketing: Whether bucket articles of similar length into the same batch.
truncate_input: Whether to truncate input that is too long. Alternative is
to discard such examples.
"""
self._data_generator = dataGenerator
self._vocab = dataGenerator.vocab
self._hps = dataGenerator._hps
# self._max_article_sentences = self.
# self._max_abstract_sentences = max_abstract_sentences
self._bucketing = bucketing
self._truncate_input = truncate_input
self._input_queue = Queue.Queue(QUEUE_NUM_BATCH * self._hps.batch_size)
self._bucket_input_queue = Queue.Queue(QUEUE_NUM_BATCH)
self._input_threads = []
for _ in xrange(8):
self._input_threads.append(Thread(target=self._FillInputQueue))
self._input_threads[-1].daemon = True
self._input_threads[-1].start()
self._bucketing_threads = []
for _ in xrange(2):
self._bucketing_threads.append(Thread(target=self._FillBucketInputQueue))
self._bucketing_threads[-1].daemon = True
self._bucketing_threads[-1].start()
# self._watch_thread = Thread(target=self._WatchThreads)
# self._watch_thread.daemon = True
# self._watch_thread.start()
def generate_files(generator, output_filenames, max_cases=None):
"""Generate cases from a generator and save as TFRecord files.
Generated cases are transformed to tf.Example protos and saved as TFRecords
in sharded files named output_dir/output_name-00..N-of-00..M=num_shards.
Args:
generator: a generator yielding (string -> int/float/str list) dictionaries.
output_filenames: List of output file paths.
max_cases: maximum number of cases to get from the generator;
if None (default), we use the generator until StopIteration is raised.
"""
if outputs_exist(output_filenames):
tf.logging.info("Skipping generator because outputs files exist")
return
num_shards = len(output_filenames)
writers = [tf.python_io.TFRecordWriter(fname) for fname in output_filenames]
counter, shard = 0, 0
for case in generator:
if counter > 0 and counter % 100000 == 0:
tf.logging.info("Generating case %d." % counter)
counter += 1
if max_cases and counter > max_cases:
break
sequence_example = to_example(case)
writers[shard].write(sequence_example.SerializeToString())
shard = (shard + 1) % num_shards
for writer in writers:
writer.close()
preprocess_measurements.py 文件源码
项目:scalable_analytics
作者: broadinstitute
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def sample_measurements_to_example(sample, sample_measurements):
"""Convert sparse measurements to TensorFlow Example protocol buffers.
See also
https://www.tensorflow.org/versions/r0.10/how_tos/reading_data/index.html
Args:
sample: the identifier for the sample
sample_measurements: list of the sample's sparse measurements
Returns:
A filled in TensorFlow Example proto for this sample.
"""
feature_tuples = [(str(cnt[MEASUREMENT_COLUMN]), cnt[VALUE_COLUMN])
for cnt in sample_measurements]
measurements, values = map(list, zip(*feature_tuples))
features = {
SAMPLE_NAME_FEATURE:
tf.train.Feature(bytes_list=tf.train.BytesList(value=[str(sample)])),
# These are tf.VarLenFeature.
MEASUREMENTS_FEATURE:
tf.train.Feature(bytes_list=tf.train.BytesList(value=measurements)),
VALUES_FEATURE:
tf.train.Feature(float_list=tf.train.FloatList(value=values))
}
return tf.train.Example(features=tf.train.Features(feature=features))
preprocess_measurements.py 文件源码
项目:scalable_analytics
作者: broadinstitute
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def run(argv=None):
"""Runs the sparse measurements preprocess pipeline.
Args:
argv: Pipeline options as a list of arguments.
"""
pipeline_options = PipelineOptions(flags=argv)
preprocess_options = pipeline_options.view_as(PreprocessOptions)
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
output_dir = os.path.join(preprocess_options.output,
datetime.datetime.now().strftime('%Y%m%d-%H%M%S'))
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(
WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'
cloud_options.staging_location = os.path.join(output_dir, 'tmp', 'staging')
cloud_options.temp_location = os.path.join(output_dir, 'tmp')
cloud_options.job_name = 'preprocess-measurements-%s' % (
datetime.datetime.now().strftime('%y%m%d-%H%M%S'))
data_query = str(
Template(open(preprocess_options.input, 'r').read()).render(
DATA_QUERY_REPLACEMENTS))
logging.info('data query : %s', data_query)
with beam.Pipeline(options=pipeline_options) as p:
# Read the table rows into a PCollection.
rows = p | 'ReadMeasurements' >> beam.io.Read(
beam.io.BigQuerySource(query=data_query, use_standard_sql=True))
# Convert the data into TensorFlow Example Protocol Buffers.
examples = measurements_to_examples(rows)
# Write the serialized compressed protocol buffers to Cloud Storage.
_ = (examples
| 'EncodeExamples'
>> beam.Map(lambda example: example.SerializeToString())
| 'WriteExamples' >> tfrecordio.WriteToTFRecord(
file_path_prefix=os.path.join(output_dir, 'examples'),
compression_type=CompressionTypes.GZIP,
file_name_suffix='.tfrecord.gz'))
def get_class_name_from_filename(file_name):
"""Gets the class name from a file.
Args:
file_name: The file name to get the class name from.
ie. "american_pit_bull_terrier_105.jpg"
Returns:
example: The converted tf.Example.
"""
match = re.match(r'([A-Za-z_]+)(_[0-9]+\.jpg)', file_name, re.I)
return match.groups()[0]
revise_preprocessed_data.py 文件源码
项目:cloudml-examples
作者: googlegenomics
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def filter_and_revise_example(serialized_example, samples_metadata):
"""Filter and revise a collection of existing TensorFlow examples.
Args:
serialized_example: the example to be revised and/or filtered
samples_metadata: dictionary of metadata for all samples
Returns:
A list containing the revised example or the empty list if the
example should be removed from the collection.
"""
example = tf.train.Example.FromString(serialized_example)
sample_name = example.features.feature[
encoder.SAMPLE_NAME_FEATURE].bytes_list.value[0]
logging.info('Checking ' + sample_name)
if sample_name not in samples_metadata:
logging.info('Omitting ' + sample_name)
return []
revised_features = {}
# Initialize with current example features.
revised_features.update(example.features.feature)
# Overwrite metadata features.
revised_features.update(
metadata_encoder.metadata_to_ancestry_features(
samples_metadata[sample_name]))
return [
tf.train.Example(features=tf.train.Features(feature=revised_features))
]
def read_input(self, data_path, batch_size, randomize_input=True,
distort_inputs=True, name="read_input"):
"""Read input labeled images and make a batch of examples.
Labeled images are read from files of tf.Example protos. This proto has
to contain two features: `image` and `label`, corresponding to an image
and its label. After being read, the labeled images are put into queues
to make a batch of examples every time the batching op is executed.
Args:
data_path: a string, path to files of tf.Example protos containing
labeled images.
batch_size: a int, number of labeled images in a batch.
randomize_input: a bool, whether the images in the batch are randomized.
distort_inputs: a bool, whether to distort the images.
name: a string, name of the op.
Returns:
keys: a tensowflow op, the keys of tf.Example protos.
examples: a tensorflow op, a batch of examples containing labeled
images. After being materialized, this op becomes a dict, in which the
`decoded_observation` key is an image and the `decoded_label` is the
label of that image.
"""
feature_types = {}
feature_types["image"] = tf.FixedLenFeature(
shape=[3072,], dtype=tf.int64, default_value=None)
feature_types["label"] = tf.FixedLenFeature(
shape=[1,], dtype=tf.int64, default_value=None)
keys, examples = tf.contrib.learn.io.graph_io.read_keyed_batch_examples(
file_pattern=data_path,
batch_size=batch_size,
reader=tf.TFRecordReader,
randomize_input=randomize_input,
queue_capacity=batch_size * 4,
num_threads=10 if randomize_input else 1,
parse_fn=lambda example_proto: self._preprocess_input(example_proto,
feature_types,
distort_inputs),
name=name)
return keys, examples
def _preprocess_input(self, example_proto, feature_types, distort_inputs):
"""Parse an tf.Example proto and preprocess its image and label.
Args:
example_proto: a tensorflow op, a tf.Example proto.
feature_types: a dict, used for parsing a tf.Example proto. This is the
same `feature_types` dict constructed in the `read_input` method.
distort_inputs: a bool, whether to distort the images.
Returns:
example: a tensorflow op, after being materialized becomes a dict, in
in which the `decoded_observation` key is a processed image, a tensor
of size InputReaderCifar10.IMAGE_SIZE x
InputReaderCifar10.IMAGE_SIZE x InputReaderCifar10.NUM_CHANNELS and
the `decoded_label` is the label of that image, a vector of size
InputReaderCifar10.NUM_CLASSES.
"""
example = tf.parse_single_example(example_proto, feature_types)
image = tf.reshape(example["image"], [InputReaderCifar10.NUM_CHANNELS,
InputReaderCifar10.IMAGE_SIZE,
InputReaderCifar10.IMAGE_SIZE])
image = tf.transpose(image, perm=[1, 2, 0])
image = tf.cast(image, tf.float32)
if distort_inputs:
image = tf.random_crop(image, [InputReaderCifar10.IMAGE_CROPPED_SIZE,
InputReaderCifar10.IMAGE_CROPPED_SIZE,
3])
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, max_delta=63)
image = tf.image.random_contrast(image, lower=0.2, upper=1.8)
else:
image = tf.image.resize_image_with_crop_or_pad(image,
InputReaderCifar10.IMAGE_CROPPED_SIZE,
InputReaderCifar10.IMAGE_CROPPED_SIZE)
image = tf.image.per_image_whitening(image)
example["decoded_observation"] = image
label = tf.one_hot(example["label"], InputReaderCifar10.NUM_CLASSES, on_value=1, off_value=0)
label = tf.reshape(label, [InputReaderCifar10.NUM_CLASSES,])
label = tf.cast(label, tf.int64)
example["decoded_label"] = label
return example
def __init__(self, schema):
"""Build an ExampleProtoCoder.
Args:
schema: A `Schema` object.
Raises:
ValueError: If `schema` is invalid.
"""
self._schema = schema
# Using pre-allocated tf.train.Example objects for performance reasons.
#
# The _encode_example_cache is used solely by "encode" paths while the
# the _decode_example_cache is used solely be "decode" paths, since the
# caching strategies are incompatible with each other (due to proto
# parsing/merging implementation).
#
# Since the output of both "encode" and "decode" are deep as opposed to
# shallow transformations, and since the schema always fully defines the
# Example's FeatureMap (ie all fields are always cleared/assigned or
# copied), the optimizations and implementation are correct and
# thread-compatible.
#
# Due to pickling issues actual initialization of this will happen lazily
# in encode or decode respectively.
self._encode_example_cache = None
self._decode_example_cache = None
self._feature_handlers = []
for name, feature_spec in six.iteritems(schema.as_feature_spec()):
if isinstance(feature_spec, tf.FixedLenFeature):
self._feature_handlers.append(
_FixedLenFeatureHandler(name, feature_spec))
elif isinstance(feature_spec, tf.VarLenFeature):
self._feature_handlers.append(
_VarLenFeatureHandler(name, feature_spec))
elif isinstance(feature_spec, tf.SparseFeature):
self._feature_handlers.append(
_SparseFeatureHandler(name, feature_spec))
else:
raise ValueError('feature_spec should be one of tf.FixedLenFeature, '
'tf.VarLenFeature or tf.SparseFeature: %s was %s' %
(name, type(feature_spec)))
def tfidf(x, vocab_size, smooth=True, name=None):
"""Maps the terms in x to their term frequency * inverse document frequency.
The inverse document frequency of a term is calculated as 1+
log((corpus size + 1) / (document frequency of term + 1)) by default.
Example usage:
example strings [["I", "like", "pie", "pie", "pie"], ["yum", "yum", "pie]]
in: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [0, 3], [0, 4],
[1, 0], [1, 1], [1, 2]],
values=[1, 2, 0, 0, 0, 3, 3, 0])
out: SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [1, 0], [1, 1]],
values=[1, 2, 0, 3, 0])
SparseTensor(indices=[[0, 0], [0, 1], [0, 2], [1, 0], [1, 1]],
values=[(1/5)*(log(3/2)+1), (1/5)*(log(3/2)+1), (1/5),
(1/3), (2/3)*(log(3/2)+1])
NOTE that the first doc's duplicate "pie" strings have been combined to
one output, as have the second doc's duplicate "yum" strings.
Args:
x: A `SparseTensor` representing int64 values (most likely that are the
result of calling string_to_int on a tokenized string).
vocab_size: An int - the count of vocab used to turn the string into int64s
including any OOV buckets.
smooth: A bool indicating if the inverse document frequency should be
smoothed. If True, which is the default, then the idf is calculated as
1 + log((corpus size + 1) / (document frequency of term + 1)).
Otherwise, the idf is
1 +log((corpus size) / (document frequency of term)), which could
result in a divizion by zero error.
name: (Optional) A name for this operation.
Returns:
Two `SparseTensor`s with indices [index_in_batch, index_in_bag_of_words].
The first has values vocab_index, which is taken from input `x`.
The second has values tfidf_weight.
"""
def _to_vocab_range(x):
"""Enforces that the vocab_ids in x are positive."""
return tf.SparseTensor(
indices=x.indices,
values=tf.mod(x.values, vocab_size),
dense_shape=x.dense_shape)
with tf.name_scope(name, 'tfidf'):
cleaned_input = _to_vocab_range(x)
term_frequencies = _to_term_frequency(cleaned_input, vocab_size)
count_docs_with_term_column = _count_docs_with_term(term_frequencies)
# Expand dims to get around the min_tensor_rank checks
sizes = tf.expand_dims(tf.shape(cleaned_input)[0], 0)
# [batch, vocab] - tfidf
tfidfs = _to_tfidf(term_frequencies,
analyzers.sum(count_docs_with_term_column,
reduce_instance_dims=False),
analyzers.sum(sizes),
smooth)
return _split_tfidfs_to_outputs(tfidfs)
def bucketize(x, num_buckets, epsilon=None, name=None):
"""Returns a bucketized column, with a bucket index assigned to each input.
Args:
x: A numeric input `Tensor` whose values should be mapped to buckets.
num_buckets: Values in the input `x` are divided into approximately
equal-sized buckets, where the number of buckets is num_buckets.
epsilon: (Optional) Error tolerance, typically a small fraction close to
zero. If a value is not specified by the caller, a suitable value is
computed based on experimental results. For `num_buckets` less than 100,
the value of 0.01 is chosen to handle a dataset of up to ~1 trillion input
data values. If `num_buckets` is larger, then epsilon is set to
(1/`num_buckets`) to enforce a stricter error tolerance, because more
buckets will result in smaller range for each bucket, and so we want the
the boundaries to be less fuzzy.
See analyzers.quantiles() for details.
name: (Optional) A name for this operation.
Returns:
A `Tensor` of the same shape as `x`, with each element in the
returned tensor representing the bucketized value. Bucketized value is
in the range [0, num_buckets).
Raises:
ValueError: If value of num_buckets is not > 1.
"""
with tf.name_scope(name, 'bucketize'):
if not isinstance(num_buckets, int):
raise TypeError('num_buckets must be an int, got %s', type(num_buckets))
if num_buckets < 1:
raise ValueError('Invalid num_buckets %d', num_buckets)
if epsilon is None:
# See explanation in args documentation for epsilon.
epsilon = min(1.0 / num_buckets, 0.01)
bucket_boundaries = analyzers.quantiles(x, num_buckets, epsilon)
buckets = quantile_ops.bucketize_with_input_boundaries(
x,
boundaries=bucket_boundaries,
name='assign_buckets')
# Convert to int64 because int32 is not compatible with tf.Example parser.
# See _TF_EXAMPLE_ALLOWED_TYPES in FixedColumnRepresentation()
# in tf_metadata/dataset_schema.py
return tf.to_int64(buckets)
def __init__(self, data_path, vocab, hps,
article_key, abstract_key, max_article_sentences,
max_abstract_sentences, bucketing=True, truncate_input=False):
"""Batcher constructor.
Args:
data_path: tf.Example filepattern.
vocab: Vocabulary.
hps: Seq2SeqAttention model hyperparameters.
article_key: article feature key in tf.Example.
abstract_key: abstract feature key in tf.Example.
max_article_sentences: Max number of sentences used from article.
max_abstract_sentences: Max number of sentences used from abstract.
bucketing: Whether bucket articles of similar length into the same batch.
truncate_input: Whether to truncate input that is too long. Alternative is
to discard such examples.
"""
self._data_path = data_path
self._vocab = vocab
self._hps = hps
self._article_key = article_key
self._abstract_key = abstract_key
self._max_article_sentences = max_article_sentences
self._max_abstract_sentences = max_abstract_sentences
self._bucketing = bucketing
self._truncate_input = truncate_input
self._input_queue = Queue.Queue(QUEUE_NUM_BATCH * self._hps.batch_size)
self._bucket_input_queue = Queue.Queue(QUEUE_NUM_BATCH)
self._input_threads = []
for _ in xrange(16):
self._input_threads.append(Thread(target=self._FillInputQueue))
self._input_threads[-1].daemon = True
self._input_threads[-1].start()
self._bucketing_threads = []
for _ in xrange(4):
self._bucketing_threads.append(Thread(target=self._FillBucketInputQueue))
self._bucketing_threads[-1].daemon = True
self._bucketing_threads[-1].start()
self._watch_thread = Thread(target=self._WatchThreads)
self._watch_thread.daemon = True
self._watch_thread.start()