作者:DemonVe
项目:elliptics-g
func (s *Session) CopyIteratorStart(id *DnetRawID, ranges []DnetIteratorRange,
groups []uint32, iflags uint64, timeFrame ...time.Time) <-chan IteratorResult {
ekey, onResultContext, onFinishContext, responseCh, err := iteratorHelper(id)
if err != nil {
return responseCh
}
defer ekey.Free()
var ctime_begin, ctime_end C.struct_dnet_time
if err := adjustTimeFrame(&ctime_begin, &ctime_end, timeFrame...); err != nil {
context, pool_err := Pool.Get(onFinishContext)
if pool_err != nil {
panic("Unable to find session number")
}
context.(func(error))(err)
return responseCh
}
if len(timeFrame) != 0 {
iflags |= DNET_IFLAGS_TS_RANGE
}
iflags |= DNET_IFLAGS_KEY_RANGE
cranges := convertRanges(ranges)
C.session_start_copy_iterator(s.session, C.context_t(onResultContext), C.context_t(onFinishContext),
(*C.struct_go_iterator_range)(&cranges[0]), C.size_t(len(cranges)),
(*C.uint32_t)(&groups[0]), (C.size_t)(len(groups)),
ekey.key,
C.uint64_t(iflags),
ctime_begin,
ctime_end)
return responseCh
}
作者:DemonVe
项目:elliptics-g
func (s *Session) setOrUpdateIndexes(operation int, key string, indexes map[string]string) <-chan Indexer {
ekey, err := NewKey(key)
if err != nil {
panic(err)
}
defer ekey.Free()
responseCh := make(chan Indexer, defaultVOLUME)
var cindexes []*C.char
var cdatas []C.struct_go_data_pointer
for index, data := range indexes {
cindex := C.CString(index) // free this
defer C.free(unsafe.Pointer(cindex))
cindexes = append(cindexes, cindex)
cdata := C.new_data_pointer(
C.CString(data), // freed by ellipics::data_pointer in std::vector ???
C.int(len(data)),
)
cdatas = append(cdatas, cdata)
}
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func() {
//It's never called. For the future.
}
onFinish := func(err error) {
if err != nil {
responseCh <- &indexResult{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
// TODO: Reimplement this with pointer on functions
switch operation {
case indexesSet:
C.session_set_indexes(s.session, C.context_t(onResultContext), C.context_t(onFinishContext),
ekey.key,
(**C.char)(&cindexes[0]),
(*C.struct_go_data_pointer)(&cdatas[0]),
C.uint64_t(len(cindexes)))
case indexesUpdate:
C.session_update_indexes(s.session, C.context_t(onResultContext), C.context_t(onFinishContext),
ekey.key,
(**C.char)(&cindexes[0]),
(*C.struct_go_data_pointer)(&cdatas[0]),
C.uint64_t(len(cindexes)))
}
return responseCh
}
作者:DemonVe
项目:elliptics-g
// ParallelLookup returns all information about given Key,
// it sends multiple lookup requests in parallel to all session groups
// and returns information about all specified group where given key has been found.
func (s *Session) ParallelLookup(kstr string) <-chan Lookuper {
responseCh := make(chan Lookuper, defaultVOLUME)
onResultContext := NextContext()
onFinishContext := NextContext()
key, err := NewKey(kstr)
if err != nil {
responseCh <- &lookupResult{err: err}
close(responseCh)
return responseCh
}
defer key.Free()
onResult := func(lookup *lookupResult) {
responseCh <- lookup
}
onFinish := func(err error) {
if err != nil {
responseCh <- &lookupResult{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
/* To keep callbacks alive */
C.session_parallel_lookup(s.session, C.context_t(onResultContext), C.context_t(onFinishContext), key.key)
return responseCh
}
作者:DemonVe
项目:elliptics-g
func (s *Session) ServerSend(keys *DnetRawIDKeys, flags uint64, groups []uint32) (<-chan IteratorResult, error) {
if len(groups) == 0 {
return nil, fmt.Errorf("server-send: invalid empty group set, must contain at least one group")
}
responseCh := make(chan IteratorResult, defaultVOLUME)
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(iterres *iteratorResult) {
responseCh <- iterres
}
onFinish := func(err error) {
if err != nil {
responseCh <- &iteratorResult{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_server_send(s.session, C.context_t(onResultContext), C.context_t(onFinishContext),
keys.keys,
C.uint64_t(flags),
(*C.uint32_t)(&groups[0]), (C.size_t)(len(groups)))
return responseCh, nil
}
作者:DemonVe
项目:elliptics-g
//ReadKey performs a read operation by key.
func (s *Session) ReadKey(key *Key, offset, size uint64) <-chan ReadResult {
responseCh := make(chan ReadResult, defaultVOLUME)
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(result *readResult) {
responseCh <- result
}
onFinish := func(err error) {
if err != nil {
responseCh <- &readResult{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_read_data(s.session,
C.context_t(onResultContext), C.context_t(onFinishContext),
key.key, C.uint64_t(offset), C.uint64_t(size))
return responseCh
}
作者:DemonVe
项目:elliptics-g
//ListIndexes gets list of all indxes, which are associated with key.
func (s *Session) ListIndexes(key string) <-chan IndexEntry {
responseCh := make(chan IndexEntry, defaultVOLUME)
ekey, err := NewKey(key)
if err != nil {
panic(err)
}
defer ekey.Free()
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(indexentry *IndexEntry) {
responseCh <- *indexentry
}
onFinish := func(err error) {
if err != nil {
responseCh <- IndexEntry{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_list_indexes(s.session, C.context_t(onResultContext), C.context_t(onFinishContext), ekey.key)
return responseCh
}
作者:DemonVe
项目:elliptics-g
//BulkRemove removes keys from array. It returns error for every key it could not delete.
func (s *Session) BulkRemove(keys_str []string) <-chan Remover {
responseCh := make(chan Remover, defaultVOLUME)
keys, err := NewKeys(keys_str)
if err != nil {
responseCh <- &removeResult{
key: "new keys allocation failed",
err: err,
}
close(responseCh)
return responseCh
}
defer keys.Free()
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(r *removeResult) {
if r.err != nil {
responseCh <- r
} else if r.cmd.Status != 0 {
key, err := keys.Find(r.Cmd().ID.ID)
if err != nil {
responseCh <- &removeResult{
key: "could not find key for replied ID",
err: err,
}
return
}
r.err = fmt.Errorf("remove status: %d", r.cmd.Status)
r.key = key
responseCh <- r
}
}
onFinish := func(err error) {
if err != nil {
responseCh <- &removeResult{
key: "overall operation result",
err: err,
}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_bulk_remove(s.session, C.context_t(onResultContext), C.context_t(onFinishContext), keys.keys)
return responseCh
}
作者:DemonVe
项目:elliptics-g
//FindAnyIndexes returns IndexEntries for keys, which were indexed with any of indexes.
func (s *Session) FindAnyIndexes(indexes []string) <-chan Finder {
responseCh := make(chan Finder, defaultVOLUME)
onResult, onFinish, cindexes := s.findIndexes(indexes, responseCh)
C.session_find_any_indexes(s.session, C.context_t(onResult), C.context_t(onFinish),
(**C.char)(&cindexes[0]), C.uint64_t(len(indexes)))
//Free cindexes
for _, item := range cindexes {
C.free(unsafe.Pointer(item))
}
return responseCh
}
作者:DemonVe
项目:elliptics-g
func (s *Session) IteratorCancel(id *DnetRawID, iteratorId uint64) <-chan IteratorResult {
ekey, onResultContext, onFinishContext, responseCh, err := iteratorHelper(id)
if err != nil {
return responseCh
}
defer ekey.Free()
C.session_cancel_iterator(s.session, C.context_t(onResultContext), C.context_t(onFinishContext),
ekey.key,
C.uint64_t(iteratorId))
return responseCh
}
作者:noxiou
项目:elliptics-g
func (s *Session) IteratorPause(id *DnetRawID, iteratorId uint64) *DChannel {
ekey, onResultContext, onFinishContext, responseCh, err := iteratorHelper(id)
if err != nil {
return responseCh
}
defer ekey.Free()
C.session_pause_iterator(s.session, C.context_t(onResultContext), C.context_t(onFinishContext),
ekey.key,
C.uint64_t(iteratorId))
return responseCh
}
作者:DemonVe
项目:elliptics-g
//WriteKey writes blob by Key.
func (s *Session) WriteKey(key *Key, input io.Reader, offset, total_size uint64) <-chan Lookuper {
responseCh := make(chan Lookuper, defaultVOLUME)
onWriteContext := NextContext()
onWriteFinishContext := NextContext()
chunk_context := NextContext()
onWriteResult := func(lookup *lookupResult) {
responseCh <- lookup
}
onWriteFinish := func(err error) {
if err != nil {
responseCh <- &lookupResult{err: err}
}
close(responseCh)
Pool.Delete(onWriteContext)
Pool.Delete(onWriteFinishContext)
Pool.Delete(chunk_context)
}
chunk, err := ioutil.ReadAll(input)
if err != nil {
responseCh <- &lookupResult{err: err}
close(responseCh)
return responseCh
}
if len(chunk) == 0 {
responseCh <- &lookupResult{
err: &DnetError{
Code: -22,
Flags: 0,
Message: "Invalid zero-length write request",
},
}
close(responseCh)
return responseCh
}
Pool.Store(onWriteContext, onWriteResult)
Pool.Store(onWriteFinishContext, onWriteFinish)
Pool.Store(chunk_context, chunk)
C.session_write_data(s.session,
C.context_t(onWriteContext), C.context_t(onWriteFinishContext),
key.key, C.uint64_t(offset), (*C.char)(unsafe.Pointer(&chunk[0])), C.uint64_t(len(chunk)))
return responseCh
}
作者:DemonVe
项目:elliptics-g
func (s *Session) DnetStat() *DnetStat {
response := make(chan *StatEntry, 10)
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(result *StatEntry) {
response <- result
}
onFinish := func(err error) {
if err != nil {
response <- &StatEntry{
err: err,
}
}
close(response)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
categories := StatCategoryBackend | StatCategoryProcFS | StatCategoryCommands
C.session_get_stats(s.session,
C.context_t(onResultContext), C.context_t(onFinishContext),
C.uint64_t(categories))
st := &DnetStat{
Group: make(map[uint32]*StatGroup),
}
s.GetRoutes(st)
// read stat results from the channel and update DnetStat
for se := range response {
st.AddStatEntry(se)
}
return st
}
作者:noxiou
项目:elliptics-g
func (s *Session) ServerSend(keys []DnetRawID, flags uint64, groups []uint32) (*DChannel, error) {
if len(groups) == 0 {
return nil, fmt.Errorf("server-send: invalid empty group set, must contain at least one group")
}
if len(keys) == 0 {
return nil, fmt.Errorf("server-send: invalid empty key set, must contain at least one key")
}
id_keys, err := NewDnetRawIDKeys(keys)
if err != nil {
return nil, fmt.Errorf("server-send: could not allocate vector of dnet_raw_id structures: %v", err)
}
defer id_keys.Free()
responseCh := NewDChannel()
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(iterres *iteratorResult) {
responseCh.In <- iterres
}
onFinish := func(err error) {
if err != nil {
responseCh.In <- &iteratorResult{err: err}
}
close(responseCh.In)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_server_send(s.session, C.context_t(onResultContext), C.context_t(onFinishContext),
id_keys.keys,
C.uint64_t(flags),
(*C.uint32_t)(&groups[0]), (C.size_t)(len(groups)))
return responseCh, nil
}
作者:DemonVe
项目:elliptics-g
func (s *Session) GetRoutes(stat *DnetStat) {
context := NextContext()
Pool.Store(context, stat)
C.session_get_routes(s.session, C.context_t(context))
stat.Finalize()
Pool.Delete(context)
return
}
作者:DemonVe
项目:elliptics-g
//RemoveIndexes removes indexes from a key.
func (s *Session) RemoveIndexes(key string, indexes []string) <-chan Indexer {
ekey, err := NewKey(key)
if err != nil {
panic(err)
}
defer ekey.Free()
responseCh := make(chan Indexer, defaultVOLUME)
var cindexes []*C.char
for _, index := range indexes {
cindex := C.CString(index) // free this
defer C.free(unsafe.Pointer(cindex))
cindexes = append(cindexes, cindex)
}
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func() {
//It's never called. For the future.
}
onFinish := func(err error) {
if err != nil {
responseCh <- &indexResult{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_remove_indexes(s.session,
C.context_t(onResultContext), C.context_t(onFinishContext),
ekey.key, (**C.char)(&cindexes[0]), C.uint64_t(len(cindexes)))
return responseCh
}
作者:DemonVe
项目:elliptics-g
//RemoveKey performs remove operation by key.
func (s *Session) RemoveKey(key *Key) <-chan Remover {
responseCh := make(chan Remover, defaultVOLUME)
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(r *removeResult) {
responseCh <- r
}
onFinish := func(err error) {
if err != nil {
responseCh <- &removeResult{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_remove(s.session, C.context_t(onResultContext), C.context_t(onFinishContext), key.key)
return responseCh
}
作者:DemonVe
项目:elliptics-g
// Lookup returns an information about given Key.
// It only returns the first group where key has been found.
func (s *Session) Lookup(key *Key) <-chan Lookuper {
responseCh := make(chan Lookuper, defaultVOLUME)
onResultContext := NextContext()
onFinishContext := NextContext()
onResult := func(lookup *lookupResult) {
responseCh <- lookup
}
onFinish := func(err error) {
if err != nil {
responseCh <- &lookupResult{err: err}
}
close(responseCh)
Pool.Delete(onResultContext)
Pool.Delete(onFinishContext)
}
Pool.Store(onResultContext, onResult)
Pool.Store(onFinishContext, onFinish)
C.session_lookup(s.session, C.context_t(onResultContext), C.context_t(onFinishContext), key.key)
return responseCh
}
作者:noxiou
项目:elliptics-g
func (s *Session) BackendMakeReadOnly(addr *DnetAddr, backend_id int32) *DChannel {
responseCh := NewDChannel()
context := NextContext()
onFinish := func(tmp *DnetBackendsStatus) {
responseCh.In <- tmp
close(responseCh.In)
Pool.Delete(context)
}
Pool.Store(context, onFinish)
var tmp *C.struct_dnet_addr = C.dnet_addr_alloc()
defer C.dnet_addr_free(tmp)
addr.CAddr(tmp)
C.session_backend_make_readonly(s.session, tmp, C.uint32_t(backend_id), C.context_t(context))
return responseCh
}
作者:DemonVe
项目:elliptics-g
func (s *Session) BackendMakeWritable(addr *DnetAddr, backend_id int32) <-chan *DnetBackendsStatus {
responseCh := make(chan *DnetBackendsStatus, defaultVOLUME)
context := NextContext()
onFinish := func(tmp *DnetBackendsStatus) {
responseCh <- tmp
close(responseCh)
Pool.Delete(context)
}
Pool.Store(context, onFinish)
var tmp *C.struct_dnet_addr = C.dnet_addr_alloc()
defer C.dnet_addr_free(tmp)
addr.CAddr(tmp)
C.session_backend_make_writable(s.session, tmp, C.uint32_t(backend_id), C.context_t(context))
return responseCh
}
作者:DemonVe
项目:elliptics-g
func (s *Session) WriteChunk(key string, input io.Reader, initial_offset, total_size uint64) <-chan Lookuper {
responseCh := make(chan Lookuper, defaultVOLUME)
onChunkContext := NextContext()
onFinishContext := NextContext()
chunk_context := NextContext()
chunk := make([]byte, max_chunk_size, max_chunk_size)
orig_total_size := total_size
offset := initial_offset
var n64 uint64
onChunkResult := func(lookup *lookupResult) {
if total_size == 0 {
responseCh <- lookup
}
}
var onChunkFinish func(err error)
onChunkFinish = func(err error) {
if err != nil {
responseCh <- &lookupResult{err: err}
close(responseCh)
Pool.Delete(onChunkContext)
Pool.Delete(onFinishContext)
Pool.Delete(chunk_context)
return
}
if total_size == 0 {
close(responseCh)
Pool.Delete(onChunkContext)
Pool.Delete(onFinishContext)
Pool.Delete(chunk_context)
return
}
n, err := input.Read(chunk)
if n <= 0 && err != nil {
responseCh <- &lookupResult{err: err}
close(responseCh)
Pool.Delete(onChunkContext)
Pool.Delete(onFinishContext)
Pool.Delete(chunk_context)
return
}
n64 = uint64(n)
total_size -= n64
offset += n64
ekey, err := NewKey(key)
if err != nil {
responseCh <- &lookupResult{err: err}
close(responseCh)
Pool.Delete(onChunkContext)
Pool.Delete(onFinishContext)
Pool.Delete(chunk_context)
return
}
defer ekey.Free()
if total_size != 0 {
C.session_write_plain(s.session,
C.context_t(onChunkContext), C.context_t(onFinishContext),
ekey.key, C.uint64_t(offset-n64),
(*C.char)(unsafe.Pointer(&chunk[0])), C.uint64_t(n))
} else {
C.session_write_commit(s.session,
C.context_t(onChunkContext), C.context_t(onFinishContext),
ekey.key, C.uint64_t(offset-n64), C.uint64_t(offset),
(*C.char)(unsafe.Pointer(&chunk[0])), C.uint64_t(n))
}
}
rest := total_size
if rest > max_chunk_size {
rest = max_chunk_size
}
n, err := input.Read(chunk)
if err != nil {
responseCh <- &lookupResult{err: err}
close(responseCh)
return responseCh
}
if n == 0 {
responseCh <- &lookupResult{
err: &DnetError{
Code: -22,
Flags: 0,
Message: fmt.Sprintf("Invalid zero-length write: current-offset: %d/%d, rest-size: %d/%d",
initial_offset, offset, total_size, orig_total_size),
},
}
}
n64 = uint64(n)
//.........这里部分代码省略.........