Golang github.com-absolute8511-nsq-internal-test.Equal类(方法)实例源码

下面列出了Golang github.com-absolute8511-nsq-internal-test.Equal 类(方法)源码代码实例,从而了解它的用法。

作者:absolute851    项目:ns   
func TestHTTPmpubEmpty(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	_, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_http_mpub_empty" + strconv.Itoa(int(time.Now().Unix()))
	nsqd.GetTopicIgnPart(topicName)

	msg := []byte("test message")
	msgs := make([][]byte, 4)
	for i := range msgs {
		msgs[i] = msg
	}
	buf := bytes.NewBuffer(bytes.Join(msgs, []byte("\n")))
	_, err := buf.Write([]byte("\n"))
	test.Equal(t, err, nil)

	url := fmt.Sprintf("http://%s/mpub?topic=%s", httpAddr, topicName)
	resp, err := http.Post(url, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, string(body), "OK")

	time.Sleep(5 * time.Millisecond)

}

作者:absolute851    项目:ns   
func TestTLSRequireVerifyExceptHTTP(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	opts.LogLevel = 3
	opts.TLSCert = "./test/certs/server.pem"
	opts.TLSKey = "./test/certs/server.key"
	opts.TLSRootCAFile = "./test/certs/ca.pem"
	opts.TLSClientAuthPolicy = "require-verify"
	opts.TLSRequired = TLSRequiredExceptHTTP
	_, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_http_req_verf_except_http" + strconv.Itoa(int(time.Now().Unix()))
	nsqd.GetTopicIgnPart(topicName)

	// no cert
	buf := bytes.NewBuffer([]byte("test message"))
	url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
	resp, err := http.Post(url, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, string(body), "OK")

	time.Sleep(5 * time.Millisecond)

}

作者:absolute851    项目:ns   
func TestStats(t *testing.T) {
	opts := nsqdNs.NewOptions()
	opts.Logger = newTestLogger(t)
	tcpAddr, _, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_stats" + strconv.Itoa(int(time.Now().Unix()))
	topic := nsqd.GetTopicIgnPart(topicName)
	msg := nsqdNs.NewMessage(0, []byte("test body"))
	topic.PutMessage(msg)

	conn, err := mustConnectNSQD(tcpAddr)
	test.Equal(t, err, nil)
	defer conn.Close()

	identify(t, conn, nil, frameTypeResponse)
	sub(t, conn, topicName, "ch")

	stats := nsqd.GetStats(false)
	t.Logf("stats: %+v", stats)

	test.Equal(t, len(stats), 1)
	test.Equal(t, len(stats[0].Channels), 1)
	test.Equal(t, len(stats[0].Channels[0].Clients), 1)
}

作者:absolute851    项目:ns   
func TestHTTPmpubBinary(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	_, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
	nsqd.GetTopicIgnPart(topicName)

	mpub := make([][]byte, 5)
	for i := range mpub {
		mpub[i] = make([]byte, 100)
	}
	cmd, _ := nsq.MultiPublish(topicName, mpub)
	buf := bytes.NewBuffer(cmd.Body)

	url := fmt.Sprintf("http://%s/mpub?topic=%s&binary=true", httpAddr, topicName)
	resp, err := http.Post(url, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, string(body), "OK")

	time.Sleep(5 * time.Millisecond)

}

作者:absolute851    项目:ns   
func TestNsqdRPCClient(t *testing.T) {
	SetCoordLogger(newTestLogger(t), 2)
	tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
	if err != nil {
		panic(err)
	}
	defer os.RemoveAll(tmpDir)

	nsqdCoord := startNsqdCoord(t, "0", tmpDir, "", nil, true)
	nsqdCoord.Start()
	defer nsqdCoord.Stop()
	time.Sleep(time.Second * 2)
	client, err := NewNsqdRpcClient(nsqdCoord.rpcServer.rpcServer.Listener.ListenAddr().String(), time.Second)
	test.Nil(t, err)
	_, err = client.CallWithRetry("TestRpcCallNotExist", "req")
	test.NotNil(t, err)
	coordErr := client.CallRpcTestCoordErr("coorderr")
	test.NotNil(t, coordErr)
	test.NotEqual(t, coordErr.ErrType, CoordNetErr)
	test.Equal(t, coordErr.ErrMsg, "coorderr")
	test.Equal(t, coordErr.ErrCode, RpcCommonErr)
	test.Equal(t, coordErr.ErrType, CoordCommonErr)

	rsp, rpcErr := client.CallRpcTest("reqdata")
	test.NotNil(t, rpcErr)
	test.Equal(t, rsp, "reqdata")
	test.Equal(t, rpcErr.ErrCode, RpcNoErr)
	test.Equal(t, rpcErr.ErrMsg, "reqdata")
	test.Equal(t, rpcErr.ErrType, CoordCommonErr)
	timeoutErr := client.CallRpcTesttimeout("reqdata")
	test.NotNil(t, timeoutErr)
	test.Equal(t, timeoutErr.(*gorpc.ClientError).Timeout, true)
	time.Sleep(time.Second * 3)
	client.Close()
}

作者:absolute851    项目:ns   
func TestHTTPgetStatusText(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	_, httpAddr, _, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	url := fmt.Sprintf("http://%s/stats?format=text", httpAddr)
	resp, err := http.Get(url)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, resp.StatusCode, 200)
	test.NotNil(t, body)
}

作者:absolute851    项目:ns   
func TestHTTPpub(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.LogLevel = 2
	opts.Logger = newTestLogger(t)
	//opts.Logger = &levellogger.GLogger{}
	tcpAddr, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_http_pub" + strconv.Itoa(int(time.Now().Unix()))
	_ = nsqd.GetTopicIgnPart(topicName)
	conn, err := mustConnectNSQD(tcpAddr)
	test.Equal(t, err, nil)
	identify(t, conn, nil, frameTypeResponse)
	sub(t, conn, topicName, "ch")

	buf := bytes.NewBuffer([]byte("test message"))
	url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
	resp, err := http.Post(url, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, string(body), "OK")

	time.Sleep(5 * time.Millisecond)

	_, err = nsq.Ready(1).WriteTo(conn)
	test.Equal(t, err, nil)
	// sleep to allow the RDY state to take effect
	time.Sleep(50 * time.Millisecond)

	for {
		resp, _ := nsq.ReadResponse(conn)
		frameType, data, err := nsq.UnpackResponse(resp)
		test.Nil(t, err)
		test.NotEqual(t, frameTypeError, frameType)
		if frameType == frameTypeResponse {
			t.Logf("got response data: %v", string(data))
			continue
		}
		msgOut, err := nsq.DecodeMessage(data)
		test.Equal(t, []byte("test message"), msgOut.Body)
		_, err = nsq.Finish(msgOut.ID).WriteTo(conn)
		test.Nil(t, err)
		break
	}
	conn.Close()
}

作者:absolute851    项目:ns   
func TestHTTPSRequire(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	//opts.LogLevel = 2
	//opts.Logger = &levellogger.GLogger{}
	opts.LogLevel = 3
	opts.TLSCert = "./test/certs/server.pem"
	opts.TLSKey = "./test/certs/server.key"
	opts.TLSClientAuthPolicy = "require"
	_, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_http_pub_req" + strconv.Itoa(int(time.Now().Unix()))
	nsqd.GetTopicIgnPart(topicName)

	buf := bytes.NewBuffer([]byte("test message"))
	url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
	resp, err := http.Post(url, "application/octet-stream", buf)
	test.Equal(t, resp.StatusCode, 403)

	httpsAddr := nsqdServer.httpsListener.Addr().(*net.TCPAddr)
	cert, err := tls.LoadX509KeyPair("./test/certs/cert.pem", "./test/certs/key.pem")
	test.Equal(t, err, nil)
	tlsConfig := &tls.Config{
		Certificates:       []tls.Certificate{cert},
		InsecureSkipVerify: true,
		MinVersion:         0,
	}
	transport := &http.Transport{
		TLSClientConfig: tlsConfig,
	}
	client := &http.Client{Transport: transport}

	buf = bytes.NewBuffer([]byte("test message"))
	url = fmt.Sprintf("https://%s/pub?topic=%s", httpsAddr, topicName)
	resp, err = client.Post(url, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, string(body), "OK")

	time.Sleep(5 * time.Millisecond)

}

作者:absolute851    项目:ns   
func TestReconfigure(t *testing.T) {
	lopts := nsqlookupd.NewOptions()
	lopts.Logger = newTestLogger(t)
	nsqlookupd.SetLogger(lopts)
	_, _, lookupd1 := mustStartNSQLookupd(lopts)
	defer lookupd1.Exit()
	_, _, lookupd2 := mustStartNSQLookupd(lopts)
	defer lookupd2.Exit()
	_, _, lookupd3 := mustStartNSQLookupd(lopts)
	defer lookupd3.Exit()

	opts := nsqdNs.NewOptions()
	opts.Logger = newTestLogger(t)
	_, _, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	time.Sleep(50 * time.Millisecond)

	newOpts := *opts
	newOpts.NSQLookupdTCPAddresses = []string{lookupd1.RealTCPAddr().String()}
	nsqd.SwapOpts(&newOpts)
	nsqd.TriggerOptsNotification()
	test.Equal(t, len(nsqd.GetOpts().NSQLookupdTCPAddresses), 1)

	time.Sleep(50 * time.Millisecond)

	numLookupPeers := len(nsqdServer.lookupPeers.Load().([]*clusterinfo.LookupPeer))
	test.Equal(t, numLookupPeers, 1)

	newOpts = *opts
	newOpts.NSQLookupdTCPAddresses = []string{lookupd2.RealTCPAddr().String(), lookupd3.RealTCPAddr().String()}
	nsqd.SwapOpts(&newOpts)
	nsqd.TriggerOptsNotification()
	test.Equal(t, len(nsqd.GetOpts().NSQLookupdTCPAddresses), 2)

	time.Sleep(50 * time.Millisecond)

	var lookupPeers []string
	for _, lp := range nsqdServer.lookupPeers.Load().([]*clusterinfo.LookupPeer) {
		lookupPeers = append(lookupPeers, lp.String())
	}
	test.Equal(t, len(lookupPeers), 2)
	test.Equal(t, lookupPeers, newOpts.NSQLookupdTCPAddresses)
}

作者:absolute851    项目:ns   
func TestHTTPconfig(t *testing.T) {
	lopts := nsqlookupd.NewOptions()
	lopts.Logger = newTestLogger(t)
	nsqlookupd.SetLogger(lopts)
	_, _, lookupd1 := mustStartNSQLookupd(lopts)
	defer lookupd1.Exit()
	_, _, lookupd2 := mustStartNSQLookupd(lopts)
	defer lookupd2.Exit()

	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	_, httpAddr, _, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	url := fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr)
	resp, err := http.Get(url)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, resp.StatusCode, 200)
	test.Equal(t, string(body), "[]")

	client := http.Client{}
	addrs := fmt.Sprintf(`["%s","%s"]`, lookupd1.RealTCPAddr().String(), lookupd2.RealTCPAddr().String())
	url = fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr)
	req, err := http.NewRequest("PUT", url, bytes.NewBuffer([]byte(addrs)))
	test.Equal(t, err, nil)
	resp, err = client.Do(req)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ = ioutil.ReadAll(resp.Body)
	test.Equal(t, resp.StatusCode, 200)
	test.Equal(t, string(body), addrs)
}

作者:absolute851    项目:ns   
func TestGetTopic(t *testing.T) {
	opts := NewOptions()
	opts.Logger = newTestLogger(t)
	_, _, nsqd := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqd.Exit()

	topic1 := nsqd.GetTopic("test", 0)
	test.NotNil(t, topic1)
	test.Equal(t, "test", topic1.GetTopicName())

	topic2 := nsqd.GetTopic("test", 0)
	test.Equal(t, topic1, topic2)

	topic3 := nsqd.GetTopic("test2", 1)
	test.Equal(t, "test2", topic3.GetTopicName())
	test.NotEqual(t, topic2, topic3)

	topic1_1 := nsqd.GetTopicIgnPart("test")
	test.Equal(t, "test", topic1_1.GetTopicName())
	test.Equal(t, 0, topic1_1.GetTopicPart())
	topic3_1 := nsqd.GetTopicIgnPart("test2")
	test.Equal(t, "test2", topic3_1.GetTopicName())
	test.Equal(t, 1, topic3_1.GetTopicPart())

}

作者:absolute851    项目:ns   
func TestGetChannel(t *testing.T) {
	opts := NewOptions()
	opts.Logger = newTestLogger(t)
	_, _, nsqd := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqd.Exit()

	topic := nsqd.GetTopic("test", 0)

	channel1 := topic.GetChannel("ch1")
	test.NotNil(t, channel1)
	test.Equal(t, "ch1", channel1.name)

	channel2 := topic.GetChannel("ch2")

	test.Equal(t, channel1, topic.channelMap["ch1"])
	test.Equal(t, channel2, topic.channelMap["ch2"])
}

作者:absolute851    项目:ns   
func TestHTTPgetStatusJSON(t *testing.T) {
	testTime := time.Now()
	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	_, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	testTime = nsqd.GetStartTime()
	expectedJSON := fmt.Sprintf(`{"status_code":200,"status_txt":"OK","data":{"version":"%v","health":"OK","start_time":%v,"topics":[]}}`, version.Binary, testTime.Unix())

	url := fmt.Sprintf("http://%s/stats?format=json", httpAddr)
	resp, err := http.Get(url)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, resp.StatusCode, 200)
	test.Equal(t, string(body), expectedJSON)
}

作者:absolute851    项目:ns   
func TestTopicBackendMaxMsgSize(t *testing.T) {
	opts := NewOptions()
	opts.Logger = newTestLogger(t)
	_, _, nsqd := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqd.Exit()

	topicName := "test_topic_backend_maxmsgsize" + strconv.Itoa(int(time.Now().Unix()))
	topic := nsqd.GetTopic(topicName, 0)

	test.Equal(t, topic.backend.maxMsgSize, int32(opts.MaxMsgSize+minValidMsgLength))
}

作者:absolute851    项目:ns   
func TestDeletes(t *testing.T) {
	opts := NewOptions()
	opts.Logger = newTestLogger(t)
	_, _, nsqd := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqd.Exit()

	topic := nsqd.GetTopicIgnPart("test")
	oldMagicFile := path.Join(topic.dataPath, "magic"+strconv.Itoa(topic.partition))

	channel1 := topic.GetChannel("ch1")
	test.NotNil(t, channel1)

	err := topic.SetMagicCode(time.Now().UnixNano())
	_, err = os.Stat(oldMagicFile)
	test.Equal(t, nil, err)
	err = topic.DeleteExistingChannel("ch1")
	test.Equal(t, nil, err)
	test.Equal(t, 0, len(topic.channelMap))

	channel2 := topic.GetChannel("ch2")
	test.NotNil(t, channel2)

	err = nsqd.DeleteExistingTopic("test", topic.GetTopicPart())
	test.Equal(t, nil, err)
	test.Equal(t, 0, len(topic.channelMap))
	test.Equal(t, 0, len(nsqd.topicMap))
	_, err = os.Stat(oldMagicFile)
	test.NotNil(t, err)
}

作者:absolute851    项目:ns   
func TestHTTPpubEmpty(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.Logger = newTestLogger(t)
	_, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_http_pub_empty" + strconv.Itoa(int(time.Now().Unix()))
	nsqd.GetTopicIgnPart(topicName)

	buf := bytes.NewBuffer([]byte(""))
	url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
	resp, err := http.Post(url, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	test.Equal(t, resp.StatusCode, 500)
	test.Equal(t, string(body), `{"status_code":500,"status_txt":"MSG_EMPTY","data":null}`)

	time.Sleep(5 * time.Millisecond)

}

作者:absolute851    项目:ns   
func TestChannelEmptyConsumer(t *testing.T) {
	opts := nsqdNs.NewOptions()
	opts.Logger = newTestLogger(t)
	tcpAddr, _, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	conn, _ := mustConnectNSQD(tcpAddr)
	defer conn.Close()

	topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
	topic := nsqd.GetTopicIgnPart(topicName)
	channel := topic.GetChannel("channel")
	client := nsqdNs.NewClientV2(0, conn, opts, nil)
	client.SetReadyCount(25)
	channel.AddClient(client.ID, client)

	for i := 0; i < 25; i++ {
		msg := nsqdNs.NewMessage(nsqdNs.MessageID(i), []byte("test"))
		channel.StartInFlightTimeout(msg, 0, "", opts.MsgTimeout)
		client.SendingMessage()
	}

	for _, cl := range channel.GetClients() {
		stats := cl.Stats()
		test.Equal(t, stats.InFlightCount, int64(25))
	}

	channel.SetConsumeOffset(channel.GetChannelEnd().Offset(), channel.GetChannelEnd().TotalMsgCnt(), true)
	time.Sleep(time.Second)

	for _, cl := range channel.GetClients() {
		stats := cl.Stats()
		test.Equal(t, stats.InFlightCount, int64(0))
	}
}

作者:absolute851    项目:ns   
func TestHTTPpubtrace(t *testing.T) {
	opts := nsqd.NewOptions()
	opts.LogLevel = 2
	opts.Logger = newTestLogger(t)
	//opts.Logger = &levellogger.GLogger{}
	_, httpAddr, nsqd, nsqdServer := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqdServer.Exit()

	topicName := "test_http_pub_trace" + strconv.Itoa(int(time.Now().Unix()))
	_ = nsqd.GetTopicIgnPart(topicName)

	buf := bytes.NewBuffer([]byte("test message"))
	rawurl := fmt.Sprintf("http://%s/pubtrace?topic=%s", httpAddr, topicName)
	resp, err := http.Post(rawurl, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	body, _ := ioutil.ReadAll(resp.Body)
	resp.Body.Close()
	test.Equal(t, resp.StatusCode, 400)
	test.Equal(t, string(body), `{"message":"INVALID_TRACE_ID"}`)

	time.Sleep(time.Second)
	// the buffer will be drained by the http post
	// so we need refill the buffer.
	buf = bytes.NewBuffer([]byte("test message 2"))
	rawurl = fmt.Sprintf("http://%s/pubtrace?topic=%s&partition=0&trace_id=11", httpAddr, topicName)
	resp, err = http.Post(rawurl, "application/octet-stream", buf)
	test.Equal(t, err, nil)
	body, _ = ioutil.ReadAll(resp.Body)
	resp.Body.Close()
	test.Equal(t, resp.StatusCode, 200)

	type tmpResp struct {
		Status      string `json:"status"`
		ID          uint64 `json:"id"`
		TraceID     string `json:"trace_id"`
		QueueOffset uint64 `json:"queue_offset"`
		DataRawSize uint32 `json:"rawsize"`
	}
	var ret tmpResp
	json.Unmarshal(body, &ret)
	test.Equal(t, ret.Status, "OK")
	test.Equal(t, ret.TraceID, "11")
	time.Sleep(5 * time.Millisecond)

}

作者:absolute851    项目:ns   
func TestDiskQueueSnapshotReader(t *testing.T) {
	l := newTestLogger(t)
	nsqLog.Logger = l
	dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix()))
	tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
	test.Nil(t, err)
	defer os.RemoveAll(tmpDir)
	queue, _ := newDiskQueueWriter(dqName, tmpDir, 1024, 4, 1<<10, 1)
	dqWriter := queue.(*diskQueueWriter)
	defer dqWriter.Close()
	test.NotNil(t, dqWriter)

	msg := []byte("test")
	msgNum := 2000
	var midEnd BackendQueueEnd
	var midEnd2 BackendQueueEnd
	for i := 0; i < msgNum; i++ {
		dqWriter.Put(msg)
		if i == msgNum/2 {
			midEnd = dqWriter.GetQueueWriteEnd()
		}
		if i == msgNum/4 {
			midEnd2 = dqWriter.GetQueueWriteEnd()
		}
	}
	dqWriter.Flush()
	end := dqWriter.GetQueueWriteEnd()
	test.Nil(t, err)

	dqReader := NewDiskQueueSnapshot(dqName, tmpDir, end)
	defer dqReader.Close()

	queueStart := dqReader.queueStart
	test.Equal(t, BackendOffset(0), queueStart.Offset())
	dqReader.SetQueueStart(midEnd2)
	test.Equal(t, midEnd2.Offset(), dqReader.readPos.Offset())
	result := dqReader.ReadOne()
	test.Nil(t, result.Err)
	test.Equal(t, midEnd2.Offset(), result.Offset)
	err = dqReader.SeekTo(midEnd.Offset())
	test.Nil(t, err)
	test.Equal(t, midEnd.Offset(), dqReader.readPos.virtualEnd)
	result = dqReader.ReadOne()
	test.Nil(t, result.Err)
	test.Equal(t, midEnd.Offset(), result.Offset)
	data, err := dqReader.ReadRaw(100)
	test.Nil(t, err)
	test.Equal(t, 100, len(data))
	// remove some begin of queue, and test queue start
}

作者:absolute851    项目:ns   
func TestDeleteLast(t *testing.T) {
	opts := NewOptions()
	opts.Logger = newTestLogger(t)
	_, _, nsqd := mustStartNSQD(opts)
	defer os.RemoveAll(opts.DataPath)
	defer nsqd.Exit()

	topic := nsqd.GetTopic("test", 0)

	channel1 := topic.GetChannel("ch1")
	test.NotNil(t, channel1)

	err := topic.DeleteExistingChannel("ch1")
	test.Nil(t, err)
	test.Equal(t, 0, len(topic.channelMap))

	msg := NewMessage(0, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
	_, _, _, _, err = topic.PutMessage(msg)
	time.Sleep(100 * time.Millisecond)
	test.Nil(t, err)
}


问题


面经


文章

微信
公众号

扫码关注公众号