作者:Terry-Ma
项目:bf
// SetStore set the data into fpath.
func (z *Zookeeper) SetStore(s *meta.Store) (err error) {
var (
data []byte
stat *myzk.Stat
os = new(meta.Store)
)
s.Id = z.conf.Zookeeper.ServerId
s.Rack = z.conf.Zookeeper.Rack
s.Status = meta.StoreStatusInit
if data, stat, err = z.c.Get(z.fpath); err != nil {
log.Errorf("zk.Get(\"%s\") error(%v)", z.fpath, err)
return
}
if len(data) > 0 {
if err = json.Unmarshal(data, os); err != nil {
log.Errorf("json.Unmarshal() error(%v)", err)
return
}
log.Infof("\nold store meta: %s, \ncurrent store meta: %s", os, s)
s.Status = os.Status
}
// meta.Status not modifify, may update by pitchfork
if data, err = json.Marshal(s); err != nil {
log.Errorf("json.Marshal() error(%v)", err)
return
}
if _, err = z.c.Set(z.fpath, data, stat.Version); err != nil {
log.Errorf("zk.Set(\"%s\") error(%v)", z.fpath, err)
}
return
}
作者:yongleho
项目:bf
// DelStores get delable stores for http del
func (d *Directory) DelStores(bucket, filename string) (n *meta.Needle, stores []string, err error) {
var (
ok bool
store string
svrs []string
storeMeta *meta.Store
)
if n, err = d.hBase.Get(bucket, filename); err != nil {
log.Errorf("hBase.Get error(%v)", err)
if err != errors.ErrNeedleNotExist {
err = errors.ErrHBase
}
return
}
if n == nil {
err = errors.ErrNeedleNotExist
return
}
if svrs, ok = d.volumeStore[n.Vid]; !ok {
err = errors.ErrZookeeperDataError
return
}
stores = make([]string, 0, len(svrs))
for _, store = range svrs {
if storeMeta, ok = d.store[store]; !ok {
err = errors.ErrZookeeperDataError
return
}
if !storeMeta.CanWrite() {
err = errors.ErrStoreNotAvailable
return
}
stores = append(stores, storeMeta.Api)
}
if err = d.hBase.Del(bucket, filename); err != nil {
log.Errorf("hBase.Del error(%v)", err)
err = errors.ErrHBase
}
return
}
作者:yongleho
项目:bf
// GetStores get readable stores for http get
func (d *Directory) GetStores(bucket, filename string) (n *meta.Needle, stores []string, err error) {
var (
store string
svrs []string
storeMeta *meta.Store
ok bool
)
if n, err = d.hBase.Get(bucket, filename); err != nil {
log.Errorf("hBase.Get error(%v)", err)
if err != errors.ErrNeedleNotExist {
err = errors.ErrHBase
}
return
}
if n == nil {
err = errors.ErrNeedleNotExist
return
}
if svrs, ok = d.volumeStore[n.Vid]; !ok {
err = errors.ErrZookeeperDataError
return
}
stores = make([]string, 0, len(svrs))
for _, store = range svrs {
if storeMeta, ok = d.store[store]; !ok {
log.Errorf("store cannot match store:", store)
continue
}
if !storeMeta.CanRead() {
continue
}
stores = append(stores, storeMeta.Api)
}
if len(stores) == 0 {
err = errors.ErrStoreNotAvailable
}
return
}
作者:Terry-Ma
项目:bf
// Update when zk updates
func (d *Dispatcher) Update(group map[int][]string,
store map[string]*meta.Store, volume map[int32]*meta.VolumeState,
storeVolume map[string][]int32) (err error) {
var (
gid int
i int
vid int32
gids []int
sid string
stores []string
restSpace, minScore, score int
totalAdd, totalAddDelay uint64
write, ok bool
storeMeta *meta.Store
volumeState *meta.VolumeState
)
gids = []int{}
for gid, stores = range group {
write = true
// check all stores can writeable by the group.
for _, sid = range stores {
if storeMeta, ok = store[sid]; !ok {
log.Errorf("idStore cannot match store: %s", sid)
break
}
if storeMeta == nil {
log.Warningf("storeMeta is null, %s", sid)
return
}
if !storeMeta.CanWrite() {
write = false
break
}
}
if !write {
continue
}
// calc score
for _, sid = range stores {
totalAdd, totalAddDelay, restSpace, minScore = 0, 0, 0, 0
// get all volumes by the store.
for _, vid = range storeVolume[sid] {
volumeState = volume[vid]
if volumeState == nil {
log.Warningf("volumeState is nil, %d", vid)
return
}
totalAdd = totalAdd + volumeState.TotalWriteProcessed
restSpace = restSpace + int(volumeState.FreeSpace)
totalAddDelay = totalAddDelay + volumeState.TotalWriteDelay
}
score = d.calScore(int(totalAdd), int(totalAddDelay), restSpace)
if score < minScore || minScore == 0 {
minScore = score
}
}
for i = 0; i < minScore; i++ {
gids = append(gids, gid)
}
}
d.gids = gids
return
}