def gdt2pt(gdt):
"""Translate GDAL data type to WKT Raster pixel type."""
pixtypes = {
gdalc.GDT_Byte : { 'name': 'PT_8BUI', 'id': 4 },
gdalc.GDT_Int16 : { 'name': 'PT_16BSI', 'id': 5 },
gdalc.GDT_UInt16 : { 'name': 'PT_16BUI', 'id': 6 },
gdalc.GDT_Int32 : { 'name': 'PT_32BSI', 'id': 7 },
gdalc.GDT_UInt32 : { 'name': 'PT_32BUI', 'id': 8 },
gdalc.GDT_Float32 : { 'name': 'PT_32BF', 'id': 10 },
gdalc.GDT_Float64 : { 'name': 'PT_64BF', 'id': 11 }
}
# XXX: Uncomment these logs to debug types translation
#logit('MSG: Input GDAL pixel type: %s (%d)\n' % (gdal.GetDataTypeName(gdt), gdt))
#logit('MSG: Output WKTRaster pixel type: %(name)s (%(id)d)\n' % (pixtypes.get(gdt, 13)))
return pixtypes.get(gdt, 13)
python类GetDataTypeName()的实例源码
def average(input_dir, out_dir, size):
input_dir = abspath(input_dir)
log.info('Reading tifs from {}'.format(input_dir))
tifs = glob.glob(join(input_dir, '*.tif'))
for tif in tifs:
data_source = gdal.Open(tif, gdal.GA_ReadOnly)
band = data_source.GetRasterBand(1)
# data_type = gdal.GetDataTypeName(band.DataType)
data = band.ReadAsArray()
no_data_val = band.GetNoDataValue()
averaged_data = filter_data(data, size, no_data_val)
log.info('Calculated average for {}'.format(basename(tif)))
output_file = join(out_dir, 'average_' + basename(tif))
out_ds = gdal.GetDriverByName('GTiff').Create(
output_file, data_source.RasterXSize, data_source.RasterYSize,
1, band.DataType)
out_band = out_ds.GetRasterBand(1)
out_band.WriteArray(averaged_data)
out_ds.SetGeoTransform(data_source.GetGeoTransform())
out_ds.SetProjection(data_source.GetProjection())
out_band.FlushCache() # Write data to disc
out_ds = None # close out_ds
data_source = None # close dataset
log.info('Finished converting {}'.format(basename(tif)))
def gdalaverage(input_dir, out_dir, size):
"""
average data using gdal's averaging method.
Parameters
----------
input_dir: str
input dir name of the tifs that needs to be averaged
out_dir: str
output dir name
size: int, optional
size of kernel
Returns
-------
"""
input_dir = abspath(input_dir)
log.info('Reading tifs from {}'.format(input_dir))
tifs = glob.glob(join(input_dir, '*.tif'))
process_tifs = np.array_split(tifs, mpiops.chunks)[mpiops.chunk_index]
for tif in process_tifs:
data_set = gdal.Open(tif, gdal.GA_ReadOnly)
# band = data_set.GetRasterBand(1)
# data_type = gdal.GetDataTypeName(band.DataType)
# data = band.ReadAsArray()
# no_data_val = band.GetNoDataValue()
# averaged_data = filter_data(data, size, no_data_val)
log.info('Calculated average for {}'.format(basename(tif)))
output_file = join(out_dir, 'average_' + basename(tif))
src_gt = data_set.GetGeoTransform()
tmp_file = '/tmp/tmp_{}.tif'.format(mpiops.chunk_index)
resample_cmd = [TRANSLATE] + [tif, tmp_file] + \
['-tr', str(src_gt[1]*size), str(src_gt[1]*size)] + \
['-r', 'bilinear']
check_call(resample_cmd)
rollback_cmd = [TRANSLATE] + [tmp_file, output_file] + \
['-tr', str(src_gt[1]), str(src_gt[1])]
check_call(rollback_cmd)
log.info('Finished converting {}'.format(basename(tif)))
def rasterToJSON (infile, outfile, outproj):
#reproject dataset to the desired projection
subprocess.call(['gdalwarp', '-t_srs', 'EPSG:{}'.format(outproj), infile, 'outReproj.tif', '-overwrite'])
projRas = gdal.Open('outReproj.tif')
#get properties of new raster
nrows = projRas.RasterYSize
ncols = projRas.RasterXSize
band1 = projRas.GetRasterBand(1)
nodata = band1.GetNoDataValue()
gdata = band1.ReadAsArray()
x0, y0 = projRas.GetGeoTransform()[0], projRas.GetGeoTransform()[3]
cellX, cellY = projRas.GetGeoTransform()[1], projRas.GetGeoTransform()[5]
#get corners
ulcorner, llcorner = [x0, y0], [x0, y0 + nrows*cellY]
urcorner, lrcorner = [x0 + ncols*cellX, y0], [x0 + ncols*cellX, y0 + nrows*cellY]
dataType = gdal.GetDataTypeName(band1.DataType)
#convert to float
if dataType.startswith('Int'):
gdata = gdata.astype(numpy.float32, copy=False)
# new nodata value
gdata[gdata == nodata] = -9999
gdata = numpy.concatenate(gdata)
gdata = gdata.tolist()
#write to json
gdataJSON = {'upLeft': transPoint(ulcorner, outproj, 4326), 'loLeft': transPoint(llcorner, outproj, 4326),
'upRight': transPoint(urcorner, outproj, 4326), 'loRight': transPoint(lrcorner, outproj, 4326),
'projEPSG': outproj, 'width': ncols, 'height': nrows, 'data': gdata}
with open(outfile, 'w') as fp:
json.dump(gdataJSON, fp)
del gdata
subprocess.call(['rm', 'outReproj.tif'])
def changeRasterValues(self, band):
fmttypes = {'Byte': 'B', 'UInt16': 'H', 'Int16': 'h', 'UInt32': 'I', 'Int32': 'i', 'Float32': 'f',
'Float64': 'd'}
data_type = band.DataType
BandType = gdal.GetDataTypeName(band.DataType)
raster = []
for y in range(band.YSize):
scanline = band.ReadRaster(0, y, band.XSize, 1, band.XSize, 1, data_type)
values = struct.unpack(fmttypes[BandType] * band.XSize, scanline)
raster.append(values)
raster = [list(item) for item in raster]
# changing raster values
#for i, item in enumerate(raster):
# for j, value in enumerate(item):
# #if value > 98:
# # #print i, j
# # raster[i][j] = 0
# transforming list in array
raster = numpy.asarray(raster)
return raster
def __init__(self, filename):
if os.path.isfile(filename):
self.filename = filename if os.path.isabs(filename) else os.path.join(os.getcwd(), filename)
self.raster = gdal.Open(filename, GA_ReadOnly)
else:
raise IOError('file does not exist')
self.cols = self.raster.RasterXSize
self.rows = self.raster.RasterYSize
self.bands = self.raster.RasterCount
self.dim = [self.rows, self.cols, self.bands]
self.driver = self.raster.GetDriver()
self.format = self.driver.ShortName
self.dtype = gdal.GetDataTypeName(self.raster.GetRasterBand(1).DataType)
self.projection = self.raster.GetProjection()
self.srs = osr.SpatialReference(wkt=self.projection)
self.proj4 = self.srs.ExportToProj4()
try:
self.epsg = crsConvert(self.srs, 'epsg')
except RuntimeError:
self.epsg = None
self.geogcs = self.srs.GetAttrValue('geogcs')
self.projcs = self.srs.GetAttrValue('projcs') if self.srs.IsProjected() else None
self.geo = dict(
zip(['xmin', 'xres', 'rotation_x', 'ymax', 'rotation_y', 'yres'], self.raster.GetGeoTransform()))
# note: yres is negative!
self.geo['xmax'] = self.geo['xmin'] + self.geo['xres'] * self.cols
self.geo['ymin'] = self.geo['ymax'] + self.geo['yres'] * self.rows
self.res = [abs(float(self.geo['xres'])), abs(float(self.geo['yres']))]
self.nodata = self.raster.GetRasterBand(1).GetNoDataValue()
self.__data = []
def get_ndv_b(b):
"""Get NoData value for GDAL band.
If NoDataValue is not set in the band,
extract upper left and lower right pixel values.
Otherwise assume NoDataValue is 0.
Parameters
----------
b : GDALRasterBand object
This is the input band.
Returns
-------
b_ndv : float
NoData value
"""
b_ndv = b.GetNoDataValue()
if b_ndv is None:
#Check ul pixel for ndv
ns = b.XSize
nl = b.YSize
ul = float(b.ReadAsArray(0, 0, 1, 1))
#ur = float(b.ReadAsArray(ns-1, 0, 1, 1))
lr = float(b.ReadAsArray(ns-1, nl-1, 1, 1))
#ll = float(b.ReadAsArray(0, nl-1, 1, 1))
#Probably better to use 3/4 corner criterion
#if ul == ur == lr == ll:
if np.isnan(ul) or ul == lr:
b_ndv = ul
else:
#Assume ndv is 0
b_ndv = 0
elif np.isnan(b_ndv):
b_dt = gdal.GetDataTypeName(b.DataType)
if 'Float' in b_dt:
b_ndv = np.nan
else:
b_ndv = 0
return b_ndv
#Write out a recarray as a csv
def __init__(self,ds_filename,load_data=True,latlon=True,band=1,
spatial_ref=None,geo_transform=None,downsampl=1):
""" Construct object with raster from a single band dataset.
Parameters:
ds_filename : filename of the dataset to import
load_data : - True, to import the data into obj.r.
- False, to not load any data.
- tuple (left, right, bottom, top) to load subset;
obj.extent will be set to reflect subset area.
latlon : default True. Only used if load_data=tuple. Set as False
if tuple is projected coordinates, True if WGS84.
band : default 1. Specify GDAL band number to load. If you want to
load multiple bands at once use MultiBandRaster instead.
downsampl : default 1. Used to down-sample the image when loading it.
A value of 2 for example will multiply the resolution by 2.
Optionally, you can manually specify/override the georeferencing.
To do this you must set both of the following parameters:
spatial_ref : a OSR SpatialReference instance
geo_transform : a Geographic Transform tuple of the form
(top left x, w-e cell size, 0, top left y, 0,
n-s cell size (-ve))
"""
# Do basic dataset loading - set up georeferencing
self._load_ds(ds_filename,spatial_ref=spatial_ref,
geo_transform=geo_transform)
# Import band datatype
band_tmp = self.ds.GetRasterBand(band)
self.dtype = gdal.GetDataTypeName(band_tmp.DataType)
# Load entire image
if load_data == True:
self.r = self.read_single_band(band,downsampl=downsampl)
# Or load just a subset region
elif isinstance(load_data,tuple) or isinstance(load_data,list):
if len(load_data) == 4:
self.r = self.read_single_band_subset(load_data,latlon=latlon,
band=band,update_info=True,downsampl=downsampl)
elif load_data == False:
return
else:
print('Warning : load_data argument not understood. No data loaded.')